dotnet-cqrs/docs/event-streaming/sagas/creating-sagas.md

483 lines
13 KiB
Markdown

# Creating Sagas
Implement ISaga to orchestrate long-running business processes.
## Overview
Creating sagas involves defining workflow steps, state management, compensation logic, and error handling for distributed transactions.
## Quick Start
```csharp
using Svrnty.CQRS.Events.Abstractions;
public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
{
private readonly IInventoryService _inventoryService;
private readonly IPaymentService _paymentService;
private readonly IShippingService _shippingService;
private readonly ISagaStateStore _stateStore;
private readonly IEventPublisher _eventPublisher;
public string SagaName => "order-fulfillment";
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var sagaId = $"order-{@event.OrderId}";
var state = new OrderFulfillmentState
{
OrderId = @event.OrderId,
SagaId = sagaId,
StartedAt = DateTimeOffset.UtcNow
};
try
{
// Step 1: Reserve Inventory
await ExecuteStepAsync(state, SagaStep.ReserveInventory, async () =>
{
await _inventoryService.ReserveAsync(@event.OrderId, @event.Items, ct);
state.InventoryReserved = true;
}, ct);
// Step 2: Process Payment
await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
{
await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
state.PaymentProcessed = true;
}, ct);
// Step 3: Ship Order
await ExecuteStepAsync(state, SagaStep.ShipOrder, async () =>
{
await _shippingService.ShipAsync(@event.OrderId, ct);
state.Shipped = true;
}, ct);
state.CurrentStep = SagaStep.Completed;
state.CompletedAt = DateTimeOffset.UtcNow;
await _stateStore.SaveStateAsync(sagaId, state, ct);
await _eventPublisher.PublishAsync(new OrderFulfilledEvent { OrderId = @event.OrderId });
}
catch (Exception ex)
{
await HandleFailureAsync(state, ex, ct);
}
}
private async Task ExecuteStepAsync(
OrderFulfillmentState state,
SagaStep step,
Func<Task> action,
CancellationToken ct)
{
state.CurrentStep = step;
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
try
{
await action();
}
catch (Exception ex)
{
_logger.LogError(ex, "Saga step {Step} failed for {SagaId}", step, state.SagaId);
throw;
}
}
private async Task HandleFailureAsync(
OrderFulfillmentState state,
Exception ex,
CancellationToken ct)
{
_logger.LogError(ex, "Saga {SagaId} failed at step {Step}", state.SagaId, state.CurrentStep);
state.FailureReason = ex.Message;
await CompensateAsync(state, ct);
await _eventPublisher.PublishAsync(new OrderFulfillmentFailedEvent
{
OrderId = state.OrderId,
FailedStep = state.CurrentStep.ToString(),
Reason = ex.Message
});
}
private async Task CompensateAsync(OrderFulfillmentState state, CancellationToken ct)
{
state.CurrentStep = SagaStep.Compensating;
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
// Compensate in reverse order
if (state.Shipped)
{
await _shippingService.CancelShipmentAsync(state.OrderId, ct);
}
if (state.PaymentProcessed && !state.PaymentRefunded)
{
await _paymentService.RefundAsync(state.OrderId, ct);
state.PaymentRefunded = true;
}
if (state.InventoryReserved && !state.InventoryReleased)
{
await _inventoryService.ReleaseAsync(state.OrderId, ct);
state.InventoryReleased = true;
}
state.CurrentStep = SagaStep.Failed;
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
}
}
```
## ISaga Interface
```csharp
public interface ISaga<TEvent> where TEvent : class
{
string SagaName { get; }
Task HandleAsync(TEvent @event, CancellationToken cancellationToken);
}
```
## Registration
```csharp
using Svrnty.CQRS.Events;
var builder = WebApplication.CreateBuilder(args);
// Register saga
builder.Services.AddSingleton<ISaga<OrderPlacedEvent>, OrderFulfillmentSaga>();
// Register saga orchestrator
builder.Services.AddSagaOrchestrator();
var app = builder.Build();
app.Run();
```
## Saga State
Define comprehensive state for your saga:
```csharp
public class OrderFulfillmentState
{
public string SagaId { get; set; } = string.Empty;
public int OrderId { get; set; }
public SagaStep CurrentStep { get; set; }
// Step completion tracking
public bool InventoryReserved { get; set; }
public bool PaymentProcessed { get; set; }
public bool Shipped { get; set; }
// Compensation tracking
public bool InventoryReleased { get; set; }
public bool PaymentRefunded { get; set; }
// Metadata
public DateTimeOffset StartedAt { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
public string? FailureReason { get; set; }
// Data needed for compensation
public string? PaymentTransactionId { get; set; }
public string? ShipmentTrackingNumber { get; set; }
}
public enum SagaStep
{
Started,
ReserveInventory,
ProcessPayment,
ShipOrder,
Completed,
Compensating,
Failed
}
```
## Step-by-Step Workflow
### Step Execution Template
```csharp
private async Task<TResult> ExecuteStepAsync<TResult>(
OrderFulfillmentState state,
SagaStep step,
Func<Task<TResult>> action,
CancellationToken ct)
{
state.CurrentStep = step;
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
_logger.LogInformation("Executing saga step {Step} for {SagaId}", step, state.SagaId);
try
{
var result = await action();
_logger.LogInformation("Saga step {Step} completed for {SagaId}", step, state.SagaId);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Saga step {Step} failed for {SagaId}", step, state.SagaId);
throw;
}
}
```
### Using the Template
```csharp
// Step with return value
var transactionId = await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
{
var txId = await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
state.PaymentProcessed = true;
state.PaymentTransactionId = txId;
return txId;
}, ct);
// Step without return value
await ExecuteStepAsync(state, SagaStep.ShipOrder, async () =>
{
await _shippingService.ShipAsync(@event.OrderId, ct);
state.Shipped = true;
return Task.CompletedTask;
}, ct);
```
## State Persistence
Persist saga state after each step:
```csharp
public interface ISagaStateStore
{
Task SaveStateAsync(string sagaId, object state, CancellationToken ct);
Task<T?> GetStateAsync<T>(string sagaId, CancellationToken ct);
Task DeleteStateAsync(string sagaId, CancellationToken ct);
}
public class PostgresSagaStateStore : ISagaStateStore
{
public async Task SaveStateAsync(string sagaId, object state, CancellationToken ct)
{
var json = JsonSerializer.Serialize(state);
var entity = await _dbContext.SagaStates
.FirstOrDefaultAsync(s => s.SagaId == sagaId, ct);
if (entity == null)
{
entity = new SagaStateEntity
{
SagaId = sagaId,
StateJson = json,
CreatedAt = DateTimeOffset.UtcNow
};
_dbContext.SagaStates.Add(entity);
}
else
{
entity.StateJson = json;
entity.UpdatedAt = DateTimeOffset.UtcNow;
}
await _dbContext.SaveChangesAsync(ct);
}
}
```
## Recovery from Failure
Resume sagas after application restart:
```csharp
public class SagaRecoveryService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Wait for startup
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
// Find incomplete sagas
var incompleteSagas = await _stateStore.GetIncompleteSagasAsync(stoppingToken);
foreach (var sagaState in incompleteSagas)
{
_logger.LogInformation("Resuming saga {SagaId} from step {Step}",
sagaState.SagaId,
sagaState.CurrentStep);
try
{
await ResumeSagaAsync(sagaState, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to resume saga {SagaId}", sagaState.SagaId);
}
}
}
private async Task ResumeSagaAsync(OrderFulfillmentState state, CancellationToken ct)
{
var saga = _serviceProvider.GetRequiredService<OrderFulfillmentSaga>();
// Resume from current step
switch (state.CurrentStep)
{
case SagaStep.ReserveInventory:
if (!state.InventoryReserved)
{
await saga.ExecuteReserveInventoryAsync(state, ct);
}
goto case SagaStep.ProcessPayment;
case SagaStep.ProcessPayment:
if (!state.PaymentProcessed)
{
await saga.ExecuteProcessPaymentAsync(state, ct);
}
goto case SagaStep.ShipOrder;
case SagaStep.ShipOrder:
if (!state.Shipped)
{
await saga.ExecuteShipOrderAsync(state, ct);
}
break;
case SagaStep.Compensating:
await saga.CompensateAsync(state, ct);
break;
}
}
}
```
## Timeout Handling
Add timeouts to prevent stuck sagas:
```csharp
private async Task ExecuteStepWithTimeoutAsync(
OrderFulfillmentState state,
SagaStep step,
Func<Task> action,
TimeSpan timeout,
CancellationToken ct)
{
using var stepCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
stepCts.CancelAfter(timeout);
state.CurrentStep = step;
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
try
{
await action();
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
_logger.LogWarning("Saga step {Step} timed out after {Timeout} for {SagaId}",
step,
timeout,
state.SagaId);
throw new SagaTimeoutException($"Step {step} timed out");
}
}
// Usage
await ExecuteStepWithTimeoutAsync(
state,
SagaStep.ProcessPayment,
async () => await _paymentService.ChargeAsync(orderId, amount, ct),
timeout: TimeSpan.FromMinutes(2),
ct);
```
## Parallel Steps
Execute independent steps in parallel:
```csharp
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var state = new OrderFulfillmentState { OrderId = @event.OrderId };
// Sequential step
await ExecuteStepAsync(state, SagaStep.ValidateOrder, async () =>
{
await _orderService.ValidateAsync(@event.OrderId, ct);
state.OrderValidated = true;
}, ct);
// Parallel steps (independent)
await Task.WhenAll(
ExecuteStepAsync(state, SagaStep.ReserveInventory, async () =>
{
await _inventoryService.ReserveAsync(@event.OrderId, @event.Items, ct);
state.InventoryReserved = true;
}, ct),
ExecuteStepAsync(state, SagaStep.CheckCredit, async () =>
{
await _creditService.CheckAsync(@event.CustomerId, @event.TotalAmount, ct);
state.CreditChecked = true;
}, ct)
);
// Continue with dependent steps
await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
{
await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
state.PaymentProcessed = true;
}, ct);
}
```
## Best Practices
### ✅ DO
- Persist state after each step
- Make steps idempotent
- Implement comprehensive compensation
- Log all saga operations
- Use unique saga IDs
- Handle timeouts
- Version saga definitions
- Test failure scenarios
- Monitor saga completion rates
- Implement recovery logic
### ❌ DON'T
- Don't skip state persistence
- Don't assume steps always succeed
- Don't forget compensation for each step
- Don't use blocking operations
- Don't ignore timeout scenarios
- Don't make steps non-idempotent
- Don't forget error logging
- Don't couple sagas tightly to services
## See Also
- [Sagas Overview](README.md)
- [Saga Pattern](saga-pattern.md)
- [Compensation](compensation.md)
- [Saga Context](saga-context.md)
- [Event Streaming Overview](../README.md)