449 lines
14 KiB
Markdown
449 lines
14 KiB
Markdown
# Event Streaming
|
|
|
|
Comprehensive event streaming support with event sourcing, message queues, consumer groups, and observability.
|
|
|
|
## Overview
|
|
|
|
Svrnty.CQRS provides production-ready event streaming capabilities for building event-driven architectures. The framework supports both **persistent streams** (event sourcing) and **ephemeral streams** (message queues), with advanced features like consumer groups, retention policies, event replay, and comprehensive monitoring.
|
|
|
|
**Key Features:**
|
|
|
|
- ✅ **Persistent Streams** - Event sourcing with append-only logs
|
|
- ✅ **Ephemeral Streams** - Message queue semantics with at-least-once delivery
|
|
- ✅ **Consumer Groups** - Coordinated consumption with automatic load balancing
|
|
- ✅ **Retention Policies** - Automatic cleanup based on age or size
|
|
- ✅ **Event Replay** - Rebuild projections and reprocess historical events
|
|
- ✅ **Stream Configuration** - Per-stream settings for retention, DLQ, lifecycle
|
|
- ✅ **Projections** - Read models from event streams
|
|
- ✅ **Sagas** - Long-running workflows with compensation logic
|
|
- ✅ **gRPC Streaming** - Real-time bidirectional event delivery
|
|
- ✅ **PostgreSQL Storage** - Production-ready persistent storage
|
|
- ✅ **Health Checks** - Monitor consumer lag and stream health
|
|
- ✅ **Metrics** - OpenTelemetry-compatible telemetry
|
|
- ✅ **Management API** - REST endpoints for operations
|
|
|
|
## Quick Start
|
|
|
|
### Basic Event Streaming
|
|
|
|
```csharp
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register event streaming with PostgreSQL
|
|
builder.Services.AddPostgresEventStreaming(
|
|
builder.Configuration.GetConnectionString("EventStore"));
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
### Publishing Events
|
|
|
|
```csharp
|
|
public record OrderPlacedEvent
|
|
{
|
|
public int OrderId { get; init; }
|
|
public string CustomerName { get; init; } = string.Empty;
|
|
public decimal TotalAmount { get; init; }
|
|
public DateTimeOffset PlacedAt { get; init; }
|
|
}
|
|
|
|
// Publish to persistent stream
|
|
var store = serviceProvider.GetRequiredService<IEventStreamStore>();
|
|
|
|
await store.AppendAsync(
|
|
streamName: "orders",
|
|
events: new[] { new OrderPlacedEvent
|
|
{
|
|
OrderId = 123,
|
|
CustomerName = "John Doe",
|
|
TotalAmount = 99.99m,
|
|
PlacedAt = DateTimeOffset.UtcNow
|
|
}});
|
|
```
|
|
|
|
### Consuming Events
|
|
|
|
```csharp
|
|
// Read from persistent stream
|
|
await foreach (var @event in store.ReadStreamAsync("orders", fromOffset: 0))
|
|
{
|
|
Console.WriteLine($"Event: {@event.EventType} at offset {@event.Offset}");
|
|
}
|
|
|
|
// Consume with consumer group (automatic offset tracking)
|
|
var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();
|
|
|
|
await foreach (var @event in reader.ConsumeAsync(
|
|
streamName: "orders",
|
|
groupId: "email-notifications",
|
|
consumerId: "worker-1"))
|
|
{
|
|
await SendEmailNotificationAsync(@event);
|
|
}
|
|
```
|
|
|
|
## Architecture
|
|
|
|
### Stream Types
|
|
|
|
**Persistent Streams:**
|
|
- Append-only event log
|
|
- Events stored indefinitely (until retention policy)
|
|
- Offset-based reading
|
|
- Ideal for event sourcing and audit logs
|
|
|
|
**Ephemeral Streams:**
|
|
- Message queue semantics
|
|
- Dequeue with visibility timeout
|
|
- At-least-once delivery with ack/nack
|
|
- Ideal for background jobs and notifications
|
|
|
|
### Storage Backends
|
|
|
|
| Backend | Use Case | Features |
|
|
|---------|----------|----------|
|
|
| **PostgreSQL** | Production | Persistent storage, consumer groups, retention policies, event replay |
|
|
| **In-Memory** | Development/Testing | Fast, no persistence |
|
|
|
|
### Delivery Semantics
|
|
|
|
| Mode | Guarantee | Use Case |
|
|
|------|-----------|----------|
|
|
| **Broadcast** | At-least-once | All consumers receive all events |
|
|
| **Queue** | Exactly-once per group | Load-balanced processing |
|
|
|
|
## Core Concepts
|
|
|
|
### Streams
|
|
|
|
Streams are named event channels with configurable properties:
|
|
|
|
```csharp
|
|
// Stream metadata
|
|
public record StreamMetadata
|
|
{
|
|
public string Name { get; init; }
|
|
public StreamType Type { get; init; } // Persistent or Ephemeral
|
|
public DeliverySemantics Semantics { get; init; }
|
|
public StreamScope Scope { get; init; } // Internal, Public, etc.
|
|
}
|
|
```
|
|
|
|
### Events
|
|
|
|
Events are immutable messages with metadata:
|
|
|
|
```csharp
|
|
public record StoredEvent
|
|
{
|
|
public long Offset { get; init; } // Sequence number
|
|
public string EventId { get; init; } // Unique identifier
|
|
public string EventType { get; init; } // Event class name
|
|
public string StreamName { get; init; } // Stream name
|
|
public byte[] Data { get; init; } // JSON payload
|
|
public DateTimeOffset Timestamp { get; init; }
|
|
public string? CorrelationId { get; init; }
|
|
}
|
|
```
|
|
|
|
### Consumer Groups
|
|
|
|
Consumer groups coordinate multiple consumers processing the same stream:
|
|
|
|
```csharp
|
|
// Consumer group ensures each event processed once per group
|
|
await reader.ConsumeAsync(
|
|
streamName: "orders",
|
|
groupId: "order-processing", // Logical consumer group
|
|
consumerId: "worker-1", // This worker instance
|
|
options: new ConsumerGroupOptions
|
|
{
|
|
CommitStrategy = OffsetCommitStrategy.AfterBatch,
|
|
BatchSize = 100
|
|
});
|
|
```
|
|
|
|
## Features
|
|
|
|
### [Fundamentals](fundamentals/)
|
|
|
|
Learn the basics of event streaming:
|
|
|
|
- [Getting Started](fundamentals/getting-started.md) - First event stream
|
|
- [Persistent Streams](fundamentals/persistent-streams.md) - Event sourcing patterns
|
|
- [Ephemeral Streams](fundamentals/ephemeral-streams.md) - Message queue usage
|
|
- [Events and Workflows](fundamentals/events-and-workflows.md) - Event design
|
|
- [Subscriptions](fundamentals/subscriptions.md) - Broadcast vs queue modes
|
|
|
|
### [Storage](storage/)
|
|
|
|
Configure storage backends:
|
|
|
|
- [In-Memory Storage](storage/in-memory-storage.md) - Development setup
|
|
- [PostgreSQL Storage](storage/postgresql-storage.md) - Production deployment
|
|
- [Database Schema](storage/database-schema.md) - Schema details
|
|
- [Connection Pooling](storage/connection-pooling.md) - Performance tuning
|
|
|
|
### [Consumer Groups](consumer-groups/)
|
|
|
|
Coordinate multiple consumers:
|
|
|
|
- [Getting Started](consumer-groups/getting-started.md) - First consumer group
|
|
- [Offset Management](consumer-groups/offset-management.md) - Position tracking
|
|
- [Commit Strategies](consumer-groups/commit-strategies.md) - Manual, AfterEach, AfterBatch, Periodic
|
|
- [Fault Tolerance](consumer-groups/fault-tolerance.md) - Heartbeats and recovery
|
|
- [Load Balancing](consumer-groups/load-balancing.md) - Multiple workers
|
|
|
|
### [Retention Policies](retention-policies/)
|
|
|
|
Automatic event cleanup:
|
|
|
|
- [Time-Based Retention](retention-policies/time-based-retention.md) - MaxAge configuration
|
|
- [Size-Based Retention](retention-policies/size-based-retention.md) - MaxEventCount limits
|
|
- [Cleanup Windows](retention-policies/cleanup-windows.md) - Scheduled maintenance
|
|
- [Wildcard Policies](retention-policies/wildcard-policies.md) - Default policies
|
|
|
|
### [Event Replay](event-replay/)
|
|
|
|
Rebuild projections and reprocess events:
|
|
|
|
- [Replay from Offset](event-replay/replay-from-offset.md) - Offset-based replay
|
|
- [Replay from Time](event-replay/replay-from-time.md) - Time-based replay
|
|
- [Rate Limiting](event-replay/rate-limiting.md) - Controlled replay speed
|
|
- [Progress Tracking](event-replay/progress-tracking.md) - Monitor progress
|
|
|
|
### [Stream Configuration](stream-configuration/)
|
|
|
|
Per-stream settings:
|
|
|
|
- [Retention Config](stream-configuration/retention-config.md) - Stream-specific retention
|
|
- [Dead Letter Queues](stream-configuration/dead-letter-queues.md) - Error handling
|
|
- [Lifecycle Config](stream-configuration/lifecycle-config.md) - Auto-create, archive, delete
|
|
- [Performance Config](stream-configuration/performance-config.md) - Batching, compression
|
|
- [Access Control](stream-configuration/access-control.md) - Stream permissions
|
|
|
|
### [Projections](projections/)
|
|
|
|
Build read models from events:
|
|
|
|
- [Creating Projections](projections/creating-projections.md) - IDynamicProjection
|
|
- [Projection Options](projections/projection-options.md) - Auto-start, batching
|
|
- [Resettable Projections](projections/resettable-projections.md) - Rebuild from scratch
|
|
- [Checkpoint Stores](projections/checkpoint-stores.md) - PostgreSQL vs in-memory
|
|
|
|
### [Sagas](sagas/)
|
|
|
|
Long-running workflows:
|
|
|
|
- [Saga Pattern](sagas/saga-pattern.md) - Fundamentals
|
|
- [Creating Sagas](sagas/creating-sagas.md) - ISaga implementation
|
|
- [Compensation](sagas/compensation.md) - Rollback logic
|
|
- [Saga Context](sagas/saga-context.md) - State sharing
|
|
|
|
### [gRPC Streaming](grpc-streaming/)
|
|
|
|
Real-time event delivery via gRPC:
|
|
|
|
- [Persistent Subscriptions](grpc-streaming/persistent-subscriptions.md) - Subscribe to persistent streams
|
|
- [Queue Subscriptions](grpc-streaming/queue-subscriptions.md) - Queue mode with ack/nack
|
|
- [gRPC Clients](grpc-streaming/grpc-clients.md) - Building streaming clients
|
|
|
|
## Observability
|
|
|
|
The framework includes comprehensive monitoring and management features:
|
|
|
|
**Health Checks:**
|
|
```csharp
|
|
builder.Services.AddStreamHealthChecks();
|
|
|
|
var healthCheck = serviceProvider.GetRequiredService<IStreamHealthCheck>();
|
|
var result = await healthCheck.CheckStreamHealthAsync("orders");
|
|
```
|
|
|
|
**Metrics (OpenTelemetry):**
|
|
```csharp
|
|
builder.Services.AddEventStreamMetrics();
|
|
|
|
builder.Services.AddOpenTelemetry()
|
|
.WithMetrics(metrics => metrics
|
|
.AddMeter("Svrnty.CQRS.Events")
|
|
.AddPrometheusExporter());
|
|
```
|
|
|
|
**Management API:**
|
|
```csharp
|
|
app.MapEventStreamManagementApi();
|
|
|
|
// Endpoints:
|
|
// GET /api/event-streams
|
|
// GET /api/event-streams/{name}
|
|
// POST /api/event-streams/subscriptions/{id}/consumers/{consumerId}/reset-offset
|
|
```
|
|
|
|
**Structured Logging:**
|
|
```csharp
|
|
using Svrnty.CQRS.Events.Logging;
|
|
|
|
using (CorrelationContext.Begin(correlationId))
|
|
{
|
|
_logger.LogEventPublished(eventId, eventType, streamName, CorrelationContext.Current);
|
|
}
|
|
```
|
|
|
|
## Packages
|
|
|
|
| Package | Purpose |
|
|
|---------|---------|
|
|
| `Svrnty.CQRS.Events.Abstractions` | Core interfaces and models |
|
|
| `Svrnty.CQRS.Events` | In-memory implementation |
|
|
| `Svrnty.CQRS.Events.PostgreSQL` | PostgreSQL storage |
|
|
| `Svrnty.CQRS.Events.ConsumerGroups.Abstractions` | Consumer group interfaces |
|
|
| `Svrnty.CQRS.Events.ConsumerGroups` | PostgreSQL consumer groups |
|
|
| `Svrnty.CQRS.Events.Grpc` | gRPC streaming support |
|
|
|
|
## Installation
|
|
|
|
```bash
|
|
# PostgreSQL event streaming
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL
|
|
|
|
# Consumer groups
|
|
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
|
|
|
|
# gRPC streaming
|
|
dotnet add package Svrnty.CQRS.Events.Grpc
|
|
|
|
# In-memory (development)
|
|
dotnet add package Svrnty.CQRS.Events
|
|
```
|
|
|
|
## Complete Example
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events;
|
|
using Svrnty.CQRS.Events.ConsumerGroups;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Event streaming with PostgreSQL
|
|
builder.Services.AddPostgresEventStreaming(
|
|
builder.Configuration.GetConnectionString("EventStore"));
|
|
|
|
// Consumer groups
|
|
builder.Services.AddPostgresConsumerGroups(
|
|
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
|
|
|
|
// Retention policies
|
|
builder.Services.AddPostgresRetentionPolicies(options =>
|
|
{
|
|
options.Enabled = true;
|
|
options.CleanupInterval = TimeSpan.FromHours(1);
|
|
});
|
|
|
|
// Event replay
|
|
builder.Services.AddPostgresEventReplay();
|
|
|
|
// Observability
|
|
builder.Services.AddStreamHealthChecks();
|
|
builder.Services.AddEventStreamMetrics();
|
|
|
|
// Management API
|
|
var app = builder.Build();
|
|
app.MapEventStreamManagementApi();
|
|
|
|
// Health checks
|
|
app.MapHealthChecks("/health");
|
|
|
|
app.Run();
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use consumer groups for load-balanced processing
|
|
- Configure retention policies for cleanup
|
|
- Monitor consumer lag with health checks
|
|
- Use correlation IDs for distributed tracing
|
|
- Implement idempotent event handlers
|
|
- Version your events for schema evolution
|
|
- Use projections for read models
|
|
- Enable metrics for production observability
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't process same event multiple times without idempotency
|
|
- Don't ignore consumer lag warnings
|
|
- Don't store large payloads in events (use references)
|
|
- Don't modify events after appending
|
|
- Don't skip error handling in event handlers
|
|
- Don't forget to commit consumer offsets
|
|
- Don't block event processing with synchronous I/O
|
|
|
|
## Common Patterns
|
|
|
|
**Event Sourcing:**
|
|
```csharp
|
|
// Append events to persistent stream
|
|
await store.AppendAsync("orders", new[] { orderPlacedEvent, paymentReceivedEvent });
|
|
|
|
// Rebuild state from events
|
|
await foreach (var @event in store.ReadStreamAsync("orders", fromOffset: 0))
|
|
{
|
|
aggregate.Apply(@event);
|
|
}
|
|
```
|
|
|
|
**Message Queue:**
|
|
```csharp
|
|
// Enqueue background job
|
|
await store.EnqueueAsync("email-queue", new SendEmailCommand { ... });
|
|
|
|
// Dequeue and process
|
|
var message = await store.DequeueAsync("email-queue", visibilityTimeout: TimeSpan.FromMinutes(5));
|
|
await SendEmailAsync(message);
|
|
await store.AcknowledgeAsync("email-queue", message.MessageId);
|
|
```
|
|
|
|
**CQRS with Events:**
|
|
```csharp
|
|
// Command publishes domain event
|
|
public class PlaceOrderCommandHandler : ICommandHandler<PlaceOrderCommand, int>
|
|
{
|
|
public async Task<int> HandleAsync(PlaceOrderCommand command, CancellationToken ct)
|
|
{
|
|
var order = Order.Create(command);
|
|
|
|
// Persist to write model
|
|
await _repository.AddAsync(order);
|
|
|
|
// Publish event for projections
|
|
await _eventStore.AppendAsync("orders", order.DomainEvents);
|
|
|
|
return order.Id;
|
|
}
|
|
}
|
|
|
|
// Projection builds read model
|
|
public class OrderSummaryProjection : IDynamicProjection
|
|
{
|
|
public async Task HandleAsync(OrderPlacedEvent @event, CancellationToken ct)
|
|
{
|
|
await _readRepository.AddOrderSummaryAsync(new OrderSummary
|
|
{
|
|
OrderId = @event.OrderId,
|
|
CustomerName = @event.CustomerName,
|
|
TotalAmount = @event.TotalAmount
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
## See Also
|
|
|
|
- [CQRS Overview](../getting-started/01-introduction.md)
|
|
- [Observability](../observability/README.md)
|
|
- [Event Sourcing Tutorial](../tutorials/event-sourcing/README.md)
|
|
- [E-commerce Example](../tutorials/ecommerce-example/README.md)
|