dotnet-cqrs/docs/event-streaming/fundamentals/events-and-workflows.md

462 lines
13 KiB
Markdown

# Events and Workflows
Designing events and implementing workflow patterns.
## Overview
Events are immutable messages that describe facts that have occurred. The workflow pattern allows command handlers to publish domain events, which are then processed by projections, sagas, and other subscribers.
**Key Concepts:**
-**Events are facts** - Describe what happened (past tense)
-**Events are immutable** - Cannot be changed after creation
-**Events are self-contained** - Include all necessary data
-**Events enable reactions** - Trigger downstream processing
-**Events provide audit trail** - Complete history of changes
## Event Design
### Naming Conventions
Use past tense to describe what happened:
```csharp
// ✅ Good - Past tense
public record UserRegisteredEvent { }
public record OrderPlacedEvent { }
public record PaymentProcessedEvent { }
public record InventoryReducedEvent { }
// ❌ Bad - Present tense or imperative
public record UserRegisterEvent { } // Present tense
public record PlaceOrderEvent { } // Imperative
public record ProcessPayment { } // Command, not event
```
### Event Structure
Include all data needed to process the event:
```csharp
public record OrderPlacedEvent
{
// Identity
public string EventId { get; init; } = Guid.NewGuid().ToString();
public int OrderId { get; init; }
// Business data
public int CustomerId { get; init; }
public string CustomerName { get; init; } = string.Empty;
public string CustomerEmail { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
public List<OrderLineItem> Items { get; init; } = new();
public string ShippingAddress { get; init; } = string.Empty;
public string PaymentMethod { get; init; } = string.Empty;
// Metadata
public DateTimeOffset PlacedAt { get; init; }
public string? CorrelationId { get; init; }
public string? CausationId { get; init; }
public int Version { get; init; } = 1;
}
public record OrderLineItem
{
public int ProductId { get; init; }
public string ProductName { get; init; } = string.Empty;
public int Quantity { get; init; }
public decimal UnitPrice { get; init; }
public decimal LineTotal { get; init; }
}
```
### Domain Events vs Integration Events
**Domain Events:**
- Internal to bounded context
- Rich domain language
- May contain domain objects
```csharp
public record ProductInventoryReducedEvent
{
public int ProductId { get; init; }
public int QuantityReduced { get; init; }
public int NewStockLevel { get; init; }
public string Reason { get; init; } = string.Empty;
}
```
**Integration Events:**
- Cross bounded context
- Simple DTOs
- Technology agnostic
```csharp
public record OrderPlacedIntegrationEvent
{
public int OrderId { get; init; }
public int CustomerId { get; init; }
public decimal TotalAmount { get; init; }
public DateTimeOffset PlacedAt { get; init; }
}
```
## Workflow Pattern
### Command → Events → Projections
```
┌────────────┐ ┌──────────────┐ ┌─────────────┐
│ Command │ ───▶ │ Handler │ ───▶ │ Events │
└────────────┘ └──────────────┘ └─────────────┘
│ │
▼ ▼
┌──────────────┐ ┌─────────────┐
│ Write Model │ │ Projections │
└──────────────┘ └─────────────┘
```
### Implementing Workflows
**1. Define Domain Events:**
```csharp
public record UserRegisteredEvent
{
public int UserId { get; init; }
public string Email { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
public DateTimeOffset RegisteredAt { get; init; }
}
public record WelcomeEmailSentEvent
{
public int UserId { get; init; }
public string Email { get; init; } = string.Empty;
public DateTimeOffset SentAt { get; init; }
}
```
**2. Command Handler Publishes Events:**
```csharp
public class RegisterUserCommandHandler : ICommandHandler<RegisterUserCommand, int>,
IWorkflow
{
private readonly IUserRepository _repository;
private readonly IEventStreamStore _eventStore;
private readonly List<object> _events = new();
public IReadOnlyList<object> Events => _events;
public async Task<int> HandleAsync(RegisterUserCommand command, CancellationToken ct)
{
// Create user
var user = new User
{
Email = command.Email,
Name = command.Name,
CreatedAt = DateTimeOffset.UtcNow
};
await _repository.AddAsync(user);
// Publish domain event
var @event = new UserRegisteredEvent
{
UserId = user.Id,
Email = user.Email,
Name = user.Name,
RegisteredAt = user.CreatedAt
};
_events.Add(@event);
// Events will be published by workflow dispatcher
return user.Id;
}
}
```
**3. Workflow Dispatcher Publishes Events:**
```csharp
public class WorkflowDispatcher
{
private readonly IEventStreamStore _eventStore;
public async Task<TResult> ExecuteAsync<TCommand, TResult>(
ICommandHandler<TCommand, TResult> handler,
TCommand command,
CancellationToken ct)
{
// Execute command
var result = await handler.HandleAsync(command, ct);
// If handler implements IWorkflow, publish events
if (handler is IWorkflow workflow && workflow.Events.Any())
{
await _eventStore.AppendAsync("domain-events", workflow.Events.ToArray());
}
return result;
}
}
```
**4. Event Subscribers React:**
```csharp
// Projection updates read model
public class UserProjection
{
public async Task HandleAsync(UserRegisteredEvent @event, CancellationToken ct)
{
await _readRepository.AddUserSummaryAsync(new UserSummary
{
UserId = @event.UserId,
Email = @event.Email,
Name = @event.Name,
RegisteredAt = @event.RegisteredAt
});
}
}
// Saga sends welcome email
public class UserOnboardingSaga
{
public async Task HandleAsync(UserRegisteredEvent @event, CancellationToken ct)
{
// Send welcome email
await _emailService.SendWelcomeEmailAsync(@event.Email, @event.Name);
// Publish follow-up event
await _eventStore.AppendAsync("domain-events", new[]
{
new WelcomeEmailSentEvent
{
UserId = @event.UserId,
Email = @event.Email,
SentAt = DateTimeOffset.UtcNow
}
});
}
}
```
## Event Correlation
### Correlation and Causation IDs
Track related events across workflows:
```csharp
public record OrderPlacedEvent
{
public int OrderId { get; init; }
// Links all events in same business transaction
public string CorrelationId { get; init; } = string.Empty;
// Links this event to the command/event that caused it
public string CausationId { get; init; } = string.Empty;
}
// Usage in handler
public class PlaceOrderCommandHandler : ICommandHandler<PlaceOrderCommand, int>
{
public async Task<int> HandleAsync(PlaceOrderCommand command, CancellationToken ct)
{
var orderId = GenerateOrderId();
var correlationId = command.CorrelationId ?? Guid.NewGuid().ToString();
await _eventStore.AppendAsync("orders", new[]
{
new OrderPlacedEvent
{
OrderId = orderId,
CorrelationId = correlationId, // Same for all related events
CausationId = command.CommandId // This command caused this event
}
});
return orderId;
}
}
// Downstream saga maintains correlation
public class OrderFulfillmentSaga
{
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
// Reserve inventory
await _inventoryService.ReserveAsync(@event.OrderId, @event.Items);
// Publish event with same correlation ID
await _eventStore.AppendAsync("orders", new[]
{
new InventoryReservedEvent
{
OrderId = @event.OrderId,
CorrelationId = @event.CorrelationId, // Same correlation
CausationId = @event.EventId // OrderPlaced caused this
}
});
}
}
```
### Using Correlation Context
Automatic correlation propagation:
```csharp
using Svrnty.CQRS.Events.Logging;
// Set correlation context
using (CorrelationContext.Begin(correlationId))
{
// All events published within this scope inherit correlation ID
await _eventStore.AppendAsync("orders", new[]
{
new OrderPlacedEvent
{
OrderId = orderId,
CorrelationId = CorrelationContext.Current // Automatically set
}
});
// Logs also include correlation ID
_logger.LogInformation("Order placed: {OrderId}", orderId);
}
```
## Event Versioning
Plan for schema evolution:
### Version 1
```csharp
public record UserRegisteredEventV1
{
public int Version { get; init; } = 1;
public int UserId { get; init; }
public string Email { get; init; } = string.Empty;
public string Name { get; init; } = string.Empty;
}
```
### Version 2 (Add Field)
```csharp
public record UserRegisteredEventV2
{
public int Version { get; init; } = 2;
public int UserId { get; init; }
public string Email { get; init; } = string.Empty;
public string FirstName { get; init; } = string.Empty; // New
public string LastName { get; init; } = string.Empty; // New
public string PhoneNumber { get; init; } = string.Empty; // New
}
```
### Upcasting
Convert old events to new schema:
```csharp
public class UserRegisteredEventUpcaster
{
public object Upcast(StoredEvent storedEvent)
{
var version = GetVersion(storedEvent);
return version switch
{
1 => UpcastV1ToV2(storedEvent),
2 => JsonSerializer.Deserialize<UserRegisteredEventV2>(storedEvent.Data),
_ => throw new NotSupportedException($"Version {version} not supported")
};
}
private UserRegisteredEventV2 UpcastV1ToV2(StoredEvent storedEvent)
{
var v1 = JsonSerializer.Deserialize<UserRegisteredEventV1>(storedEvent.Data);
// Split name into first/last
var nameParts = v1.Name.Split(' ', 2);
return new UserRegisteredEventV2
{
Version = 2,
UserId = v1.UserId,
Email = v1.Email,
FirstName = nameParts.Length > 0 ? nameParts[0] : string.Empty,
LastName = nameParts.Length > 1 ? nameParts[1] : string.Empty,
PhoneNumber = string.Empty // Not available in V1
};
}
}
```
## Idempotency
Events may be processed multiple times - handlers must be idempotent:
```csharp
public class OrderSummaryProjection
{
private readonly IOrderSummaryRepository _repository;
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
// Check if already processed (idempotency)
var existing = await _repository.FindByEventIdAsync(@event.EventId);
if (existing != null)
{
_logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId);
return;
}
// Create summary
var summary = new OrderSummary
{
OrderId = @event.OrderId,
CustomerName = @event.CustomerName,
TotalAmount = @event.TotalAmount,
ProcessedEventId = @event.EventId // Track which event was processed
};
await _repository.AddAsync(summary);
}
}
```
## Best Practices
### ✅ DO
- Use past tense for event names
- Include all data needed to process event
- Add correlation and causation IDs
- Version events from the start
- Design for idempotent handlers
- Use small, focused events
- Document event schemas
### ❌ DON'T
- Don't use imperative or present tense names
- Don't modify events after publishing
- Don't include behavior in events (only data)
- Don't couple events to specific subscribers
- Don't store large binary data in events
- Don't break existing event schemas
- Don't assume events processed exactly once
## See Also
- [Getting Started](getting-started.md)
- [Persistent Streams](persistent-streams.md)
- [Projections](../projections/README.md)
- [Sagas](../sagas/README.md)
- [Event Sourcing Tutorial](../../tutorials/event-sourcing/README.md)