dotnet-cqrs/docs/event-streaming/sagas/saga-pattern.md

427 lines
10 KiB
Markdown

# Saga Pattern
Long-running business processes with multiple steps and compensation logic.
## Overview
The Saga pattern coordinates distributed transactions across multiple services:
- **Multi-Step Workflows** - Orchestrate complex business processes
- **Compensation Logic** - Rollback completed steps on failure
- **Event-Driven** - React to domain events
- **Eventually Consistent** - Maintain consistency without distributed transactions
## What is a Saga?
A saga is a sequence of local transactions where each transaction updates data and publishes events. If a step fails, compensating transactions undo completed steps.
### Example: Order Fulfillment
```
1. Reserve Inventory → Success
2. Process Payment → Success
3. Ship Order → FAILED
Compensate:
- Refund Payment
- Release Inventory
```
## Saga Types
### Choreography
Services react to events independently:
```csharp
// Order Service
public async Task HandleOrderPlacedAsync(OrderPlacedEvent @event)
{
// Publish event for inventory service
await PublishAsync(new ReserveInventoryRequested
{
OrderId = @event.OrderId,
Items = @event.Items
});
}
// Inventory Service (independent)
public async Task HandleReserveInventoryRequestedAsync(ReserveInventoryRequested @event)
{
await ReserveInventoryAsync(@event.OrderId, @event.Items);
// Publish event for payment service
await PublishAsync(new InventoryReservedEvent
{
OrderId = @event.OrderId
});
}
// Payment Service (independent)
public async Task HandleInventoryReservedAsync(InventoryReservedEvent @event)
{
await ProcessPaymentAsync(@event.OrderId);
// And so on...
}
```
**Pros:**
- No central coordinator
- Services loosely coupled
- Good for simple workflows
**Cons:**
- Hard to understand flow
- Difficult to debug
- Complex error handling
### Orchestration
Central coordinator manages the workflow:
```csharp
public class OrderFulfillmentSaga : ISaga
{
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var sagaState = new OrderFulfillmentState
{
OrderId = @event.OrderId,
CurrentStep = SagaStep.ReserveInventory
};
try
{
// Step 1: Reserve Inventory
await _inventoryService.ReserveAsync(@event.OrderId, @event.Items, ct);
sagaState.InventoryReserved = true;
sagaState.CurrentStep = SagaStep.ProcessPayment;
// Step 2: Process Payment
await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
sagaState.PaymentProcessed = true;
sagaState.CurrentStep = SagaStep.ShipOrder;
// Step 3: Ship Order
await _shippingService.ShipAsync(@event.OrderId, ct);
sagaState.Shipped = true;
sagaState.CurrentStep = SagaStep.Completed;
await PublishAsync(new OrderFulfilledEvent { OrderId = @event.OrderId });
}
catch (Exception ex)
{
_logger.LogError(ex, "Saga failed at step {Step}", sagaState.CurrentStep);
// Compensate completed steps
await CompensateAsync(sagaState, ct);
await PublishAsync(new OrderFulfillmentFailedEvent
{
OrderId = @event.OrderId,
FailedStep = sagaState.CurrentStep,
Reason = ex.Message
});
}
}
private async Task CompensateAsync(OrderFulfillmentState state, CancellationToken ct)
{
// Compensate in reverse order
if (state.PaymentProcessed)
{
await _paymentService.RefundAsync(state.OrderId, ct);
}
if (state.InventoryReserved)
{
await _inventoryService.ReleaseAsync(state.OrderId, ct);
}
}
}
```
**Pros:**
- Clear workflow definition
- Easier to debug
- Centralized error handling
- Better observability
**Cons:**
- Central coordinator (single point of failure)
- More complex implementation
- Tighter coupling to coordinator
## Saga State
Sagas maintain state across steps:
```csharp
public enum SagaStep
{
ReserveInventory,
ProcessPayment,
ShipOrder,
Completed,
Compensating,
Failed
}
public class OrderFulfillmentState
{
public int OrderId { get; set; }
public SagaStep CurrentStep { get; set; }
// Completed steps
public bool InventoryReserved { get; set; }
public bool PaymentProcessed { get; set; }
public bool Shipped { get; set; }
// Compensation tracking
public bool PaymentRefunded { get; set; }
public bool InventoryReleased { get; set; }
// Metadata
public DateTimeOffset StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
public string? FailureReason { get; set; }
}
```
## Compensation Strategies
### Forward Recovery
Retry failed steps:
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var maxRetries = 3;
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
return; // Success
}
catch (TransientException ex)
{
if (attempt == maxRetries)
throw;
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)), ct);
}
}
}
```
### Backward Recovery
Undo completed steps:
```csharp
private async Task CompensateAsync(OrderFulfillmentState state, CancellationToken ct)
{
state.CurrentStep = SagaStep.Compensating;
try
{
// Undo in reverse order
if (state.Shipped)
{
await _shippingService.CancelShipmentAsync(state.OrderId, ct);
}
if (state.PaymentProcessed)
{
await _paymentService.RefundAsync(state.OrderId, ct);
state.PaymentRefunded = true;
}
if (state.InventoryReserved)
{
await _inventoryService.ReleaseAsync(state.OrderId, ct);
state.InventoryReleased = true;
}
state.CurrentStep = SagaStep.Failed;
}
catch (Exception ex)
{
_logger.LogError(ex, "Compensation failed for order {OrderId}", state.OrderId);
// Manual intervention required
await NotifyOpsTeamAsync(state, ex);
}
}
```
## Saga Persistence
Persist saga state for recovery:
```csharp
public class SagaStateStore
{
public async Task SaveStateAsync(string sagaId, object state, CancellationToken ct)
{
var json = JsonSerializer.Serialize(state);
await _dbContext.SagaStates.AddAsync(new SagaStateEntity
{
SagaId = sagaId,
StateJson = json,
UpdatedAt = DateTimeOffset.UtcNow
}, ct);
await _dbContext.SaveChangesAsync(ct);
}
public async Task<T?> GetStateAsync<T>(string sagaId, CancellationToken ct)
{
var entity = await _dbContext.SagaStates
.FirstOrDefaultAsync(s => s.SagaId == sagaId, ct);
return entity == null
? default
: JsonSerializer.Deserialize<T>(entity.StateJson);
}
}
```
## Idempotency
Ensure saga steps are idempotent:
```csharp
public async Task ReserveInventoryAsync(int orderId, List<OrderItem> items, CancellationToken ct)
{
// Check if already reserved (idempotency)
var existing = await _dbContext.InventoryReservations
.FirstOrDefaultAsync(r => r.OrderId == orderId, ct);
if (existing != null)
{
_logger.LogInformation("Inventory already reserved for order {OrderId}", orderId);
return; // Already done
}
// Reserve inventory
foreach (var item in items)
{
var product = await _dbContext.Products.FindAsync(item.ProductId);
if (product.Stock < item.Quantity)
{
throw new InsufficientStockException(item.ProductId);
}
product.Stock -= item.Quantity;
}
// Record reservation
await _dbContext.InventoryReservations.AddAsync(new InventoryReservation
{
OrderId = orderId,
ReservedAt = DateTimeOffset.UtcNow
}, ct);
await _dbContext.SaveChangesAsync(ct);
}
```
## Timeouts
Handle long-running steps with timeouts:
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
using var stepCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
stepCts.CancelAfter(TimeSpan.FromMinutes(5)); // 5-minute timeout
try
{
await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, stepCts.Token);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
_logger.LogWarning("Payment step timed out for order {OrderId}", @event.OrderId);
// Compensate
await CompensateAsync(state, ct);
throw new SagaTimeoutException("Payment step timed out");
}
}
```
## Common Patterns
### Two-Phase Commit (2PC)
```csharp
// Phase 1: Prepare
await _inventoryService.PrepareReservationAsync(orderId);
await _paymentService.PrepareChargeAsync(orderId);
// Phase 2: Commit
await _inventoryService.CommitReservationAsync(orderId);
await _paymentService.CommitChargeAsync(orderId);
```
### Reservation Pattern
```csharp
// Reserve resources
var reservationId = await _inventoryService.ReserveAsync(items, expiresIn: TimeSpan.FromMinutes(10));
try
{
await _paymentService.ChargeAsync(orderId, amount);
// Confirm reservation
await _inventoryService.ConfirmReservationAsync(reservationId);
}
catch
{
// Cancel reservation (auto-expires after 10 minutes anyway)
await _inventoryService.CancelReservationAsync(reservationId);
throw;
}
```
## Best Practices
### ✅ DO
- Use orchestration for complex workflows
- Make saga steps idempotent
- Persist saga state
- Implement compensation logic
- Handle timeouts
- Log all saga steps
- Monitor saga completion rates
- Design for failure
- Use unique saga IDs
- Version saga definitions
### ❌ DON'T
- Don't use distributed transactions (2PC)
- Don't assume steps always succeed
- Don't forget compensation logic
- Don't ignore partial failures
- Don't make compensations non-idempotent
- Don't skip saga state persistence
- Don't use blocking operations
- Don't forget timeout handling
## See Also
- [Sagas Overview](README.md)
- [Creating Sagas](creating-sagas.md)
- [Compensation](compensation.md)
- [Saga Context](saga-context.md)
- [Event Streaming Overview](../README.md)