520 lines
14 KiB
Markdown
520 lines
14 KiB
Markdown
# Saga Context
|
|
|
|
Share data and state across saga steps with SagaContext.
|
|
|
|
## Overview
|
|
|
|
Saga context provides a way to share data between saga steps:
|
|
- **Shared State** - Pass data between steps
|
|
- **Correlation** - Track related events
|
|
- **Metadata** - Store contextual information
|
|
- **Scoped Services** - Dependency injection per saga execution
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
|
|
{
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
// Create saga context
|
|
var context = new SagaContext
|
|
{
|
|
SagaId = $"order-{@event.OrderId}",
|
|
CorrelationId = @event.CorrelationId,
|
|
UserId = @event.CustomerId.ToString(),
|
|
StartedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
// Set custom data
|
|
context.Set("OrderId", @event.OrderId);
|
|
context.Set("CustomerEmail", await GetCustomerEmailAsync(@event.CustomerId, ct));
|
|
context.Set("TotalAmount", @event.TotalAmount);
|
|
|
|
try
|
|
{
|
|
// Step 1 - context available to all methods
|
|
await ReserveInventoryAsync(context, @event.Items, ct);
|
|
|
|
// Step 2 - uses data from context
|
|
var email = context.Get<string>("CustomerEmail");
|
|
await SendConfirmationEmailAsync(email, context, ct);
|
|
|
|
// Step 3
|
|
await ProcessPaymentAsync(context, ct);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
await CompensateAsync(context, ct);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## SagaContext Class
|
|
|
|
```csharp
|
|
public class SagaContext
|
|
{
|
|
public string SagaId { get; set; } = string.Empty;
|
|
public string CorrelationId { get; set; } = string.Empty;
|
|
public string? UserId { get; set; }
|
|
public DateTimeOffset StartedAt { get; set; }
|
|
public DateTimeOffset? CompletedAt { get; set; }
|
|
|
|
private readonly Dictionary<string, object> _data = new();
|
|
private readonly Dictionary<string, string> _metadata = new();
|
|
|
|
// Data storage
|
|
public void Set<T>(string key, T value) => _data[key] = value!;
|
|
public T? Get<T>(string key) => _data.TryGetValue(key, out var value) ? (T)value : default;
|
|
public bool TryGet<T>(string key, out T? value)
|
|
{
|
|
if (_data.TryGetValue(key, out var obj))
|
|
{
|
|
value = (T)obj;
|
|
return true;
|
|
}
|
|
value = default;
|
|
return false;
|
|
}
|
|
|
|
// Metadata
|
|
public void SetMetadata(string key, string value) => _metadata[key] = value;
|
|
public string? GetMetadata(string key) => _metadata.TryGetValue(key, out var value) ? value : null;
|
|
|
|
// Convenience properties
|
|
public IDictionary<string, object> Data => _data;
|
|
public IDictionary<string, string> Metadata => _metadata;
|
|
}
|
|
```
|
|
|
|
## Sharing Data Between Steps
|
|
|
|
### Storing Step Results
|
|
|
|
```csharp
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var context = new SagaContext { SagaId = $"order-{@event.OrderId}" };
|
|
|
|
// Step 1: Reserve inventory (store reservation ID)
|
|
var reservationId = await _inventoryService.ReserveAsync(@event.Items, ct);
|
|
context.Set("ReservationId", reservationId);
|
|
|
|
// Step 2: Process payment (store transaction ID)
|
|
var transactionId = await _paymentService.ChargeAsync(@event.TotalAmount, ct);
|
|
context.Set("TransactionId", transactionId);
|
|
|
|
// Step 3: Ship order (use data from previous steps)
|
|
var trackingNumber = await _shippingService.ShipAsync(
|
|
reservationId: context.Get<string>("ReservationId"),
|
|
paymentConfirmation: context.Get<string>("TransactionId"),
|
|
ct);
|
|
context.Set("TrackingNumber", trackingNumber);
|
|
|
|
// Final step uses all collected data
|
|
await SendShippedNotificationAsync(context, ct);
|
|
}
|
|
|
|
private async Task SendShippedNotificationAsync(SagaContext context, CancellationToken ct)
|
|
{
|
|
var email = context.Get<string>("CustomerEmail");
|
|
var trackingNumber = context.Get<string>("TrackingNumber");
|
|
|
|
await _emailService.SendAsync(email, $"Your order has shipped! Tracking: {trackingNumber}", ct);
|
|
}
|
|
```
|
|
|
|
### Computed Values
|
|
|
|
```csharp
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var context = new SagaContext();
|
|
|
|
// Store base values
|
|
context.Set("Subtotal", @event.Subtotal);
|
|
context.Set("Tax", @event.Tax);
|
|
context.Set("Shipping", @event.Shipping);
|
|
|
|
// Compute derived value
|
|
var total = context.Get<decimal>("Subtotal")
|
|
+ context.Get<decimal>("Tax")
|
|
+ context.Get<decimal>("Shipping");
|
|
|
|
context.Set("Total", total);
|
|
|
|
await ProcessPaymentAsync(context, ct);
|
|
}
|
|
```
|
|
|
|
## Correlation and Tracing
|
|
|
|
### Correlation IDs
|
|
|
|
```csharp
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var context = new SagaContext
|
|
{
|
|
SagaId = $"order-{@event.OrderId}",
|
|
CorrelationId = @event.CorrelationId ?? Guid.NewGuid().ToString(),
|
|
UserId = @event.CustomerId.ToString()
|
|
};
|
|
|
|
// All events published include correlation ID
|
|
await PublishEventAsync(new InventoryReservedEvent
|
|
{
|
|
OrderId = @event.OrderId,
|
|
CorrelationId = context.CorrelationId
|
|
});
|
|
|
|
await PublishEventAsync(new PaymentProcessedEvent
|
|
{
|
|
OrderId = @event.OrderId,
|
|
CorrelationId = context.CorrelationId
|
|
});
|
|
}
|
|
```
|
|
|
|
### Distributed Tracing
|
|
|
|
```csharp
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
using var activity = ActivitySource.StartActivity("OrderFulfillmentSaga");
|
|
activity?.SetTag("saga.id", context.SagaId);
|
|
activity?.SetTag("order.id", @event.OrderId);
|
|
|
|
var context = new SagaContext
|
|
{
|
|
SagaId = $"order-{@event.OrderId}",
|
|
CorrelationId = @event.CorrelationId ?? activity?.TraceId.ToString() ?? Guid.NewGuid().ToString()
|
|
};
|
|
|
|
// Store trace context
|
|
context.SetMetadata("TraceId", activity?.TraceId.ToString() ?? "");
|
|
context.SetMetadata("SpanId", activity?.SpanId.ToString() ?? "");
|
|
|
|
await ExecuteStepsAsync(context, ct);
|
|
}
|
|
```
|
|
|
|
## Metadata Storage
|
|
|
|
### Storing Context Information
|
|
|
|
```csharp
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var context = new SagaContext();
|
|
|
|
// Metadata for observability
|
|
context.SetMetadata("Environment", _environment.EnvironmentName);
|
|
context.SetMetadata("Version", Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "");
|
|
context.SetMetadata("MachineName", Environment.MachineName);
|
|
|
|
// Business metadata
|
|
context.SetMetadata("Channel", @event.Channel); // Web, Mobile, API
|
|
context.SetMetadata("Region", @event.ShippingAddress.Country);
|
|
context.SetMetadata("CustomerTier", await GetCustomerTierAsync(@event.CustomerId, ct));
|
|
|
|
await ExecuteStepsAsync(context, ct);
|
|
}
|
|
```
|
|
|
|
### Querying Metadata
|
|
|
|
```csharp
|
|
private async Task ProcessPaymentAsync(SagaContext context, CancellationToken ct)
|
|
{
|
|
var customerTier = context.GetMetadata("CustomerTier");
|
|
|
|
// Apply tier-specific logic
|
|
var discount = customerTier switch
|
|
{
|
|
"Premium" => 0.10m, // 10% discount
|
|
"Gold" => 0.05m, // 5% discount
|
|
_ => 0.0m
|
|
};
|
|
|
|
var amount = context.Get<decimal>("Total") * (1 - discount);
|
|
|
|
await _paymentService.ChargeAsync(amount, ct);
|
|
}
|
|
```
|
|
|
|
## Scoped Services
|
|
|
|
### Dependency Injection
|
|
|
|
```csharp
|
|
public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
|
|
{
|
|
private readonly IServiceProvider _serviceProvider;
|
|
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
// Create scope for this saga execution
|
|
using var scope = _serviceProvider.CreateScope();
|
|
|
|
var context = new SagaContext();
|
|
context.Set("ServiceScope", scope);
|
|
|
|
// Each step uses scoped services
|
|
await ReserveInventoryAsync(context, @event.Items, ct);
|
|
await ProcessPaymentAsync(context, ct);
|
|
await ShipOrderAsync(context, ct);
|
|
}
|
|
|
|
private async Task ReserveInventoryAsync(
|
|
SagaContext context,
|
|
List<OrderItem> items,
|
|
CancellationToken ct)
|
|
{
|
|
var scope = context.Get<IServiceScope>("ServiceScope");
|
|
var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();
|
|
|
|
await inventoryService.ReserveAsync(items, ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Scoped Database Connections
|
|
|
|
```csharp
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var dbContext = scope.ServiceProvider.GetRequiredService<ApplicationDbContext>();
|
|
|
|
var context = new SagaContext();
|
|
context.Set("DbContext", dbContext);
|
|
|
|
using var transaction = await dbContext.Database.BeginTransactionAsync(ct);
|
|
context.Set("Transaction", transaction);
|
|
|
|
try
|
|
{
|
|
await ExecuteStepsAsync(context, ct);
|
|
|
|
await transaction.CommitAsync(ct);
|
|
}
|
|
catch
|
|
{
|
|
await transaction.RollbackAsync(ct);
|
|
throw;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Context in Compensation
|
|
|
|
```csharp
|
|
private async Task CompensateAsync(SagaContext context, CancellationToken ct)
|
|
{
|
|
_logger.LogWarning("Compensating saga {SagaId}", context.SagaId);
|
|
|
|
// Use context data for compensation
|
|
if (context.TryGet<string>("TransactionId", out var transactionId))
|
|
{
|
|
await _paymentService.RefundAsync(transactionId, ct);
|
|
}
|
|
|
|
if (context.TryGet<string>("ReservationId", out var reservationId))
|
|
{
|
|
await _inventoryService.ReleaseReservationAsync(reservationId, ct);
|
|
}
|
|
|
|
if (context.TryGet<string>("TrackingNumber", out var trackingNumber))
|
|
{
|
|
await _shippingService.CancelShipmentAsync(trackingNumber, ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Persisting Context
|
|
|
|
### Save Context with State
|
|
|
|
```csharp
|
|
public class PersistedSagaContext : SagaContext
|
|
{
|
|
public async Task SaveAsync(ISagaStateStore stateStore, CancellationToken ct)
|
|
{
|
|
var state = new
|
|
{
|
|
SagaId,
|
|
CorrelationId,
|
|
UserId,
|
|
StartedAt,
|
|
CompletedAt,
|
|
Data,
|
|
Metadata
|
|
};
|
|
|
|
await stateStore.SaveStateAsync(SagaId, state, ct);
|
|
}
|
|
|
|
public static async Task<PersistedSagaContext?> LoadAsync(
|
|
string sagaId,
|
|
ISagaStateStore stateStore,
|
|
CancellationToken ct)
|
|
{
|
|
var state = await stateStore.GetStateAsync<dynamic>(sagaId, ct);
|
|
|
|
if (state == null)
|
|
return null;
|
|
|
|
var context = new PersistedSagaContext
|
|
{
|
|
SagaId = state.SagaId,
|
|
CorrelationId = state.CorrelationId,
|
|
UserId = state.UserId,
|
|
StartedAt = state.StartedAt,
|
|
CompletedAt = state.CompletedAt
|
|
};
|
|
|
|
foreach (var (key, value) in state.Data)
|
|
{
|
|
context.Set(key, value);
|
|
}
|
|
|
|
foreach (var (key, value) in state.Metadata)
|
|
{
|
|
context.SetMetadata(key, value);
|
|
}
|
|
|
|
return context;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Resume from Saved Context
|
|
|
|
```csharp
|
|
public async Task ResumeAsync(string sagaId, CancellationToken ct)
|
|
{
|
|
// Load saved context
|
|
var context = await PersistedSagaContext.LoadAsync(sagaId, _stateStore, ct);
|
|
|
|
if (context == null)
|
|
{
|
|
_logger.LogWarning("Cannot resume saga {SagaId} - context not found", sagaId);
|
|
return;
|
|
}
|
|
|
|
_logger.LogInformation("Resuming saga {SagaId} from saved context", sagaId);
|
|
|
|
// Resume execution with restored context
|
|
await ContinueExecutionAsync(context, ct);
|
|
}
|
|
```
|
|
|
|
## Context Extensions
|
|
|
|
### Fluent API
|
|
|
|
```csharp
|
|
public static class SagaContextExtensions
|
|
{
|
|
public static SagaContext WithOrderData(this SagaContext context, OrderPlacedEvent @event)
|
|
{
|
|
context.Set("OrderId", @event.OrderId);
|
|
context.Set("CustomerId", @event.CustomerId);
|
|
context.Set("TotalAmount", @event.TotalAmount);
|
|
context.Set("Items", @event.Items);
|
|
return context;
|
|
}
|
|
|
|
public static SagaContext WithCustomerData(this SagaContext context, Customer customer)
|
|
{
|
|
context.Set("CustomerEmail", customer.Email);
|
|
context.Set("CustomerName", customer.Name);
|
|
context.SetMetadata("CustomerTier", customer.Tier);
|
|
return context;
|
|
}
|
|
}
|
|
|
|
// Usage
|
|
var context = new SagaContext()
|
|
.WithOrderData(@event)
|
|
.WithCustomerData(customer);
|
|
```
|
|
|
|
### Typed Context
|
|
|
|
```csharp
|
|
public class OrderFulfillmentContext : SagaContext
|
|
{
|
|
public int OrderId
|
|
{
|
|
get => Get<int>("OrderId");
|
|
set => Set("OrderId", value);
|
|
}
|
|
|
|
public decimal TotalAmount
|
|
{
|
|
get => Get<decimal>("TotalAmount");
|
|
set => Set("TotalAmount", value);
|
|
}
|
|
|
|
public string? TransactionId
|
|
{
|
|
get => Get<string>("TransactionId");
|
|
set => Set("TransactionId", value);
|
|
}
|
|
|
|
public string? TrackingNumber
|
|
{
|
|
get => Get<string>("TrackingNumber");
|
|
set => Set("TrackingNumber", value);
|
|
}
|
|
}
|
|
|
|
// Usage with type safety
|
|
var context = new OrderFulfillmentContext
|
|
{
|
|
OrderId = @event.OrderId,
|
|
TotalAmount = @event.TotalAmount
|
|
};
|
|
|
|
await ProcessPaymentAsync(context, ct);
|
|
|
|
// Type-safe access
|
|
var transactionId = context.TransactionId; // string?
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use context to share data between steps
|
|
- Include correlation IDs for tracing
|
|
- Store step results in context
|
|
- Use metadata for observability
|
|
- Persist context with saga state
|
|
- Use typed context for complex sagas
|
|
- Clear sensitive data after use
|
|
- Use scoped services via context
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't store large objects in context
|
|
- Don't store sensitive data without encryption
|
|
- Don't mutate context from multiple threads
|
|
- Don't skip correlation IDs
|
|
- Don't forget to persist context
|
|
- Don't leak database connections
|
|
- Don't use context for business logic
|
|
- Don't ignore context when compensating
|
|
|
|
## See Also
|
|
|
|
- [Sagas Overview](README.md)
|
|
- [Saga Pattern](saga-pattern.md)
|
|
- [Creating Sagas](creating-sagas.md)
|
|
- [Compensation](compensation.md)
|
|
- [Event Streaming Overview](../README.md)
|