dotnet-cqrs/docs/event-streaming
2025-12-11 01:18:24 -05:00
..
consumer-groups this is a mess 2025-12-11 01:18:24 -05:00
event-replay this is a mess 2025-12-11 01:18:24 -05:00
fundamentals this is a mess 2025-12-11 01:18:24 -05:00
grpc-streaming this is a mess 2025-12-11 01:18:24 -05:00
projections this is a mess 2025-12-11 01:18:24 -05:00
retention-policies this is a mess 2025-12-11 01:18:24 -05:00
sagas this is a mess 2025-12-11 01:18:24 -05:00
storage this is a mess 2025-12-11 01:18:24 -05:00
stream-configuration this is a mess 2025-12-11 01:18:24 -05:00
README.md this is a mess 2025-12-11 01:18:24 -05:00

Event Streaming

Comprehensive event streaming support with event sourcing, message queues, consumer groups, and observability.

Overview

Svrnty.CQRS provides production-ready event streaming capabilities for building event-driven architectures. The framework supports both persistent streams (event sourcing) and ephemeral streams (message queues), with advanced features like consumer groups, retention policies, event replay, and comprehensive monitoring.

Key Features:

  • Persistent Streams - Event sourcing with append-only logs
  • Ephemeral Streams - Message queue semantics with at-least-once delivery
  • Consumer Groups - Coordinated consumption with automatic load balancing
  • Retention Policies - Automatic cleanup based on age or size
  • Event Replay - Rebuild projections and reprocess historical events
  • Stream Configuration - Per-stream settings for retention, DLQ, lifecycle
  • Projections - Read models from event streams
  • Sagas - Long-running workflows with compensation logic
  • gRPC Streaming - Real-time bidirectional event delivery
  • PostgreSQL Storage - Production-ready persistent storage
  • Health Checks - Monitor consumer lag and stream health
  • Metrics - OpenTelemetry-compatible telemetry
  • Management API - REST endpoints for operations

Quick Start

Basic Event Streaming

var builder = WebApplication.CreateBuilder(args);

// Register event streaming with PostgreSQL
builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetConnectionString("EventStore"));

var app = builder.Build();
app.Run();

Publishing Events

public record OrderPlacedEvent
{
    public int OrderId { get; init; }
    public string CustomerName { get; init; } = string.Empty;
    public decimal TotalAmount { get; init; }
    public DateTimeOffset PlacedAt { get; init; }
}

// Publish to persistent stream
var store = serviceProvider.GetRequiredService<IEventStreamStore>();

await store.AppendAsync(
    streamName: "orders",
    events: new[] { new OrderPlacedEvent
    {
        OrderId = 123,
        CustomerName = "John Doe",
        TotalAmount = 99.99m,
        PlacedAt = DateTimeOffset.UtcNow
    }});

Consuming Events

// Read from persistent stream
await foreach (var @event in store.ReadStreamAsync("orders", fromOffset: 0))
{
    Console.WriteLine($"Event: {@event.EventType} at offset {@event.Offset}");
}

// Consume with consumer group (automatic offset tracking)
var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();

await foreach (var @event in reader.ConsumeAsync(
    streamName: "orders",
    groupId: "email-notifications",
    consumerId: "worker-1"))
{
    await SendEmailNotificationAsync(@event);
}

Architecture

Stream Types

Persistent Streams:

  • Append-only event log
  • Events stored indefinitely (until retention policy)
  • Offset-based reading
  • Ideal for event sourcing and audit logs

Ephemeral Streams:

  • Message queue semantics
  • Dequeue with visibility timeout
  • At-least-once delivery with ack/nack
  • Ideal for background jobs and notifications

Storage Backends

Backend Use Case Features
PostgreSQL Production Persistent storage, consumer groups, retention policies, event replay
In-Memory Development/Testing Fast, no persistence

Delivery Semantics

Mode Guarantee Use Case
Broadcast At-least-once All consumers receive all events
Queue Exactly-once per group Load-balanced processing

Core Concepts

Streams

Streams are named event channels with configurable properties:

// Stream metadata
public record StreamMetadata
{
    public string Name { get; init; }
    public StreamType Type { get; init; }  // Persistent or Ephemeral
    public DeliverySemantics Semantics { get; init; }
    public StreamScope Scope { get; init; }  // Internal, Public, etc.
}

Events

Events are immutable messages with metadata:

public record StoredEvent
{
    public long Offset { get; init; }          // Sequence number
    public string EventId { get; init; }       // Unique identifier
    public string EventType { get; init; }     // Event class name
    public string StreamName { get; init; }    // Stream name
    public byte[] Data { get; init; }          // JSON payload
    public DateTimeOffset Timestamp { get; init; }
    public string? CorrelationId { get; init; }
}

Consumer Groups

Consumer groups coordinate multiple consumers processing the same stream:

// Consumer group ensures each event processed once per group
await reader.ConsumeAsync(
    streamName: "orders",
    groupId: "order-processing",     // Logical consumer group
    consumerId: "worker-1",          // This worker instance
    options: new ConsumerGroupOptions
    {
        CommitStrategy = OffsetCommitStrategy.AfterBatch,
        BatchSize = 100
    });

Features

Fundamentals

Learn the basics of event streaming:

Storage

Configure storage backends:

Consumer Groups

Coordinate multiple consumers:

Retention Policies

Automatic event cleanup:

Event Replay

Rebuild projections and reprocess events:

Stream Configuration

Per-stream settings:

Projections

Build read models from events:

Sagas

Long-running workflows:

gRPC Streaming

Real-time event delivery via gRPC:

Observability

The framework includes comprehensive monitoring and management features:

Health Checks:

builder.Services.AddStreamHealthChecks();

var healthCheck = serviceProvider.GetRequiredService<IStreamHealthCheck>();
var result = await healthCheck.CheckStreamHealthAsync("orders");

Metrics (OpenTelemetry):

builder.Services.AddEventStreamMetrics();

builder.Services.AddOpenTelemetry()
    .WithMetrics(metrics => metrics
        .AddMeter("Svrnty.CQRS.Events")
        .AddPrometheusExporter());

Management API:

app.MapEventStreamManagementApi();

// Endpoints:
// GET /api/event-streams
// GET /api/event-streams/{name}
// POST /api/event-streams/subscriptions/{id}/consumers/{consumerId}/reset-offset

Structured Logging:

using Svrnty.CQRS.Events.Logging;

using (CorrelationContext.Begin(correlationId))
{
    _logger.LogEventPublished(eventId, eventType, streamName, CorrelationContext.Current);
}

Packages

Package Purpose
Svrnty.CQRS.Events.Abstractions Core interfaces and models
Svrnty.CQRS.Events In-memory implementation
Svrnty.CQRS.Events.PostgreSQL PostgreSQL storage
Svrnty.CQRS.Events.ConsumerGroups.Abstractions Consumer group interfaces
Svrnty.CQRS.Events.ConsumerGroups PostgreSQL consumer groups
Svrnty.CQRS.Events.Grpc gRPC streaming support

Installation

# PostgreSQL event streaming
dotnet add package Svrnty.CQRS.Events.PostgreSQL

# Consumer groups
dotnet add package Svrnty.CQRS.Events.ConsumerGroups

# gRPC streaming
dotnet add package Svrnty.CQRS.Events.Grpc

# In-memory (development)
dotnet add package Svrnty.CQRS.Events

Complete Example

using Svrnty.CQRS.Events;
using Svrnty.CQRS.Events.ConsumerGroups;

var builder = WebApplication.CreateBuilder(args);

// Event streaming with PostgreSQL
builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetConnectionString("EventStore"));

// Consumer groups
builder.Services.AddPostgresConsumerGroups(
    builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));

// Retention policies
builder.Services.AddPostgresRetentionPolicies(options =>
{
    options.Enabled = true;
    options.CleanupInterval = TimeSpan.FromHours(1);
});

// Event replay
builder.Services.AddPostgresEventReplay();

// Observability
builder.Services.AddStreamHealthChecks();
builder.Services.AddEventStreamMetrics();

// Management API
var app = builder.Build();
app.MapEventStreamManagementApi();

// Health checks
app.MapHealthChecks("/health");

app.Run();

Best Practices

DO

  • Use consumer groups for load-balanced processing
  • Configure retention policies for cleanup
  • Monitor consumer lag with health checks
  • Use correlation IDs for distributed tracing
  • Implement idempotent event handlers
  • Version your events for schema evolution
  • Use projections for read models
  • Enable metrics for production observability

DON'T

  • Don't process same event multiple times without idempotency
  • Don't ignore consumer lag warnings
  • Don't store large payloads in events (use references)
  • Don't modify events after appending
  • Don't skip error handling in event handlers
  • Don't forget to commit consumer offsets
  • Don't block event processing with synchronous I/O

Common Patterns

Event Sourcing:

// Append events to persistent stream
await store.AppendAsync("orders", new[] { orderPlacedEvent, paymentReceivedEvent });

// Rebuild state from events
await foreach (var @event in store.ReadStreamAsync("orders", fromOffset: 0))
{
    aggregate.Apply(@event);
}

Message Queue:

// Enqueue background job
await store.EnqueueAsync("email-queue", new SendEmailCommand { ... });

// Dequeue and process
var message = await store.DequeueAsync("email-queue", visibilityTimeout: TimeSpan.FromMinutes(5));
await SendEmailAsync(message);
await store.AcknowledgeAsync("email-queue", message.MessageId);

CQRS with Events:

// Command publishes domain event
public class PlaceOrderCommandHandler : ICommandHandler<PlaceOrderCommand, int>
{
    public async Task<int> HandleAsync(PlaceOrderCommand command, CancellationToken ct)
    {
        var order = Order.Create(command);

        // Persist to write model
        await _repository.AddAsync(order);

        // Publish event for projections
        await _eventStore.AppendAsync("orders", order.DomainEvents);

        return order.Id;
    }
}

// Projection builds read model
public class OrderSummaryProjection : IDynamicProjection
{
    public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
    {
        await _readRepository.AddOrderSummaryAsync(new OrderSummary
        {
            OrderId = @event.OrderId,
            CustomerName = @event.CustomerName,
            TotalAmount = @event.TotalAmount
        });
    }
}

See Also