dotnet-cqrs/docs/tutorials/event-sourcing/03-events-and-workflows.md

425 lines
13 KiB
Markdown

# Event Design and Workflows
Learn how to design events and implement workflow patterns with Svrnty.CQRS.
## Event Design Principles
### 1. Events are Immutable Facts
Events represent things that have already happened and cannot be changed:
```csharp
// ✅ Good: Immutable record, past tense, descriptive
public record OrderPlacedEvent
{
public string OrderId { get; init; } = string.Empty;
public string CustomerId { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
public List<OrderLineItem> Items { get; init; } = new();
public DateTimeOffset PlacedAt { get; init; }
}
// ❌ Bad: Mutable class, present tense, vague
public class OrderEvent
{
public string Id { get; set; }
public string Data { get; set; } // Not descriptive
}
```
### 2. Include All Relevant Data
Events should contain all information needed to understand what happened:
```csharp
// ✅ Good: Complete information
public record ProductPriceChangedEvent
{
public string ProductId { get; init; } = string.Empty;
public string ProductName { get; init; } = string.Empty; // Context
public decimal OldPrice { get; init; } // Previous state
public decimal NewPrice { get; init; } // New state
public string ChangedBy { get; init; } = string.Empty; // Who
public string Reason { get; init; } = string.Empty; // Why
public DateTimeOffset ChangedAt { get; init; } // When
}
// ❌ Bad: Minimal information
public record PriceChangedEvent
{
public string ProductId { get; init; } = string.Empty;
public decimal NewPrice { get; init; } // Missing context
}
```
### 3. Use Business Language
Name events using domain language, not technical terms:
```csharp
// ✅ Good: Business language
public record OrderShippedEvent { }
public record PaymentReceivedEvent { }
public record CustomerRegisteredEvent { }
// ❌ Bad: Technical language
public record OrderStatusUpdatedEvent { } // What status?
public record DataChangedEvent { } // What data?
public record EntityCreatedEvent { } // What entity?
```
## Event Granularity
### Fine-Grained Events
Each event represents a single business fact:
```csharp
// ✅ Good: Separate events for separate facts
public record UserRegisteredEvent
{
public string UserId { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
public string Email { get; init; } = string.Empty;
}
public record UserEmailVerifiedEvent
{
public string UserId { get; init; } = string.Empty;
public DateTimeOffset VerifiedAt { get; init; }
}
public record UserProfileCompletedEvent
{
public string UserId { get; init; } = string.Empty;
public string PhoneNumber { get; init; } = string.Empty;
public string Address { get; init; } = string.Empty;
}
// ❌ Bad: Too coarse-grained
public record UserCreatedEvent
{
public string UserId { get; init; } = string.Empty;
public bool EmailVerified { get; init; } // Mixing concerns
public bool ProfileCompleted { get; init; } // Mixing concerns
}
```
## Workflow Pattern
Workflows coordinate multiple aggregates using events:
```csharp
// Aggregate 1: Order produces event
public class Order : AggregateRoot
{
public void Place(string customerId, List<OrderLine> items)
{
// Validate and produce event
ApplyEvent(new OrderPlacedEvent
{
OrderId = Id,
CustomerId = customerId,
Items = items,
TotalAmount = items.Sum(i => i.Price * i.Quantity),
PlacedAt = DateTimeOffset.UtcNow
});
}
}
// Workflow: Listen to events and coordinate
public class OrderWorkflow : IWorkflow<OrderPlacedEvent>
{
private readonly IEventStreamStore _eventStore;
private readonly IInventoryService _inventory;
private readonly IPaymentService _payment;
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
// Step 1: Reserve inventory
var reservationResult = await _inventory.ReserveAsync(@event.OrderId, @event.Items, ct);
if (reservationResult.Success)
{
await _eventStore.AppendAsync(@event.OrderId, new InventoryReservedEvent
{
OrderId = @event.OrderId,
ReservationId = reservationResult.ReservationId,
Items = @event.Items,
ReservedAt = DateTimeOffset.UtcNow
}, ct);
// Step 2: Process payment
var paymentResult = await _payment.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
if (paymentResult.Success)
{
await _eventStore.AppendAsync(@event.OrderId, new PaymentProcessedEvent
{
OrderId = @event.OrderId,
PaymentId = paymentResult.PaymentId,
Amount = @event.TotalAmount,
ProcessedAt = DateTimeOffset.UtcNow
}, ct);
}
else
{
// Compensation: Release inventory
await _inventory.ReleaseAsync(reservationResult.ReservationId, ct);
await _eventStore.AppendAsync(@event.OrderId, new OrderPaymentFailedEvent
{
OrderId = @event.OrderId,
Reason = paymentResult.ErrorMessage,
FailedAt = DateTimeOffset.UtcNow
}, ct);
}
}
else
{
await _eventStore.AppendAsync(@event.OrderId, new OrderInventoryUnavailableEvent
{
OrderId = @event.OrderId,
UnavailableItems = reservationResult.UnavailableItems,
NotifiedAt = DateTimeOffset.UtcNow
}, ct);
}
}
}
```
## Event Registration
Register workflows to listen to events:
```csharp
// In Program.cs
builder.Services.AddEventStreaming()
.AddPostgresEventStore(builder.Configuration.GetConnectionString("EventStore"));
// Register workflow
builder.Services.AddWorkflow<OrderPlacedEvent, OrderWorkflow>();
var app = builder.Build();
// Subscribe workflow to event stream
var subscription = app.Services.GetRequiredService<IEventSubscriptionService>();
await subscription.SubscribeAsync(
streamName: "orders",
subscriptionId: "order-workflow",
mode: SubscriptionMode.Broadcast,
async (StoredEvent storedEvent, CancellationToken ct) =>
{
if (storedEvent.Data is OrderPlacedEvent orderPlaced)
{
var workflow = app.Services.GetRequiredService<IWorkflow<OrderPlacedEvent>>();
await workflow.HandleAsync(orderPlaced, ct);
}
});
```
## Event Versioning
Events evolve over time. Use versioning to handle schema changes:
```csharp
// Version 1
public record UserRegisteredEventV1
{
public string UserId { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
}
// Version 2: Added email field
public record UserRegisteredEventV2
{
public string UserId { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
public string Email { get; init; } = string.Empty;
}
// Upcaster: Convert V1 to V2
public class UserRegisteredEventUpcaster : IEventUpcaster<UserRegisteredEventV1, UserRegisteredEventV2>
{
public UserRegisteredEventV2 Upcast(UserRegisteredEventV1 oldEvent)
{
return new UserRegisteredEventV2
{
UserId = oldEvent.UserId,
Name = oldEvent.Name,
Email = "unknown@example.com" // Default for old events
};
}
}
// When reading events
await foreach (var storedEvent in eventStore.ReadStreamAsync("user-123"))
{
var @event = storedEvent.Data;
// Upcast if needed
if (@event is UserRegisteredEventV1 v1)
{
@event = _upcaster.Upcast(v1);
}
// Now work with V2
if (@event is UserRegisteredEventV2 v2)
{
// Process...
}
}
```
## Complete Workflow Example
Here's a complete order fulfillment workflow:
```csharp
public class OrderFulfillmentWorkflow : IWorkflow<OrderPlacedEvent>
{
private readonly IEventStreamStore _eventStore;
private readonly IInventoryService _inventory;
private readonly IPaymentService _payment;
private readonly IShippingService _shipping;
private readonly ILogger<OrderFulfillmentWorkflow> _logger;
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var streamName = $"order-{@event.OrderId}";
try
{
// Step 1: Reserve inventory
_logger.LogInformation("Reserving inventory for order {OrderId}", @event.OrderId);
var reservation = await _inventory.ReserveAsync(
@event.OrderId,
@event.Items.Select(i => new InventoryItem
{
ProductId = i.ProductId,
Quantity = i.Quantity
}).ToList(),
ct);
if (!reservation.Success)
{
await _eventStore.AppendAsync(streamName, new OrderInventoryUnavailableEvent
{
OrderId = @event.OrderId,
UnavailableItems = reservation.UnavailableItems,
NotifiedAt = DateTimeOffset.UtcNow
}, ct);
return;
}
await _eventStore.AppendAsync(streamName, new InventoryReservedEvent
{
OrderId = @event.OrderId,
ReservationId = reservation.ReservationId,
Items = @event.Items,
ReservedAt = DateTimeOffset.UtcNow
}, ct);
// Step 2: Process payment
_logger.LogInformation("Processing payment for order {OrderId}", @event.OrderId);
var payment = await _payment.ChargeAsync(
@event.OrderId,
@event.TotalAmount,
ct);
if (!payment.Success)
{
// Compensation: Release inventory
await _inventory.ReleaseAsync(reservation.ReservationId, ct);
await _eventStore.AppendAsync(streamName, new OrderPaymentFailedEvent
{
OrderId = @event.OrderId,
Reason = payment.ErrorMessage,
FailedAt = DateTimeOffset.UtcNow
}, ct);
return;
}
await _eventStore.AppendAsync(streamName, new PaymentProcessedEvent
{
OrderId = @event.OrderId,
PaymentId = payment.PaymentId,
Amount = @event.TotalAmount,
ProcessedAt = DateTimeOffset.UtcNow
}, ct);
// Step 3: Create shipment
_logger.LogInformation("Creating shipment for order {OrderId}", @event.OrderId);
var shipment = await _shipping.CreateShipmentAsync(
@event.OrderId,
@event.Items,
ct);
await _eventStore.AppendAsync(streamName, new ShipmentCreatedEvent
{
OrderId = @event.OrderId,
ShipmentId = shipment.ShipmentId,
TrackingNumber = shipment.TrackingNumber,
EstimatedDelivery = shipment.EstimatedDelivery,
CreatedAt = DateTimeOffset.UtcNow
}, ct);
// Final: Mark order as fulfilled
await _eventStore.AppendAsync(streamName, new OrderFulfilledEvent
{
OrderId = @event.OrderId,
FulfilledAt = DateTimeOffset.UtcNow
}, ct);
_logger.LogInformation("Order {OrderId} fulfilled successfully", @event.OrderId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to fulfill order {OrderId}", @event.OrderId);
await _eventStore.AppendAsync(streamName, new OrderFulfillmentFailedEvent
{
OrderId = @event.OrderId,
ErrorMessage = ex.Message,
FailedAt = DateTimeOffset.UtcNow
}, ct);
}
}
}
```
## Best Practices
**DO:**
- Use past tense for event names (OrderPlaced, PaymentProcessed)
- Include all relevant context in events
- Keep events immutable (use `init` properties)
- Version events when schema changes
- Emit events after state changes are validated
- Use workflows to coordinate aggregates
**DON'T:**
- Use present tense (PlaceOrder, ProcessPayment)
- Include minimal data in events
- Make events mutable
- Change event schema without versioning
- Emit events before validation
- Put coordination logic in aggregates
## Next Steps
- [04-projections.md](04-projections.md) - Build read models from events
- [05-snapshots.md](05-snapshots.md) - Optimize with snapshots
- [06-replay-and-rebuild.md](06-replay-and-rebuild.md) - Replay and rebuild projections
## See Also
- [Events and Workflows](../../event-streaming/fundamentals/events-and-workflows.md)
- [Event Design Best Practices](../../best-practices/event-design.md)
- [Sagas](../../event-streaming/sagas/creating-sagas.md)