6.2 KiB
6.2 KiB
Offset Management
Understanding how consumer group offsets are tracked and committed.
Overview
Offsets represent the position of each consumer in a stream. The consumer group framework automatically tracks and commits offsets, enabling fault-tolerant event processing with exactly-once-per-group semantics.
How Offsets Work
Stream: orders
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ ← Events
└─────┴─────┴─────┴─────┴─────┴─────┴─────┘
↑ ↑
│ └─ Consumer B (offset: 3)
└─ Consumer A (offset: 1)
Consumer A has processed events 0-1. Next read starts at offset 2. Consumer B has processed events 0-3. Next read starts at offset 4.
Offset Storage
Offsets are stored in PostgreSQL:
SELECT * FROM consumer_offsets;
| stream_name | group_id | consumer_id | offset | updated_at |
|---|---|---|---|---|
| orders | email-notif | worker-1 | 1542 | 2025-12-10 10:30:00 |
| orders | email-notif | worker-2 | 1540 | 2025-12-10 10:30:01 |
Commit Strategies
AfterBatch (Recommended)
Commit after processing a batch of events:
await foreach (var @event in _consumerGroup.ConsumeAsync(
"orders",
"email-notifications",
"worker-1",
options: new ConsumerGroupOptions
{
BatchSize = 100,
CommitStrategy = OffsetCommitStrategy.AfterBatch
}))
{
await ProcessEventAsync(@event);
// Offset committed automatically after batch of 100
}
Pros:
- ✅ Best performance (fewer database writes)
- ✅ Good balance between safety and throughput
Cons:
- ❌ May reprocess up to BatchSize events on failure
AfterEach
Commit after processing each event:
options.CommitStrategy = OffsetCommitStrategy.AfterEach;
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
await ProcessEventAsync(@event);
// Offset committed after each event
}
Pros:
- ✅ Minimal reprocessing on failure
- ✅ Precise offset tracking
Cons:
- ❌ Lower throughput (many database writes)
- ❌ Higher database load
Periodic
Commit every N seconds:
options.CommitStrategy = OffsetCommitStrategy.Periodic;
options.CommitInterval = TimeSpan.FromSeconds(30);
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
await ProcessEventAsync(@event);
// Offset committed every 30 seconds
}
Pros:
- ✅ Predictable commit frequency
- ✅ Reduces database writes
Cons:
- ❌ May reprocess many events on failure
- ❌ Offset lag during processing
Manual
Explicit commit control:
options.CommitStrategy = OffsetCommitStrategy.Manual;
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
await ProcessEventAsync(@event);
// Explicit commit
await _offsetStore.CommitOffsetAsync(
streamName: "orders",
groupId: "email-notifications",
consumerId: "worker-1",
offset: @event.Offset);
}
Pros:
- ✅ Full control over when to commit
- ✅ Can commit conditionally
Cons:
- ❌ More complex code
- ❌ Easy to forget commits
Fault Tolerance
Resume from Last Committed Offset
When a consumer restarts:
// Consumer crashes at offset 1500
// Last committed offset: 1400
// On restart
await foreach (var @event in _consumerGroup.ConsumeAsync(...))
{
// Resumes from offset 1401
// Events 1401-1500 will be reprocessed
}
Idempotent Event Handlers
Handle reprocessing gracefully:
private async Task ProcessEventAsync(StoredEvent @event)
{
// Check if already processed (idempotency)
if (await _repository.IsEventProcessedAsync(@event.EventId))
{
_logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId);
return;
}
// Process event
await SendEmailAsync(@event);
// Mark as processed
await _repository.MarkEventProcessedAsync(@event.EventId);
}
Querying Offsets
Get Consumer Offset
public class OffsetQuery
{
private readonly IConsumerOffsetStore _offsetStore;
public async Task<long> GetConsumerOffsetAsync(
string streamName,
string groupId,
string consumerId)
{
return await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);
}
}
Get All Consumers in Group
public async Task<List<ConsumerInfo>> GetGroupConsumersAsync(
string streamName,
string groupId)
{
return await _offsetStore.GetConsumersAsync(streamName, groupId);
}
Calculate Consumer Lag
public async Task<long> CalculateLagAsync(
string streamName,
string groupId,
string consumerId)
{
var streamHead = await GetStreamHeadAsync(streamName);
var consumerOffset = await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);
return streamHead - consumerOffset;
}
Resetting Offsets
Reset to Beginning
await _offsetStore.ResetOffsetAsync(
streamName: "orders",
groupId: "email-notifications",
consumerId: "worker-1",
newOffset: 0); // Start from beginning
Reset to Specific Offset
await _offsetStore.ResetOffsetAsync(
"orders",
"email-notifications",
"worker-1",
newOffset: 5000); // Skip to offset 5000
Reset to Latest
var streamHead = await GetStreamHeadAsync("orders");
await _offsetStore.ResetOffsetAsync(
"orders",
"email-notifications",
"worker-1",
newOffset: streamHead); // Skip all lag
Best Practices
✅ DO
- Use AfterBatch for production (best performance)
- Implement idempotent event handlers
- Monitor consumer lag
- Reset offsets only when necessary
- Track offset commits in metrics
❌ DON'T
- Don't use AfterEach unless absolutely necessary
- Don't modify offsets manually in database
- Don't skip error handling
- Don't forget to test reprocessing scenarios