dotnet-cqrs/docs/event-streaming/sagas/saga-context.md

520 lines
14 KiB
Markdown

# Saga Context
Share data and state across saga steps with SagaContext.
## Overview
Saga context provides a way to share data between saga steps:
- **Shared State** - Pass data between steps
- **Correlation** - Track related events
- **Metadata** - Store contextual information
- **Scoped Services** - Dependency injection per saga execution
## Quick Start
```csharp
public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
{
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
// Create saga context
var context = new SagaContext
{
SagaId = $"order-{@event.OrderId}",
CorrelationId = @event.CorrelationId,
UserId = @event.CustomerId.ToString(),
StartedAt = DateTimeOffset.UtcNow
};
// Set custom data
context.Set("OrderId", @event.OrderId);
context.Set("CustomerEmail", await GetCustomerEmailAsync(@event.CustomerId, ct));
context.Set("TotalAmount", @event.TotalAmount);
try
{
// Step 1 - context available to all methods
await ReserveInventoryAsync(context, @event.Items, ct);
// Step 2 - uses data from context
var email = context.Get<string>("CustomerEmail");
await SendConfirmationEmailAsync(email, context, ct);
// Step 3
await ProcessPaymentAsync(context, ct);
}
catch (Exception ex)
{
await CompensateAsync(context, ct);
}
}
}
```
## SagaContext Class
```csharp
public class SagaContext
{
public string SagaId { get; set; } = string.Empty;
public string CorrelationId { get; set; } = string.Empty;
public string? UserId { get; set; }
public DateTimeOffset StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
private readonly Dictionary<string, object> _data = new();
private readonly Dictionary<string, string> _metadata = new();
// Data storage
public void Set<T>(string key, T value) => _data[key] = value!;
public T? Get<T>(string key) => _data.TryGetValue(key, out var value) ? (T)value : default;
public bool TryGet<T>(string key, out T? value)
{
if (_data.TryGetValue(key, out var obj))
{
value = (T)obj;
return true;
}
value = default;
return false;
}
// Metadata
public void SetMetadata(string key, string value) => _metadata[key] = value;
public string? GetMetadata(string key) => _metadata.TryGetValue(key, out var value) ? value : null;
// Convenience properties
public IDictionary<string, object> Data => _data;
public IDictionary<string, string> Metadata => _metadata;
}
```
## Sharing Data Between Steps
### Storing Step Results
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var context = new SagaContext { SagaId = $"order-{@event.OrderId}" };
// Step 1: Reserve inventory (store reservation ID)
var reservationId = await _inventoryService.ReserveAsync(@event.Items, ct);
context.Set("ReservationId", reservationId);
// Step 2: Process payment (store transaction ID)
var transactionId = await _paymentService.ChargeAsync(@event.TotalAmount, ct);
context.Set("TransactionId", transactionId);
// Step 3: Ship order (use data from previous steps)
var trackingNumber = await _shippingService.ShipAsync(
reservationId: context.Get<string>("ReservationId"),
paymentConfirmation: context.Get<string>("TransactionId"),
ct);
context.Set("TrackingNumber", trackingNumber);
// Final step uses all collected data
await SendShippedNotificationAsync(context, ct);
}
private async Task SendShippedNotificationAsync(SagaContext context, CancellationToken ct)
{
var email = context.Get<string>("CustomerEmail");
var trackingNumber = context.Get<string>("TrackingNumber");
await _emailService.SendAsync(email, $"Your order has shipped! Tracking: {trackingNumber}", ct);
}
```
### Computed Values
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var context = new SagaContext();
// Store base values
context.Set("Subtotal", @event.Subtotal);
context.Set("Tax", @event.Tax);
context.Set("Shipping", @event.Shipping);
// Compute derived value
var total = context.Get<decimal>("Subtotal")
+ context.Get<decimal>("Tax")
+ context.Get<decimal>("Shipping");
context.Set("Total", total);
await ProcessPaymentAsync(context, ct);
}
```
## Correlation and Tracing
### Correlation IDs
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var context = new SagaContext
{
SagaId = $"order-{@event.OrderId}",
CorrelationId = @event.CorrelationId ?? Guid.NewGuid().ToString(),
UserId = @event.CustomerId.ToString()
};
// All events published include correlation ID
await PublishEventAsync(new InventoryReservedEvent
{
OrderId = @event.OrderId,
CorrelationId = context.CorrelationId
});
await PublishEventAsync(new PaymentProcessedEvent
{
OrderId = @event.OrderId,
CorrelationId = context.CorrelationId
});
}
```
### Distributed Tracing
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
using var activity = ActivitySource.StartActivity("OrderFulfillmentSaga");
activity?.SetTag("saga.id", context.SagaId);
activity?.SetTag("order.id", @event.OrderId);
var context = new SagaContext
{
SagaId = $"order-{@event.OrderId}",
CorrelationId = @event.CorrelationId ?? activity?.TraceId.ToString() ?? Guid.NewGuid().ToString()
};
// Store trace context
context.SetMetadata("TraceId", activity?.TraceId.ToString() ?? "");
context.SetMetadata("SpanId", activity?.SpanId.ToString() ?? "");
await ExecuteStepsAsync(context, ct);
}
```
## Metadata Storage
### Storing Context Information
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var context = new SagaContext();
// Metadata for observability
context.SetMetadata("Environment", _environment.EnvironmentName);
context.SetMetadata("Version", Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "");
context.SetMetadata("MachineName", Environment.MachineName);
// Business metadata
context.SetMetadata("Channel", @event.Channel); // Web, Mobile, API
context.SetMetadata("Region", @event.ShippingAddress.Country);
context.SetMetadata("CustomerTier", await GetCustomerTierAsync(@event.CustomerId, ct));
await ExecuteStepsAsync(context, ct);
}
```
### Querying Metadata
```csharp
private async Task ProcessPaymentAsync(SagaContext context, CancellationToken ct)
{
var customerTier = context.GetMetadata("CustomerTier");
// Apply tier-specific logic
var discount = customerTier switch
{
"Premium" => 0.10m, // 10% discount
"Gold" => 0.05m, // 5% discount
_ => 0.0m
};
var amount = context.Get<decimal>("Total") * (1 - discount);
await _paymentService.ChargeAsync(amount, ct);
}
```
## Scoped Services
### Dependency Injection
```csharp
public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
{
private readonly IServiceProvider _serviceProvider;
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
// Create scope for this saga execution
using var scope = _serviceProvider.CreateScope();
var context = new SagaContext();
context.Set("ServiceScope", scope);
// Each step uses scoped services
await ReserveInventoryAsync(context, @event.Items, ct);
await ProcessPaymentAsync(context, ct);
await ShipOrderAsync(context, ct);
}
private async Task ReserveInventoryAsync(
SagaContext context,
List<OrderItem> items,
CancellationToken ct)
{
var scope = context.Get<IServiceScope>("ServiceScope");
var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();
await inventoryService.ReserveAsync(items, ct);
}
}
```
### Scoped Database Connections
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
using var scope = _serviceProvider.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
var context = new SagaContext();
context.Set("DbContext", dbContext);
using var transaction = await dbContext.Database.BeginTransactionAsync(ct);
context.Set("Transaction", transaction);
try
{
await ExecuteStepsAsync(context, ct);
await transaction.CommitAsync(ct);
}
catch
{
await transaction.RollbackAsync(ct);
throw;
}
}
```
## Context in Compensation
```csharp
private async Task CompensateAsync(SagaContext context, CancellationToken ct)
{
_logger.LogWarning("Compensating saga {SagaId}", context.SagaId);
// Use context data for compensation
if (context.TryGet<string>("TransactionId", out var transactionId))
{
await _paymentService.RefundAsync(transactionId, ct);
}
if (context.TryGet<string>("ReservationId", out var reservationId))
{
await _inventoryService.ReleaseReservationAsync(reservationId, ct);
}
if (context.TryGet<string>("TrackingNumber", out var trackingNumber))
{
await _shippingService.CancelShipmentAsync(trackingNumber, ct);
}
}
```
## Persisting Context
### Save Context with State
```csharp
public class PersistedSagaContext : SagaContext
{
public async Task SaveAsync(ISagaStateStore stateStore, CancellationToken ct)
{
var state = new
{
SagaId,
CorrelationId,
UserId,
StartedAt,
CompletedAt,
Data,
Metadata
};
await stateStore.SaveStateAsync(SagaId, state, ct);
}
public static async Task<PersistedSagaContext?> LoadAsync(
string sagaId,
ISagaStateStore stateStore,
CancellationToken ct)
{
var state = await stateStore.GetStateAsync<dynamic>(sagaId, ct);
if (state == null)
return null;
var context = new PersistedSagaContext
{
SagaId = state.SagaId,
CorrelationId = state.CorrelationId,
UserId = state.UserId,
StartedAt = state.StartedAt,
CompletedAt = state.CompletedAt
};
foreach (var (key, value) in state.Data)
{
context.Set(key, value);
}
foreach (var (key, value) in state.Metadata)
{
context.SetMetadata(key, value);
}
return context;
}
}
```
### Resume from Saved Context
```csharp
public async Task ResumeAsync(string sagaId, CancellationToken ct)
{
// Load saved context
var context = await PersistedSagaContext.LoadAsync(sagaId, _stateStore, ct);
if (context == null)
{
_logger.LogWarning("Cannot resume saga {SagaId} - context not found", sagaId);
return;
}
_logger.LogInformation("Resuming saga {SagaId} from saved context", sagaId);
// Resume execution with restored context
await ContinueExecutionAsync(context, ct);
}
```
## Context Extensions
### Fluent API
```csharp
public static class SagaContextExtensions
{
public static SagaContext WithOrderData(this SagaContext context, OrderPlacedEvent @event)
{
context.Set("OrderId", @event.OrderId);
context.Set("CustomerId", @event.CustomerId);
context.Set("TotalAmount", @event.TotalAmount);
context.Set("Items", @event.Items);
return context;
}
public static SagaContext WithCustomerData(this SagaContext context, Customer customer)
{
context.Set("CustomerEmail", customer.Email);
context.Set("CustomerName", customer.Name);
context.SetMetadata("CustomerTier", customer.Tier);
return context;
}
}
// Usage
var context = new SagaContext()
.WithOrderData(@event)
.WithCustomerData(customer);
```
### Typed Context
```csharp
public class OrderFulfillmentContext : SagaContext
{
public int OrderId
{
get => Get<int>("OrderId");
set => Set("OrderId", value);
}
public decimal TotalAmount
{
get => Get<decimal>("TotalAmount");
set => Set("TotalAmount", value);
}
public string? TransactionId
{
get => Get<string>("TransactionId");
set => Set("TransactionId", value);
}
public string? TrackingNumber
{
get => Get<string>("TrackingNumber");
set => Set("TrackingNumber", value);
}
}
// Usage with type safety
var context = new OrderFulfillmentContext
{
OrderId = @event.OrderId,
TotalAmount = @event.TotalAmount
};
await ProcessPaymentAsync(context, ct);
// Type-safe access
var transactionId = context.TransactionId; // string?
```
## Best Practices
### ✅ DO
- Use context to share data between steps
- Include correlation IDs for tracing
- Store step results in context
- Use metadata for observability
- Persist context with saga state
- Use typed context for complex sagas
- Clear sensitive data after use
- Use scoped services via context
### ❌ DON'T
- Don't store large objects in context
- Don't store sensitive data without encryption
- Don't mutate context from multiple threads
- Don't skip correlation IDs
- Don't forget to persist context
- Don't leak database connections
- Don't use context for business logic
- Don't ignore context when compensating
## See Also
- [Sagas Overview](README.md)
- [Saga Pattern](saga-pattern.md)
- [Creating Sagas](creating-sagas.md)
- [Compensation](compensation.md)
- [Event Streaming Overview](../README.md)