# Events and Workflows Designing events and implementing workflow patterns. ## Overview Events are immutable messages that describe facts that have occurred. The workflow pattern allows command handlers to publish domain events, which are then processed by projections, sagas, and other subscribers. **Key Concepts:** - ✅ **Events are facts** - Describe what happened (past tense) - ✅ **Events are immutable** - Cannot be changed after creation - ✅ **Events are self-contained** - Include all necessary data - ✅ **Events enable reactions** - Trigger downstream processing - ✅ **Events provide audit trail** - Complete history of changes ## Event Design ### Naming Conventions Use past tense to describe what happened: ```csharp // ✅ Good - Past tense public record UserRegisteredEvent { } public record OrderPlacedEvent { } public record PaymentProcessedEvent { } public record InventoryReducedEvent { } // ❌ Bad - Present tense or imperative public record UserRegisterEvent { } // Present tense public record PlaceOrderEvent { } // Imperative public record ProcessPayment { } // Command, not event ``` ### Event Structure Include all data needed to process the event: ```csharp public record OrderPlacedEvent { // Identity public string EventId { get; init; } = Guid.NewGuid().ToString(); public int OrderId { get; init; } // Business data public int CustomerId { get; init; } public string CustomerName { get; init; } = string.Empty; public string CustomerEmail { get; init; } = string.Empty; public decimal TotalAmount { get; init; } public List Items { get; init; } = new(); public string ShippingAddress { get; init; } = string.Empty; public string PaymentMethod { get; init; } = string.Empty; // Metadata public DateTimeOffset PlacedAt { get; init; } public string? CorrelationId { get; init; } public string? CausationId { get; init; } public int Version { get; init; } = 1; } public record OrderLineItem { public int ProductId { get; init; } public string ProductName { get; init; } = string.Empty; public int Quantity { get; init; } public decimal UnitPrice { get; init; } public decimal LineTotal { get; init; } } ``` ### Domain Events vs Integration Events **Domain Events:** - Internal to bounded context - Rich domain language - May contain domain objects ```csharp public record ProductInventoryReducedEvent { public int ProductId { get; init; } public int QuantityReduced { get; init; } public int NewStockLevel { get; init; } public string Reason { get; init; } = string.Empty; } ``` **Integration Events:** - Cross bounded context - Simple DTOs - Technology agnostic ```csharp public record OrderPlacedIntegrationEvent { public int OrderId { get; init; } public int CustomerId { get; init; } public decimal TotalAmount { get; init; } public DateTimeOffset PlacedAt { get; init; } } ``` ## Workflow Pattern ### Command → Events → Projections ``` ┌────────────┐ ┌──────────────┐ ┌─────────────┐ │ Command │ ───▶ │ Handler │ ───▶ │ Events │ └────────────┘ └──────────────┘ └─────────────┘ │ │ ▼ ▼ ┌──────────────┐ ┌─────────────┐ │ Write Model │ │ Projections │ └──────────────┘ └─────────────┘ ``` ### Implementing Workflows **1. Define Domain Events:** ```csharp public record UserRegisteredEvent { public int UserId { get; init; } public string Email { get; init; } = string.Empty; public string Name { get; init; } = string.Empty; public DateTimeOffset RegisteredAt { get; init; } } public record WelcomeEmailSentEvent { public int UserId { get; init; } public string Email { get; init; } = string.Empty; public DateTimeOffset SentAt { get; init; } } ``` **2. Command Handler Publishes Events:** ```csharp public class RegisterUserCommandHandler : ICommandHandler, IWorkflow { private readonly IUserRepository _repository; private readonly IEventStreamStore _eventStore; private readonly List _events = new(); public IReadOnlyList Events => _events; public async Task HandleAsync(RegisterUserCommand command, CancellationToken ct) { // Create user var user = new User { Email = command.Email, Name = command.Name, CreatedAt = DateTimeOffset.UtcNow }; await _repository.AddAsync(user); // Publish domain event var @event = new UserRegisteredEvent { UserId = user.Id, Email = user.Email, Name = user.Name, RegisteredAt = user.CreatedAt }; _events.Add(@event); // Events will be published by workflow dispatcher return user.Id; } } ``` **3. Workflow Dispatcher Publishes Events:** ```csharp public class WorkflowDispatcher { private readonly IEventStreamStore _eventStore; public async Task ExecuteAsync( ICommandHandler handler, TCommand command, CancellationToken ct) { // Execute command var result = await handler.HandleAsync(command, ct); // If handler implements IWorkflow, publish events if (handler is IWorkflow workflow && workflow.Events.Any()) { await _eventStore.AppendAsync("domain-events", workflow.Events.ToArray()); } return result; } } ``` **4. Event Subscribers React:** ```csharp // Projection updates read model public class UserProjection { public async Task HandleAsync(UserRegisteredEvent @event, CancellationToken ct) { await _readRepository.AddUserSummaryAsync(new UserSummary { UserId = @event.UserId, Email = @event.Email, Name = @event.Name, RegisteredAt = @event.RegisteredAt }); } } // Saga sends welcome email public class UserOnboardingSaga { public async Task HandleAsync(UserRegisteredEvent @event, CancellationToken ct) { // Send welcome email await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name); // Publish follow-up event await _eventStore.AppendAsync("domain-events", new[] { new WelcomeEmailSentEvent { UserId = @event.UserId, Email = @event.Email, SentAt = DateTimeOffset.UtcNow } }); } } ``` ## Event Correlation ### Correlation and Causation IDs Track related events across workflows: ```csharp public record OrderPlacedEvent { public int OrderId { get; init; } // Links all events in same business transaction public string CorrelationId { get; init; } = string.Empty; // Links this event to the command/event that caused it public string CausationId { get; init; } = string.Empty; } // Usage in handler public class PlaceOrderCommandHandler : ICommandHandler { public async Task HandleAsync(PlaceOrderCommand command, CancellationToken ct) { var orderId = GenerateOrderId(); var correlationId = command.CorrelationId ?? Guid.NewGuid().ToString(); await _eventStore.AppendAsync("orders", new[] { new OrderPlacedEvent { OrderId = orderId, CorrelationId = correlationId, // Same for all related events CausationId = command.CommandId // This command caused this event } }); return orderId; } } // Downstream saga maintains correlation public class OrderFulfillmentSaga { public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct) { // Reserve inventory await _inventoryService.ReserveAsync(@event.OrderId, @event.Items); // Publish event with same correlation ID await _eventStore.AppendAsync("orders", new[] { new InventoryReservedEvent { OrderId = @event.OrderId, CorrelationId = @event.CorrelationId, // Same correlation CausationId = @event.EventId // OrderPlaced caused this } }); } } ``` ### Using Correlation Context Automatic correlation propagation: ```csharp using Svrnty.CQRS.Events.Logging; // Set correlation context using (CorrelationContext.Begin(correlationId)) { // All events published within this scope inherit correlation ID await _eventStore.AppendAsync("orders", new[] { new OrderPlacedEvent { OrderId = orderId, CorrelationId = CorrelationContext.Current // Automatically set } }); // Logs also include correlation ID _logger.LogInformation("Order placed: {OrderId}", orderId); } ``` ## Event Versioning Plan for schema evolution: ### Version 1 ```csharp public record UserRegisteredEventV1 { public int Version { get; init; } = 1; public int UserId { get; init; } public string Email { get; init; } = string.Empty; public string Name { get; init; } = string.Empty; } ``` ### Version 2 (Add Field) ```csharp public record UserRegisteredEventV2 { public int Version { get; init; } = 2; public int UserId { get; init; } public string Email { get; init; } = string.Empty; public string FirstName { get; init; } = string.Empty; // New public string LastName { get; init; } = string.Empty; // New public string PhoneNumber { get; init; } = string.Empty; // New } ``` ### Upcasting Convert old events to new schema: ```csharp public class UserRegisteredEventUpcaster { public object Upcast(StoredEvent storedEvent) { var version = GetVersion(storedEvent); return version switch { 1 => UpcastV1ToV2(storedEvent), 2 => JsonSerializer.Deserialize(storedEvent.Data), _ => throw new NotSupportedException($"Version {version} not supported") }; } private UserRegisteredEventV2 UpcastV1ToV2(StoredEvent storedEvent) { var v1 = JsonSerializer.Deserialize(storedEvent.Data); // Split name into first/last var nameParts = v1.Name.Split(' ', 2); return new UserRegisteredEventV2 { Version = 2, UserId = v1.UserId, Email = v1.Email, FirstName = nameParts.Length > 0 ? nameParts[0] : string.Empty, LastName = nameParts.Length > 1 ? nameParts[1] : string.Empty, PhoneNumber = string.Empty // Not available in V1 }; } } ``` ## Idempotency Events may be processed multiple times - handlers must be idempotent: ```csharp public class OrderSummaryProjection { private readonly IOrderSummaryRepository _repository; public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct) { // Check if already processed (idempotency) var existing = await _repository.FindByEventIdAsync(@event.EventId); if (existing != null) { _logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId); return; } // Create summary var summary = new OrderSummary { OrderId = @event.OrderId, CustomerName = @event.CustomerName, TotalAmount = @event.TotalAmount, ProcessedEventId = @event.EventId // Track which event was processed }; await _repository.AddAsync(summary); } } ``` ## Best Practices ### ✅ DO - Use past tense for event names - Include all data needed to process event - Add correlation and causation IDs - Version events from the start - Design for idempotent handlers - Use small, focused events - Document event schemas ### ❌ DON'T - Don't use imperative or present tense names - Don't modify events after publishing - Don't include behavior in events (only data) - Don't couple events to specific subscribers - Don't store large binary data in events - Don't break existing event schemas - Don't assume events processed exactly once ## See Also - [Getting Started](getting-started.md) - [Persistent Streams](persistent-streams.md) - [Projections](../projections/README.md) - [Sagas](../sagas/README.md) - [Event Sourcing Tutorial](../../tutorials/event-sourcing/README.md)