332 lines
8.2 KiB
Markdown
332 lines
8.2 KiB
Markdown
# Fault Tolerance
|
|
|
|
Heartbeat monitoring, stale consumer cleanup, and recovery mechanisms.
|
|
|
|
## Overview
|
|
|
|
Consumer groups provide fault tolerance through heartbeat monitoring, automatic stale consumer cleanup, and offset-based recovery. These mechanisms ensure reliable event processing even when consumers fail.
|
|
|
|
## Heartbeat Monitoring
|
|
|
|
### How It Works
|
|
|
|
```
|
|
Consumer registers → Send heartbeat every 10s → Last heartbeat tracked in DB
|
|
|
|
If no heartbeat for 30s → Consumer marked as stale → Removed from group
|
|
```
|
|
|
|
### Configuration
|
|
|
|
```csharp
|
|
var options = new ConsumerGroupOptions
|
|
{
|
|
HeartbeatInterval = TimeSpan.FromSeconds(10), // Send heartbeat every 10s
|
|
SessionTimeout = TimeSpan.FromSeconds(30) // Mark stale after 30s
|
|
};
|
|
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(..., options))
|
|
{
|
|
// Heartbeats sent automatically in background
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
### Heartbeat Storage
|
|
|
|
```sql
|
|
SELECT * FROM consumer_registrations;
|
|
```
|
|
|
|
| stream_name | group_id | consumer_id | last_heartbeat | session_timeout_ms |
|
|
|-------------|----------|-------------|----------------|---------------------|
|
|
| orders | email-notif | worker-1 | 2025-12-10 10:30:00 | 30000 |
|
|
| orders | email-notif | worker-2 | 2025-12-10 10:29:45 | 30000 |
|
|
|
|
## Stale Consumer Cleanup
|
|
|
|
### Automatic Cleanup
|
|
|
|
Background service removes stale consumers:
|
|
|
|
```csharp
|
|
builder.Services.AddPostgresConsumerGroups(options =>
|
|
{
|
|
options.CleanupInterval = TimeSpan.FromMinutes(1); // Run cleanup every minute
|
|
});
|
|
```
|
|
|
|
### Cleanup Function
|
|
|
|
```sql
|
|
CREATE OR REPLACE FUNCTION cleanup_stale_consumers()
|
|
RETURNS INTEGER AS $$
|
|
DECLARE
|
|
deleted_count INTEGER;
|
|
BEGIN
|
|
DELETE FROM consumer_registrations
|
|
WHERE last_heartbeat < NOW() - (session_timeout_ms || ' milliseconds')::INTERVAL;
|
|
|
|
GET DIAGNOSTICS deleted_count = ROW_COUNT;
|
|
RETURN deleted_count;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
```
|
|
|
|
### Manual Cleanup
|
|
|
|
```csharp
|
|
public class ConsumerCleanup
|
|
{
|
|
private readonly IConsumerOffsetStore _offsetStore;
|
|
|
|
public async Task RemoveStaleConsumersAsync()
|
|
{
|
|
var removed = await _offsetStore.CleanupStaleConsumersAsync();
|
|
_logger.LogInformation("Removed {Count} stale consumers", removed);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Consumer Failure Recovery
|
|
|
|
### Scenario 1: Graceful Shutdown
|
|
|
|
Consumer stops cleanly:
|
|
|
|
```csharp
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
try
|
|
{
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(..., stoppingToken))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.LogInformation("Shutting down gracefully");
|
|
// Consumer unregisters, offset committed
|
|
}
|
|
}
|
|
```
|
|
|
|
**Recovery:**
|
|
- Offset committed before shutdown
|
|
- Consumer unregistered from group
|
|
- No reprocessing needed
|
|
|
|
### Scenario 2: Unexpected Crash
|
|
|
|
Consumer crashes without cleanup:
|
|
|
|
```
|
|
T=0: Consumer processes events 1-500, commits offset 500
|
|
T=10: Consumer processes events 501-1000 (not committed)
|
|
T=15: Consumer crashes
|
|
T=45: Stale consumer removed (30s timeout)
|
|
T=46: New consumer starts, resumes from offset 501
|
|
```
|
|
|
|
**Recovery:**
|
|
- Events 501-1000 reprocessed
|
|
- Requires idempotent handlers
|
|
|
|
### Scenario 3: Network Partition
|
|
|
|
Consumer loses connectivity:
|
|
|
|
```
|
|
T=0: Consumer processing normally
|
|
T=10: Network partition (no heartbeats sent)
|
|
T=40: Consumer marked stale, removed from group
|
|
T=45: Network restored
|
|
T=46: Consumer attempts to commit offset → Fails (not registered)
|
|
T=47: Consumer re-registers and resumes
|
|
```
|
|
|
|
**Recovery:**
|
|
- Consumer re-registers automatically
|
|
- Resumes from last committed offset
|
|
- Some events may be reprocessed
|
|
|
|
## Idempotent Event Handlers
|
|
|
|
### Track Processed Events
|
|
|
|
```csharp
|
|
public class IdempotentOrderProcessor
|
|
{
|
|
private readonly IProcessedEventRepository _repository;
|
|
|
|
public async Task ProcessEventAsync(StoredEvent @event)
|
|
{
|
|
// Check if already processed
|
|
if (await _repository.IsProcessedAsync(@event.EventId))
|
|
{
|
|
_logger.LogInformation(
|
|
"Event {EventId} already processed, skipping",
|
|
@event.EventId);
|
|
return;
|
|
}
|
|
|
|
// Process event
|
|
await ProcessOrderAsync(@event);
|
|
|
|
// Mark as processed
|
|
await _repository.MarkProcessedAsync(@event.EventId, @event.Offset);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Database Schema
|
|
|
|
```sql
|
|
CREATE TABLE processed_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
offset BIGINT NOT NULL,
|
|
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX idx_processed_events_offset ON processed_events(offset);
|
|
```
|
|
|
|
### With EF Core
|
|
|
|
```csharp
|
|
public class ProcessedEvent
|
|
{
|
|
public string EventId { get; set; } = string.Empty;
|
|
public long Offset { get; set; }
|
|
public DateTimeOffset ProcessedAt { get; set; }
|
|
}
|
|
|
|
public class ApplicationDbContext : DbContext
|
|
{
|
|
public DbSet<ProcessedEvent> ProcessedEvents { get; set; }
|
|
}
|
|
```
|
|
|
|
## Monitoring Stale Consumers
|
|
|
|
### Health Check
|
|
|
|
```csharp
|
|
public class ConsumerHealthCheck : IHealthCheck
|
|
{
|
|
private readonly IConsumerOffsetStore _offsetStore;
|
|
|
|
public async Task<HealthCheckResult> CheckHealthAsync(
|
|
HealthCheckContext context,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications");
|
|
|
|
var staleConsumers = consumers.Where(c =>
|
|
DateTimeOffset.UtcNow - c.LastHeartbeat > TimeSpan.FromMinutes(1));
|
|
|
|
if (staleConsumers.Any())
|
|
{
|
|
return HealthCheckResult.Degraded(
|
|
$"Stale consumers: {string.Join(", ", staleConsumers.Select(c => c.ConsumerId))}");
|
|
}
|
|
|
|
return HealthCheckResult.Healthy($"{consumers.Count} active consumers");
|
|
}
|
|
}
|
|
```
|
|
|
|
### Monitoring Service
|
|
|
|
```csharp
|
|
public class ConsumerMonitor : BackgroundService
|
|
{
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
var consumers = await _offsetStore.GetConsumersAsync("orders", "email-notifications");
|
|
|
|
foreach (var consumer in consumers)
|
|
{
|
|
var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat;
|
|
|
|
if (timeSinceHeartbeat > TimeSpan.FromSeconds(20))
|
|
{
|
|
_logger.LogWarning(
|
|
"Consumer {ConsumerId} has not sent heartbeat for {Seconds}s",
|
|
consumer.ConsumerId,
|
|
timeSinceHeartbeat.TotalSeconds);
|
|
}
|
|
|
|
_metrics.RecordGauge(
|
|
"consumer.heartbeat.age",
|
|
timeSinceHeartbeat.TotalSeconds,
|
|
new[] { new KeyValuePair<string, object>("consumer_id", consumer.ConsumerId) });
|
|
}
|
|
|
|
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Split-Brain Prevention
|
|
|
|
### Consumer ID Uniqueness
|
|
|
|
Always use unique consumer IDs:
|
|
|
|
```csharp
|
|
// ✅ Good - Unique per instance
|
|
var consumerId = $"{Environment.MachineName}-{Process.GetCurrentProcess().Id}-{Guid.NewGuid():N}";
|
|
|
|
// ❌ Bad - Same ID across instances
|
|
var consumerId = "worker"; // Multiple instances will conflict
|
|
```
|
|
|
|
### Registration Check
|
|
|
|
```csharp
|
|
public async Task<bool> IsConsumerActiveAsync(string streamName, string groupId, string consumerId)
|
|
{
|
|
var consumers = await _offsetStore.GetConsumersAsync(streamName, groupId);
|
|
|
|
var consumer = consumers.FirstOrDefault(c => c.ConsumerId == consumerId);
|
|
|
|
if (consumer == null)
|
|
return false;
|
|
|
|
var timeSinceHeartbeat = DateTimeOffset.UtcNow - consumer.LastHeartbeat;
|
|
|
|
return timeSinceHeartbeat < consumer.SessionTimeout;
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use unique consumer IDs
|
|
- Implement idempotent event handlers
|
|
- Monitor stale consumers
|
|
- Set appropriate session timeouts (30-60s)
|
|
- Log heartbeat failures
|
|
- Test failure scenarios
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't reuse consumer IDs
|
|
- Don't ignore stale consumer warnings
|
|
- Don't set very short timeouts (< 10s)
|
|
- Don't skip idempotency checks
|
|
- Don't process without error handling
|
|
|
|
## See Also
|
|
|
|
- [Consumer Groups Overview](README.md)
|
|
- [Offset Management](offset-management.md)
|
|
- [Health Checks](../../observability/health-checks/consumer-health.md)
|
|
- [Best Practices](../../best-practices/README.md)
|