462 lines
13 KiB
Markdown
462 lines
13 KiB
Markdown
# Events and Workflows
|
|
|
|
Designing events and implementing workflow patterns.
|
|
|
|
## Overview
|
|
|
|
Events are immutable messages that describe facts that have occurred. The workflow pattern allows command handlers to publish domain events, which are then processed by projections, sagas, and other subscribers.
|
|
|
|
**Key Concepts:**
|
|
|
|
- ✅ **Events are facts** - Describe what happened (past tense)
|
|
- ✅ **Events are immutable** - Cannot be changed after creation
|
|
- ✅ **Events are self-contained** - Include all necessary data
|
|
- ✅ **Events enable reactions** - Trigger downstream processing
|
|
- ✅ **Events provide audit trail** - Complete history of changes
|
|
|
|
## Event Design
|
|
|
|
### Naming Conventions
|
|
|
|
Use past tense to describe what happened:
|
|
|
|
```csharp
|
|
// ✅ Good - Past tense
|
|
public record UserRegisteredEvent { }
|
|
public record OrderPlacedEvent { }
|
|
public record PaymentProcessedEvent { }
|
|
public record InventoryReducedEvent { }
|
|
|
|
// ❌ Bad - Present tense or imperative
|
|
public record UserRegisterEvent { } // Present tense
|
|
public record PlaceOrderEvent { } // Imperative
|
|
public record ProcessPayment { } // Command, not event
|
|
```
|
|
|
|
### Event Structure
|
|
|
|
Include all data needed to process the event:
|
|
|
|
```csharp
|
|
public record OrderPlacedEvent
|
|
{
|
|
// Identity
|
|
public string EventId { get; init; } = Guid.NewGuid().ToString();
|
|
public int OrderId { get; init; }
|
|
|
|
// Business data
|
|
public int CustomerId { get; init; }
|
|
public string CustomerName { get; init; } = string.Empty;
|
|
public string CustomerEmail { get; init; } = string.Empty;
|
|
public decimal TotalAmount { get; init; }
|
|
public List<OrderLineItem> Items { get; init; } = new();
|
|
public string ShippingAddress { get; init; } = string.Empty;
|
|
public string PaymentMethod { get; init; } = string.Empty;
|
|
|
|
// Metadata
|
|
public DateTimeOffset PlacedAt { get; init; }
|
|
public string? CorrelationId { get; init; }
|
|
public string? CausationId { get; init; }
|
|
public int Version { get; init; } = 1;
|
|
}
|
|
|
|
public record OrderLineItem
|
|
{
|
|
public int ProductId { get; init; }
|
|
public string ProductName { get; init; } = string.Empty;
|
|
public int Quantity { get; init; }
|
|
public decimal UnitPrice { get; init; }
|
|
public decimal LineTotal { get; init; }
|
|
}
|
|
```
|
|
|
|
### Domain Events vs Integration Events
|
|
|
|
**Domain Events:**
|
|
- Internal to bounded context
|
|
- Rich domain language
|
|
- May contain domain objects
|
|
|
|
```csharp
|
|
public record ProductInventoryReducedEvent
|
|
{
|
|
public int ProductId { get; init; }
|
|
public int QuantityReduced { get; init; }
|
|
public int NewStockLevel { get; init; }
|
|
public string Reason { get; init; } = string.Empty;
|
|
}
|
|
```
|
|
|
|
**Integration Events:**
|
|
- Cross bounded context
|
|
- Simple DTOs
|
|
- Technology agnostic
|
|
|
|
```csharp
|
|
public record OrderPlacedIntegrationEvent
|
|
{
|
|
public int OrderId { get; init; }
|
|
public int CustomerId { get; init; }
|
|
public decimal TotalAmount { get; init; }
|
|
public DateTimeOffset PlacedAt { get; init; }
|
|
}
|
|
```
|
|
|
|
## Workflow Pattern
|
|
|
|
### Command → Events → Projections
|
|
|
|
```
|
|
┌────────────┐ ┌──────────────┐ ┌─────────────┐
|
|
│ Command │ ───▶ │ Handler │ ───▶ │ Events │
|
|
└────────────┘ └──────────────┘ └─────────────┘
|
|
│ │
|
|
▼ ▼
|
|
┌──────────────┐ ┌─────────────┐
|
|
│ Write Model │ │ Projections │
|
|
└──────────────┘ └─────────────┘
|
|
```
|
|
|
|
### Implementing Workflows
|
|
|
|
**1. Define Domain Events:**
|
|
```csharp
|
|
public record UserRegisteredEvent
|
|
{
|
|
public int UserId { get; init; }
|
|
public string Email { get; init; } = string.Empty;
|
|
public string Name { get; init; } = string.Empty;
|
|
public DateTimeOffset RegisteredAt { get; init; }
|
|
}
|
|
|
|
public record WelcomeEmailSentEvent
|
|
{
|
|
public int UserId { get; init; }
|
|
public string Email { get; init; } = string.Empty;
|
|
public DateTimeOffset SentAt { get; init; }
|
|
}
|
|
```
|
|
|
|
**2. Command Handler Publishes Events:**
|
|
```csharp
|
|
public class RegisterUserCommandHandler : ICommandHandler<RegisterUserCommand, int>,
|
|
IWorkflow
|
|
{
|
|
private readonly IUserRepository _repository;
|
|
private readonly IEventStreamStore _eventStore;
|
|
private readonly List<object> _events = new();
|
|
|
|
public IReadOnlyList<object> Events => _events;
|
|
|
|
public async Task<int> HandleAsync(RegisterUserCommand command, CancellationToken ct)
|
|
{
|
|
// Create user
|
|
var user = new User
|
|
{
|
|
Email = command.Email,
|
|
Name = command.Name,
|
|
CreatedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await _repository.AddAsync(user);
|
|
|
|
// Publish domain event
|
|
var @event = new UserRegisteredEvent
|
|
{
|
|
UserId = user.Id,
|
|
Email = user.Email,
|
|
Name = user.Name,
|
|
RegisteredAt = user.CreatedAt
|
|
};
|
|
|
|
_events.Add(@event);
|
|
|
|
// Events will be published by workflow dispatcher
|
|
return user.Id;
|
|
}
|
|
}
|
|
```
|
|
|
|
**3. Workflow Dispatcher Publishes Events:**
|
|
```csharp
|
|
public class WorkflowDispatcher
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public async Task<TResult> ExecuteAsync<TCommand, TResult>(
|
|
ICommandHandler<TCommand, TResult> handler,
|
|
TCommand command,
|
|
CancellationToken ct)
|
|
{
|
|
// Execute command
|
|
var result = await handler.HandleAsync(command, ct);
|
|
|
|
// If handler implements IWorkflow, publish events
|
|
if (handler is IWorkflow workflow && workflow.Events.Any())
|
|
{
|
|
await _eventStore.AppendAsync("domain-events", workflow.Events.ToArray());
|
|
}
|
|
|
|
return result;
|
|
}
|
|
}
|
|
```
|
|
|
|
**4. Event Subscribers React:**
|
|
```csharp
|
|
// Projection updates read model
|
|
public class UserProjection
|
|
{
|
|
public async Task HandleAsync(UserRegisteredEvent @event, CancellationToken ct)
|
|
{
|
|
await _readRepository.AddUserSummaryAsync(new UserSummary
|
|
{
|
|
UserId = @event.UserId,
|
|
Email = @event.Email,
|
|
Name = @event.Name,
|
|
RegisteredAt = @event.RegisteredAt
|
|
});
|
|
}
|
|
}
|
|
|
|
// Saga sends welcome email
|
|
public class UserOnboardingSaga
|
|
{
|
|
public async Task HandleAsync(UserRegisteredEvent @event, CancellationToken ct)
|
|
{
|
|
// Send welcome email
|
|
await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name);
|
|
|
|
// Publish follow-up event
|
|
await _eventStore.AppendAsync("domain-events", new[]
|
|
{
|
|
new WelcomeEmailSentEvent
|
|
{
|
|
UserId = @event.UserId,
|
|
Email = @event.Email,
|
|
SentAt = DateTimeOffset.UtcNow
|
|
}
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
## Event Correlation
|
|
|
|
### Correlation and Causation IDs
|
|
|
|
Track related events across workflows:
|
|
|
|
```csharp
|
|
public record OrderPlacedEvent
|
|
{
|
|
public int OrderId { get; init; }
|
|
|
|
// Links all events in same business transaction
|
|
public string CorrelationId { get; init; } = string.Empty;
|
|
|
|
// Links this event to the command/event that caused it
|
|
public string CausationId { get; init; } = string.Empty;
|
|
}
|
|
|
|
// Usage in handler
|
|
public class PlaceOrderCommandHandler : ICommandHandler<PlaceOrderCommand, int>
|
|
{
|
|
public async Task<int> HandleAsync(PlaceOrderCommand command, CancellationToken ct)
|
|
{
|
|
var orderId = GenerateOrderId();
|
|
var correlationId = command.CorrelationId ?? Guid.NewGuid().ToString();
|
|
|
|
await _eventStore.AppendAsync("orders", new[]
|
|
{
|
|
new OrderPlacedEvent
|
|
{
|
|
OrderId = orderId,
|
|
CorrelationId = correlationId, // Same for all related events
|
|
CausationId = command.CommandId // This command caused this event
|
|
}
|
|
});
|
|
|
|
return orderId;
|
|
}
|
|
}
|
|
|
|
// Downstream saga maintains correlation
|
|
public class OrderFulfillmentSaga
|
|
{
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
// Reserve inventory
|
|
await _inventoryService.ReserveAsync(@event.OrderId, @event.Items);
|
|
|
|
// Publish event with same correlation ID
|
|
await _eventStore.AppendAsync("orders", new[]
|
|
{
|
|
new InventoryReservedEvent
|
|
{
|
|
OrderId = @event.OrderId,
|
|
CorrelationId = @event.CorrelationId, // Same correlation
|
|
CausationId = @event.EventId // OrderPlaced caused this
|
|
}
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
### Using Correlation Context
|
|
|
|
Automatic correlation propagation:
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events.Logging;
|
|
|
|
// Set correlation context
|
|
using (CorrelationContext.Begin(correlationId))
|
|
{
|
|
// All events published within this scope inherit correlation ID
|
|
await _eventStore.AppendAsync("orders", new[]
|
|
{
|
|
new OrderPlacedEvent
|
|
{
|
|
OrderId = orderId,
|
|
CorrelationId = CorrelationContext.Current // Automatically set
|
|
}
|
|
});
|
|
|
|
// Logs also include correlation ID
|
|
_logger.LogInformation("Order placed: {OrderId}", orderId);
|
|
}
|
|
```
|
|
|
|
## Event Versioning
|
|
|
|
Plan for schema evolution:
|
|
|
|
### Version 1
|
|
|
|
```csharp
|
|
public record UserRegisteredEventV1
|
|
{
|
|
public int Version { get; init; } = 1;
|
|
public int UserId { get; init; }
|
|
public string Email { get; init; } = string.Empty;
|
|
public string Name { get; init; } = string.Empty;
|
|
}
|
|
```
|
|
|
|
### Version 2 (Add Field)
|
|
|
|
```csharp
|
|
public record UserRegisteredEventV2
|
|
{
|
|
public int Version { get; init; } = 2;
|
|
public int UserId { get; init; }
|
|
public string Email { get; init; } = string.Empty;
|
|
public string FirstName { get; init; } = string.Empty; // New
|
|
public string LastName { get; init; } = string.Empty; // New
|
|
public string PhoneNumber { get; init; } = string.Empty; // New
|
|
}
|
|
```
|
|
|
|
### Upcasting
|
|
|
|
Convert old events to new schema:
|
|
|
|
```csharp
|
|
public class UserRegisteredEventUpcaster
|
|
{
|
|
public object Upcast(StoredEvent storedEvent)
|
|
{
|
|
var version = GetVersion(storedEvent);
|
|
|
|
return version switch
|
|
{
|
|
1 => UpcastV1ToV2(storedEvent),
|
|
2 => JsonSerializer.Deserialize<UserRegisteredEventV2>(storedEvent.Data),
|
|
_ => throw new NotSupportedException($"Version {version} not supported")
|
|
};
|
|
}
|
|
|
|
private UserRegisteredEventV2 UpcastV1ToV2(StoredEvent storedEvent)
|
|
{
|
|
var v1 = JsonSerializer.Deserialize<UserRegisteredEventV1>(storedEvent.Data);
|
|
|
|
// Split name into first/last
|
|
var nameParts = v1.Name.Split(' ', 2);
|
|
|
|
return new UserRegisteredEventV2
|
|
{
|
|
Version = 2,
|
|
UserId = v1.UserId,
|
|
Email = v1.Email,
|
|
FirstName = nameParts.Length > 0 ? nameParts[0] : string.Empty,
|
|
LastName = nameParts.Length > 1 ? nameParts[1] : string.Empty,
|
|
PhoneNumber = string.Empty // Not available in V1
|
|
};
|
|
}
|
|
}
|
|
```
|
|
|
|
## Idempotency
|
|
|
|
Events may be processed multiple times - handlers must be idempotent:
|
|
|
|
```csharp
|
|
public class OrderSummaryProjection
|
|
{
|
|
private readonly IOrderSummaryRepository _repository;
|
|
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
// Check if already processed (idempotency)
|
|
var existing = await _repository.FindByEventIdAsync(@event.EventId);
|
|
if (existing != null)
|
|
{
|
|
_logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId);
|
|
return;
|
|
}
|
|
|
|
// Create summary
|
|
var summary = new OrderSummary
|
|
{
|
|
OrderId = @event.OrderId,
|
|
CustomerName = @event.CustomerName,
|
|
TotalAmount = @event.TotalAmount,
|
|
ProcessedEventId = @event.EventId // Track which event was processed
|
|
};
|
|
|
|
await _repository.AddAsync(summary);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use past tense for event names
|
|
- Include all data needed to process event
|
|
- Add correlation and causation IDs
|
|
- Version events from the start
|
|
- Design for idempotent handlers
|
|
- Use small, focused events
|
|
- Document event schemas
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't use imperative or present tense names
|
|
- Don't modify events after publishing
|
|
- Don't include behavior in events (only data)
|
|
- Don't couple events to specific subscribers
|
|
- Don't store large binary data in events
|
|
- Don't break existing event schemas
|
|
- Don't assume events processed exactly once
|
|
|
|
## See Also
|
|
|
|
- [Getting Started](getting-started.md)
|
|
- [Persistent Streams](persistent-streams.md)
|
|
- [Projections](../projections/README.md)
|
|
- [Sagas](../sagas/README.md)
|
|
- [Event Sourcing Tutorial](../../tutorials/event-sourcing/README.md)
|