# Fault Tolerance Heartbeat monitoring, stale consumer cleanup, and recovery mechanisms. ## Overview Consumer groups provide fault tolerance through heartbeat monitoring, automatic stale consumer cleanup, and offset-based recovery. These mechanisms ensure reliable event processing even when consumers fail. ## Heartbeat Monitoring ### How It Works ``` Consumer registers → Send heartbeat every 10s → Last heartbeat tracked in DB If no heartbeat for 30s → Consumer marked as stale → Removed from group ``` ### Configuration ```csharp var options = new ConsumerGroupOptions { HeartbeatInterval = TimeSpan.FromSeconds(10), // Send heartbeat every 10s SessionTimeout = TimeSpan.FromSeconds(30) // Mark stale after 30s }; await foreach (var @event in _consumerGroup.ConsumeAsync(..., options)) { // Heartbeats sent automatically in background await ProcessEventAsync(@event); } ``` ### Heartbeat Storage ```sql SELECT * FROM consumer_registrations; ``` | stream_name | group_id | consumer_id | last_heartbeat | session_timeout_ms | |-------------|----------|-------------|----------------|---------------------| | orders | email-notif | worker-1 | 2025-12-10 10:30:00 | 30000 | | orders | email-notif | worker-2 | 2025-12-10 10:29:45 | 30000 | ## Stale Consumer Cleanup ### Automatic Cleanup Background service removes stale consumers: ```csharp builder.Services.AddPostgresConsumerGroups(options => { options.CleanupInterval = TimeSpan.FromMinutes(1); // Run cleanup every minute }); ``` ### Cleanup Function ```sql CREATE OR REPLACE FUNCTION cleanup_stale_consumers() RETURNS INTEGER AS $$ DECLARE deleted_count INTEGER; BEGIN DELETE FROM consumer_registrations WHERE last_heartbeat < NOW() - (session_timeout_ms || ' milliseconds')::INTERVAL; GET DIAGNOSTICS deleted_count = ROW_COUNT; RETURN deleted_count; END; $$ LANGUAGE plpgsql; ``` ### Manual Cleanup ```csharp public class ConsumerCleanup { private readonly IConsumerOffsetStore _offsetStore; public async Task RemoveStaleConsumersAsync() { var removed = await _offsetStore.CleanupStaleConsumersAsync(); _logger.LogInformation("Removed {Count} stale consumers", removed); } } ``` ## Consumer Failure Recovery ### Scenario 1: Graceful Shutdown Consumer stops cleanly: ```csharp protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { await foreach (var @event in _consumerGroup.ConsumeAsync(..., stoppingToken)) { await ProcessEventAsync(@event); } } catch (OperationCanceledException) { _logger.LogInformation("Shutting down gracefully"); // Consumer unregisters, offset committed } } ``` **Recovery:** - Offset committed before shutdown - Consumer unregistered from group - No reprocessing needed ### Scenario 2: Unexpected Crash Consumer crashes without cleanup: ``` T=0: Consumer processes events 1-500, commits offset 500 T=10: Consumer processes events 501-1000 (not committed) T=15: Consumer crashes T=45: Stale consumer removed (30s timeout) T=46: New consumer starts, resumes from offset 501 ``` **Recovery:** - Events 501-1000 reprocessed - Requires idempotent handlers ### Scenario 3: Network Partition Consumer loses connectivity: ``` T=0: Consumer processing normally T=10: Network partition (no heartbeats sent) T=40: Consumer marked stale, removed from group T=45: Network restored T=46: Consumer attempts to commit offset → Fails (not registered) T=47: Consumer re-registers and resumes ``` **Recovery:** - Consumer re-registers automatically - Resumes from last committed offset - Some events may be reprocessed ## Idempotent Event Handlers ### Track Processed Events ```csharp public class IdempotentOrderProcessor { private readonly IProcessedEventRepository _repository; public async Task ProcessEventAsync(StoredEvent @event) { // Check if already processed if (await _repository.IsProcessedAsync(@event.EventId)) { _logger.LogInformation( "Event {EventId} already processed, skipping", @event.EventId); return; } // Process event await ProcessOrderAsync(@event); // Mark as processed await _repository.MarkProcessedAsync(@event.EventId, @event.Offset); } } ``` ### Database Schema ```sql CREATE TABLE processed_events ( event_id TEXT PRIMARY KEY, offset BIGINT NOT NULL, processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_processed_events_offset ON processed_events(offset); ``` ### With EF Core ```csharp public class ProcessedEvent { public string EventId { get; set; } = string.Empty; public long Offset { get; set; } public DateTimeOffset ProcessedAt { get; set; } } public class ApplicationDbContext : DbContext { public DbSet ProcessedEvents { get; set; } } ``` ## Monitoring Stale Consumers ### Health Check ```csharp public class ConsumerHealthCheck : IHealthCheck { private readonly IConsumerOffsetStore _offsetStore; public async Task CheckHealthAsync( HealthCheckContext context, CancellationToken cancellationToken) { var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications"); var staleConsumers = consumers.Where(c => DateTimeOffset.UtcNow - c.LastHeartbeat > TimeSpan.FromMinutes(1)); if (staleConsumers.Any()) { return HealthCheckResult.Degraded( $"Stale consumers: {string.Join(", ", staleConsumers.Select(c => c.ConsumerId))}"); } return HealthCheckResult.Healthy($"{consumers.Count} active consumers"); } } ``` ### Monitoring Service ```csharp public class ConsumerMonitor : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications"); foreach (var consumer in consumers) { var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat; if (timeSinceHeartbeat > TimeSpan.FromSeconds(20)) { _logger.LogWarning( "Consumer {ConsumerId} has not sent heartbeat for {Seconds}s", consumer.ConsumerId, timeSinceHeartbeat.TotalSeconds); } _metrics.RecordGauge( "consumer.heartbeat.age", timeSinceHeartbeat.TotalSeconds, new[] { new KeyValuePair("consumer_id", consumer.ConsumerId) }); } await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken); } } } ``` ## Split-Brain Prevention ### Consumer ID Uniqueness Always use unique consumer IDs: ```csharp // ✅ Good - Unique per instance var consumerId = $"{Environment.MachineName}-{Process.GetCurrentProcess().Id}-{Guid.NewGuid():N}"; // ❌ Bad - Same ID across instances var consumerId = "worker"; // Multiple instances will conflict ``` ### Registration Check ```csharp public async Task IsConsumerActiveAsync(string streamName, string groupId, string consumerId) { var consumers = await _offsetStore.GetConsumersAsync(streamName, groupId); var consumer = consumers.FirstOrDefault(c => c.ConsumerId == consumerId); if (consumer == null) return false; var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat; return timeSinceHeartbeat < consumer.SessionTimeout; } ``` ## Best Practices ### ✅ DO - Use unique consumer IDs - Implement idempotent event handlers - Monitor stale consumers - Set appropriate session timeouts (30-60s) - Log heartbeat failures - Test failure scenarios ### ❌ DON'T - Don't reuse consumer IDs - Don't ignore stale consumer warnings - Don't set very short timeouts (< 10s) - Don't skip idempotency checks - Don't process without error handling ## See Also - [Consumer Groups Overview](README.md) - [Offset Management](offset-management.md) - [Health Checks](../../observability/health-checks/consumer-health.md) - [Best Practices](../../best-practices/README.md)