# Offset Management Understanding how consumer group offsets are tracked and committed. ## Overview Offsets represent the position of each consumer in a stream. The consumer group framework automatically tracks and commits offsets, enabling fault-tolerant event processing with exactly-once-per-group semantics. ## How Offsets Work ``` Stream: orders ┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐ │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ ← Events └─────┴─────┴─────┴─────┴─────┴─────┴─────┘ ↑ ↑ │ └─ Consumer B (offset: 3) └─ Consumer A (offset: 1) ``` Consumer A has processed events 0-1. Next read starts at offset 2. Consumer B has processed events 0-3. Next read starts at offset 4. ## Offset Storage Offsets are stored in PostgreSQL: ```sql SELECT * FROM consumer_offsets; ``` | stream_name | group_id | consumer_id | offset | updated_at | |-------------|----------|-------------|--------|------------| | orders | email-notif | worker-1 | 1542 | 2025-12-10 10:30:00 | | orders | email-notif | worker-2 | 1540 | 2025-12-10 10:30:01 | ## Commit Strategies ### AfterBatch (Recommended) Commit after processing a batch of events: ```csharp await foreach (var @event in _consumerGroup.ConsumeAsync( "orders", "email-notifications", "worker-1", options: new ConsumerGroupOptions { BatchSize = 100, CommitStrategy = OffsetCommitStrategy.AfterBatch })) { await ProcessEventAsync(@event); // Offset committed automatically after batch of 100 } ``` **Pros:** - ✅ Best performance (fewer database writes) - ✅ Good balance between safety and throughput **Cons:** - ❌ May reprocess up to BatchSize events on failure ### AfterEach Commit after processing each event: ```csharp options.CommitStrategy = OffsetCommitStrategy.AfterEach; await foreach (var @event in _consumerGroup.ConsumeAsync(..., options)) { await ProcessEventAsync(@event); // Offset committed after each event } ``` **Pros:** - ✅ Minimal reprocessing on failure - ✅ Precise offset tracking **Cons:** - ❌ Lower throughput (many database writes) - ❌ Higher database load ### Periodic Commit every N seconds: ```csharp options.CommitStrategy = OffsetCommitStrategy.Periodic; options.CommitInterval = TimeSpan.FromSeconds(30); await foreach (var @event in _consumerGroup.ConsumeAsync(..., options)) { await ProcessEventAsync(@event); // Offset committed every 30 seconds } ``` **Pros:** - ✅ Predictable commit frequency - ✅ Reduces database writes **Cons:** - ❌ May reprocess many events on failure - ❌ Offset lag during processing ### Manual Explicit commit control: ```csharp options.CommitStrategy = OffsetCommitStrategy.Manual; await foreach (var @event in _consumerGroup.ConsumeAsync(..., options)) { await ProcessEventAsync(@event); // Explicit commit await _offsetStore.CommitOffsetAsync( streamName: "orders", groupId: "email-notifications", consumerId: "worker-1", offset: @event.Offset); } ``` **Pros:** - ✅ Full control over when to commit - ✅ Can commit conditionally **Cons:** - ❌ More complex code - ❌ Easy to forget commits ## Fault Tolerance ### Resume from Last Committed Offset When a consumer restarts: ```csharp // Consumer crashes at offset 1500 // Last committed offset: 1400 // On restart await foreach (var @event in _consumerGroup.ConsumeAsync(...)) { // Resumes from offset 1401 // Events 1401-1500 will be reprocessed } ``` ### Idempotent Event Handlers Handle reprocessing gracefully: ```csharp private async Task ProcessEventAsync(StoredEvent @event) { // Check if already processed (idempotency) if (await _repository.IsEventProcessedAsync(@event.EventId)) { _logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId); return; } // Process event await SendEmailAsync(@event); // Mark as processed await _repository.MarkEventProcessedAsync(@event.EventId); } ``` ## Querying Offsets ### Get Consumer Offset ```csharp public class OffsetQuery { private readonly IConsumerOffsetStore _offsetStore; public async Task GetConsumerOffsetAsync( string streamName, string groupId, string consumerId) { return await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId); } } ``` ### Get All Consumers in Group ```csharp public async Task> GetGroupConsumersAsync( string streamName, string groupId) { return await _offsetStore.GetConsumersAsync(streamName, groupId); } ``` ### Calculate Consumer Lag ```csharp public async Task CalculateLagAsync( string streamName, string groupId, string consumerId) { var streamHead = await GetStreamHeadAsync(streamName); var consumerOffset = await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId); return streamHead - consumerOffset; } ``` ## Resetting Offsets ### Reset to Beginning ```csharp await _offsetStore.ResetOffsetAsync( streamName: "orders", groupId: "email-notifications", consumerId: "worker-1", newOffset: 0); // Start from beginning ``` ### Reset to Specific Offset ```csharp await _offsetStore.ResetOffsetAsync( "orders", "email-notifications", "worker-1", newOffset: 5000); // Skip to offset 5000 ``` ### Reset to Latest ```csharp var streamHead = await GetStreamHeadAsync("orders"); await _offsetStore.ResetOffsetAsync( "orders", "email-notifications", "worker-1", newOffset: streamHead); // Skip all lag ``` ## Best Practices ### ✅ DO - Use AfterBatch for production (best performance) - Implement idempotent event handlers - Monitor consumer lag - Reset offsets only when necessary - Track offset commits in metrics ### ❌ DON'T - Don't use AfterEach unless absolutely necessary - Don't modify offsets manually in database - Don't skip error handling - Don't forget to test reprocessing scenarios ## See Also - [Consumer Groups Overview](README.md) - [Commit Strategies](commit-strategies.md) - [Fault Tolerance](fault-tolerance.md)