483 lines
13 KiB
Markdown
483 lines
13 KiB
Markdown
# Creating Sagas
|
|
|
|
Implement ISaga to orchestrate long-running business processes.
|
|
|
|
## Overview
|
|
|
|
Creating sagas involves defining workflow steps, state management, compensation logic, and error handling for distributed transactions.
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
public class OrderFulfillmentSaga : ISaga<OrderPlacedEvent>
|
|
{
|
|
private readonly IInventoryService _inventoryService;
|
|
private readonly IPaymentService _paymentService;
|
|
private readonly IShippingService _shippingService;
|
|
private readonly ISagaStateStore _stateStore;
|
|
private readonly IEventPublisher _eventPublisher;
|
|
|
|
public string SagaName => "order-fulfillment";
|
|
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var sagaId = $"order-{@event.OrderId}";
|
|
var state = new OrderFulfillmentState
|
|
{
|
|
OrderId = @event.OrderId,
|
|
SagaId = sagaId,
|
|
StartedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
try
|
|
{
|
|
// Step 1: Reserve Inventory
|
|
await ExecuteStepAsync(state, SagaStep.ReserveInventory, async () =>
|
|
{
|
|
await _inventoryService.ReserveAsync(@event.OrderId, @event.Items, ct);
|
|
state.InventoryReserved = true;
|
|
}, ct);
|
|
|
|
// Step 2: Process Payment
|
|
await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
|
|
{
|
|
await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
|
|
state.PaymentProcessed = true;
|
|
}, ct);
|
|
|
|
// Step 3: Ship Order
|
|
await ExecuteStepAsync(state, SagaStep.ShipOrder, async () =>
|
|
{
|
|
await _shippingService.ShipAsync(@event.OrderId, ct);
|
|
state.Shipped = true;
|
|
}, ct);
|
|
|
|
state.CurrentStep = SagaStep.Completed;
|
|
state.CompletedAt = DateTimeOffset.UtcNow;
|
|
|
|
await _stateStore.SaveStateAsync(sagaId, state, ct);
|
|
await _eventPublisher.PublishAsync(new OrderFulfilledEvent { OrderId = @event.OrderId });
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
await HandleFailureAsync(state, ex, ct);
|
|
}
|
|
}
|
|
|
|
private async Task ExecuteStepAsync(
|
|
OrderFulfillmentState state,
|
|
SagaStep step,
|
|
Func<Task> action,
|
|
CancellationToken ct)
|
|
{
|
|
state.CurrentStep = step;
|
|
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
|
|
|
|
try
|
|
{
|
|
await action();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Saga step {Step} failed for {SagaId}", step, state.SagaId);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
private async Task HandleFailureAsync(
|
|
OrderFulfillmentState state,
|
|
Exception ex,
|
|
CancellationToken ct)
|
|
{
|
|
_logger.LogError(ex, "Saga {SagaId} failed at step {Step}", state.SagaId, state.CurrentStep);
|
|
|
|
state.FailureReason = ex.Message;
|
|
await CompensateAsync(state, ct);
|
|
|
|
await _eventPublisher.PublishAsync(new OrderFulfillmentFailedEvent
|
|
{
|
|
OrderId = state.OrderId,
|
|
FailedStep = state.CurrentStep.ToString(),
|
|
Reason = ex.Message
|
|
});
|
|
}
|
|
|
|
private async Task CompensateAsync(OrderFulfillmentState state, CancellationToken ct)
|
|
{
|
|
state.CurrentStep = SagaStep.Compensating;
|
|
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
|
|
|
|
// Compensate in reverse order
|
|
if (state.Shipped)
|
|
{
|
|
await _shippingService.CancelShipmentAsync(state.OrderId, ct);
|
|
}
|
|
|
|
if (state.PaymentProcessed && !state.PaymentRefunded)
|
|
{
|
|
await _paymentService.RefundAsync(state.OrderId, ct);
|
|
state.PaymentRefunded = true;
|
|
}
|
|
|
|
if (state.InventoryReserved && !state.InventoryReleased)
|
|
{
|
|
await _inventoryService.ReleaseAsync(state.OrderId, ct);
|
|
state.InventoryReleased = true;
|
|
}
|
|
|
|
state.CurrentStep = SagaStep.Failed;
|
|
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
## ISaga Interface
|
|
|
|
```csharp
|
|
public interface ISaga<TEvent> where TEvent : class
|
|
{
|
|
string SagaName { get; }
|
|
Task HandleAsync(TEvent @event, CancellationToken cancellationToken);
|
|
}
|
|
```
|
|
|
|
## Registration
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register saga
|
|
builder.Services.AddSingleton<ISaga<OrderPlacedEvent>, OrderFulfillmentSaga>();
|
|
|
|
// Register saga orchestrator
|
|
builder.Services.AddSagaOrchestrator();
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
## Saga State
|
|
|
|
Define comprehensive state for your saga:
|
|
|
|
```csharp
|
|
public class OrderFulfillmentState
|
|
{
|
|
public string SagaId { get; set; } = string.Empty;
|
|
public int OrderId { get; set; }
|
|
public SagaStep CurrentStep { get; set; }
|
|
|
|
// Step completion tracking
|
|
public bool InventoryReserved { get; set; }
|
|
public bool PaymentProcessed { get; set; }
|
|
public bool Shipped { get; set; }
|
|
|
|
// Compensation tracking
|
|
public bool InventoryReleased { get; set; }
|
|
public bool PaymentRefunded { get; set; }
|
|
|
|
// Metadata
|
|
public DateTimeOffset StartedAt { get; set; }
|
|
public DateTimeOffset? CompletedAt { get; set; }
|
|
public string? FailureReason { get; set; }
|
|
|
|
// Data needed for compensation
|
|
public string? PaymentTransactionId { get; set; }
|
|
public string? ShipmentTrackingNumber { get; set; }
|
|
}
|
|
|
|
public enum SagaStep
|
|
{
|
|
Started,
|
|
ReserveInventory,
|
|
ProcessPayment,
|
|
ShipOrder,
|
|
Completed,
|
|
Compensating,
|
|
Failed
|
|
}
|
|
```
|
|
|
|
## Step-by-Step Workflow
|
|
|
|
### Step Execution Template
|
|
|
|
```csharp
|
|
private async Task<TResult> ExecuteStepAsync<TResult>(
|
|
OrderFulfillmentState state,
|
|
SagaStep step,
|
|
Func<Task<TResult>> action,
|
|
CancellationToken ct)
|
|
{
|
|
state.CurrentStep = step;
|
|
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
|
|
|
|
_logger.LogInformation("Executing saga step {Step} for {SagaId}", step, state.SagaId);
|
|
|
|
try
|
|
{
|
|
var result = await action();
|
|
|
|
_logger.LogInformation("Saga step {Step} completed for {SagaId}", step, state.SagaId);
|
|
|
|
return result;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Saga step {Step} failed for {SagaId}", step, state.SagaId);
|
|
throw;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Using the Template
|
|
|
|
```csharp
|
|
// Step with return value
|
|
var transactionId = await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
|
|
{
|
|
var txId = await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
|
|
state.PaymentProcessed = true;
|
|
state.PaymentTransactionId = txId;
|
|
return txId;
|
|
}, ct);
|
|
|
|
// Step without return value
|
|
await ExecuteStepAsync(state, SagaStep.ShipOrder, async () =>
|
|
{
|
|
await _shippingService.ShipAsync(@event.OrderId, ct);
|
|
state.Shipped = true;
|
|
return Task.CompletedTask;
|
|
}, ct);
|
|
```
|
|
|
|
## State Persistence
|
|
|
|
Persist saga state after each step:
|
|
|
|
```csharp
|
|
public interface ISagaStateStore
|
|
{
|
|
Task SaveStateAsync(string sagaId, object state, CancellationToken ct);
|
|
Task<T?> GetStateAsync<T>(string sagaId, CancellationToken ct);
|
|
Task DeleteStateAsync(string sagaId, CancellationToken ct);
|
|
}
|
|
|
|
public class PostgresSagaStateStore : ISagaStateStore
|
|
{
|
|
public async Task SaveStateAsync(string sagaId, object state, CancellationToken ct)
|
|
{
|
|
var json = JsonSerializer.Serialize(state);
|
|
|
|
var entity = await _dbContext.SagaStates
|
|
.FirstOrDefaultAsync(s => s.SagaId == sagaId, ct);
|
|
|
|
if (entity == null)
|
|
{
|
|
entity = new SagaStateEntity
|
|
{
|
|
SagaId = sagaId,
|
|
StateJson = json,
|
|
CreatedAt = DateTimeOffset.UtcNow
|
|
};
|
|
_dbContext.SagaStates.Add(entity);
|
|
}
|
|
else
|
|
{
|
|
entity.StateJson = json;
|
|
entity.UpdatedAt = DateTimeOffset.UtcNow;
|
|
}
|
|
|
|
await _dbContext.SaveChangesAsync(ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Recovery from Failure
|
|
|
|
Resume sagas after application restart:
|
|
|
|
```csharp
|
|
public class SagaRecoveryService : BackgroundService
|
|
{
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
// Wait for startup
|
|
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
|
|
|
// Find incomplete sagas
|
|
var incompleteSagas = await _stateStore.GetIncompleteSagasAsync(stoppingToken);
|
|
|
|
foreach (var sagaState in incompleteSagas)
|
|
{
|
|
_logger.LogInformation("Resuming saga {SagaId} from step {Step}",
|
|
sagaState.SagaId,
|
|
sagaState.CurrentStep);
|
|
|
|
try
|
|
{
|
|
await ResumeSagaAsync(sagaState, stoppingToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to resume saga {SagaId}", sagaState.SagaId);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ResumeSagaAsync(OrderFulfillmentState state, CancellationToken ct)
|
|
{
|
|
var saga = _serviceProvider.GetRequiredService<OrderFulfillmentSaga>();
|
|
|
|
// Resume from current step
|
|
switch (state.CurrentStep)
|
|
{
|
|
case SagaStep.ReserveInventory:
|
|
if (!state.InventoryReserved)
|
|
{
|
|
await saga.ExecuteReserveInventoryAsync(state, ct);
|
|
}
|
|
goto case SagaStep.ProcessPayment;
|
|
|
|
case SagaStep.ProcessPayment:
|
|
if (!state.PaymentProcessed)
|
|
{
|
|
await saga.ExecuteProcessPaymentAsync(state, ct);
|
|
}
|
|
goto case SagaStep.ShipOrder;
|
|
|
|
case SagaStep.ShipOrder:
|
|
if (!state.Shipped)
|
|
{
|
|
await saga.ExecuteShipOrderAsync(state, ct);
|
|
}
|
|
break;
|
|
|
|
case SagaStep.Compensating:
|
|
await saga.CompensateAsync(state, ct);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Timeout Handling
|
|
|
|
Add timeouts to prevent stuck sagas:
|
|
|
|
```csharp
|
|
private async Task ExecuteStepWithTimeoutAsync(
|
|
OrderFulfillmentState state,
|
|
SagaStep step,
|
|
Func<Task> action,
|
|
TimeSpan timeout,
|
|
CancellationToken ct)
|
|
{
|
|
using var stepCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
stepCts.CancelAfter(timeout);
|
|
|
|
state.CurrentStep = step;
|
|
await _stateStore.SaveStateAsync(state.SagaId, state, ct);
|
|
|
|
try
|
|
{
|
|
await action();
|
|
}
|
|
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
|
|
{
|
|
_logger.LogWarning("Saga step {Step} timed out after {Timeout} for {SagaId}",
|
|
step,
|
|
timeout,
|
|
state.SagaId);
|
|
|
|
throw new SagaTimeoutException($"Step {step} timed out");
|
|
}
|
|
}
|
|
|
|
// Usage
|
|
await ExecuteStepWithTimeoutAsync(
|
|
state,
|
|
SagaStep.ProcessPayment,
|
|
async () => await _paymentService.ChargeAsync(orderId, amount, ct),
|
|
timeout: TimeSpan.FromMinutes(2),
|
|
ct);
|
|
```
|
|
|
|
## Parallel Steps
|
|
|
|
Execute independent steps in parallel:
|
|
|
|
```csharp
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
var state = new OrderFulfillmentState { OrderId = @event.OrderId };
|
|
|
|
// Sequential step
|
|
await ExecuteStepAsync(state, SagaStep.ValidateOrder, async () =>
|
|
{
|
|
await _orderService.ValidateAsync(@event.OrderId, ct);
|
|
state.OrderValidated = true;
|
|
}, ct);
|
|
|
|
// Parallel steps (independent)
|
|
await Task.WhenAll(
|
|
ExecuteStepAsync(state, SagaStep.ReserveInventory, async () =>
|
|
{
|
|
await _inventoryService.ReserveAsync(@event.OrderId, @event.Items, ct);
|
|
state.InventoryReserved = true;
|
|
}, ct),
|
|
|
|
ExecuteStepAsync(state, SagaStep.CheckCredit, async () =>
|
|
{
|
|
await _creditService.CheckAsync(@event.CustomerId, @event.TotalAmount, ct);
|
|
state.CreditChecked = true;
|
|
}, ct)
|
|
);
|
|
|
|
// Continue with dependent steps
|
|
await ExecuteStepAsync(state, SagaStep.ProcessPayment, async () =>
|
|
{
|
|
await _paymentService.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
|
|
state.PaymentProcessed = true;
|
|
}, ct);
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Persist state after each step
|
|
- Make steps idempotent
|
|
- Implement comprehensive compensation
|
|
- Log all saga operations
|
|
- Use unique saga IDs
|
|
- Handle timeouts
|
|
- Version saga definitions
|
|
- Test failure scenarios
|
|
- Monitor saga completion rates
|
|
- Implement recovery logic
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't skip state persistence
|
|
- Don't assume steps always succeed
|
|
- Don't forget compensation for each step
|
|
- Don't use blocking operations
|
|
- Don't ignore timeout scenarios
|
|
- Don't make steps non-idempotent
|
|
- Don't forget error logging
|
|
- Don't couple sagas tightly to services
|
|
|
|
## See Also
|
|
|
|
- [Sagas Overview](README.md)
|
|
- [Saga Pattern](saga-pattern.md)
|
|
- [Compensation](compensation.md)
|
|
- [Saga Context](saga-context.md)
|
|
- [Event Streaming Overview](../README.md)
|