198 lines
5.8 KiB
Markdown
198 lines
5.8 KiB
Markdown
# Consumer Groups
|
|
|
|
Coordinated event consumption with automatic offset tracking and load balancing.
|
|
|
|
## Overview
|
|
|
|
Consumer groups enable multiple consumers to process the same event stream in a coordinated manner. The framework automatically load-balances events across group members and tracks processing offsets for fault tolerance.
|
|
|
|
**Key Features:**
|
|
|
|
- ✅ **Load Balancing** - Events distributed across consumers
|
|
- ✅ **Offset Tracking** - Automatic position management
|
|
- ✅ **Fault Tolerance** - Resume from last committed position
|
|
- ✅ **Exactly-Once per Group** - No duplicate processing within group
|
|
- ✅ **Heartbeat Monitoring** - Detect and remove stale consumers
|
|
- ✅ **Flexible Commit Strategies** - Manual, AfterEach, AfterBatch, Periodic
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events.ConsumerGroups;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register consumer groups
|
|
builder.Services.AddPostgresConsumerGroups(
|
|
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
**appsettings.json:**
|
|
```json
|
|
{
|
|
"EventStreaming": {
|
|
"ConsumerGroups": {
|
|
"HeartbeatInterval": "00:00:10",
|
|
"SessionTimeout": "00:00:30",
|
|
"CleanupInterval": "00:01:00"
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Core Concepts
|
|
|
|
### Consumer Group
|
|
|
|
Logical group of consumers processing the same stream:
|
|
|
|
```
|
|
Stream: orders
|
|
├─ Group: email-notifications
|
|
│ ├─ Consumer: worker-1 (processes events 1, 4, 7...)
|
|
│ ├─ Consumer: worker-2 (processes events 2, 5, 8...)
|
|
│ └─ Consumer: worker-3 (processes events 3, 6, 9...)
|
|
└─ Group: analytics
|
|
└─ Consumer: analytics-1 (processes all events)
|
|
```
|
|
|
|
### Offset Management
|
|
|
|
Consumer groups track the last processed offset per consumer:
|
|
|
|
```sql
|
|
stream_name | group_id | consumer_id | offset | updated_at
|
|
------------|------------------|-------------|--------|-------------------
|
|
orders | email-notif | worker-1 | 1542 | 2025-12-10 10:30:00
|
|
orders | email-notif | worker-2 | 1540 | 2025-12-10 10:30:01
|
|
orders | analytics | worker-1 | 1520 | 2025-12-10 10:29:50
|
|
```
|
|
|
|
### Heartbeats
|
|
|
|
Consumers send periodic heartbeats to prove they're alive:
|
|
|
|
```
|
|
Consumer registers → Sends heartbeats every 10s → Removed if no heartbeat for 30s
|
|
```
|
|
|
|
## Features
|
|
|
|
### [Getting Started](getting-started.md)
|
|
Create your first consumer group with automatic offset management.
|
|
|
|
### [Offset Management](offset-management.md)
|
|
Learn how offsets are tracked and committed for fault tolerance.
|
|
|
|
### [Commit Strategies](commit-strategies.md)
|
|
Choose between Manual, AfterEach, AfterBatch, and Periodic commit strategies.
|
|
|
|
### [Fault Tolerance](fault-tolerance.md)
|
|
Understand heartbeat monitoring, stale consumer cleanup, and recovery.
|
|
|
|
### [Load Balancing](load-balancing.md)
|
|
Scale horizontally by adding consumers to a group.
|
|
|
|
## Usage Pattern
|
|
|
|
```csharp
|
|
public class OrderProcessingWorker : BackgroundService
|
|
{
|
|
private readonly IConsumerGroupReader _consumerGroup;
|
|
private readonly string _consumerId;
|
|
|
|
public OrderProcessingWorker(IConsumerGroupReader consumerGroup)
|
|
{
|
|
_consumerGroup = consumerGroup;
|
|
_consumerId = $"worker-{Environment.MachineName}-{Guid.NewGuid():N}";
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
streamName: "orders",
|
|
groupId: "order-processing",
|
|
consumerId: _consumerId,
|
|
options: new ConsumerGroupOptions
|
|
{
|
|
BatchSize = 100,
|
|
CommitStrategy = OffsetCommitStrategy.AfterBatch,
|
|
HeartbeatInterval = TimeSpan.FromSeconds(10),
|
|
SessionTimeout = TimeSpan.FromSeconds(30)
|
|
},
|
|
cancellationToken: stoppingToken))
|
|
{
|
|
await ProcessOrderEventAsync(@event);
|
|
// Offset committed automatically after batch
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Comparison: Consumer Groups vs Broadcast
|
|
|
|
| Feature | Consumer Group | Broadcast |
|
|
|---------|---------------|-----------|
|
|
| **Load Balancing** | ✅ Events split across consumers | ❌ All consumers get all events |
|
|
| **Scalability** | ✅ Add more consumers | ❌ Fixed processing capacity |
|
|
| **Use Case** | Background jobs, order processing | Independent projections, analytics |
|
|
| **Offset Tracking** | ✅ Automatic | Manual checkpoint required |
|
|
| **Exactly-Once** | ✅ Per group | ❌ Must implement idempotency |
|
|
|
|
## Configuration
|
|
|
|
### Options
|
|
|
|
```csharp
|
|
public class ConsumerGroupOptions
|
|
{
|
|
public int BatchSize { get; set; } = 100;
|
|
public OffsetCommitStrategy CommitStrategy { get; set; } = OffsetCommitStrategy.AfterBatch;
|
|
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10);
|
|
public TimeSpan SessionTimeout { get; set; } = TimeSpan.FromSeconds(30);
|
|
}
|
|
```
|
|
|
|
### Commit Strategies
|
|
|
|
```csharp
|
|
public enum OffsetCommitStrategy
|
|
{
|
|
Manual, // Explicit commit calls
|
|
AfterEach, // Commit after each event
|
|
AfterBatch, // Commit after batch (default)
|
|
Periodic // Commit every N seconds
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use consumer groups for scalable processing
|
|
- Set appropriate batch sizes (100-1000)
|
|
- Monitor consumer lag
|
|
- Use AfterBatch for best performance
|
|
- Implement idempotent handlers
|
|
- Use unique consumer IDs
|
|
- Monitor stale consumers
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't use same consumer ID for multiple instances
|
|
- Don't skip error handling
|
|
- Don't set very large batch sizes (> 10000)
|
|
- Don't ignore consumer lag warnings
|
|
- Don't forget to commit offsets
|
|
- Don't assume exactly-once delivery
|
|
|
|
## See Also
|
|
|
|
- [Event Streaming Overview](../README.md)
|
|
- [Subscriptions](../fundamentals/subscriptions.md)
|
|
- [PostgreSQL Storage](../storage/postgresql-storage.md)
|
|
- [Health Checks](../observability/health-checks/consumer-health.md)
|