7.1 KiB
7.1 KiB
Load Balancing
Scaling event processing horizontally with multiple consumers.
Overview
Consumer groups automatically load-balance events across multiple consumers in the same group. This enables horizontal scaling by simply adding more worker instances.
How Load Balancing Works
Stream: orders (1000 events/sec)
Consumer Group: order-processing
├─ worker-1: Processes events 1, 4, 7, 10, 13... (333 events/sec)
├─ worker-2: Processes events 2, 5, 8, 11, 14... (333 events/sec)
└─ worker-3: Processes events 3, 6, 9, 12, 15... (334 events/sec)
Total throughput: 1000 events/sec
Scaling Patterns
Vertical Scaling (Single Consumer)
// One consumer processing all events
var options = new ConsumerGroupOptions
{
BatchSize = 1000, // Larger batches
CommitStrategy = OffsetCommitStrategy.AfterBatch
};
await foreach (var @event in _consumerGroup.ConsumeAsync(
"orders",
"order-processing",
"single-worker"))
{
await ProcessEventAsync(@event);
}
Limits:
- CPU-bound (single core)
- Memory-bound (single process)
- I/O-bound (single connection)
Horizontal Scaling (Multiple Consumers)
// Three consumers in same group
// Each processes 1/3 of events
// Worker 1
await foreach (var @event in _consumerGroup.ConsumeAsync(
"orders",
"order-processing",
"worker-1"))
{
await ProcessEventAsync(@event);
}
// Worker 2
await foreach (var @event in _consumerGroup.ConsumeAsync(
"orders",
"order-processing",
"worker-2"))
{
await ProcessEventAsync(@event);
}
// Worker 3
await foreach (var @event in _consumerGroup.ConsumeAsync(
"orders",
"order-processing",
"worker-3"))
{
await ProcessEventAsync(@event);
}
Benefits:
- Linear scalability
- Fault tolerance
- Faster processing
Running Multiple Instances
Docker
version: '3.8'
services:
worker:
image: myapp:latest
deploy:
replicas: 5 # 5 instances
environment:
ConnectionStrings__EventStore: "Host=postgres;..."
WorkerId: "{{.Task.Slot}}"
Kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-processor
spec:
replicas: 10 # 10 instances
selector:
matchLabels:
app: order-processor
template:
metadata:
labels:
app: order-processor
spec:
containers:
- name: worker
image: myapp:latest
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
Multiple Processes (Local)
# Terminal 1
dotnet run --WorkerId=1 &
# Terminal 2
dotnet run --WorkerId=2 &
# Terminal 3
dotnet run --WorkerId=3 &
# All 3 processes share workload
Performance Scaling
Throughput vs Consumer Count
| Consumers | Events/sec | Per Consumer |
|---|---|---|
| 1 | 1,000 | 1,000 |
| 2 | 2,000 | 1,000 |
| 4 | 4,000 | 1,000 |
| 8 | 7,500 | 938 |
| 16 | 13,000 | 813 |
Note: Diminishing returns due to coordination overhead.
Optimal Consumer Count
Optimal Consumers = Stream Throughput / Target Per-Consumer Throughput
Example:
Stream: 10,000 events/sec
Target per consumer: 1,000 events/sec
Optimal: 10 consumers
Dynamic Scaling
Auto-Scaling Based on Lag
public class AutoScaler : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var lag = await GetConsumerLagAsync();
if (lag > 10000)
{
await ScaleUpAsync(); // Add more consumers
}
else if (lag < 1000)
{
await ScaleDownAsync(); // Remove consumers
}
await Task.Delay(TimeSpan.FromMinutes(1), ct);
}
}
}
Kubernetes HPA (Horizontal Pod Autoscaler)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-processor-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-processor
minReplicas: 2
maxReplicas: 20
metrics:
- type: Pods
pods:
metric:
name: consumer_lag
target:
type: AverageValue
averageValue: "1000"
Coordination
Event Distribution
Consumer groups use offset-based distribution:
Event offset % consumer_count = consumer_index
Example (3 consumers):
Event 0: 0 % 3 = 0 → Consumer 0
Event 1: 1 % 3 = 1 → Consumer 1
Event 2: 2 % 3 = 2 → Consumer 2
Event 3: 3 % 3 = 0 → Consumer 0
Event 4: 4 % 3 = 1 → Consumer 1
Guaranteed:
- Each event processed by exactly one consumer
- No duplicate processing within group
- Deterministic distribution
Rebalancing
When a consumer joins or leaves:
Before:
Worker-1: Events 1, 3, 5, 7...
Worker-2: Events 2, 4, 6, 8...
Worker-3 joins:
After:
Worker-1: Events 1, 4, 7, 10...
Worker-2: Events 2, 5, 8, 11...
Worker-3: Events 3, 6, 9, 12...
Rebalancing happens automatically and transparently.
Monitoring
Consumer Distribution
public async Task MonitorLoadBalanceAsync()
{
var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing");
foreach (var consumer in consumers)
{
var eventsProcessed = consumer.Offset;
_logger.LogInformation(
"Consumer {ConsumerId}: {Events} events processed",
consumer.ConsumerId,
eventsProcessed);
}
// Check for imbalance
var avgEvents = consumers.Average(c => c.Offset);
var maxDeviation = consumers.Max(c => Math.Abs(c.Offset - avgEvents));
if (maxDeviation > avgEvents * 0.2) // > 20% deviation
{
_logger.LogWarning("Load imbalance detected");
}
}
Throughput per Consumer
public async Task MeasureThroughputAsync()
{
var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing");
foreach (var consumer in consumers)
{
var initialOffset = consumer.Offset;
await Task.Delay(TimeSpan.FromSeconds(60));
var updatedConsumer = await _offsetStore.GetConsumerAsync(
"orders",
"order-processing",
consumer.ConsumerId);
var eventsProcessed = updatedConsumer.Offset - initialOffset;
var throughput = eventsProcessed / 60.0; // Events per second
_logger.LogInformation(
"Consumer {ConsumerId}: {Throughput:F2} events/sec",
consumer.ConsumerId,
throughput);
}
}
Best Practices
✅ DO
- Start with 2-4 consumers
- Scale based on lag monitoring
- Use unique consumer IDs
- Monitor throughput per consumer
- Set appropriate batch sizes
- Test rebalancing scenarios
❌ DON'T
- Don't over-provision (too many consumers)
- Don't use same consumer ID for multiple instances
- Don't ignore load imbalance
- Don't scale without monitoring
- Don't forget about database connection limits