dotnet-cqrs/docs/event-streaming/consumer-groups/offset-management.md

280 lines
6.2 KiB
Markdown

# Offset Management
Understanding how consumer group offsets are tracked and committed.
## Overview
Offsets represent the position of each consumer in a stream. The consumer group framework automatically tracks and commits offsets, enabling fault-tolerant event processing with exactly-once-per-group semantics.
## How Offsets Work
```
Stream: orders
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ ← Events
└─────┴─────┴─────┴─────┴─────┴─────┴─────┘
↑ ↑
│ └─ Consumer B (offset: 3)
└─ Consumer A (offset: 1)
```
Consumer A has processed events 0-1. Next read starts at offset 2.
Consumer B has processed events 0-3. Next read starts at offset 4.
## Offset Storage
Offsets are stored in PostgreSQL:
```sql
SELECT * FROM consumer_offsets;
```
| stream_name | group_id | consumer_id | offset | updated_at |
|-------------|----------|-------------|--------|------------|
| orders | email-notif | worker-1 | 1542 | 2025-12-10 10:30:00 |
| orders | email-notif | worker-2 | 1540 | 2025-12-10 10:30:01 |
## Commit Strategies
### AfterBatch (Recommended)
Commit after processing a batch of events:
```csharp
await foreach (var @event in _consumerGroup.ConsumeAsync(
"orders",
"email-notifications",
"worker-1",
options: new ConsumerGroupOptions
{
BatchSize = 100,
CommitStrategy = OffsetCommitStrategy.AfterBatch
}))
{
await ProcessEventAsync(@event);
// Offset committed automatically after batch of 100
}
```
**Pros:**
- ✅ Best performance (fewer database writes)
- ✅ Good balance between safety and throughput
**Cons:**
- ❌ May reprocess up to BatchSize events on failure
### AfterEach
Commit after processing each event:
```csharp
options.CommitStrategy = OffsetCommitStrategy.AfterEach;
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
await ProcessEventAsync(@event);
// Offset committed after each event
}
```
**Pros:**
- ✅ Minimal reprocessing on failure
- ✅ Precise offset tracking
**Cons:**
- ❌ Lower throughput (many database writes)
- ❌ Higher database load
### Periodic
Commit every N seconds:
```csharp
options.CommitStrategy = OffsetCommitStrategy.Periodic;
options.CommitInterval = TimeSpan.FromSeconds(30);
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
await ProcessEventAsync(@event);
// Offset committed every 30 seconds
}
```
**Pros:**
- ✅ Predictable commit frequency
- ✅ Reduces database writes
**Cons:**
- ❌ May reprocess many events on failure
- ❌ Offset lag during processing
### Manual
Explicit commit control:
```csharp
options.CommitStrategy = OffsetCommitStrategy.Manual;
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
await ProcessEventAsync(@event);
// Explicit commit
await _offsetStore.CommitOffsetAsync(
streamName: "orders",
groupId: "email-notifications",
consumerId: "worker-1",
offset: @event.Offset);
}
```
**Pros:**
- ✅ Full control over when to commit
- ✅ Can commit conditionally
**Cons:**
- ❌ More complex code
- ❌ Easy to forget commits
## Fault Tolerance
### Resume from Last Committed Offset
When a consumer restarts:
```csharp
// Consumer crashes at offset 1500
// Last committed offset: 1400
// On restart
await foreach (var @event in _consumerGroup.ConsumeAsync(...))
{
// Resumes from offset 1401
// Events 1401-1500 will be reprocessed
}
```
### Idempotent Event Handlers
Handle reprocessing gracefully:
```csharp
private async Task ProcessEventAsync(StoredEvent @event)
{
// Check if already processed (idempotency)
if (await _repository.IsEventProcessedAsync(@event.EventId))
{
_logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId);
return;
}
// Process event
await SendEmailAsync(@event);
// Mark as processed
await _repository.MarkEventProcessedAsync(@event.EventId);
}
```
## Querying Offsets
### Get Consumer Offset
```csharp
public class OffsetQuery
{
private readonly IConsumerOffsetStore _offsetStore;
public async Task<long> GetConsumerOffsetAsync(
string streamName,
string groupId,
string consumerId)
{
return await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);
}
}
```
### Get All Consumers in Group
```csharp
public async Task<List<ConsumerInfo>> GetGroupConsumersAsync(
string streamName,
string groupId)
{
return await _offsetStore.GetConsumersAsync(streamName, groupId);
}
```
### Calculate Consumer Lag
```csharp
public async Task<long> CalculateLagAsync(
string streamName,
string groupId,
string consumerId)
{
var streamHead = await GetStreamHeadAsync(streamName);
var consumerOffset = await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);
return streamHead - consumerOffset;
}
```
## Resetting Offsets
### Reset to Beginning
```csharp
await _offsetStore.ResetOffsetAsync(
streamName: "orders",
groupId: "email-notifications",
consumerId: "worker-1",
newOffset: 0); // Start from beginning
```
### Reset to Specific Offset
```csharp
await _offsetStore.ResetOffsetAsync(
"orders",
"email-notifications",
"worker-1",
newOffset: 5000); // Skip to offset 5000
```
### Reset to Latest
```csharp
var streamHead = await GetStreamHeadAsync("orders");
await _offsetStore.ResetOffsetAsync(
"orders",
"email-notifications",
"worker-1",
newOffset: streamHead); // Skip all lag
```
## Best Practices
### ✅ DO
- Use AfterBatch for production (best performance)
- Implement idempotent event handlers
- Monitor consumer lag
- Reset offsets only when necessary
- Track offset commits in metrics
### ❌ DON'T
- Don't use AfterEach unless absolutely necessary
- Don't modify offsets manually in database
- Don't skip error handling
- Don't forget to test reprocessing scenarios
## See Also
- [Consumer Groups Overview](README.md)
- [Commit Strategies](commit-strategies.md)
- [Fault Tolerance](fault-tolerance.md)