dotnet-cqrs/docs/event-streaming/sagas/saga-context.md

14 KiB

Saga Context

Share data and state across saga steps with SagaContext.

Overview

Saga context provides a way to share data between saga steps:

  • Shared State - Pass data between steps
  • Correlation - Track related events
  • Metadata - Store contextual information
  • Scoped Services - Dependency injection per saga execution

Quick Start

public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
{
    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
    {
        // Create saga context
        var context = new SagaContext
        {
            SagaId = $"order-{@event.OrderId}",
            CorrelationId = @event.CorrelationId,
            UserId = @event.CustomerId.ToString(),
            StartedAt = DateTimeOffset.UtcNow
        };

        // Set custom data
        context.Set("OrderId", @event.OrderId);
        context.Set("CustomerEmail", await GetCustomerEmailAsync(@event.CustomerId, ct));
        context.Set("TotalAmount", @event.TotalAmount);

        try
        {
            // Step 1 - context available to all methods
            await ReserveInventoryAsync(context, @event.Items, ct);

            // Step 2 - uses data from context
            var email = context.Get<string>("CustomerEmail");
            await SendConfirmationEmailAsync(email, context, ct);

            // Step 3
            await ProcessPaymentAsync(context, ct);
        }
        catch (Exception ex)
        {
            await CompensateAsync(context, ct);
        }
    }
}

SagaContext Class

public class SagaContext
{
    public string SagaId { get; set; } = string.Empty;
    public string CorrelationId { get; set; } = string.Empty;
    public string? UserId { get; set; }
    public DateTimeOffset StartedAt { get; set; }
    public DateTimeOffset? CompletedAt { get; set; }

    private readonly Dictionary<string, object> _data = new();
    private readonly Dictionary<string, string> _metadata = new();

    // Data storage
    public void Set<T>(string key, T value) => _data[key] = value!;
    public T? Get<T>(string key) => _data.TryGetValue(key, out var value) ? (T)value : default;
    public bool TryGet<T>(string key, out T? value)
    {
        if (_data.TryGetValue(key, out var obj))
        {
            value = (T)obj;
            return true;
        }
        value = default;
        return false;
    }

    // Metadata
    public void SetMetadata(string key, string value) => _metadata[key] = value;
    public string? GetMetadata(string key) => _metadata.TryGetValue(key, out var value) ? value : null;

    // Convenience properties
    public IDictionary<string, object> Data => _data;
    public IDictionary<string, string> Metadata => _metadata;
}

Sharing Data Between Steps

Storing Step Results

public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    var context = new SagaContext { SagaId = $"order-{@event.OrderId}" };

    // Step 1: Reserve inventory (store reservation ID)
    var reservationId = await _inventoryService.ReserveAsync(@event.Items, ct);
    context.Set("ReservationId", reservationId);

    // Step 2: Process payment (store transaction ID)
    var transactionId = await _paymentService.ChargeAsync(@event.TotalAmount, ct);
    context.Set("TransactionId", transactionId);

    // Step 3: Ship order (use data from previous steps)
    var trackingNumber = await _shippingService.ShipAsync(
        reservationId: context.Get<string>("ReservationId"),
        paymentConfirmation: context.Get<string>("TransactionId"),
        ct);
    context.Set("TrackingNumber", trackingNumber);

    // Final step uses all collected data
    await SendShippedNotificationAsync(context, ct);
}

private async Task SendShippedNotificationAsync(SagaContext context, CancellationToken ct)
{
    var email = context.Get<string>("CustomerEmail");
    var trackingNumber = context.Get<string>("TrackingNumber");

    await _emailService.SendAsync(email, $"Your order has shipped! Tracking: {trackingNumber}", ct);
}

Computed Values

public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    var context = new SagaContext();

    // Store base values
    context.Set("Subtotal", @event.Subtotal);
    context.Set("Tax", @event.Tax);
    context.Set("Shipping", @event.Shipping);

    // Compute derived value
    var total = context.Get<decimal>("Subtotal")
              + context.Get<decimal>("Tax")
              + context.Get<decimal>("Shipping");

    context.Set("Total", total);

    await ProcessPaymentAsync(context, ct);
}

Correlation and Tracing

Correlation IDs

public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    var context = new SagaContext
    {
        SagaId = $"order-{@event.OrderId}",
        CorrelationId = @event.CorrelationId ?? Guid.NewGuid().ToString(),
        UserId = @event.CustomerId.ToString()
    };

    // All events published include correlation ID
    await PublishEventAsync(new InventoryReservedEvent
    {
        OrderId = @event.OrderId,
        CorrelationId = context.CorrelationId
    });

    await PublishEventAsync(new PaymentProcessedEvent
    {
        OrderId = @event.OrderId,
        CorrelationId = context.CorrelationId
    });
}

Distributed Tracing

public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    using var activity = ActivitySource.StartActivity("OrderFulfillmentSaga");
    activity?.SetTag("saga.id", context.SagaId);
    activity?.SetTag("order.id", @event.OrderId);

    var context = new SagaContext
    {
        SagaId = $"order-{@event.OrderId}",
        CorrelationId = @event.CorrelationId ?? activity?.TraceId.ToString() ?? Guid.NewGuid().ToString()
    };

    // Store trace context
    context.SetMetadata("TraceId", activity?.TraceId.ToString() ?? "");
    context.SetMetadata("SpanId", activity?.SpanId.ToString() ?? "");

    await ExecuteStepsAsync(context, ct);
}

Metadata Storage

Storing Context Information

public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    var context = new SagaContext();

    // Metadata for observability
    context.SetMetadata("Environment", _environment.EnvironmentName);
    context.SetMetadata("Version", Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "");
    context.SetMetadata("MachineName", Environment.MachineName);

    // Business metadata
    context.SetMetadata("Channel", @event.Channel);  // Web, Mobile, API
    context.SetMetadata("Region", @event.ShippingAddress.Country);
    context.SetMetadata("CustomerTier", await GetCustomerTierAsync(@event.CustomerId, ct));

    await ExecuteStepsAsync(context, ct);
}

Querying Metadata

private async Task ProcessPaymentAsync(SagaContext context, CancellationToken ct)
{
    var customerTier = context.GetMetadata("CustomerTier");

    // Apply tier-specific logic
    var discount = customerTier switch
    {
        "Premium" => 0.10m,  // 10% discount
        "Gold" => 0.05m,     // 5% discount
        _ => 0.0m
    };

    var amount = context.Get<decimal>("Total") * (1 - discount);

    await _paymentService.ChargeAsync(amount, ct);
}

Scoped Services

Dependency Injection

public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
{
    private readonly IServiceProvider _serviceProvider;

    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
    {
        // Create scope for this saga execution
        using var scope = _serviceProvider.CreateScope();

        var context = new SagaContext();
        context.Set("ServiceScope", scope);

        // Each step uses scoped services
        await ReserveInventoryAsync(context, @event.Items, ct);
        await ProcessPaymentAsync(context, ct);
        await ShipOrderAsync(context, ct);
    }

    private async Task ReserveInventoryAsync(
        SagaContext context,
        List<OrderItem> items,
        CancellationToken ct)
    {
        var scope = context.Get<IServiceScope>("ServiceScope");
        var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();

        await inventoryService.ReserveAsync(items, ct);
    }
}

Scoped Database Connections

public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
    using var scope = _serviceProvider.CreateScope();
    var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();

    var context = new SagaContext();
    context.Set("DbContext", dbContext);

    using var transaction = await dbContext.Database.BeginTransactionAsync(ct);
    context.Set("Transaction", transaction);

    try
    {
        await ExecuteStepsAsync(context, ct);

        await transaction.CommitAsync(ct);
    }
    catch
    {
        await transaction.RollbackAsync(ct);
        throw;
    }
}

Context in Compensation

private async Task CompensateAsync(SagaContext context, CancellationToken ct)
{
    _logger.LogWarning("Compensating saga {SagaId}", context.SagaId);

    // Use context data for compensation
    if (context.TryGet<string>("TransactionId", out var transactionId))
    {
        await _paymentService.RefundAsync(transactionId, ct);
    }

    if (context.TryGet<string>("ReservationId", out var reservationId))
    {
        await _inventoryService.ReleaseReservationAsync(reservationId, ct);
    }

    if (context.TryGet<string>("TrackingNumber", out var trackingNumber))
    {
        await _shippingService.CancelShipmentAsync(trackingNumber, ct);
    }
}

Persisting Context

Save Context with State

public class PersistedSagaContext : SagaContext
{
    public async Task SaveAsync(ISagaStateStore stateStore, CancellationToken ct)
    {
        var state = new
        {
            SagaId,
            CorrelationId,
            UserId,
            StartedAt,
            CompletedAt,
            Data,
            Metadata
        };

        await stateStore.SaveStateAsync(SagaId, state, ct);
    }

    public static async Task<PersistedSagaContext?> LoadAsync(
        string sagaId,
        ISagaStateStore stateStore,
        CancellationToken ct)
    {
        var state = await stateStore.GetStateAsync<dynamic>(sagaId, ct);

        if (state == null)
            return null;

        var context = new PersistedSagaContext
        {
            SagaId = state.SagaId,
            CorrelationId = state.CorrelationId,
            UserId = state.UserId,
            StartedAt = state.StartedAt,
            CompletedAt = state.CompletedAt
        };

        foreach (var (key, value) in state.Data)
        {
            context.Set(key, value);
        }

        foreach (var (key, value) in state.Metadata)
        {
            context.SetMetadata(key, value);
        }

        return context;
    }
}

Resume from Saved Context

public async Task ResumeAsync(string sagaId, CancellationToken ct)
{
    // Load saved context
    var context = await PersistedSagaContext.LoadAsync(sagaId, _stateStore, ct);

    if (context == null)
    {
        _logger.LogWarning("Cannot resume saga {SagaId} - context not found", sagaId);
        return;
    }

    _logger.LogInformation("Resuming saga {SagaId} from saved context", sagaId);

    // Resume execution with restored context
    await ContinueExecutionAsync(context, ct);
}

Context Extensions

Fluent API

public static class SagaContextExtensions
{
    public static SagaContext WithOrderData(this SagaContext context, OrderPlacedEvent @event)
    {
        context.Set("OrderId", @event.OrderId);
        context.Set("CustomerId", @event.CustomerId);
        context.Set("TotalAmount", @event.TotalAmount);
        context.Set("Items", @event.Items);
        return context;
    }

    public static SagaContext WithCustomerData(this SagaContext context, Customer customer)
    {
        context.Set("CustomerEmail", customer.Email);
        context.Set("CustomerName", customer.Name);
        context.SetMetadata("CustomerTier", customer.Tier);
        return context;
    }
}

// Usage
var context = new SagaContext()
    .WithOrderData(@event)
    .WithCustomerData(customer);

Typed Context

public class OrderFulfillmentContext : SagaContext
{
    public int OrderId
    {
        get => Get<int>("OrderId");
        set => Set("OrderId", value);
    }

    public decimal TotalAmount
    {
        get => Get<decimal>("TotalAmount");
        set => Set("TotalAmount", value);
    }

    public string? TransactionId
    {
        get => Get<string>("TransactionId");
        set => Set("TransactionId", value);
    }

    public string? TrackingNumber
    {
        get => Get<string>("TrackingNumber");
        set => Set("TrackingNumber", value);
    }
}

// Usage with type safety
var context = new OrderFulfillmentContext
{
    OrderId = @event.OrderId,
    TotalAmount = @event.TotalAmount
};

await ProcessPaymentAsync(context, ct);

// Type-safe access
var transactionId = context.TransactionId;  // string?

Best Practices

DO

  • Use context to share data between steps
  • Include correlation IDs for tracing
  • Store step results in context
  • Use metadata for observability
  • Persist context with saga state
  • Use typed context for complex sagas
  • Clear sensitive data after use
  • Use scoped services via context

DON'T

  • Don't store large objects in context
  • Don't store sensitive data without encryption
  • Don't mutate context from multiple threads
  • Don't skip correlation IDs
  • Don't forget to persist context
  • Don't leak database connections
  • Don't use context for business logic
  • Don't ignore context when compensating

See Also