dotnet-cqrs/docs/event-streaming/consumer-groups/offset-management.md

6.2 KiB

Offset Management

Understanding how consumer group offsets are tracked and committed.

Overview

Offsets represent the position of each consumer in a stream. The consumer group framework automatically tracks and commits offsets, enabling fault-tolerant event processing with exactly-once-per-group semantics.

How Offsets Work

Stream: orders
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│  0  │  1  │  2  │  3  │  4  │  5  │  6  │ ← Events
└─────┴─────┴─────┴─────┴─────┴─────┴─────┘
         ↑           ↑
         │           └─ Consumer B (offset: 3)
         └─ Consumer A (offset: 1)

Consumer A has processed events 0-1. Next read starts at offset 2. Consumer B has processed events 0-3. Next read starts at offset 4.

Offset Storage

Offsets are stored in PostgreSQL:

SELECT * FROM consumer_offsets;
stream_name group_id consumer_id offset updated_at
orders email-notif worker-1 1542 2025-12-10 10:30:00
orders email-notif worker-2 1540 2025-12-10 10:30:01

Commit Strategies

Commit after processing a batch of events:

await foreach (var @event in _consumerGroup.ConsumeAsync(
    "orders",
    "email-notifications",
    "worker-1",
    options: new ConsumerGroupOptions
    {
        BatchSize = 100,
        CommitStrategy = OffsetCommitStrategy.AfterBatch
    }))
{
    await ProcessEventAsync(@event);
    // Offset committed automatically after batch of 100
}

Pros:

  • Best performance (fewer database writes)
  • Good balance between safety and throughput

Cons:

  • May reprocess up to BatchSize events on failure

AfterEach

Commit after processing each event:

options.CommitStrategy = OffsetCommitStrategy.AfterEach;

await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
    await ProcessEventAsync(@event);
    // Offset committed after each event
}

Pros:

  • Minimal reprocessing on failure
  • Precise offset tracking

Cons:

  • Lower throughput (many database writes)
  • Higher database load

Periodic

Commit every N seconds:

options.CommitStrategy = OffsetCommitStrategy.Periodic;
options.CommitInterval = TimeSpan.FromSeconds(30);

await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
    await ProcessEventAsync(@event);
    // Offset committed every 30 seconds
}

Pros:

  • Predictable commit frequency
  • Reduces database writes

Cons:

  • May reprocess many events on failure
  • Offset lag during processing

Manual

Explicit commit control:

options.CommitStrategy = OffsetCommitStrategy.Manual;

await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
    await ProcessEventAsync(@event);

    // Explicit commit
    await _offsetStore.CommitOffsetAsync(
        streamName: "orders",
        groupId: "email-notifications",
        consumerId: "worker-1",
        offset: @event.Offset);
}

Pros:

  • Full control over when to commit
  • Can commit conditionally

Cons:

  • More complex code
  • Easy to forget commits

Fault Tolerance

Resume from Last Committed Offset

When a consumer restarts:

// Consumer crashes at offset 1500
// Last committed offset: 1400

// On restart
await foreach (var @event in _consumerGroup.ConsumeAsync(...))
{
    // Resumes from offset 1401
    // Events 1401-1500 will be reprocessed
}

Idempotent Event Handlers

Handle reprocessing gracefully:

private async Task ProcessEventAsync(StoredEvent @event)
{
    // Check if already processed (idempotency)
    if (await _repository.IsEventProcessedAsync(@event.EventId))
    {
        _logger.LogInformation("Event {EventId} already processed, skipping", @event.EventId);
        return;
    }

    // Process event
    await SendEmailAsync(@event);

    // Mark as processed
    await _repository.MarkEventProcessedAsync(@event.EventId);
}

Querying Offsets

Get Consumer Offset

public class OffsetQuery
{
    private readonly IConsumerOffsetStore _offsetStore;

    public async Task<long> GetConsumerOffsetAsync(
        string streamName,
        string groupId,
        string consumerId)
    {
        return await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);
    }
}

Get All Consumers in Group

public async Task<List<ConsumerInfo>> GetGroupConsumersAsync(
    string streamName,
    string groupId)
{
    return await _offsetStore.GetConsumersAsync(streamName, groupId);
}

Calculate Consumer Lag

public async Task<long> CalculateLagAsync(
    string streamName,
    string groupId,
    string consumerId)
{
    var streamHead = await GetStreamHeadAsync(streamName);
    var consumerOffset = await _offsetStore.GetOffsetAsync(streamName, groupId, consumerId);

    return streamHead - consumerOffset;
}

Resetting Offsets

Reset to Beginning

await _offsetStore.ResetOffsetAsync(
    streamName: "orders",
    groupId: "email-notifications",
    consumerId: "worker-1",
    newOffset: 0);  // Start from beginning

Reset to Specific Offset

await _offsetStore.ResetOffsetAsync(
    "orders",
    "email-notifications",
    "worker-1",
    newOffset: 5000);  // Skip to offset 5000

Reset to Latest

var streamHead = await GetStreamHeadAsync("orders");

await _offsetStore.ResetOffsetAsync(
    "orders",
    "email-notifications",
    "worker-1",
    newOffset: streamHead);  // Skip all lag

Best Practices

DO

  • Use AfterBatch for production (best performance)
  • Implement idempotent event handlers
  • Monitor consumer lag
  • Reset offsets only when necessary
  • Track offset commits in metrics

DON'T

  • Don't use AfterEach unless absolutely necessary
  • Don't modify offsets manually in database
  • Don't skip error handling
  • Don't forget to test reprocessing scenarios

See Also