dotnet-cqrs/docs/event-streaming/consumer-groups/README.md

198 lines
5.8 KiB
Markdown

# Consumer Groups
Coordinated event consumption with automatic offset tracking and load balancing.
## Overview
Consumer groups enable multiple consumers to process the same event stream in a coordinated manner. The framework automatically load-balances events across group members and tracks processing offsets for fault tolerance.
**Key Features:**
-**Load Balancing** - Events distributed across consumers
-**Offset Tracking** - Automatic position management
-**Fault Tolerance** - Resume from last committed position
-**Exactly-Once per Group** - No duplicate processing within group
-**Heartbeat Monitoring** - Detect and remove stale consumers
-**Flexible Commit Strategies** - Manual, AfterEach, AfterBatch, Periodic
## Quick Start
```csharp
using Svrnty.CQRS.Events.ConsumerGroups;
var builder = WebApplication.CreateBuilder(args);
// Register consumer groups
builder.Services.AddPostgresConsumerGroups(
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
var app = builder.Build();
app.Run();
```
**appsettings.json:**
```json
{
"EventStreaming": {
"ConsumerGroups": {
"HeartbeatInterval": "00:00:10",
"SessionTimeout": "00:00:30",
"CleanupInterval": "00:01:00"
}
}
}
```
## Core Concepts
### Consumer Group
Logical group of consumers processing the same stream:
```
Stream: orders
├─ Group: email-notifications
│ ├─ Consumer: worker-1 (processes events 1, 4, 7...)
│ ├─ Consumer: worker-2 (processes events 2, 5, 8...)
│ └─ Consumer: worker-3 (processes events 3, 6, 9...)
└─ Group: analytics
└─ Consumer: analytics-1 (processes all events)
```
### Offset Management
Consumer groups track the last processed offset per consumer:
```sql
stream_name | group_id | consumer_id | offset | updated_at
------------|------------------|-------------|--------|-------------------
orders | email-notif | worker-1 | 1542 | 2025-12-10 10:30:00
orders | email-notif | worker-2 | 1540 | 2025-12-10 10:30:01
orders | analytics | worker-1 | 1520 | 2025-12-10 10:29:50
```
### Heartbeats
Consumers send periodic heartbeats to prove they're alive:
```
Consumer registers → Sends heartbeats every 10s → Removed if no heartbeat for 30s
```
## Features
### [Getting Started](getting-started.md)
Create your first consumer group with automatic offset management.
### [Offset Management](offset-management.md)
Learn how offsets are tracked and committed for fault tolerance.
### [Commit Strategies](commit-strategies.md)
Choose between Manual, AfterEach, AfterBatch, and Periodic commit strategies.
### [Fault Tolerance](fault-tolerance.md)
Understand heartbeat monitoring, stale consumer cleanup, and recovery.
### [Load Balancing](load-balancing.md)
Scale horizontally by adding consumers to a group.
## Usage Pattern
```csharp
public class OrderProcessingWorker : BackgroundService
{
private readonly IConsumerGroupReader _consumerGroup;
private readonly string _consumerId;
public OrderProcessingWorker(IConsumerGroupReader consumerGroup)
{
_consumerGroup = consumerGroup;
_consumerId = $"worker-{Environment.MachineName}-{Guid.NewGuid():N}";
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (var @event in _consumerGroup.ConsumeAsync(
streamName: "orders",
groupId: "order-processing",
consumerId: _consumerId,
options: new ConsumerGroupOptions
{
BatchSize = 100,
CommitStrategy = OffsetCommitStrategy.AfterBatch,
HeartbeatInterval = TimeSpan.FromSeconds(10),
SessionTimeout = TimeSpan.FromSeconds(30)
},
cancellationToken: stoppingToken))
{
await ProcessOrderEventAsync(@event);
// Offset committed automatically after batch
}
}
}
```
## Comparison: Consumer Groups vs Broadcast
| Feature | Consumer Group | Broadcast |
|---------|---------------|-----------|
| **Load Balancing** | ✅ Events split across consumers | ❌ All consumers get all events |
| **Scalability** | ✅ Add more consumers | ❌ Fixed processing capacity |
| **Use Case** | Background jobs, order processing | Independent projections, analytics |
| **Offset Tracking** | ✅ Automatic | Manual checkpoint required |
| **Exactly-Once** | ✅ Per group | ❌ Must implement idempotency |
## Configuration
### Options
```csharp
public class ConsumerGroupOptions
{
public int BatchSize { get; set; } = 100;
public OffsetCommitStrategy CommitStrategy { get; set; } = OffsetCommitStrategy.AfterBatch;
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan SessionTimeout { get; set; } = TimeSpan.FromSeconds(30);
}
```
### Commit Strategies
```csharp
public enum OffsetCommitStrategy
{
Manual, // Explicit commit calls
AfterEach, // Commit after each event
AfterBatch, // Commit after batch (default)
Periodic // Commit every N seconds
}
```
## Best Practices
### ✅ DO
- Use consumer groups for scalable processing
- Set appropriate batch sizes (100-1000)
- Monitor consumer lag
- Use AfterBatch for best performance
- Implement idempotent handlers
- Use unique consumer IDs
- Monitor stale consumers
### ❌ DON'T
- Don't use same consumer ID for multiple instances
- Don't skip error handling
- Don't set very large batch sizes (> 10000)
- Don't ignore consumer lag warnings
- Don't forget to commit offsets
- Don't assume exactly-once delivery
## See Also
- [Event Streaming Overview](../README.md)
- [Subscriptions](../fundamentals/subscriptions.md)
- [PostgreSQL Storage](../storage/postgresql-storage.md)
- [Health Checks](../observability/health-checks/consumer-health.md)