dotnet-cqrs/docs/event-streaming/consumer-groups/fault-tolerance.md

8.2 KiB

Fault Tolerance

Heartbeat monitoring, stale consumer cleanup, and recovery mechanisms.

Overview

Consumer groups provide fault tolerance through heartbeat monitoring, automatic stale consumer cleanup, and offset-based recovery. These mechanisms ensure reliable event processing even when consumers fail.

Heartbeat Monitoring

How It Works

Consumer registers → Send heartbeat every 10s → Last heartbeat tracked in DB

If no heartbeat for 30s → Consumer marked as stale → Removed from group

Configuration

var options = new ConsumerGroupOptions
{
    HeartbeatInterval = TimeSpan.FromSeconds(10),   // Send heartbeat every 10s
    SessionTimeout = TimeSpan.FromSeconds(30)        // Mark stale after 30s
};

await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
{
    // Heartbeats sent automatically in background
    await ProcessEventAsync(@event);
}

Heartbeat Storage

SELECT * FROM consumer_registrations;
stream_name group_id consumer_id last_heartbeat session_timeout_ms
orders email-notif worker-1 2025-12-10 10:30:00 30000
orders email-notif worker-2 2025-12-10 10:29:45 30000

Stale Consumer Cleanup

Automatic Cleanup

Background service removes stale consumers:

builder.Services.AddPostgresConsumerGroups(options =>
{
    options.CleanupInterval = TimeSpan.FromMinutes(1);  // Run cleanup every minute
});

Cleanup Function

CREATE OR REPLACE FUNCTION cleanup_stale_consumers()
RETURNS INTEGER AS $$
DECLARE
    deleted_count INTEGER;
BEGIN
    DELETE FROM consumer_registrations
    WHERE last_heartbeat < NOW() - (session_timeout_ms || ' milliseconds')::INTERVAL;

    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;

Manual Cleanup

public class ConsumerCleanup
{
    private readonly IConsumerOffsetStore _offsetStore;

    public async Task RemoveStaleConsumersAsync()
    {
        var removed = await _offsetStore.CleanupStaleConsumersAsync();
        _logger.LogInformation("Removed {Count} stale consumers", removed);
    }
}

Consumer Failure Recovery

Scenario 1: Graceful Shutdown

Consumer stops cleanly:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    try
    {
        await foreach (var @event in _consumerGroup.ConsumeAsync(..., stoppingToken))
        {
            await ProcessEventAsync(@event);
        }
    }
    catch (OperationCanceledException)
    {
        _logger.LogInformation("Shutting down gracefully");
        // Consumer unregisters, offset committed
    }
}

Recovery:

  • Offset committed before shutdown
  • Consumer unregistered from group
  • No reprocessing needed

Scenario 2: Unexpected Crash

Consumer crashes without cleanup:

T=0:  Consumer processes events 1-500, commits offset 500
T=10: Consumer processes events 501-1000 (not committed)
T=15: Consumer crashes
T=45: Stale consumer removed (30s timeout)
T=46: New consumer starts, resumes from offset 501

Recovery:

  • Events 501-1000 reprocessed
  • Requires idempotent handlers

Scenario 3: Network Partition

Consumer loses connectivity:

T=0:  Consumer processing normally
T=10: Network partition (no heartbeats sent)
T=40: Consumer marked stale, removed from group
T=45: Network restored
T=46: Consumer attempts to commit offset → Fails (not registered)
T=47: Consumer re-registers and resumes

Recovery:

  • Consumer re-registers automatically
  • Resumes from last committed offset
  • Some events may be reprocessed

Idempotent Event Handlers

Track Processed Events

public class IdempotentOrderProcessor
{
    private readonly IProcessedEventRepository _repository;

    public async Task ProcessEventAsync(StoredEvent @event)
    {
        // Check if already processed
        if (await _repository.IsProcessedAsync(@event.EventId))
        {
            _logger.LogInformation(
                "Event {EventId} already processed, skipping",
                @event.EventId);
            return;
        }

        // Process event
        await ProcessOrderAsync(@event);

        // Mark as processed
        await _repository.MarkProcessedAsync(@event.EventId, @event.Offset);
    }
}

Database Schema

CREATE TABLE processed_events (
    event_id TEXT PRIMARY KEY,
    offset BIGINT NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_processed_events_offset ON processed_events(offset);

With EF Core

public class ProcessedEvent
{
    public string EventId { get; set; } = string.Empty;
    public long Offset { get; set; }
    public DateTimeOffset ProcessedAt { get; set; }
}

public class ApplicationDbContext : DbContext
{
    public DbSet<ProcessedEvent> ProcessedEvents { get; set; }
}

Monitoring Stale Consumers

Health Check

public class ConsumerHealthCheck : IHealthCheck
{
    private readonly IConsumerOffsetStore _offsetStore;

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken cancellationToken)
    {
        var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications");

        var staleConsumers = consumers.Where(c =>
            DateTimeOffset.UtcNow - c.LastHeartbeat > TimeSpan.FromMinutes(1));

        if (staleConsumers.Any())
        {
            return HealthCheckResult.Degraded(
                $"Stale consumers: {string.Join(", ", staleConsumers.Select(c => c.ConsumerId))}");
        }

        return HealthCheckResult.Healthy($"{consumers.Count} active consumers");
    }
}

Monitoring Service

public class ConsumerMonitor : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications");

            foreach (var consumer in consumers)
            {
                var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat;

                if (timeSinceHeartbeat > TimeSpan.FromSeconds(20))
                {
                    _logger.LogWarning(
                        "Consumer {ConsumerId} has not sent heartbeat for {Seconds}s",
                        consumer.ConsumerId,
                        timeSinceHeartbeat.TotalSeconds);
                }

                _metrics.RecordGauge(
                    "consumer.heartbeat.age",
                    timeSinceHeartbeat.TotalSeconds,
                    new[] { new KeyValuePair<string, object>("consumer_id", consumer.ConsumerId) });
            }

            await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
        }
    }
}

Split-Brain Prevention

Consumer ID Uniqueness

Always use unique consumer IDs:

// ✅ Good - Unique per instance
var consumerId = $"{Environment.MachineName}-{Process.GetCurrentProcess().Id}-{Guid.NewGuid():N}";

// ❌ Bad - Same ID across instances
var consumerId = "worker";  // Multiple instances will conflict

Registration Check

public async Task<bool> IsConsumerActiveAsync(string streamName, string groupId, string consumerId)
{
    var consumers = await _offsetStore.GetConsumersAsync(streamName, groupId);

    var consumer = consumers.FirstOrDefault(c => c.ConsumerId == consumerId);

    if (consumer == null)
        return false;

    var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat;

    return timeSinceHeartbeat < consumer.SessionTimeout;
}

Best Practices

DO

  • Use unique consumer IDs
  • Implement idempotent event handlers
  • Monitor stale consumers
  • Set appropriate session timeouts (30-60s)
  • Log heartbeat failures
  • Test failure scenarios

DON'T

  • Don't reuse consumer IDs
  • Don't ignore stale consumer warnings
  • Don't set very short timeouts (< 10s)
  • Don't skip idempotency checks
  • Don't process without error handling

See Also