345 lines
7.1 KiB
Markdown
345 lines
7.1 KiB
Markdown
# Load Balancing
|
|
|
|
Scaling event processing horizontally with multiple consumers.
|
|
|
|
## Overview
|
|
|
|
Consumer groups automatically load-balance events across multiple consumers in the same group. This enables horizontal scaling by simply adding more worker instances.
|
|
|
|
## How Load Balancing Works
|
|
|
|
```
|
|
Stream: orders (1000 events/sec)
|
|
|
|
Consumer Group: order-processing
|
|
├─ worker-1: Processes events 1, 4, 7, 10, 13... (333 events/sec)
|
|
├─ worker-2: Processes events 2, 5, 8, 11, 14... (333 events/sec)
|
|
└─ worker-3: Processes events 3, 6, 9, 12, 15... (334 events/sec)
|
|
|
|
Total throughput: 1000 events/sec
|
|
```
|
|
|
|
## Scaling Patterns
|
|
|
|
### Vertical Scaling (Single Consumer)
|
|
|
|
```csharp
|
|
// One consumer processing all events
|
|
var options = new ConsumerGroupOptions
|
|
{
|
|
BatchSize = 1000, // Larger batches
|
|
CommitStrategy = OffsetCommitStrategy.AfterBatch
|
|
};
|
|
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
"orders",
|
|
"order-processing",
|
|
"single-worker"))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
**Limits:**
|
|
- CPU-bound (single core)
|
|
- Memory-bound (single process)
|
|
- I/O-bound (single connection)
|
|
|
|
### Horizontal Scaling (Multiple Consumers)
|
|
|
|
```csharp
|
|
// Three consumers in same group
|
|
// Each processes 1/3 of events
|
|
|
|
// Worker 1
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
"orders",
|
|
"order-processing",
|
|
"worker-1"))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
|
|
// Worker 2
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
"orders",
|
|
"order-processing",
|
|
"worker-2"))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
|
|
// Worker 3
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
"orders",
|
|
"order-processing",
|
|
"worker-3"))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
**Benefits:**
|
|
- Linear scalability
|
|
- Fault tolerance
|
|
- Faster processing
|
|
|
|
## Running Multiple Instances
|
|
|
|
### Docker
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
services:
|
|
worker:
|
|
image: myapp:latest
|
|
deploy:
|
|
replicas: 5 # 5 instances
|
|
environment:
|
|
ConnectionStrings__EventStore: "Host=postgres;..."
|
|
WorkerId: "{{.Task.Slot}}"
|
|
```
|
|
|
|
### Kubernetes
|
|
|
|
```yaml
|
|
apiVersion: apps/v1
|
|
kind: Deployment
|
|
metadata:
|
|
name: order-processor
|
|
spec:
|
|
replicas: 10 # 10 instances
|
|
selector:
|
|
matchLabels:
|
|
app: order-processor
|
|
template:
|
|
metadata:
|
|
labels:
|
|
app: order-processor
|
|
spec:
|
|
containers:
|
|
- name: worker
|
|
image: myapp:latest
|
|
env:
|
|
- name: POD_NAME
|
|
valueFrom:
|
|
fieldRef:
|
|
fieldPath: metadata.name
|
|
```
|
|
|
|
### Multiple Processes (Local)
|
|
|
|
```bash
|
|
# Terminal 1
|
|
dotnet run --WorkerId=1 &
|
|
|
|
# Terminal 2
|
|
dotnet run --WorkerId=2 &
|
|
|
|
# Terminal 3
|
|
dotnet run --WorkerId=3 &
|
|
|
|
# All 3 processes share workload
|
|
```
|
|
|
|
## Performance Scaling
|
|
|
|
### Throughput vs Consumer Count
|
|
|
|
| Consumers | Events/sec | Per Consumer |
|
|
|-----------|------------|--------------|
|
|
| 1 | 1,000 | 1,000 |
|
|
| 2 | 2,000 | 1,000 |
|
|
| 4 | 4,000 | 1,000 |
|
|
| 8 | 7,500 | 938 |
|
|
| 16 | 13,000 | 813 |
|
|
|
|
**Note:** Diminishing returns due to coordination overhead.
|
|
|
|
### Optimal Consumer Count
|
|
|
|
```
|
|
Optimal Consumers = Stream Throughput / Target Per-Consumer Throughput
|
|
|
|
Example:
|
|
Stream: 10,000 events/sec
|
|
Target per consumer: 1,000 events/sec
|
|
Optimal: 10 consumers
|
|
```
|
|
|
|
## Dynamic Scaling
|
|
|
|
### Auto-Scaling Based on Lag
|
|
|
|
```csharp
|
|
public class AutoScaler : BackgroundService
|
|
{
|
|
protected override async Task ExecuteAsync(CancellationToken ct)
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
var lag = await GetConsumerLagAsync();
|
|
|
|
if (lag > 10000)
|
|
{
|
|
await ScaleUpAsync(); // Add more consumers
|
|
}
|
|
else if (lag < 1000)
|
|
{
|
|
await ScaleDownAsync(); // Remove consumers
|
|
}
|
|
|
|
await Task.Delay(TimeSpan.FromMinutes(1), ct);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Kubernetes HPA (Horizontal Pod Autoscaler)
|
|
|
|
```yaml
|
|
apiVersion: autoscaling/v2
|
|
kind: HorizontalPodAutoscaler
|
|
metadata:
|
|
name: order-processor-hpa
|
|
spec:
|
|
scaleTargetRef:
|
|
apiVersion: apps/v1
|
|
kind: Deployment
|
|
name: order-processor
|
|
minReplicas: 2
|
|
maxReplicas: 20
|
|
metrics:
|
|
- type: Pods
|
|
pods:
|
|
metric:
|
|
name: consumer_lag
|
|
target:
|
|
type: AverageValue
|
|
averageValue: "1000"
|
|
```
|
|
|
|
## Coordination
|
|
|
|
### Event Distribution
|
|
|
|
Consumer groups use offset-based distribution:
|
|
|
|
```
|
|
Event offset % consumer_count = consumer_index
|
|
|
|
Example (3 consumers):
|
|
Event 0: 0 % 3 = 0 → Consumer 0
|
|
Event 1: 1 % 3 = 1 → Consumer 1
|
|
Event 2: 2 % 3 = 2 → Consumer 2
|
|
Event 3: 3 % 3 = 0 → Consumer 0
|
|
Event 4: 4 % 3 = 1 → Consumer 1
|
|
```
|
|
|
|
**Guaranteed:**
|
|
- Each event processed by exactly one consumer
|
|
- No duplicate processing within group
|
|
- Deterministic distribution
|
|
|
|
### Rebalancing
|
|
|
|
When a consumer joins or leaves:
|
|
|
|
```
|
|
Before:
|
|
Worker-1: Events 1, 3, 5, 7...
|
|
Worker-2: Events 2, 4, 6, 8...
|
|
|
|
Worker-3 joins:
|
|
|
|
After:
|
|
Worker-1: Events 1, 4, 7, 10...
|
|
Worker-2: Events 2, 5, 8, 11...
|
|
Worker-3: Events 3, 6, 9, 12...
|
|
```
|
|
|
|
Rebalancing happens automatically and transparently.
|
|
|
|
## Monitoring
|
|
|
|
### Consumer Distribution
|
|
|
|
```csharp
|
|
public async Task MonitorLoadBalanceAsync()
|
|
{
|
|
var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing");
|
|
|
|
foreach (var consumer in consumers)
|
|
{
|
|
var eventsProcessed = consumer.Offset;
|
|
_logger.LogInformation(
|
|
"Consumer {ConsumerId}: {Events} events processed",
|
|
consumer.ConsumerId,
|
|
eventsProcessed);
|
|
}
|
|
|
|
// Check for imbalance
|
|
var avgEvents = consumers.Average(c => c.Offset);
|
|
var maxDeviation = consumers.Max(c => Math.Abs(c.Offset - avgEvents));
|
|
|
|
if (maxDeviation > avgEvents * 0.2) // > 20% deviation
|
|
{
|
|
_logger.LogWarning("Load imbalance detected");
|
|
}
|
|
}
|
|
```
|
|
|
|
### Throughput per Consumer
|
|
|
|
```csharp
|
|
public async Task MeasureThroughputAsync()
|
|
{
|
|
var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing");
|
|
|
|
foreach (var consumer in consumers)
|
|
{
|
|
var initialOffset = consumer.Offset;
|
|
await Task.Delay(TimeSpan.FromSeconds(60));
|
|
|
|
var updatedConsumer = await _offsetStore.GetConsumerAsync(
|
|
"orders",
|
|
"order-processing",
|
|
consumer.ConsumerId);
|
|
|
|
var eventsProcessed = updatedConsumer.Offset - initialOffset;
|
|
var throughput = eventsProcessed / 60.0; // Events per second
|
|
|
|
_logger.LogInformation(
|
|
"Consumer {ConsumerId}: {Throughput:F2} events/sec",
|
|
consumer.ConsumerId,
|
|
throughput);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Start with 2-4 consumers
|
|
- Scale based on lag monitoring
|
|
- Use unique consumer IDs
|
|
- Monitor throughput per consumer
|
|
- Set appropriate batch sizes
|
|
- Test rebalancing scenarios
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't over-provision (too many consumers)
|
|
- Don't use same consumer ID for multiple instances
|
|
- Don't ignore load imbalance
|
|
- Don't scale without monitoring
|
|
- Don't forget about database connection limits
|
|
|
|
## See Also
|
|
|
|
- [Consumer Groups Overview](README.md)
|
|
- [Fault Tolerance](fault-tolerance.md)
|
|
- [Health Checks](../../observability/health-checks/consumer-health.md)
|
|
- [Performance Best Practices](../../best-practices/performance.md)
|