dotnet-cqrs/docs/event-streaming/sagas/creating-sagas.md

13 KiB

Creating Sagas

Implement ISaga to orchestrate long-running business processes.

Overview

Creating sagas involves defining workflow steps, state management, compensation logic, and error handling for distributed transactions.

Quick Start

using Svrnty.CQRS.Events.Abstractions;

public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
{
    private readonly IInventoryService _inventoryService;
    private readonly IPaymentService _paymentService;
    private readonly IShippingService _shippingService;
    private readonly ISagaStateStore _stateStore;
    private readonly IEventPublisher _eventPublisher;

    public string SagaName => "order-fulfillment";

    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
    {
        var sagaId = $"order-{@event.OrderId}";
        var state = new OrderFulfillmentState
        {
            OrderId = @event.OrderId,
            SagaId = sagaId,
            StartedAt = DateTimeOffset.UtcNow
        };

        try
        {
            // Step 1: Reserve Inventory
            await ExecuteStepAsync(state, SagaStep.ReserveInventory, async () =>
            {
                await _inventoryService.ReserveAsync(@event.OrderId, @event.Items, ct);
                state.InventoryReserved = true;
            }, ct);

            // Step 2: Process Payment
            await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
            {
                await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
                state.PaymentProcessed = true;
            }, ct);

            // Step 3: Ship Order
            await ExecuteStepAsync(state, SagaStep.ShipOrder, async () =>
            {
                await _shippingService.ShipAsync(@event.OrderId, ct);
                state.Shipped = true;
            }, ct);

            state.CurrentStep = SagaStep.Completed;
            state.CompletedAt = DateTimeOffset.UtcNow;

            await _stateStore.SaveStateAsync(sagaId, state, ct);
            await _eventPublisher.PublishAsync(new OrderFulfilledEvent { OrderId = @event.OrderId });
        }
        catch (Exception ex)
        {
            await HandleFailureAsync(state, ex, ct);
        }
    }

    private async Task ExecuteStepAsync(
        OrderFulfillmentState state,
        SagaStep step,
        Func<Task> action,
        CancellationToken ct)
    {
        state.CurrentStep = step;
        await _stateStore.SaveStateAsync(state.SagaId, state, ct);

        try
        {
            await action();
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Saga step {Step} failed for {SagaId}", step, state.SagaId);
            throw;
        }
    }

    private async Task HandleFailureAsync(
        OrderFulfillmentState state,
        Exception ex,
        CancellationToken ct)
    {
        _logger.LogError(ex, "Saga {SagaId} failed at step {Step}", state.SagaId, state.CurrentStep);

        state.FailureReason = ex.Message;
        await CompensateAsync(state, ct);

        await _eventPublisher.PublishAsync(new OrderFulfillmentFailedEvent
        {
            OrderId = state.OrderId,
            FailedStep = state.CurrentStep.ToString(),
            Reason = ex.Message
        });
    }

    private async Task CompensateAsync(OrderFulfillmentState state, CancellationToken ct)
    {
        state.CurrentStep = SagaStep.Compensating;
        await _stateStore.SaveStateAsync(state.SagaId, state, ct);

        // Compensate in reverse order
        if (state.Shipped)
        {
            await _shippingService.CancelShipmentAsync(state.OrderId, ct);
        }

        if (state.PaymentProcessed && !state.PaymentRefunded)
        {
            await _paymentService.RefundAsync(state.OrderId, ct);
            state.PaymentRefunded = true;
        }

        if (state.InventoryReserved && !state.InventoryReleased)
        {
            await _inventoryService.ReleaseAsync(state.OrderId, ct);
            state.InventoryReleased = true;
        }

        state.CurrentStep = SagaStep.Failed;
        await _stateStore.SaveStateAsync(state.SagaId, state, ct);
    }
}

ISaga Interface

public interface ISaga<TEvent> where TEvent : class
{
    string SagaName { get; }
    Task HandleAsync(TEvent @event, CancellationToken cancellationToken);
}

Registration

using Svrnty.CQRS.Events;

var builder = WebApplication.CreateBuilder(args);

// Register saga
builder.Services.AddSingleton<ISaga<OrderPlacedEvent>, OrderFulfillmentSaga>();

// Register saga orchestrator
builder.Services.AddSagaOrchestrator();

var app = builder.Build();
app.Run();

Saga State

Define comprehensive state for your saga:

public class OrderFulfillmentState
{
    public string SagaId { get; set; } = string.Empty;
    public int OrderId { get; set; }
    public SagaStep CurrentStep { get; set; }

    // Step completion tracking
    public bool InventoryReserved { get; set; }
    public bool PaymentProcessed { get; set; }
    public bool Shipped { get; set; }

    // Compensation tracking
    public bool InventoryReleased { get; set; }
    public bool PaymentRefunded { get; set; }

    // Metadata
    public DateTimeOffset StartedAt { get; set; }
    public DateTimeOffset? CompletedAt { get; set; }
    public string? FailureReason { get; set; }

    // Data needed for compensation
    public string? PaymentTransactionId { get; set; }
    public string? ShipmentTrackingNumber { get; set; }
}

public enum SagaStep
{
    Started,
    ReserveInventory,
    ProcessPayment,
    ShipOrder,
    Completed,
    Compensating,
    Failed
}

Step-by-Step Workflow

Step Execution Template

private async Task<TResult> ExecuteStepAsync<TResult>(
    OrderFulfillmentState state,
    SagaStep step,
    Func<Task<TResult>> action,
    CancellationToken ct)
{
    state.CurrentStep = step;
    await _stateStore.SaveStateAsync(state.SagaId, state, ct);

    _logger.LogInformation("Executing saga step {Step} for {SagaId}", step, state.SagaId);

    try
    {
        var result = await action();

        _logger.LogInformation("Saga step {Step} completed for {SagaId}", step, state.SagaId);

        return result;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Saga step {Step} failed for {SagaId}", step, state.SagaId);
        throw;
    }
}

Using the Template

// Step with return value
var transactionId = await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
{
    var txId = await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
    state.PaymentProcessed = true;
    state.PaymentTransactionId = txId;
    return txId;
}, ct);

// Step without return value
await ExecuteStepAsync(state, SagaStep.ShipOrder, async () =>
{
    await _shippingService.ShipAsync(@event.OrderId, ct);
    state.Shipped = true;
    return Task.CompletedTask;
}, ct);

State Persistence

Persist saga state after each step:

public interface ISagaStateStore
{
    Task SaveStateAsync(string sagaId, object state, CancellationToken ct);
    Task<T?> GetStateAsync<T>(string sagaId, CancellationToken ct);
    Task DeleteStateAsync(string sagaId, CancellationToken ct);
}

public class PostgresSagaStateStore : ISagaStateStore
{
    public async Task SaveStateAsync(string sagaId, object state, CancellationToken ct)
    {
        var json = JsonSerializer.Serialize(state);

        var entity = await _dbContext.SagaStates
            .FirstOrDefaultAsync(s => s.SagaId == sagaId, ct);

        if (entity == null)
        {
            entity = new SagaStateEntity
            {
                SagaId = sagaId,
                StateJson = json,
                CreatedAt = DateTimeOffset.UtcNow
            };
            _dbContext.SagaStates.Add(entity);
        }
        else
        {
            entity.StateJson = json;
            entity.UpdatedAt = DateTimeOffset.UtcNow;
        }

        await _dbContext.SaveChangesAsync(ct);
    }
}

Recovery from Failure

Resume sagas after application restart:

public class SagaRecoveryService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Wait for startup
        await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);

        // Find incomplete sagas
        var incompleteSagas = await _stateStore.GetIncompleteSagasAsync(stoppingToken);

        foreach (var sagaState in incompleteSagas)
        {
            _logger.LogInformation("Resuming saga {SagaId} from step {Step}",
                sagaState.SagaId,
                sagaState.CurrentStep);

            try
            {
                await ResumeSagaAsync(sagaState, stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to resume saga {SagaId}", sagaState.SagaId);
            }
        }
    }

    private async Task ResumeSagaAsync(OrderFulfillmentState state, CancellationToken ct)
    {
        var saga = _serviceProvider.GetRequiredService<OrderFulfillmentSaga>();

        // Resume from current step
        switch (state.CurrentStep)
        {
            case SagaStep.ReserveInventory:
                if (!state.InventoryReserved)
                {
                    await saga.ExecuteReserveInventoryAsync(state, ct);
                }
                goto case SagaStep.ProcessPayment;

            case SagaStep.ProcessPayment:
                if (!state.PaymentProcessed)
                {
                    await saga.ExecuteProcessPaymentAsync(state, ct);
                }
                goto case SagaStep.ShipOrder;

            case SagaStep.ShipOrder:
                if (!state.Shipped)
                {
                    await saga.ExecuteShipOrderAsync(state, ct);
                }
                break;

            case SagaStep.Compensating:
                await saga.CompensateAsync(state, ct);
                break;
        }
    }
}

Timeout Handling

Add timeouts to prevent stuck sagas:

private async Task ExecuteStepWithTimeoutAsync(
    OrderFulfillmentState state,
    SagaStep step,
    Func<Task> action,
    TimeSpan timeout,
    CancellationToken ct)
{
    using var stepCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    stepCts.CancelAfter(timeout);

    state.CurrentStep = step;
    await _stateStore.SaveStateAsync(state.SagaId, state, ct);

    try
    {
        await action();
    }
    catch (OperationCanceledException) when (!ct.IsCancellationRequested)
    {
        _logger.LogWarning("Saga step {Step} timed out after {Timeout} for {SagaId}",
            step,
            timeout,
            state.SagaId);

        throw new SagaTimeoutException($"Step {step} timed out");
    }
}

// Usage
await ExecuteStepWithTimeoutAsync(
    state,
    SagaStep.ProcessPayment,
    async () => await _paymentService.ChargeAsync(orderId, amount, ct),
    timeout: TimeSpan.FromMinutes(2),
    ct);

Parallel Steps

Execute independent steps in parallel:

public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    var state = new OrderFulfillmentState { OrderId = @event.OrderId };

    // Sequential step
    await ExecuteStepAsync(state, SagaStep.ValidateOrder, async () =>
    {
        await _orderService.ValidateAsync(@event.OrderId, ct);
        state.OrderValidated = true;
    }, ct);

    // Parallel steps (independent)
    await Task.WhenAll(
        ExecuteStepAsync(state, SagaStep.ReserveInventory, async () =>
        {
            await _inventoryService.ReserveAsync(@event.OrderId, @event.Items, ct);
            state.InventoryReserved = true;
        }, ct),

        ExecuteStepAsync(state, SagaStep.CheckCredit, async () =>
        {
            await _creditService.CheckAsync(@event.CustomerId, @event.TotalAmount, ct);
            state.CreditChecked = true;
        }, ct)
    );

    // Continue with dependent steps
    await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
    {
        await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
        state.PaymentProcessed = true;
    }, ct);
}

Best Practices

DO

  • Persist state after each step
  • Make steps idempotent
  • Implement comprehensive compensation
  • Log all saga operations
  • Use unique saga IDs
  • Handle timeouts
  • Version saga definitions
  • Test failure scenarios
  • Monitor saga completion rates
  • Implement recovery logic

DON'T

  • Don't skip state persistence
  • Don't assume steps always succeed
  • Don't forget compensation for each step
  • Don't use blocking operations
  • Don't ignore timeout scenarios
  • Don't make steps non-idempotent
  • Don't forget error logging
  • Don't couple sagas tightly to services

See Also