dotnet-cqrs/docs/event-streaming/README.md

449 lines
14 KiB
Markdown

# Event Streaming
Comprehensive event streaming support with event sourcing, message queues, consumer groups, and observability.
## Overview
Svrnty.CQRS provides production-ready event streaming capabilities for building event-driven architectures. The framework supports both **persistent streams** (event sourcing) and **ephemeral streams** (message queues), with advanced features like consumer groups, retention policies, event replay, and comprehensive monitoring.
**Key Features:**
-**Persistent Streams** - Event sourcing with append-only logs
-**Ephemeral Streams** - Message queue semantics with at-least-once delivery
-**Consumer Groups** - Coordinated consumption with automatic load balancing
-**Retention Policies** - Automatic cleanup based on age or size
-**Event Replay** - Rebuild projections and reprocess historical events
-**Stream Configuration** - Per-stream settings for retention, DLQ, lifecycle
-**Projections** - Read models from event streams
-**Sagas** - Long-running workflows with compensation logic
-**gRPC Streaming** - Real-time bidirectional event delivery
-**PostgreSQL Storage** - Production-ready persistent storage
-**Health Checks** - Monitor consumer lag and stream health
-**Metrics** - OpenTelemetry-compatible telemetry
-**Management API** - REST endpoints for operations
## Quick Start
### Basic Event Streaming
```csharp
var builder = WebApplication.CreateBuilder(args);
// Register event streaming with PostgreSQL
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
var app = builder.Build();
app.Run();
```
### Publishing Events
```csharp
public record OrderPlacedEvent
{
public int OrderId { get; init; }
public string CustomerName { get; init; } = string.Empty;
public decimal TotalAmount { get; init; }
public DateTimeOffset PlacedAt { get; init; }
}
// Publish to persistent stream
var store = serviceProvider.GetRequiredService<IEventStreamStore>();
await store.AppendAsync(
streamName: "orders",
events: new[] { new OrderPlacedEvent
{
OrderId = 123,
CustomerName = "John Doe",
TotalAmount = 99.99m,
PlacedAt = DateTimeOffset.UtcNow
}});
```
### Consuming Events
```csharp
// Read from persistent stream
await foreach (var @event in store.ReadStreamAsync("orders", fromOffset: 0))
{
Console.WriteLine($"Event: {@event.EventType} at offset {@event.Offset}");
}
// Consume with consumer group (automatic offset tracking)
var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();
await foreach (var @event in reader.ConsumeAsync(
streamName: "orders",
groupId: "email-notifications",
consumerId: "worker-1"))
{
await SendEmailNotificationAsync(@event);
}
```
## Architecture
### Stream Types
**Persistent Streams:**
- Append-only event log
- Events stored indefinitely (until retention policy)
- Offset-based reading
- Ideal for event sourcing and audit logs
**Ephemeral Streams:**
- Message queue semantics
- Dequeue with visibility timeout
- At-least-once delivery with ack/nack
- Ideal for background jobs and notifications
### Storage Backends
| Backend | Use Case | Features |
|---------|----------|----------|
| **PostgreSQL** | Production | Persistent storage, consumer groups, retention policies, event replay |
| **In-Memory** | Development/Testing | Fast, no persistence |
### Delivery Semantics
| Mode | Guarantee | Use Case |
|------|-----------|----------|
| **Broadcast** | At-least-once | All consumers receive all events |
| **Queue** | Exactly-once per group | Load-balanced processing |
## Core Concepts
### Streams
Streams are named event channels with configurable properties:
```csharp
// Stream metadata
public record StreamMetadata
{
public string Name { get; init; }
public StreamType Type { get; init; } // Persistent or Ephemeral
public DeliverySemantics Semantics { get; init; }
public StreamScope Scope { get; init; } // Internal, Public, etc.
}
```
### Events
Events are immutable messages with metadata:
```csharp
public record StoredEvent
{
public long Offset { get; init; } // Sequence number
public string EventId { get; init; } // Unique identifier
public string EventType { get; init; } // Event class name
public string StreamName { get; init; } // Stream name
public byte[] Data { get; init; } // JSON payload
public DateTimeOffset Timestamp { get; init; }
public string? CorrelationId { get; init; }
}
```
### Consumer Groups
Consumer groups coordinate multiple consumers processing the same stream:
```csharp
// Consumer group ensures each event processed once per group
await reader.ConsumeAsync(
streamName: "orders",
groupId: "order-processing", // Logical consumer group
consumerId: "worker-1", // This worker instance
options: new ConsumerGroupOptions
{
CommitStrategy = OffsetCommitStrategy.AfterBatch,
BatchSize = 100
});
```
## Features
### [Fundamentals](fundamentals/)
Learn the basics of event streaming:
- [Getting Started](fundamentals/getting-started.md) - First event stream
- [Persistent Streams](fundamentals/persistent-streams.md) - Event sourcing patterns
- [Ephemeral Streams](fundamentals/ephemeral-streams.md) - Message queue usage
- [Events and Workflows](fundamentals/events-and-workflows.md) - Event design
- [Subscriptions](fundamentals/subscriptions.md) - Broadcast vs queue modes
### [Storage](storage/)
Configure storage backends:
- [In-Memory Storage](storage/in-memory-storage.md) - Development setup
- [PostgreSQL Storage](storage/postgresql-storage.md) - Production deployment
- [Database Schema](storage/database-schema.md) - Schema details
- [Connection Pooling](storage/connection-pooling.md) - Performance tuning
### [Consumer Groups](consumer-groups/)
Coordinate multiple consumers:
- [Getting Started](consumer-groups/getting-started.md) - First consumer group
- [Offset Management](consumer-groups/offset-management.md) - Position tracking
- [Commit Strategies](consumer-groups/commit-strategies.md) - Manual, AfterEach, AfterBatch, Periodic
- [Fault Tolerance](consumer-groups/fault-tolerance.md) - Heartbeats and recovery
- [Load Balancing](consumer-groups/load-balancing.md) - Multiple workers
### [Retention Policies](retention-policies/)
Automatic event cleanup:
- [Time-Based Retention](retention-policies/time-based-retention.md) - MaxAge configuration
- [Size-Based Retention](retention-policies/size-based-retention.md) - MaxEventCount limits
- [Cleanup Windows](retention-policies/cleanup-windows.md) - Scheduled maintenance
- [Wildcard Policies](retention-policies/wildcard-policies.md) - Default policies
### [Event Replay](event-replay/)
Rebuild projections and reprocess events:
- [Replay from Offset](event-replay/replay-from-offset.md) - Offset-based replay
- [Replay from Time](event-replay/replay-from-time.md) - Time-based replay
- [Rate Limiting](event-replay/rate-limiting.md) - Controlled replay speed
- [Progress Tracking](event-replay/progress-tracking.md) - Monitor progress
### [Stream Configuration](stream-configuration/)
Per-stream settings:
- [Retention Config](stream-configuration/retention-config.md) - Stream-specific retention
- [Dead Letter Queues](stream-configuration/dead-letter-queues.md) - Error handling
- [Lifecycle Config](stream-configuration/lifecycle-config.md) - Auto-create, archive, delete
- [Performance Config](stream-configuration/performance-config.md) - Batching, compression
- [Access Control](stream-configuration/access-control.md) - Stream permissions
### [Projections](projections/)
Build read models from events:
- [Creating Projections](projections/creating-projections.md) - IDynamicProjection
- [Projection Options](projections/projection-options.md) - Auto-start, batching
- [Resettable Projections](projections/resettable-projections.md) - Rebuild from scratch
- [Checkpoint Stores](projections/checkpoint-stores.md) - PostgreSQL vs in-memory
### [Sagas](sagas/)
Long-running workflows:
- [Saga Pattern](sagas/saga-pattern.md) - Fundamentals
- [Creating Sagas](sagas/creating-sagas.md) - ISaga implementation
- [Compensation](sagas/compensation.md) - Rollback logic
- [Saga Context](sagas/saga-context.md) - State sharing
### [gRPC Streaming](grpc-streaming/)
Real-time event delivery via gRPC:
- [Persistent Subscriptions](grpc-streaming/persistent-subscriptions.md) - Subscribe to persistent streams
- [Queue Subscriptions](grpc-streaming/queue-subscriptions.md) - Queue mode with ack/nack
- [gRPC Clients](grpc-streaming/grpc-clients.md) - Building streaming clients
## Observability
The framework includes comprehensive monitoring and management features:
**Health Checks:**
```csharp
builder.Services.AddStreamHealthChecks();
var healthCheck = serviceProvider.GetRequiredService<IStreamHealthCheck>();
var result = await healthCheck.CheckStreamHealthAsync("orders");
```
**Metrics (OpenTelemetry):**
```csharp
builder.Services.AddEventStreamMetrics();
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics => metrics
.AddMeter("Svrnty.CQRS.Events")
.AddPrometheusExporter());
```
**Management API:**
```csharp
app.MapEventStreamManagementApi();
// Endpoints:
// GET /api/event-streams
// GET /api/event-streams/{name}
// POST /api/event-streams/subscriptions/{id}/consumers/{consumerId}/reset-offset
```
**Structured Logging:**
```csharp
using Svrnty.CQRS.Events.Logging;
using (CorrelationContext.Begin(correlationId))
{
_logger.LogEventPublished(eventId, eventType, streamName, CorrelationContext.Current);
}
```
## Packages
| Package | Purpose |
|---------|---------|
| `Svrnty.CQRS.Events.Abstractions` | Core interfaces and models |
| `Svrnty.CQRS.Events` | In-memory implementation |
| `Svrnty.CQRS.Events.PostgreSQL` | PostgreSQL storage |
| `Svrnty.CQRS.Events.ConsumerGroups.Abstractions` | Consumer group interfaces |
| `Svrnty.CQRS.Events.ConsumerGroups` | PostgreSQL consumer groups |
| `Svrnty.CQRS.Events.Grpc` | gRPC streaming support |
## Installation
```bash
# PostgreSQL event streaming
dotnet add package Svrnty.CQRS.Events.PostgreSQL
# Consumer groups
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
# gRPC streaming
dotnet add package Svrnty.CQRS.Events.Grpc
# In-memory (development)
dotnet add package Svrnty.CQRS.Events
```
## Complete Example
```csharp
using Svrnty.CQRS.Events;
using Svrnty.CQRS.Events.ConsumerGroups;
var builder = WebApplication.CreateBuilder(args);
// Event streaming with PostgreSQL
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
// Consumer groups
builder.Services.AddPostgresConsumerGroups(
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
// Retention policies
builder.Services.AddPostgresRetentionPolicies(options =>
{
options.Enabled = true;
options.CleanupInterval = TimeSpan.FromHours(1);
});
// Event replay
builder.Services.AddPostgresEventReplay();
// Observability
builder.Services.AddStreamHealthChecks();
builder.Services.AddEventStreamMetrics();
// Management API
var app = builder.Build();
app.MapEventStreamManagementApi();
// Health checks
app.MapHealthChecks("/health");
app.Run();
```
## Best Practices
### ✅ DO
- Use consumer groups for load-balanced processing
- Configure retention policies for cleanup
- Monitor consumer lag with health checks
- Use correlation IDs for distributed tracing
- Implement idempotent event handlers
- Version your events for schema evolution
- Use projections for read models
- Enable metrics for production observability
### ❌ DON'T
- Don't process same event multiple times without idempotency
- Don't ignore consumer lag warnings
- Don't store large payloads in events (use references)
- Don't modify events after appending
- Don't skip error handling in event handlers
- Don't forget to commit consumer offsets
- Don't block event processing with synchronous I/O
## Common Patterns
**Event Sourcing:**
```csharp
// Append events to persistent stream
await store.AppendAsync("orders", new[] { orderPlacedEvent, paymentReceivedEvent });
// Rebuild state from events
await foreach (var @event in store.ReadStreamAsync("orders", fromOffset: 0))
{
aggregate.Apply(@event);
}
```
**Message Queue:**
```csharp
// Enqueue background job
await store.EnqueueAsync("email-queue", new SendEmailCommand { ... });
// Dequeue and process
var message = await store.DequeueAsync("email-queue", visibilityTimeout: TimeSpan.FromMinutes(5));
await SendEmailAsync(message);
await store.AcknowledgeAsync("email-queue", message.MessageId);
```
**CQRS with Events:**
```csharp
// Command publishes domain event
public class PlaceOrderCommandHandler : ICommandHandler<PlaceOrderCommand, int>
{
public async Task<int> HandleAsync(PlaceOrderCommand command, CancellationToken ct)
{
var order = Order.Create(command);
// Persist to write model
await _repository.AddAsync(order);
// Publish event for projections
await _eventStore.AppendAsync("orders", order.DomainEvents);
return order.Id;
}
}
// Projection builds read model
public class OrderSummaryProjection : IDynamicProjection
{
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
{
await _readRepository.AddOrderSummaryAsync(new OrderSummary
{
OrderId = @event.OrderId,
CustomerName = @event.CustomerName,
TotalAmount = @event.TotalAmount
});
}
}
```
## See Also
- [CQRS Overview](../getting-started/01-introduction.md)
- [Observability](../observability/README.md)
- [Event Sourcing Tutorial](../tutorials/event-sourcing/README.md)
- [E-commerce Example](../tutorials/ecommerce-example/README.md)