using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions.Sagas; namespace Svrnty.CQRS.Events.Sagas; /// /// Orchestrates saga execution with compensation logic. /// public sealed class SagaOrchestrator : ISagaOrchestrator { private readonly IServiceProvider _serviceProvider; private readonly ISagaStateStore _stateStore; private readonly ISagaRegistry _sagaRegistry; private readonly ILogger _logger; public SagaOrchestrator( IServiceProvider serviceProvider, ISagaStateStore stateStore, ISagaRegistry sagaRegistry, ILogger logger) { _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); _stateStore = stateStore ?? throw new ArgumentNullException(nameof(stateStore)); _sagaRegistry = sagaRegistry ?? throw new ArgumentNullException(nameof(sagaRegistry)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public async Task StartSagaAsync( string correlationId, ISagaData? initialData = null, CancellationToken cancellationToken = default) where TSaga : ISaga { if (string.IsNullOrWhiteSpace(correlationId)) throw new ArgumentException("Correlation ID cannot be null or empty", nameof(correlationId)); var sagaDefinition = _sagaRegistry.GetDefinition(); if (sagaDefinition == null) throw new InvalidOperationException($"Saga '{typeof(TSaga).Name}' is not registered"); // Create saga instance var sagaId = Guid.NewGuid().ToString(); var saga = ActivatorUtilities.CreateInstance(_serviceProvider); // Set saga properties via reflection (since ISaga only defines getters) var sagaType = typeof(TSaga); sagaType.GetProperty(nameof(ISaga.SagaId))?.SetValue(saga, sagaId); sagaType.GetProperty(nameof(ISaga.CorrelationId))?.SetValue(saga, correlationId); sagaType.GetProperty(nameof(ISaga.SagaName))?.SetValue(saga, sagaDefinition.SagaName); var data = initialData ?? new SagaData(); var context = new SagaContext(saga, SagaState.NotStarted, data); // Save initial state var snapshot = CreateSnapshot(context, sagaDefinition, 0, new List(), null); await _stateStore.SaveStateAsync(snapshot, cancellationToken); _logger.LogInformation( "Starting saga '{SagaName}' with ID '{SagaId}' and correlation '{CorrelationId}'", sagaDefinition.SagaName, sagaId, correlationId); // Execute saga asynchronously (don't await - fire and forget) _ = Task.Run(async () => await ExecuteSagaAsync(saga, context, sagaDefinition, cancellationToken), cancellationToken); return sagaId; } /// public async Task ResumeSagaAsync(string sagaId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(sagaId)) throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId)); var snapshot = await _stateStore.LoadStateAsync(sagaId, cancellationToken); if (snapshot == null) throw new InvalidOperationException($"Saga '{sagaId}' not found"); if (snapshot.State != SagaState.Paused) throw new InvalidOperationException($"Saga '{sagaId}' is not in Paused state (current: {snapshot.State})"); _logger.LogInformation("Resuming saga '{SagaName}' with ID '{SagaId}'", snapshot.SagaName, sagaId); // Reconstruct saga and context from snapshot var sagaDefinition = _sagaRegistry.GetDefinitionByName(snapshot.SagaName); if (sagaDefinition == null) throw new InvalidOperationException($"Saga definition '{snapshot.SagaName}' not found in registry"); var sagaType = _sagaRegistry.GetSagaType(snapshot.SagaName); if (sagaType == null) throw new InvalidOperationException($"Saga type for '{snapshot.SagaName}' not found"); var saga = (ISaga)ActivatorUtilities.CreateInstance(_serviceProvider, sagaType); sagaType.GetProperty(nameof(ISaga.SagaId))?.SetValue(saga, snapshot.SagaId); sagaType.GetProperty(nameof(ISaga.CorrelationId))?.SetValue(saga, snapshot.CorrelationId); sagaType.GetProperty(nameof(ISaga.SagaName))?.SetValue(saga, snapshot.SagaName); var data = new SagaData(); data.LoadFrom(snapshot.Data); var context = new SagaContext(saga, SagaState.Running, data); // Update state to Running snapshot = snapshot with { State = SagaState.Running, LastUpdated = DateTimeOffset.UtcNow }; await _stateStore.SaveStateAsync(snapshot, cancellationToken); // Resume execution _ = Task.Run(async () => await ExecuteSagaAsync(saga, context, sagaDefinition, cancellationToken, snapshot.CurrentStep), cancellationToken); } /// public async Task CancelSagaAsync(string sagaId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(sagaId)) throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId)); var snapshot = await _stateStore.LoadStateAsync(sagaId, cancellationToken); if (snapshot == null) throw new InvalidOperationException($"Saga '{sagaId}' not found"); if (snapshot.State == SagaState.Completed || snapshot.State == SagaState.Compensated) throw new InvalidOperationException($"Cannot cancel saga '{sagaId}' in {snapshot.State} state"); _logger.LogWarning("Cancelling saga '{SagaName}' with ID '{SagaId}'", snapshot.SagaName, sagaId); // Reconstruct context for compensation var sagaDefinition = _sagaRegistry.GetDefinitionByName(snapshot.SagaName); if (sagaDefinition == null) throw new InvalidOperationException($"Saga definition '{snapshot.SagaName}' not found in registry"); var sagaType = _sagaRegistry.GetSagaType(snapshot.SagaName); if (sagaType == null) throw new InvalidOperationException($"Saga type for '{snapshot.SagaName}' not found"); var saga = (ISaga)ActivatorUtilities.CreateInstance(_serviceProvider, sagaType); sagaType.GetProperty(nameof(ISaga.SagaId))?.SetValue(saga, snapshot.SagaId); sagaType.GetProperty(nameof(ISaga.CorrelationId))?.SetValue(saga, snapshot.CorrelationId); sagaType.GetProperty(nameof(ISaga.SagaName))?.SetValue(saga, snapshot.SagaName); var data = new SagaData(); data.LoadFrom(snapshot.Data); var context = new SagaContext(saga, SagaState.Compensating, data); // Compensate completed steps await CompensateSagaAsync(context, sagaDefinition, snapshot.CompletedSteps, "Saga cancelled by user", cancellationToken); } /// public async Task GetStatusAsync(string sagaId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(sagaId)) throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId)); var snapshot = await _stateStore.LoadStateAsync(sagaId, cancellationToken); if (snapshot == null) throw new InvalidOperationException($"Saga '{sagaId}' not found"); return new SagaStatus { SagaId = snapshot.SagaId, CorrelationId = snapshot.CorrelationId, SagaName = snapshot.SagaName, State = snapshot.State, CurrentStep = snapshot.CurrentStep, TotalSteps = snapshot.TotalSteps, StartedAt = snapshot.StartedAt, LastUpdated = snapshot.LastUpdated, CompletedAt = snapshot.CompletedAt, ErrorMessage = snapshot.ErrorMessage, Data = snapshot.Data }; } private async Task ExecuteSagaAsync( ISaga saga, SagaContext context, SagaDefinition definition, CancellationToken cancellationToken, int startFromStep = 0) { var completedSteps = new List(); context.State = SagaState.Running; try { for (int i = startFromStep; i < definition.Steps.Count; i++) { var step = definition.Steps[i]; _logger.LogInformation( "Executing saga '{SagaName}' step {StepIndex}/{TotalSteps}: {StepName}", definition.SagaName, i + 1, definition.Steps.Count, step.StepName); try { await step.ExecuteAsync(context, cancellationToken); completedSteps.Add(step.StepName); // Save checkpoint after each step var snapshot = CreateSnapshot(context, definition, i + 1, completedSteps, null); await _stateStore.SaveStateAsync(snapshot, cancellationToken); } catch (Exception ex) { _logger.LogError( ex, "Saga '{SagaName}' step {StepIndex} '{StepName}' failed", definition.SagaName, i + 1, step.StepName); // Compensate completed steps await CompensateSagaAsync(context, definition, completedSteps, ex.Message, cancellationToken); return; } } // All steps completed successfully context.State = SagaState.Completed; var completedSnapshot = CreateSnapshot(context, definition, definition.Steps.Count, completedSteps, null); completedSnapshot = completedSnapshot with { CompletedAt = DateTimeOffset.UtcNow }; await _stateStore.SaveStateAsync(completedSnapshot, cancellationToken); _logger.LogInformation("Saga '{SagaName}' completed successfully", definition.SagaName); } catch (Exception ex) { _logger.LogError(ex, "Saga '{SagaName}' execution failed", definition.SagaName); await CompensateSagaAsync(context, definition, completedSteps, ex.Message, cancellationToken); } } private async Task CompensateSagaAsync( SagaContext context, SagaDefinition definition, List completedSteps, string errorMessage, CancellationToken cancellationToken) { context.State = SagaState.Compensating; _logger.LogWarning( "Compensating saga '{SagaName}', rolling back {StepCount} completed steps", definition.SagaName, completedSteps.Count); // Save compensating state var compensatingSnapshot = CreateSnapshot(context, definition, 0, completedSteps, errorMessage); await _stateStore.SaveStateAsync(compensatingSnapshot, cancellationToken); // Compensate in reverse order for (int i = completedSteps.Count - 1; i >= 0; i--) { var stepName = completedSteps[i]; var step = definition.Steps.FirstOrDefault(s => s.StepName == stepName); if (step == null) { _logger.LogWarning("Cannot find step '{StepName}' for compensation", stepName); continue; } try { _logger.LogInformation("Compensating step: {StepName}", stepName); await step.CompensateAsync(context, cancellationToken); } catch (Exception ex) { _logger.LogError( ex, "Compensation failed for step '{StepName}' in saga '{SagaName}'", stepName, definition.SagaName); // Continue compensation even if one step fails } } // Mark as compensated context.State = SagaState.Compensated; var compensatedSnapshot = CreateSnapshot(context, definition, 0, new List(), errorMessage); compensatedSnapshot = compensatedSnapshot with { CompletedAt = DateTimeOffset.UtcNow }; await _stateStore.SaveStateAsync(compensatedSnapshot, cancellationToken); _logger.LogInformation("Saga '{SagaName}' compensation completed", definition.SagaName); } private SagaStateSnapshot CreateSnapshot( SagaContext context, SagaDefinition definition, int currentStep, List completedSteps, string? errorMessage) { return new SagaStateSnapshot { SagaId = context.Saga.SagaId, CorrelationId = context.Saga.CorrelationId, SagaName = context.Saga.SagaName, State = context.State, CurrentStep = currentStep, TotalSteps = definition.Steps.Count, CompletedSteps = new List(completedSteps), StartedAt = DateTimeOffset.UtcNow, LastUpdated = DateTimeOffset.UtcNow, ErrorMessage = errorMessage, Data = new Dictionary(context.Data.GetAll()) }; } }