328 lines
13 KiB
C#
328 lines
13 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using Svrnty.CQRS.Events.Abstractions.Sagas;
|
|
|
|
namespace Svrnty.CQRS.Events.Sagas;
|
|
|
|
/// <summary>
|
|
/// Orchestrates saga execution with compensation logic.
|
|
/// </summary>
|
|
public sealed class SagaOrchestrator : ISagaOrchestrator
|
|
{
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly ISagaStateStore _stateStore;
|
|
private readonly ISagaRegistry _sagaRegistry;
|
|
private readonly ILogger<SagaOrchestrator> _logger;
|
|
|
|
public SagaOrchestrator(
|
|
IServiceProvider serviceProvider,
|
|
ISagaStateStore stateStore,
|
|
ISagaRegistry sagaRegistry,
|
|
ILogger<SagaOrchestrator> logger)
|
|
{
|
|
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
|
|
_stateStore = stateStore ?? throw new ArgumentNullException(nameof(stateStore));
|
|
_sagaRegistry = sagaRegistry ?? throw new ArgumentNullException(nameof(sagaRegistry));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<string> StartSagaAsync<TSaga>(
|
|
string correlationId,
|
|
ISagaData? initialData = null,
|
|
CancellationToken cancellationToken = default)
|
|
where TSaga : ISaga
|
|
{
|
|
if (string.IsNullOrWhiteSpace(correlationId))
|
|
throw new ArgumentException("Correlation ID cannot be null or empty", nameof(correlationId));
|
|
|
|
var sagaDefinition = _sagaRegistry.GetDefinition<TSaga>();
|
|
if (sagaDefinition == null)
|
|
throw new InvalidOperationException($"Saga '{typeof(TSaga).Name}' is not registered");
|
|
|
|
// Create saga instance
|
|
var sagaId = Guid.NewGuid().ToString();
|
|
var saga = ActivatorUtilities.CreateInstance<TSaga>(_serviceProvider);
|
|
|
|
// Set saga properties via reflection (since ISaga only defines getters)
|
|
var sagaType = typeof(TSaga);
|
|
sagaType.GetProperty(nameof(ISaga.SagaId))?.SetValue(saga, sagaId);
|
|
sagaType.GetProperty(nameof(ISaga.CorrelationId))?.SetValue(saga, correlationId);
|
|
sagaType.GetProperty(nameof(ISaga.SagaName))?.SetValue(saga, sagaDefinition.SagaName);
|
|
|
|
var data = initialData ?? new SagaData();
|
|
var context = new SagaContext(saga, SagaState.NotStarted, data);
|
|
|
|
// Save initial state
|
|
var snapshot = CreateSnapshot(context, sagaDefinition, 0, new List<string>(), null);
|
|
await _stateStore.SaveStateAsync(snapshot, cancellationToken);
|
|
|
|
_logger.LogInformation(
|
|
"Starting saga '{SagaName}' with ID '{SagaId}' and correlation '{CorrelationId}'",
|
|
sagaDefinition.SagaName,
|
|
sagaId,
|
|
correlationId);
|
|
|
|
// Execute saga asynchronously (don't await - fire and forget)
|
|
_ = Task.Run(async () => await ExecuteSagaAsync(saga, context, sagaDefinition, cancellationToken), cancellationToken);
|
|
|
|
return sagaId;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task ResumeSagaAsync(string sagaId, CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(sagaId))
|
|
throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId));
|
|
|
|
var snapshot = await _stateStore.LoadStateAsync(sagaId, cancellationToken);
|
|
if (snapshot == null)
|
|
throw new InvalidOperationException($"Saga '{sagaId}' not found");
|
|
|
|
if (snapshot.State != SagaState.Paused)
|
|
throw new InvalidOperationException($"Saga '{sagaId}' is not in Paused state (current: {snapshot.State})");
|
|
|
|
_logger.LogInformation("Resuming saga '{SagaName}' with ID '{SagaId}'", snapshot.SagaName, sagaId);
|
|
|
|
// Reconstruct saga and context from snapshot
|
|
var sagaDefinition = _sagaRegistry.GetDefinitionByName(snapshot.SagaName);
|
|
if (sagaDefinition == null)
|
|
throw new InvalidOperationException($"Saga definition '{snapshot.SagaName}' not found in registry");
|
|
|
|
var sagaType = _sagaRegistry.GetSagaType(snapshot.SagaName);
|
|
if (sagaType == null)
|
|
throw new InvalidOperationException($"Saga type for '{snapshot.SagaName}' not found");
|
|
|
|
var saga = (ISaga)ActivatorUtilities.CreateInstance(_serviceProvider, sagaType);
|
|
sagaType.GetProperty(nameof(ISaga.SagaId))?.SetValue(saga, snapshot.SagaId);
|
|
sagaType.GetProperty(nameof(ISaga.CorrelationId))?.SetValue(saga, snapshot.CorrelationId);
|
|
sagaType.GetProperty(nameof(ISaga.SagaName))?.SetValue(saga, snapshot.SagaName);
|
|
|
|
var data = new SagaData();
|
|
data.LoadFrom(snapshot.Data);
|
|
|
|
var context = new SagaContext(saga, SagaState.Running, data);
|
|
|
|
// Update state to Running
|
|
snapshot = snapshot with
|
|
{
|
|
State = SagaState.Running,
|
|
LastUpdated = DateTimeOffset.UtcNow
|
|
};
|
|
await _stateStore.SaveStateAsync(snapshot, cancellationToken);
|
|
|
|
// Resume execution
|
|
_ = Task.Run(async () => await ExecuteSagaAsync(saga, context, sagaDefinition, cancellationToken, snapshot.CurrentStep), cancellationToken);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task CancelSagaAsync(string sagaId, CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(sagaId))
|
|
throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId));
|
|
|
|
var snapshot = await _stateStore.LoadStateAsync(sagaId, cancellationToken);
|
|
if (snapshot == null)
|
|
throw new InvalidOperationException($"Saga '{sagaId}' not found");
|
|
|
|
if (snapshot.State == SagaState.Completed || snapshot.State == SagaState.Compensated)
|
|
throw new InvalidOperationException($"Cannot cancel saga '{sagaId}' in {snapshot.State} state");
|
|
|
|
_logger.LogWarning("Cancelling saga '{SagaName}' with ID '{SagaId}'", snapshot.SagaName, sagaId);
|
|
|
|
// Reconstruct context for compensation
|
|
var sagaDefinition = _sagaRegistry.GetDefinitionByName(snapshot.SagaName);
|
|
if (sagaDefinition == null)
|
|
throw new InvalidOperationException($"Saga definition '{snapshot.SagaName}' not found in registry");
|
|
|
|
var sagaType = _sagaRegistry.GetSagaType(snapshot.SagaName);
|
|
if (sagaType == null)
|
|
throw new InvalidOperationException($"Saga type for '{snapshot.SagaName}' not found");
|
|
|
|
var saga = (ISaga)ActivatorUtilities.CreateInstance(_serviceProvider, sagaType);
|
|
sagaType.GetProperty(nameof(ISaga.SagaId))?.SetValue(saga, snapshot.SagaId);
|
|
sagaType.GetProperty(nameof(ISaga.CorrelationId))?.SetValue(saga, snapshot.CorrelationId);
|
|
sagaType.GetProperty(nameof(ISaga.SagaName))?.SetValue(saga, snapshot.SagaName);
|
|
|
|
var data = new SagaData();
|
|
data.LoadFrom(snapshot.Data);
|
|
|
|
var context = new SagaContext(saga, SagaState.Compensating, data);
|
|
|
|
// Compensate completed steps
|
|
await CompensateSagaAsync(context, sagaDefinition, snapshot.CompletedSteps, "Saga cancelled by user", cancellationToken);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<SagaStatus> GetStatusAsync(string sagaId, CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(sagaId))
|
|
throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId));
|
|
|
|
var snapshot = await _stateStore.LoadStateAsync(sagaId, cancellationToken);
|
|
if (snapshot == null)
|
|
throw new InvalidOperationException($"Saga '{sagaId}' not found");
|
|
|
|
return new SagaStatus
|
|
{
|
|
SagaId = snapshot.SagaId,
|
|
CorrelationId = snapshot.CorrelationId,
|
|
SagaName = snapshot.SagaName,
|
|
State = snapshot.State,
|
|
CurrentStep = snapshot.CurrentStep,
|
|
TotalSteps = snapshot.TotalSteps,
|
|
StartedAt = snapshot.StartedAt,
|
|
LastUpdated = snapshot.LastUpdated,
|
|
CompletedAt = snapshot.CompletedAt,
|
|
ErrorMessage = snapshot.ErrorMessage,
|
|
Data = snapshot.Data
|
|
};
|
|
}
|
|
|
|
private async Task ExecuteSagaAsync(
|
|
ISaga saga,
|
|
SagaContext context,
|
|
SagaDefinition definition,
|
|
CancellationToken cancellationToken,
|
|
int startFromStep = 0)
|
|
{
|
|
var completedSteps = new List<string>();
|
|
context.State = SagaState.Running;
|
|
|
|
try
|
|
{
|
|
for (int i = startFromStep; i < definition.Steps.Count; i++)
|
|
{
|
|
var step = definition.Steps[i];
|
|
|
|
_logger.LogInformation(
|
|
"Executing saga '{SagaName}' step {StepIndex}/{TotalSteps}: {StepName}",
|
|
definition.SagaName,
|
|
i + 1,
|
|
definition.Steps.Count,
|
|
step.StepName);
|
|
|
|
try
|
|
{
|
|
await step.ExecuteAsync(context, cancellationToken);
|
|
completedSteps.Add(step.StepName);
|
|
|
|
// Save checkpoint after each step
|
|
var snapshot = CreateSnapshot(context, definition, i + 1, completedSteps, null);
|
|
await _stateStore.SaveStateAsync(snapshot, cancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(
|
|
ex,
|
|
"Saga '{SagaName}' step {StepIndex} '{StepName}' failed",
|
|
definition.SagaName,
|
|
i + 1,
|
|
step.StepName);
|
|
|
|
// Compensate completed steps
|
|
await CompensateSagaAsync(context, definition, completedSteps, ex.Message, cancellationToken);
|
|
return;
|
|
}
|
|
}
|
|
|
|
// All steps completed successfully
|
|
context.State = SagaState.Completed;
|
|
var completedSnapshot = CreateSnapshot(context, definition, definition.Steps.Count, completedSteps, null);
|
|
completedSnapshot = completedSnapshot with { CompletedAt = DateTimeOffset.UtcNow };
|
|
await _stateStore.SaveStateAsync(completedSnapshot, cancellationToken);
|
|
|
|
_logger.LogInformation("Saga '{SagaName}' completed successfully", definition.SagaName);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Saga '{SagaName}' execution failed", definition.SagaName);
|
|
await CompensateSagaAsync(context, definition, completedSteps, ex.Message, cancellationToken);
|
|
}
|
|
}
|
|
|
|
private async Task CompensateSagaAsync(
|
|
SagaContext context,
|
|
SagaDefinition definition,
|
|
List<string> completedSteps,
|
|
string errorMessage,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
context.State = SagaState.Compensating;
|
|
|
|
_logger.LogWarning(
|
|
"Compensating saga '{SagaName}', rolling back {StepCount} completed steps",
|
|
definition.SagaName,
|
|
completedSteps.Count);
|
|
|
|
// Save compensating state
|
|
var compensatingSnapshot = CreateSnapshot(context, definition, 0, completedSteps, errorMessage);
|
|
await _stateStore.SaveStateAsync(compensatingSnapshot, cancellationToken);
|
|
|
|
// Compensate in reverse order
|
|
for (int i = completedSteps.Count - 1; i >= 0; i--)
|
|
{
|
|
var stepName = completedSteps[i];
|
|
var step = definition.Steps.FirstOrDefault(s => s.StepName == stepName);
|
|
|
|
if (step == null)
|
|
{
|
|
_logger.LogWarning("Cannot find step '{StepName}' for compensation", stepName);
|
|
continue;
|
|
}
|
|
|
|
try
|
|
{
|
|
_logger.LogInformation("Compensating step: {StepName}", stepName);
|
|
await step.CompensateAsync(context, cancellationToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(
|
|
ex,
|
|
"Compensation failed for step '{StepName}' in saga '{SagaName}'",
|
|
stepName,
|
|
definition.SagaName);
|
|
|
|
// Continue compensation even if one step fails
|
|
}
|
|
}
|
|
|
|
// Mark as compensated
|
|
context.State = SagaState.Compensated;
|
|
var compensatedSnapshot = CreateSnapshot(context, definition, 0, new List<string>(), errorMessage);
|
|
compensatedSnapshot = compensatedSnapshot with { CompletedAt = DateTimeOffset.UtcNow };
|
|
await _stateStore.SaveStateAsync(compensatedSnapshot, cancellationToken);
|
|
|
|
_logger.LogInformation("Saga '{SagaName}' compensation completed", definition.SagaName);
|
|
}
|
|
|
|
private SagaStateSnapshot CreateSnapshot(
|
|
SagaContext context,
|
|
SagaDefinition definition,
|
|
int currentStep,
|
|
List<string> completedSteps,
|
|
string? errorMessage)
|
|
{
|
|
return new SagaStateSnapshot
|
|
{
|
|
SagaId = context.Saga.SagaId,
|
|
CorrelationId = context.Saga.CorrelationId,
|
|
SagaName = context.Saga.SagaName,
|
|
State = context.State,
|
|
CurrentStep = currentStep,
|
|
TotalSteps = definition.Steps.Count,
|
|
CompletedSteps = new List<string>(completedSteps),
|
|
StartedAt = DateTimeOffset.UtcNow,
|
|
LastUpdated = DateTimeOffset.UtcNow,
|
|
ErrorMessage = errorMessage,
|
|
Data = new Dictionary<string, object>(context.Data.GetAll())
|
|
};
|
|
}
|
|
}
|