dotnet-cqrs/docs/event-streaming/consumer-groups/commit-strategies.md

6.7 KiB

Commit Strategies

Choosing the right offset commit strategy for your workload.

Overview

Commit strategies determine when consumer offsets are persisted to storage. The choice affects performance, fault tolerance, and potential reprocessing.

Strategy Comparison

Strategy Performance Reprocessing Use Case
AfterBatch ≤ BatchSize events Production (recommended)
AfterEach ≤ 1 event Critical data, low volume
Periodic Variable Time-based workflows
Manual Depends on impl Custom control needed

AfterBatch Strategy

Default and recommended for most scenarios.

Configuration

var options = new ConsumerGroupOptions
{
    BatchSize = 100,
    CommitStrategy = OffsetCommitStrategy.AfterBatch
};

await foreach (var @event in _consumerGroup.ConsumeAsync(
    "orders",
    "order-processing",
    "worker-1",
    options))
{
    await ProcessEventAsync(@event);
    // Offset committed after processing 100 events
}

Behavior

Process event 1 → Process event 2 → ... → Process event 100 → Commit offset 100
Process event 101 → Process event 102 → ... → Process event 200 → Commit offset 200

Performance

  • Database writes: 1 per batch
  • Throughput: ~10,000-50,000 events/sec
  • Latency: Low

Reprocessing

On failure, up to BatchSize events may be reprocessed:

Processed: events 1-150
Committed: offset 100
Crash
On restart: Reprocess events 101-150

Best For

  • High-throughput scenarios
  • Idempotent event handlers
  • Production workloads

AfterEach Strategy

Commit after every single event.

Configuration

var options = new ConsumerGroupOptions
{
    CommitStrategy = OffsetCommitStrategy.AfterEach
};

await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
    await ProcessEventAsync(@event);
    // Offset committed immediately after each event
}

Behavior

Process event 1 → Commit offset 1
Process event 2 → Commit offset 2
Process event 3 → Commit offset 3

Performance

  • Database writes: 1 per event
  • Throughput: ~1,000-5,000 events/sec
  • Latency: Higher (DB roundtrip per event)

Reprocessing

Minimal reprocessing on failure:

Processed: events 1-150
Committed: offset 150
Crash
On restart: Resume from event 151 (no reprocessing)

Best For

  • Critical financial transactions
  • Non-idempotent operations
  • Low-volume streams (< 1000 events/sec)
  • When reprocessing is unacceptable

Periodic Strategy

Commit every N seconds.

Configuration

var options = new ConsumerGroupOptions
{
    CommitStrategy = OffsetCommitStrategy.Periodic,
    CommitInterval = TimeSpan.FromSeconds(30)
};

await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
    await ProcessEventAsync(@event);
    // Offset committed every 30 seconds
}

Behavior

T=0:  Process events 1-500
T=30: Commit offset 500
T=60: Commit offset 1200 (processed 500-1200)
T=90: Commit offset 1800

Performance

  • Database writes: 1 per time interval
  • Throughput: High
  • Latency: Low

Reprocessing

Variable based on processing speed:

T=0:  Processed events 1-500
T=15: Processed events 501-1000
T=29: Processed events 1001-1500 (not committed yet)
Crash
On restart: Reprocess events 501-1500

Best For

  • Time-based workflows
  • Analytics pipelines
  • When consistent commit intervals matter
  • Reducing database load

Manual Strategy

Explicit control over commits.

Configuration

var options = new ConsumerGroupOptions
{
    CommitStrategy = OffsetCommitStrategy.Manual
};

await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
    await ProcessEventAsync(@event);

    // Explicit commit decision
    if (ShouldCommit(@event))
    {
        await _offsetStore.CommitOffsetAsync(
            "orders",
            "order-processing",
            "worker-1",
            @event.Offset);
    }
}

Use Cases

Conditional commits:

// Commit only on specific events
if (@event.EventType == "OrderCompletedEvent")
{
    await _offsetStore.CommitOffsetAsync(...);
}

Transaction-based commits:

using var transaction = await _dbContext.Database.BeginTransactionAsync();

try
{
    await ProcessEventAsync(@event);
    await _offsetStore.CommitOffsetAsync(...);
    await transaction.CommitAsync();
}
catch
{
    await transaction.RollbackAsync();
    throw;
}

Batch with custom size:

var processedCount = 0;

await foreach (var @event in _consumerGroup.ConsumeAsync(...))
{
    await ProcessEventAsync(@event);
    processedCount++;

    // Custom batch size
    if (processedCount >= 500)
    {
        await _offsetStore.CommitOffsetAsync(..., @event.Offset);
        processedCount = 0;
    }
}

Best For

  • Complex commit logic
  • Transaction coordination
  • Custom batching requirements
  • Advanced scenarios

Choosing a Strategy

Decision Tree

Is throughput > 10,000 events/sec?
├─ Yes → Use AfterBatch (BatchSize: 1000-5000)
└─ No
    └─ Can you handle reprocessing?
        ├─ Yes → Use AfterBatch (BatchSize: 100-1000)
        └─ No
            └─ Is volume < 1000 events/sec?
                ├─ Yes → Use AfterEach
                └─ No → Use AfterBatch + idempotency

Recommendations

High-throughput (> 10k events/sec):

new ConsumerGroupOptions
{
    BatchSize = 5000,
    CommitStrategy = OffsetCommitStrategy.AfterBatch
}

Medium-throughput (1k-10k events/sec):

new ConsumerGroupOptions
{
    BatchSize = 1000,
    CommitStrategy = OffsetCommitStrategy.AfterBatch
}

Low-throughput (< 1k events/sec):

new ConsumerGroupOptions
{
    BatchSize = 100,
    CommitStrategy = OffsetCommitStrategy.AfterEach  // Or AfterBatch with small batch
}

Time-sensitive workflows:

new ConsumerGroupOptions
{
    CommitStrategy = OffsetCommitStrategy.Periodic,
    CommitInterval = TimeSpan.FromMinutes(1)
}

Best Practices

DO

  • Use AfterBatch for production
  • Set appropriate batch sizes (100-5000)
  • Implement idempotent handlers
  • Monitor commit lag
  • Test reprocessing scenarios

DON'T

  • Don't use AfterEach for high-volume streams
  • Don't set very large batch sizes (> 10000)
  • Don't forget to commit in Manual mode
  • Don't ignore reprocessing implications

See Also