dotnet-cqrs/docs/event-streaming/consumer-groups/load-balancing.md

7.1 KiB

Load Balancing

Scaling event processing horizontally with multiple consumers.

Overview

Consumer groups automatically load-balance events across multiple consumers in the same group. This enables horizontal scaling by simply adding more worker instances.

How Load Balancing Works

Stream: orders (1000 events/sec)

Consumer Group: order-processing
├─ worker-1: Processes events 1, 4, 7, 10, 13... (333 events/sec)
├─ worker-2: Processes events 2, 5, 8, 11, 14... (333 events/sec)
└─ worker-3: Processes events 3, 6, 9, 12, 15... (334 events/sec)

Total throughput: 1000 events/sec

Scaling Patterns

Vertical Scaling (Single Consumer)

// One consumer processing all events
var options = new ConsumerGroupOptions
{
    BatchSize = 1000,  // Larger batches
    CommitStrategy = OffsetCommitStrategy.AfterBatch
};

await foreach (var @event in _consumerGroup.ConsumeAsync(
    "orders",
    "order-processing",
    "single-worker"))
{
    await ProcessEventAsync(@event);
}

Limits:

  • CPU-bound (single core)
  • Memory-bound (single process)
  • I/O-bound (single connection)

Horizontal Scaling (Multiple Consumers)

// Three consumers in same group
// Each processes 1/3 of events

// Worker 1
await foreach (var @event in _consumerGroup.ConsumeAsync(
    "orders",
    "order-processing",
    "worker-1"))
{
    await ProcessEventAsync(@event);
}

// Worker 2
await foreach (var @event in _consumerGroup.ConsumeAsync(
    "orders",
    "order-processing",
    "worker-2"))
{
    await ProcessEventAsync(@event);
}

// Worker 3
await foreach (var @event in _consumerGroup.ConsumeAsync(
    "orders",
    "order-processing",
    "worker-3"))
{
    await ProcessEventAsync(@event);
}

Benefits:

  • Linear scalability
  • Fault tolerance
  • Faster processing

Running Multiple Instances

Docker

version: '3.8'
services:
  worker:
    image: myapp:latest
    deploy:
      replicas: 5  # 5 instances
    environment:
      ConnectionStrings__EventStore: "Host=postgres;..."
      WorkerId: "{{.Task.Slot}}"

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-processor
spec:
  replicas: 10  # 10 instances
  selector:
    matchLabels:
      app: order-processor
  template:
    metadata:
      labels:
        app: order-processor
    spec:
      containers:
      - name: worker
        image: myapp:latest
        env:
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name

Multiple Processes (Local)

# Terminal 1
dotnet run --WorkerId=1 &

# Terminal 2
dotnet run --WorkerId=2 &

# Terminal 3
dotnet run --WorkerId=3 &

# All 3 processes share workload

Performance Scaling

Throughput vs Consumer Count

Consumers Events/sec Per Consumer
1 1,000 1,000
2 2,000 1,000
4 4,000 1,000
8 7,500 938
16 13,000 813

Note: Diminishing returns due to coordination overhead.

Optimal Consumer Count

Optimal Consumers = Stream Throughput / Target Per-Consumer Throughput

Example:
Stream: 10,000 events/sec
Target per consumer: 1,000 events/sec
Optimal: 10 consumers

Dynamic Scaling

Auto-Scaling Based on Lag

public class AutoScaler : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            var lag = await GetConsumerLagAsync();

            if (lag > 10000)
            {
                await ScaleUpAsync();  // Add more consumers
            }
            else if (lag < 1000)
            {
                await ScaleDownAsync();  // Remove consumers
            }

            await Task.Delay(TimeSpan.FromMinutes(1), ct);
        }
    }
}

Kubernetes HPA (Horizontal Pod Autoscaler)

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: order-processor-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: order-processor
  minReplicas: 2
  maxReplicas: 20
  metrics:
  - type: Pods
    pods:
      metric:
        name: consumer_lag
      target:
        type: AverageValue
        averageValue: "1000"

Coordination

Event Distribution

Consumer groups use offset-based distribution:

Event offset % consumer_count = consumer_index

Example (3 consumers):
Event 0: 0 % 3 = 0 → Consumer 0
Event 1: 1 % 3 = 1 → Consumer 1
Event 2: 2 % 3 = 2 → Consumer 2
Event 3: 3 % 3 = 0 → Consumer 0
Event 4: 4 % 3 = 1 → Consumer 1

Guaranteed:

  • Each event processed by exactly one consumer
  • No duplicate processing within group
  • Deterministic distribution

Rebalancing

When a consumer joins or leaves:

Before:
  Worker-1: Events 1, 3, 5, 7...
  Worker-2: Events 2, 4, 6, 8...

Worker-3 joins:

After:
  Worker-1: Events 1, 4, 7, 10...
  Worker-2: Events 2, 5, 8, 11...
  Worker-3: Events 3, 6, 9, 12...

Rebalancing happens automatically and transparently.

Monitoring

Consumer Distribution

public async Task MonitorLoadBalanceAsync()
{
    var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing");

    foreach (var consumer in consumers)
    {
        var eventsProcessed = consumer.Offset;
        _logger.LogInformation(
            "Consumer {ConsumerId}: {Events} events processed",
            consumer.ConsumerId,
            eventsProcessed);
    }

    // Check for imbalance
    var avgEvents = consumers.Average(c => c.Offset);
    var maxDeviation = consumers.Max(c => Math.Abs(c.Offset - avgEvents));

    if (maxDeviation > avgEvents * 0.2)  // > 20% deviation
    {
        _logger.LogWarning("Load imbalance detected");
    }
}

Throughput per Consumer

public async Task MeasureThroughputAsync()
{
    var consumers = await _offsetStore.GetConsumersAsync("orders", "order-processing");

    foreach (var consumer in consumers)
    {
        var initialOffset = consumer.Offset;
        await Task.Delay(TimeSpan.FromSeconds(60));

        var updatedConsumer = await _offsetStore.GetConsumerAsync(
            "orders",
            "order-processing",
            consumer.ConsumerId);

        var eventsProcessed = updatedConsumer.Offset - initialOffset;
        var throughput = eventsProcessed / 60.0;  // Events per second

        _logger.LogInformation(
            "Consumer {ConsumerId}: {Throughput:F2} events/sec",
            consumer.ConsumerId,
            throughput);
    }
}

Best Practices

DO

  • Start with 2-4 consumers
  • Scale based on lag monitoring
  • Use unique consumer IDs
  • Monitor throughput per consumer
  • Set appropriate batch sizes
  • Test rebalancing scenarios

DON'T

  • Don't over-provision (too many consumers)
  • Don't use same consumer ID for multiple instances
  • Don't ignore load imbalance
  • Don't scale without monitoring
  • Don't forget about database connection limits

See Also