13 KiB
13 KiB
Event Design and Workflows
Learn how to design events and implement workflow patterns with Svrnty.CQRS.
Event Design Principles
1. Events are Immutable Facts
Events represent things that have already happened and cannot be changed:
// ✅ Good: Immutable record, past tense, descriptive
public record OrderPlacedEvent
{
public string OrderId { get; init; } = string.Empty;
public string CustomerId { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
public List<OrderLineItem> Items { get; init; } = new();
public DateTimeOffset PlacedAt { get; init; }
}
// ❌ Bad: Mutable class, present tense, vague
public class OrderEvent
{
public string Id { get; set; }
public string Data { get; set; } // Not descriptive
}
2. Include All Relevant Data
Events should contain all information needed to understand what happened:
// ✅ Good: Complete information
public record ProductPriceChangedEvent
{
public string ProductId { get; init; } = string.Empty;
public string ProductName { get; init; } = string.Empty; // Context
public decimal OldPrice { get; init; } // Previous state
public decimal NewPrice { get; init; } // New state
public string ChangedBy { get; init; } = string.Empty; // Who
public string Reason { get; init; } = string.Empty; // Why
public DateTimeOffset ChangedAt { get; init; } // When
}
// ❌ Bad: Minimal information
public record PriceChangedEvent
{
public string ProductId { get; init; } = string.Empty;
public decimal NewPrice { get; init; } // Missing context
}
3. Use Business Language
Name events using domain language, not technical terms:
// ✅ Good: Business language
public record OrderShippedEvent { }
public record PaymentReceivedEvent { }
public record CustomerRegisteredEvent { }
// ❌ Bad: Technical language
public record OrderStatusUpdatedEvent { } // What status?
public record DataChangedEvent { } // What data?
public record EntityCreatedEvent { } // What entity?
Event Granularity
Fine-Grained Events
Each event represents a single business fact:
// ✅ Good: Separate events for separate facts
public record UserRegisteredEvent
{
public string UserId { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
public string Email { get; init; } = string.Empty;
}
public record UserEmailVerifiedEvent
{
public string UserId { get; init; } = string.Empty;
public DateTimeOffset VerifiedAt { get; init; }
}
public record UserProfileCompletedEvent
{
public string UserId { get; init; } = string.Empty;
public string PhoneNumber { get; init; } = string.Empty;
public string Address { get; init; } = string.Empty;
}
// ❌ Bad: Too coarse-grained
public record UserCreatedEvent
{
public string UserId { get; init; } = string.Empty;
public bool EmailVerified { get; init; } // Mixing concerns
public bool ProfileCompleted { get; init; } // Mixing concerns
}
Workflow Pattern
Workflows coordinate multiple aggregates using events:
// Aggregate 1: Order produces event
public class Order : AggregateRoot
{
public void Place(string customerId, List<OrderLine> items)
{
// Validate and produce event
ApplyEvent(new OrderPlacedEvent
{
OrderId = Id,
CustomerId = customerId,
Items = items,
TotalAmount = items.Sum(i => i.Price * i.Quantity),
PlacedAt = DateTimeOffset.UtcNow
});
}
}
// Workflow: Listen to events and coordinate
public class OrderWorkflow : IWorkflow<OrderPlacedEvent>
{
private readonly IEventStreamStore _eventStore;
private readonly IInventoryService _inventory;
private readonly IPaymentService _payment;
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
// Step 1: Reserve inventory
var reservationResult = await _inventory.ReserveAsync(@event.OrderId, @event.Items, ct);
if (reservationResult.Success)
{
await _eventStore.AppendAsync(@event.OrderId, new InventoryReservedEvent
{
OrderId = @event.OrderId,
ReservationId = reservationResult.ReservationId,
Items = @event.Items,
ReservedAt = DateTimeOffset.UtcNow
}, ct);
// Step 2: Process payment
var paymentResult = await _payment.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);
if (paymentResult.Success)
{
await _eventStore.AppendAsync(@event.OrderId, new PaymentProcessedEvent
{
OrderId = @event.OrderId,
PaymentId = paymentResult.PaymentId,
Amount = @event.TotalAmount,
ProcessedAt = DateTimeOffset.UtcNow
}, ct);
}
else
{
// Compensation: Release inventory
await _inventory.ReleaseAsync(reservationResult.ReservationId, ct);
await _eventStore.AppendAsync(@event.OrderId, new OrderPaymentFailedEvent
{
OrderId = @event.OrderId,
Reason = paymentResult.ErrorMessage,
FailedAt = DateTimeOffset.UtcNow
}, ct);
}
}
else
{
await _eventStore.AppendAsync(@event.OrderId, new OrderInventoryUnavailableEvent
{
OrderId = @event.OrderId,
UnavailableItems = reservationResult.UnavailableItems,
NotifiedAt = DateTimeOffset.UtcNow
}, ct);
}
}
}
Event Registration
Register workflows to listen to events:
// In Program.cs
builder.Services.AddEventStreaming()
.AddPostgresEventStore(builder.Configuration.GetConnectionString("EventStore"));
// Register workflow
builder.Services.AddWorkflow<OrderPlacedEvent, OrderWorkflow>();
var app = builder.Build();
// Subscribe workflow to event stream
var subscription = app.Services.GetRequiredService<IEventSubscriptionService>();
await subscription.SubscribeAsync(
streamName: "orders",
subscriptionId: "order-workflow",
mode: SubscriptionMode.Broadcast,
async (StoredEvent storedEvent, CancellationToken ct) =>
{
if (storedEvent.Data is OrderPlacedEvent orderPlaced)
{
var workflow = app.Services.GetRequiredService<IWorkflow<OrderPlacedEvent>>();
await workflow.HandleAsync(orderPlaced, ct);
}
});
Event Versioning
Events evolve over time. Use versioning to handle schema changes:
// Version 1
public record UserRegisteredEventV1
{
public string UserId { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
}
// Version 2: Added email field
public record UserRegisteredEventV2
{
public string UserId { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
public string Email { get; init; } = string.Empty;
}
// Upcaster: Convert V1 to V2
public class UserRegisteredEventUpcaster : IEventUpcaster<UserRegisteredEventV1, UserRegisteredEventV2>
{
public UserRegisteredEventV2 Upcast(UserRegisteredEventV1 oldEvent)
{
return new UserRegisteredEventV2
{
UserId = oldEvent.UserId,
Name = oldEvent.Name,
Email = "unknown@example.com" // Default for old events
};
}
}
// When reading events
await foreach (var storedEvent in eventStore.ReadStreamAsync("user-123"))
{
var @event = storedEvent.Data;
// Upcast if needed
if (@event is UserRegisteredEventV1 v1)
{
@event = _upcaster.Upcast(v1);
}
// Now work with V2
if (@event is UserRegisteredEventV2 v2)
{
// Process...
}
}
Complete Workflow Example
Here's a complete order fulfillment workflow:
public class OrderFulfillmentWorkflow : IWorkflow<OrderPlacedEvent>
{
private readonly IEventStreamStore _eventStore;
private readonly IInventoryService _inventory;
private readonly IPaymentService _payment;
private readonly IShippingService _shipping;
private readonly ILogger<OrderFulfillmentWorkflow> _logger;
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
var streamName = $"order-{@event.OrderId}";
try
{
// Step 1: Reserve inventory
_logger.LogInformation("Reserving inventory for order {OrderId}", @event.OrderId);
var reservation = await _inventory.ReserveAsync(
@event.OrderId,
@event.Items.Select(i => new InventoryItem
{
ProductId = i.ProductId,
Quantity = i.Quantity
}).ToList(),
ct);
if (!reservation.Success)
{
await _eventStore.AppendAsync(streamName, new OrderInventoryUnavailableEvent
{
OrderId = @event.OrderId,
UnavailableItems = reservation.UnavailableItems,
NotifiedAt = DateTimeOffset.UtcNow
}, ct);
return;
}
await _eventStore.AppendAsync(streamName, new InventoryReservedEvent
{
OrderId = @event.OrderId,
ReservationId = reservation.ReservationId,
Items = @event.Items,
ReservedAt = DateTimeOffset.UtcNow
}, ct);
// Step 2: Process payment
_logger.LogInformation("Processing payment for order {OrderId}", @event.OrderId);
var payment = await _payment.ChargeAsync(
@event.OrderId,
@event.TotalAmount,
ct);
if (!payment.Success)
{
// Compensation: Release inventory
await _inventory.ReleaseAsync(reservation.ReservationId, ct);
await _eventStore.AppendAsync(streamName, new OrderPaymentFailedEvent
{
OrderId = @event.OrderId,
Reason = payment.ErrorMessage,
FailedAt = DateTimeOffset.UtcNow
}, ct);
return;
}
await _eventStore.AppendAsync(streamName, new PaymentProcessedEvent
{
OrderId = @event.OrderId,
PaymentId = payment.PaymentId,
Amount = @event.TotalAmount,
ProcessedAt = DateTimeOffset.UtcNow
}, ct);
// Step 3: Create shipment
_logger.LogInformation("Creating shipment for order {OrderId}", @event.OrderId);
var shipment = await _shipping.CreateShipmentAsync(
@event.OrderId,
@event.Items,
ct);
await _eventStore.AppendAsync(streamName, new ShipmentCreatedEvent
{
OrderId = @event.OrderId,
ShipmentId = shipment.ShipmentId,
TrackingNumber = shipment.TrackingNumber,
EstimatedDelivery = shipment.EstimatedDelivery,
CreatedAt = DateTimeOffset.UtcNow
}, ct);
// Final: Mark order as fulfilled
await _eventStore.AppendAsync(streamName, new OrderFulfilledEvent
{
OrderId = @event.OrderId,
FulfilledAt = DateTimeOffset.UtcNow
}, ct);
_logger.LogInformation("Order {OrderId} fulfilled successfully", @event.OrderId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to fulfill order {OrderId}", @event.OrderId);
await _eventStore.AppendAsync(streamName, new OrderFulfillmentFailedEvent
{
OrderId = @event.OrderId,
ErrorMessage = ex.Message,
FailedAt = DateTimeOffset.UtcNow
}, ct);
}
}
}
Best Practices
✅ DO:
- Use past tense for event names (OrderPlaced, PaymentProcessed)
- Include all relevant context in events
- Keep events immutable (use
initproperties) - Version events when schema changes
- Emit events after state changes are validated
- Use workflows to coordinate aggregates
❌ DON'T:
- Use present tense (PlaceOrder, ProcessPayment)
- Include minimal data in events
- Make events mutable
- Change event schema without versioning
- Emit events before validation
- Put coordination logic in aggregates
Next Steps
- 04-projections.md - Build read models from events
- 05-snapshots.md - Optimize with snapshots
- 06-replay-and-rebuild.md - Replay and rebuild projections