dotnet-cqrs/docs/tutorials/event-sourcing/03-events-and-workflows.md

13 KiB

Event Design and Workflows

Learn how to design events and implement workflow patterns with Svrnty.CQRS.

Event Design Principles

1. Events are Immutable Facts

Events represent things that have already happened and cannot be changed:

// ✅ Good: Immutable record, past tense, descriptive
public record OrderPlacedEvent
{
    public string OrderId { get; init; } = string.Empty;
    public string CustomerId { get; init; } = string.Empty;
    public decimal TotalAmount { get; init; }
    public List<OrderLineItem> Items { get; init; } = new();
    public DateTimeOffset PlacedAt { get; init; }
}

// ❌ Bad: Mutable class, present tense, vague
public class OrderEvent
{
    public string Id { get; set; }
    public string Data { get; set; }  // Not descriptive
}

2. Include All Relevant Data

Events should contain all information needed to understand what happened:

// ✅ Good: Complete information
public record ProductPriceChangedEvent
{
    public string ProductId { get; init; } = string.Empty;
    public string ProductName { get; init; } = string.Empty;  // Context
    public decimal OldPrice { get; init; }  // Previous state
    public decimal NewPrice { get; init; }  // New state
    public string ChangedBy { get; init; } = string.Empty;  // Who
    public string Reason { get; init; } = string.Empty;  // Why
    public DateTimeOffset ChangedAt { get; init; }  // When
}

// ❌ Bad: Minimal information
public record PriceChangedEvent
{
    public string ProductId { get; init; } = string.Empty;
    public decimal NewPrice { get; init; }  // Missing context
}

3. Use Business Language

Name events using domain language, not technical terms:

// ✅ Good: Business language
public record OrderShippedEvent { }
public record PaymentReceivedEvent { }
public record CustomerRegisteredEvent { }

// ❌ Bad: Technical language
public record OrderStatusUpdatedEvent { }  // What status?
public record DataChangedEvent { }  // What data?
public record EntityCreatedEvent { }  // What entity?

Event Granularity

Fine-Grained Events

Each event represents a single business fact:

// ✅ Good: Separate events for separate facts
public record UserRegisteredEvent
{
    public string UserId { get; init; } = string.Empty;
    public string Name { get; init; } = string.Empty;
    public string Email { get; init; } = string.Empty;
}

public record UserEmailVerifiedEvent
{
    public string UserId { get; init; } = string.Empty;
    public DateTimeOffset VerifiedAt { get; init; }
}

public record UserProfileCompletedEvent
{
    public string UserId { get; init; } = string.Empty;
    public string PhoneNumber { get; init; } = string.Empty;
    public string Address { get; init; } = string.Empty;
}

// ❌ Bad: Too coarse-grained
public record UserCreatedEvent
{
    public string UserId { get; init; } = string.Empty;
    public bool EmailVerified { get; init; }  // Mixing concerns
    public bool ProfileCompleted { get; init; }  // Mixing concerns
}

Workflow Pattern

Workflows coordinate multiple aggregates using events:

// Aggregate 1: Order produces event
public class Order : AggregateRoot
{
    public void Place(string customerId, List<OrderLine> items)
    {
        // Validate and produce event
        ApplyEvent(new OrderPlacedEvent
        {
            OrderId = Id,
            CustomerId = customerId,
            Items = items,
            TotalAmount = items.Sum(i => i.Price * i.Quantity),
            PlacedAt = DateTimeOffset.UtcNow
        });
    }
}

// Workflow: Listen to events and coordinate
public class OrderWorkflow : IWorkflow<OrderPlacedEvent>
{
    private readonly IEventStreamStore _eventStore;
    private readonly IInventoryService _inventory;
    private readonly IPaymentService _payment;

    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
    {
        // Step 1: Reserve inventory
        var reservationResult = await _inventory.ReserveAsync(@event.OrderId, @event.Items, ct);

        if (reservationResult.Success)
        {
            await _eventStore.AppendAsync(@event.OrderId, new InventoryReservedEvent
            {
                OrderId = @event.OrderId,
                ReservationId = reservationResult.ReservationId,
                Items = @event.Items,
                ReservedAt = DateTimeOffset.UtcNow
            }, ct);

            // Step 2: Process payment
            var paymentResult = await _payment.ChargeAsync(@event.OrderId, @event.TotalAmount, ct);

            if (paymentResult.Success)
            {
                await _eventStore.AppendAsync(@event.OrderId, new PaymentProcessedEvent
                {
                    OrderId = @event.OrderId,
                    PaymentId = paymentResult.PaymentId,
                    Amount = @event.TotalAmount,
                    ProcessedAt = DateTimeOffset.UtcNow
                }, ct);
            }
            else
            {
                // Compensation: Release inventory
                await _inventory.ReleaseAsync(reservationResult.ReservationId, ct);

                await _eventStore.AppendAsync(@event.OrderId, new OrderPaymentFailedEvent
                {
                    OrderId = @event.OrderId,
                    Reason = paymentResult.ErrorMessage,
                    FailedAt = DateTimeOffset.UtcNow
                }, ct);
            }
        }
        else
        {
            await _eventStore.AppendAsync(@event.OrderId, new OrderInventoryUnavailableEvent
            {
                OrderId = @event.OrderId,
                UnavailableItems = reservationResult.UnavailableItems,
                NotifiedAt = DateTimeOffset.UtcNow
            }, ct);
        }
    }
}

Event Registration

Register workflows to listen to events:

// In Program.cs
builder.Services.AddEventStreaming()
    .AddPostgresEventStore(builder.Configuration.GetConnectionString("EventStore"));

// Register workflow
builder.Services.AddWorkflow<OrderPlacedEvent, OrderWorkflow>();

var app = builder.Build();

// Subscribe workflow to event stream
var subscription = app.Services.GetRequiredService<IEventSubscriptionService>();

await subscription.SubscribeAsync(
    streamName: "orders",
    subscriptionId: "order-workflow",
    mode: SubscriptionMode.Broadcast,
    async (StoredEvent storedEvent, CancellationToken ct) =>
    {
        if (storedEvent.Data is OrderPlacedEvent orderPlaced)
        {
            var workflow = app.Services.GetRequiredService<IWorkflow<OrderPlacedEvent>>();
            await workflow.HandleAsync(orderPlaced, ct);
        }
    });

Event Versioning

Events evolve over time. Use versioning to handle schema changes:

// Version 1
public record UserRegisteredEventV1
{
    public string UserId { get; init; } = string.Empty;
    public string Name { get; init; } = string.Empty;
}

// Version 2: Added email field
public record UserRegisteredEventV2
{
    public string UserId { get; init; } = string.Empty;
    public string Name { get; init; } = string.Empty;
    public string Email { get; init; } = string.Empty;
}

// Upcaster: Convert V1 to V2
public class UserRegisteredEventUpcaster : IEventUpcaster<UserRegisteredEventV1, UserRegisteredEventV2>
{
    public UserRegisteredEventV2 Upcast(UserRegisteredEventV1 oldEvent)
    {
        return new UserRegisteredEventV2
        {
            UserId = oldEvent.UserId,
            Name = oldEvent.Name,
            Email = "unknown@example.com"  // Default for old events
        };
    }
}

// When reading events
await foreach (var storedEvent in eventStore.ReadStreamAsync("user-123"))
{
    var @event = storedEvent.Data;

    // Upcast if needed
    if (@event is UserRegisteredEventV1 v1)
    {
        @event = _upcaster.Upcast(v1);
    }

    // Now work with V2
    if (@event is UserRegisteredEventV2 v2)
    {
        // Process...
    }
}

Complete Workflow Example

Here's a complete order fulfillment workflow:

public class OrderFulfillmentWorkflow : IWorkflow<OrderPlacedEvent>
{
    private readonly IEventStreamStore _eventStore;
    private readonly IInventoryService _inventory;
    private readonly IPaymentService _payment;
    private readonly IShippingService _shipping;
    private readonly ILogger<OrderFulfillmentWorkflow> _logger;

    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
    {
        var streamName = $"order-{@event.OrderId}";

        try
        {
            // Step 1: Reserve inventory
            _logger.LogInformation("Reserving inventory for order {OrderId}", @event.OrderId);

            var reservation = await _inventory.ReserveAsync(
                @event.OrderId,
                @event.Items.Select(i => new InventoryItem
                {
                    ProductId = i.ProductId,
                    Quantity = i.Quantity
                }).ToList(),
                ct);

            if (!reservation.Success)
            {
                await _eventStore.AppendAsync(streamName, new OrderInventoryUnavailableEvent
                {
                    OrderId = @event.OrderId,
                    UnavailableItems = reservation.UnavailableItems,
                    NotifiedAt = DateTimeOffset.UtcNow
                }, ct);
                return;
            }

            await _eventStore.AppendAsync(streamName, new InventoryReservedEvent
            {
                OrderId = @event.OrderId,
                ReservationId = reservation.ReservationId,
                Items = @event.Items,
                ReservedAt = DateTimeOffset.UtcNow
            }, ct);

            // Step 2: Process payment
            _logger.LogInformation("Processing payment for order {OrderId}", @event.OrderId);

            var payment = await _payment.ChargeAsync(
                @event.OrderId,
                @event.TotalAmount,
                ct);

            if (!payment.Success)
            {
                // Compensation: Release inventory
                await _inventory.ReleaseAsync(reservation.ReservationId, ct);

                await _eventStore.AppendAsync(streamName, new OrderPaymentFailedEvent
                {
                    OrderId = @event.OrderId,
                    Reason = payment.ErrorMessage,
                    FailedAt = DateTimeOffset.UtcNow
                }, ct);
                return;
            }

            await _eventStore.AppendAsync(streamName, new PaymentProcessedEvent
            {
                OrderId = @event.OrderId,
                PaymentId = payment.PaymentId,
                Amount = @event.TotalAmount,
                ProcessedAt = DateTimeOffset.UtcNow
            }, ct);

            // Step 3: Create shipment
            _logger.LogInformation("Creating shipment for order {OrderId}", @event.OrderId);

            var shipment = await _shipping.CreateShipmentAsync(
                @event.OrderId,
                @event.Items,
                ct);

            await _eventStore.AppendAsync(streamName, new ShipmentCreatedEvent
            {
                OrderId = @event.OrderId,
                ShipmentId = shipment.ShipmentId,
                TrackingNumber = shipment.TrackingNumber,
                EstimatedDelivery = shipment.EstimatedDelivery,
                CreatedAt = DateTimeOffset.UtcNow
            }, ct);

            // Final: Mark order as fulfilled
            await _eventStore.AppendAsync(streamName, new OrderFulfilledEvent
            {
                OrderId = @event.OrderId,
                FulfilledAt = DateTimeOffset.UtcNow
            }, ct);

            _logger.LogInformation("Order {OrderId} fulfilled successfully", @event.OrderId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to fulfill order {OrderId}", @event.OrderId);

            await _eventStore.AppendAsync(streamName, new OrderFulfillmentFailedEvent
            {
                OrderId = @event.OrderId,
                ErrorMessage = ex.Message,
                FailedAt = DateTimeOffset.UtcNow
            }, ct);
        }
    }
}

Best Practices

DO:

  • Use past tense for event names (OrderPlaced, PaymentProcessed)
  • Include all relevant context in events
  • Keep events immutable (use init properties)
  • Version events when schema changes
  • Emit events after state changes are validated
  • Use workflows to coordinate aggregates

DON'T:

  • Use present tense (PlaceOrder, ProcessPayment)
  • Include minimal data in events
  • Make events mutable
  • Change event schema without versioning
  • Emit events before validation
  • Put coordination logic in aggregates

Next Steps

See Also