280 lines
6.2 KiB
Markdown
280 lines
6.2 KiB
Markdown
# Offset Management
|
|
|
|
Understanding how consumer group offsets are tracked and committed.
|
|
|
|
## Overview
|
|
|
|
Offsets represent the position of each consumer in a stream. The consumer group framework automatically tracks and commits offsets, enabling fault-tolerant event processing with exactly-once-per-group semantics.
|
|
|
|
## How Offsets Work
|
|
|
|
```
|
|
Stream: orders
|
|
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
|
|
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ ← Events
|
|
└─────┴─────┴─────┴─────┴─────┴─────┴─────┘
|
|
↑ ↑
|
|
│ └─ Consumer B (offset: 3)
|
|
└─ Consumer A (offset: 1)
|
|
```
|
|
|
|
Consumer A has processed events 0-1. Next read starts at offset 2.
|
|
Consumer B has processed events 0-3. Next read starts at offset 4.
|
|
|
|
## Offset Storage
|
|
|
|
Offsets are stored in PostgreSQL:
|
|
|
|
```sql
|
|
SELECT * FROM consumer_offsets;
|
|
```
|
|
|
|
| stream_name | group_id | consumer_id | offset | updated_at |
|
|
|-------------|----------|-------------|--------|------------|
|
|
| orders | email-notif | worker-1 | 1542 | 2025-12-10 10:30:00 |
|
|
| orders | email-notif | worker-2 | 1540 | 2025-12-10 10:30:01 |
|
|
|
|
## Commit Strategies
|
|
|
|
### AfterBatch (Recommended)
|
|
|
|
Commit after processing a batch of events:
|
|
|
|
```csharp
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
"orders",
|
|
"email-notifications",
|
|
"worker-1",
|
|
options: new ConsumerGroupOptions
|
|
{
|
|
BatchSize = 100,
|
|
CommitStrategy = OffsetCommitStrategy.AfterBatch
|
|
}))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
// Offset committed automatically after batch of 100
|
|
}
|
|
```
|
|
|
|
**Pros:**
|
|
- ✅ Best performance (fewer database writes)
|
|
- ✅ Good balance between safety and throughput
|
|
|
|
**Cons:**
|
|
- ❌ May reprocess up to BatchSize events on failure
|
|
|
|
### AfterEach
|
|
|
|
Commit after processing each event:
|
|
|
|
```csharp
|
|
options.CommitStrategy = OffsetCommitStrategy.AfterEach;
|
|
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
// Offset committed after each event
|
|
}
|
|
```
|
|
|
|
**Pros:**
|
|
- ✅ Minimal reprocessing on failure
|
|
- ✅ Precise offset tracking
|
|
|
|
**Cons:**
|
|
- ❌ Lower throughput (many database writes)
|
|
- ❌ Higher database load
|
|
|
|
### Periodic
|
|
|
|
Commit every N seconds:
|
|
|
|
```csharp
|
|
options.CommitStrategy = OffsetCommitStrategy.Periodic;
|
|
options.CommitInterval = TimeSpan.FromSeconds(30);
|
|
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
// Offset committed every 30 seconds
|
|
}
|
|
```
|
|
|
|
**Pros:**
|
|
- ✅ Predictable commit frequency
|
|
- ✅ Reduces database writes
|
|
|
|
**Cons:**
|
|
- ❌ May reprocess many events on failure
|
|
- ❌ Offset lag during processing
|
|
|
|
### Manual
|
|
|
|
Explicit commit control:
|
|
|
|
```csharp
|
|
options.CommitStrategy = OffsetCommitStrategy.Manual;
|
|
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
|
|
// Explicit commit
|
|
await _offsetStore.CommitOffsetAsync(
|
|
streamName: "orders",
|
|
groupId: "email-notifications",
|
|
consumerId: "worker-1",
|
|
offset: @event.Offset);
|
|
}
|
|
```
|
|
|
|
**Pros:**
|
|
- ✅ Full control over when to commit
|
|
- ✅ Can commit conditionally
|
|
|
|
**Cons:**
|
|
- ❌ More complex code
|
|
- ❌ Easy to forget commits
|
|
|
|
## Fault Tolerance
|
|
|
|
### Resume from Last Committed Offset
|
|
|
|
When a consumer restarts:
|
|
|
|
```csharp
|
|
// Consumer crashes at offset 1500
|
|
// Last committed offset: 1400
|
|
|
|
// On restart
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(...))
|
|
{
|
|
// Resumes from offset 1401
|
|
// Events 1401-1500 will be reprocessed
|
|
}
|
|
```
|
|
|
|
### Idempotent Event Handlers
|
|
|
|
Handle reprocessing gracefully:
|
|
|
|
```csharp
|
|
private async Task ProcessEventAsync(StoredEvent @event)
|
|
{
|
|
// Check if already processed (idempotency)
|
|
if (await _repository.IsEventProcessedAsync(@event.EventId))
|
|
{
|
|
_logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId);
|
|
return;
|
|
}
|
|
|
|
// Process event
|
|
await SendEmailAsync(@event);
|
|
|
|
// Mark as processed
|
|
await _repository.MarkEventProcessedAsync(@event.EventId);
|
|
}
|
|
```
|
|
|
|
## Querying Offsets
|
|
|
|
### Get Consumer Offset
|
|
|
|
```csharp
|
|
public class OffsetQuery
|
|
{
|
|
private readonly IConsumerOffsetStore _offsetStore;
|
|
|
|
public async Task<long> GetConsumerOffsetAsync(
|
|
string streamName,
|
|
string groupId,
|
|
string consumerId)
|
|
{
|
|
return await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Get All Consumers in Group
|
|
|
|
```csharp
|
|
public async Task<List<ConsumerInfo>> GetGroupConsumersAsync(
|
|
string streamName,
|
|
string groupId)
|
|
{
|
|
return await _offsetStore.GetConsumersAsync(streamName, groupId);
|
|
}
|
|
```
|
|
|
|
### Calculate Consumer Lag
|
|
|
|
```csharp
|
|
public async Task<long> CalculateLagAsync(
|
|
string streamName,
|
|
string groupId,
|
|
string consumerId)
|
|
{
|
|
var streamHead = await GetStreamHeadAsync(streamName);
|
|
var consumerOffset = await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);
|
|
|
|
return streamHead - consumerOffset;
|
|
}
|
|
```
|
|
|
|
## Resetting Offsets
|
|
|
|
### Reset to Beginning
|
|
|
|
```csharp
|
|
await _offsetStore.ResetOffsetAsync(
|
|
streamName: "orders",
|
|
groupId: "email-notifications",
|
|
consumerId: "worker-1",
|
|
newOffset: 0); // Start from beginning
|
|
```
|
|
|
|
### Reset to Specific Offset
|
|
|
|
```csharp
|
|
await _offsetStore.ResetOffsetAsync(
|
|
"orders",
|
|
"email-notifications",
|
|
"worker-1",
|
|
newOffset: 5000); // Skip to offset 5000
|
|
```
|
|
|
|
### Reset to Latest
|
|
|
|
```csharp
|
|
var streamHead = await GetStreamHeadAsync("orders");
|
|
|
|
await _offsetStore.ResetOffsetAsync(
|
|
"orders",
|
|
"email-notifications",
|
|
"worker-1",
|
|
newOffset: streamHead); // Skip all lag
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use AfterBatch for production (best performance)
|
|
- Implement idempotent event handlers
|
|
- Monitor consumer lag
|
|
- Reset offsets only when necessary
|
|
- Track offset commits in metrics
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't use AfterEach unless absolutely necessary
|
|
- Don't modify offsets manually in database
|
|
- Don't skip error handling
|
|
- Don't forget to test reprocessing scenarios
|
|
|
|
## See Also
|
|
|
|
- [Consumer Groups Overview](README.md)
|
|
- [Commit Strategies](commit-strategies.md)
|
|
- [Fault Tolerance](fault-tolerance.md)
|