# Load Balancing Scaling event processing horizontally with multiple consumers. ## Overview Consumer groups automatically load-balance events across multiple consumers in the same group. This enables horizontal scaling by simply adding more worker instances. ## How Load Balancing Works ``` Stream: orders (1000 events/sec) Consumer Group: order-processing ├─ worker-1: Processes events 1, 4, 7, 10, 13... (333 events/sec) ├─ worker-2: Processes events 2, 5, 8, 11, 14... (333 events/sec) └─ worker-3: Processes events 3, 6, 9, 12, 15... (334 events/sec) Total throughput: 1000 events/sec ``` ## Scaling Patterns ### Vertical Scaling (Single Consumer) ```csharp // One consumer processing all events var options = new ConsumerGroupOptions { BatchSize = 1000, // Larger batches CommitStrategy = OffsetCommitStrategy.AfterBatch }; await foreach (var @event in _consumerGroup.ConsumeAsync( "orders", "order-processing", "single-worker")) { await ProcessEventAsync(@event); } ``` **Limits:** - CPU-bound (single core) - Memory-bound (single process) - I/O-bound (single connection) ### Horizontal Scaling (Multiple Consumers) ```csharp // Three consumers in same group // Each processes 1/3 of events // Worker 1 await foreach (var @event in _consumerGroup.ConsumeAsync( "orders", "order-processing", "worker-1")) { await ProcessEventAsync(@event); } // Worker 2 await foreach (var @event in _consumerGroup.ConsumeAsync( "orders", "order-processing", "worker-2")) { await ProcessEventAsync(@event); } // Worker 3 await foreach (var @event in _consumerGroup.ConsumeAsync( "orders", "order-processing", "worker-3")) { await ProcessEventAsync(@event); } ``` **Benefits:** - Linear scalability - Fault tolerance - Faster processing ## Running Multiple Instances ### Docker ```yaml version: '3.8' services: worker: image: myapp:latest deploy: replicas: 5 # 5 instances environment: ConnectionStrings__EventStore: "Host=postgres;..." WorkerId: "{{.Task.Slot}}" ``` ### Kubernetes ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: order-processor spec: replicas: 10 # 10 instances selector: matchLabels: app: order-processor template: metadata: labels: app: order-processor spec: containers: - name: worker image: myapp:latest env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name ``` ### Multiple Processes (Local) ```bash # Terminal 1 dotnet run --WorkerId=1 & # Terminal 2 dotnet run --WorkerId=2 & # Terminal 3 dotnet run --WorkerId=3 & # All 3 processes share workload ``` ## Performance Scaling ### Throughput vs Consumer Count | Consumers | Events/sec | Per Consumer | |-----------|------------|--------------| | 1 | 1,000 | 1,000 | | 2 | 2,000 | 1,000 | | 4 | 4,000 | 1,000 | | 8 | 7,500 | 938 | | 16 | 13,000 | 813 | **Note:** Diminishing returns due to coordination overhead. ### Optimal Consumer Count ``` Optimal Consumers = Stream Throughput / Target Per-Consumer Throughput Example: Stream: 10,000 events/sec Target per consumer: 1,000 events/sec Optimal: 10 consumers ``` ## Dynamic Scaling ### Auto-Scaling Based on Lag ```csharp public class AutoScaler : BackgroundService { protected override async Task ExecuteAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { var lag = await GetConsumerLagAsync(); if (lag > 10000) { await ScaleUpAsync(); // Add more consumers } else if (lag < 1000) { await ScaleDownAsync(); // Remove consumers } await Task.Delay(TimeSpan.FromMinutes(1), ct); } } } ``` ### Kubernetes HPA (Horizontal Pod Autoscaler) ```yaml apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: order-processor-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: order-processor minReplicas: 2 maxReplicas: 20 metrics: - type: Pods pods: metric: name: consumer_lag target: type: AverageValue averageValue: "1000" ``` ## Coordination ### Event Distribution Consumer groups use offset-based distribution: ``` Event offset % consumer_count = consumer_index Example (3 consumers): Event 0: 0 % 3 = 0 → Consumer 0 Event 1: 1 % 3 = 1 → Consumer 1 Event 2: 2 % 3 = 2 → Consumer 2 Event 3: 3 % 3 = 0 → Consumer 0 Event 4: 4 % 3 = 1 → Consumer 1 ``` **Guaranteed:** - Each event processed by exactly one consumer - No duplicate processing within group - Deterministic distribution ### Rebalancing When a consumer joins or leaves: ``` Before: Worker-1: Events 1, 3, 5, 7... Worker-2: Events 2, 4, 6, 8... Worker-3 joins: After: Worker-1: Events 1, 4, 7, 10... Worker-2: Events 2, 5, 8, 11... Worker-3: Events 3, 6, 9, 12... ``` Rebalancing happens automatically and transparently. ## Monitoring ### Consumer Distribution ```csharp public async Task MonitorLoadBalanceAsync() { var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing"); foreach (var consumer in consumers) { var eventsProcessed = consumer.Offset; _logger.LogInformation( "Consumer {ConsumerId}: {Events} events processed", consumer.ConsumerId, eventsProcessed); } // Check for imbalance var avgEvents = consumers.Average(c => c.Offset); var maxDeviation = consumers.Max(c => Math.Abs(c.Offset - avgEvents)); if (maxDeviation > avgEvents * 0.2) // > 20% deviation { _logger.LogWarning("Load imbalance detected"); } } ``` ### Throughput per Consumer ```csharp public async Task MeasureThroughputAsync() { var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing"); foreach (var consumer in consumers) { var initialOffset = consumer.Offset; await Task.Delay(TimeSpan.FromSeconds(60)); var updatedConsumer = await _offsetStore.GetConsumerAsync( "orders", "order-processing", consumer.ConsumerId); var eventsProcessed = updatedConsumer.Offset - initialOffset; var throughput = eventsProcessed / 60.0; // Events per second _logger.LogInformation( "Consumer {ConsumerId}: {Throughput:F2} events/sec", consumer.ConsumerId, throughput); } } ``` ## Best Practices ### ✅ DO - Start with 2-4 consumers - Scale based on lag monitoring - Use unique consumer IDs - Monitor throughput per consumer - Set appropriate batch sizes - Test rebalancing scenarios ### ❌ DON'T - Don't over-provision (too many consumers) - Don't use same consumer ID for multiple instances - Don't ignore load imbalance - Don't scale without monitoring - Don't forget about database connection limits ## See Also - [Consumer Groups Overview](README.md) - [Fault Tolerance](fault-tolerance.md) - [Health Checks](../../observability/health-checks/consumer-health.md) - [Performance Best Practices](../../best-practices/performance.md)