# Consumer Groups Coordinated event consumption with automatic offset tracking and load balancing. ## Overview Consumer groups enable multiple consumers to process the same event stream in a coordinated manner. The framework automatically load-balances events across group members and tracks processing offsets for fault tolerance. **Key Features:** - ✅ **Load Balancing** - Events distributed across consumers - ✅ **Offset Tracking** - Automatic position management - ✅ **Fault Tolerance** - Resume from last committed position - ✅ **Exactly-Once per Group** - No duplicate processing within group - ✅ **Heartbeat Monitoring** - Detect and remove stale consumers - ✅ **Flexible Commit Strategies** - Manual, AfterEach, AfterBatch, Periodic ## Quick Start ```csharp using Svrnty.CQRS.Events.ConsumerGroups; var builder = WebApplication.CreateBuilder(args); // Register consumer groups builder.Services.AddPostgresConsumerGroups( builder.Configuration.GetSection("EventStreaming:ConsumerGroups")); var app = builder.Build(); app.Run(); ``` **appsettings.json:** ```json { "EventStreaming": { "ConsumerGroups": { "HeartbeatInterval": "00:00:10", "SessionTimeout": "00:00:30", "CleanupInterval": "00:01:00" } } } ``` ## Core Concepts ### Consumer Group Logical group of consumers processing the same stream: ``` Stream: orders ├─ Group: email-notifications │ ├─ Consumer: worker-1 (processes events 1, 4, 7...) │ ├─ Consumer: worker-2 (processes events 2, 5, 8...) │ └─ Consumer: worker-3 (processes events 3, 6, 9...) └─ Group: analytics └─ Consumer: analytics-1 (processes all events) ``` ### Offset Management Consumer groups track the last processed offset per consumer: ```sql stream_name | group_id | consumer_id | offset | updated_at ------------|------------------|-------------|--------|------------------- orders | email-notif | worker-1 | 1542 | 2025-12-10 10:30:00 orders | email-notif | worker-2 | 1540 | 2025-12-10 10:30:01 orders | analytics | worker-1 | 1520 | 2025-12-10 10:29:50 ``` ### Heartbeats Consumers send periodic heartbeats to prove they're alive: ``` Consumer registers → Sends heartbeats every 10s → Removed if no heartbeat for 30s ``` ## Features ### [Getting Started](getting-started.md) Create your first consumer group with automatic offset management. ### [Offset Management](offset-management.md) Learn how offsets are tracked and committed for fault tolerance. ### [Commit Strategies](commit-strategies.md) Choose between Manual, AfterEach, AfterBatch, and Periodic commit strategies. ### [Fault Tolerance](fault-tolerance.md) Understand heartbeat monitoring, stale consumer cleanup, and recovery. ### [Load Balancing](load-balancing.md) Scale horizontally by adding consumers to a group. ## Usage Pattern ```csharp public class OrderProcessingWorker : BackgroundService { private readonly IConsumerGroupReader _consumerGroup; private readonly string _consumerId; public OrderProcessingWorker(IConsumerGroupReader consumerGroup) { _consumerGroup = consumerGroup; _consumerId = $"worker-{Environment.MachineName}-{Guid.NewGuid():N}"; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await foreach (var @event in _consumerGroup.ConsumeAsync( streamName: "orders", groupId: "order-processing", consumerId: _consumerId, options: new ConsumerGroupOptions { BatchSize = 100, CommitStrategy = OffsetCommitStrategy.AfterBatch, HeartbeatInterval = TimeSpan.FromSeconds(10), SessionTimeout = TimeSpan.FromSeconds(30) }, cancellationToken: stoppingToken)) { await ProcessOrderEventAsync(@event); // Offset committed automatically after batch } } } ``` ## Comparison: Consumer Groups vs Broadcast | Feature | Consumer Group | Broadcast | |---------|---------------|-----------| | **Load Balancing** | ✅ Events split across consumers | ❌ All consumers get all events | | **Scalability** | ✅ Add more consumers | ❌ Fixed processing capacity | | **Use Case** | Background jobs, order processing | Independent projections, analytics | | **Offset Tracking** | ✅ Automatic | Manual checkpoint required | | **Exactly-Once** | ✅ Per group | ❌ Must implement idempotency | ## Configuration ### Options ```csharp public class ConsumerGroupOptions { public int BatchSize { get; set; } = 100; public OffsetCommitStrategy CommitStrategy { get; set; } = OffsetCommitStrategy.AfterBatch; public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10); public TimeSpan SessionTimeout { get; set; } = TimeSpan.FromSeconds(30); } ``` ### Commit Strategies ```csharp public enum OffsetCommitStrategy { Manual, // Explicit commit calls AfterEach, // Commit after each event AfterBatch, // Commit after batch (default) Periodic // Commit every N seconds } ``` ## Best Practices ### ✅ DO - Use consumer groups for scalable processing - Set appropriate batch sizes (100-1000) - Monitor consumer lag - Use AfterBatch for best performance - Implement idempotent handlers - Use unique consumer IDs - Monitor stale consumers ### ❌ DON'T - Don't use same consumer ID for multiple instances - Don't skip error handling - Don't set very large batch sizes (> 10000) - Don't ignore consumer lag warnings - Don't forget to commit offsets - Don't assume exactly-once delivery ## See Also - [Event Streaming Overview](../README.md) - [Subscriptions](../fundamentals/subscriptions.md) - [PostgreSQL Storage](../storage/postgresql-storage.md) - [Health Checks](../observability/health-checks/consumer-health.md)