dotnet-cqrs/docs/event-streaming/consumer-groups/getting-started.md

7.8 KiB

Getting Started with Consumer Groups

Create your first consumer group for scalable event processing.

Installation

dotnet add package Svrnty.CQRS.Events.PostgreSQL
dotnet add package Svrnty.CQRS.Events.ConsumerGroups

Configuration

appsettings.json:

{
  "ConnectionStrings": {
    "EventStore": "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"
  },
  "EventStreaming": {
    "ConsumerGroups": {
      "HeartbeatInterval": "00:00:10",
      "SessionTimeout": "00:00:30",
      "CleanupInterval": "00:01:00"
    }
  }
}

Program.cs:

using Svrnty.CQRS.Events.PostgreSQL;
using Svrnty.CQRS.Events.ConsumerGroups;

var builder = WebApplication.CreateBuilder(args);

// PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetConnectionString("EventStore"));

// Consumer groups
builder.Services.AddPostgresConsumerGroups(
    builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));

var app = builder.Build();
app.Run();

Basic Consumer

public class EmailNotificationWorker : BackgroundService
{
    private readonly IConsumerGroupReader _consumerGroup;
    private readonly IEmailService _emailService;
    private readonly ILogger<EmailNotificationWorker> _logger;
    private readonly string _consumerId;

    public EmailNotificationWorker(
        IConsumerGroupReader consumerGroup,
        IEmailService emailService,
        ILogger<EmailNotificationWorker> logger)
    {
        _consumerGroup = consumerGroup;
        _emailService = emailService;
        _logger = logger;
        _consumerId = $"{Environment.MachineName}-{Guid.NewGuid():N}";
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Email worker {ConsumerId} started", _consumerId);

        try
        {
            await foreach (var @event in _consumerGroup.ConsumeAsync(
                streamName: "orders",
                groupId: "email-notifications",
                consumerId: _consumerId,
                options: new ConsumerGroupOptions
                {
                    BatchSize = 100,
                    CommitStrategy = OffsetCommitStrategy.AfterBatch
                },
                cancellationToken: stoppingToken))
            {
                try
                {
                    await ProcessEventAsync(@event);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error processing event {EventId}", @event.EventId);
                    // Offset not committed, will retry
                }
            }
        }
        catch (OperationCanceledException)
        {
            _logger.LogInformation("Email worker {ConsumerId} stopping", _consumerId);
        }
        finally
        {
            _logger.LogInformation("Email worker {ConsumerId} stopped", _consumerId);
        }
    }

    private async Task ProcessEventAsync(StoredEvent @event)
    {
        var eventData = JsonSerializer.Deserialize(
            @event.Data,
            Type.GetType(@event.EventType));

        switch (eventData)
        {
            case OrderPlacedEvent placed:
                await _emailService.SendOrderConfirmationAsync(
                    placed.CustomerEmail,
                    placed.OrderId);
                break;

            case OrderShippedEvent shipped:
                await _emailService.SendShippingNotificationAsync(
                    shipped.CustomerEmail,
                    shipped.OrderId,
                    shipped.TrackingNumber);
                break;
        }

        _logger.LogInformation(
            "Processed {EventType} at offset {Offset}",
            @event.EventType,
            @event.Offset);
    }
}

Registration

var builder = WebApplication.CreateBuilder(args);

// Event streaming
builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetConnectionString("EventStore"));

// Consumer groups
builder.Services.AddPostgresConsumerGroups(
    builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));

// Services
builder.Services.AddSingleton<IEmailService, EmailService>();

// Background worker
builder.Services.AddHostedService<EmailNotificationWorker>();

var app = builder.Build();
app.Run();

Running Multiple Workers

Scale horizontally by running multiple instances:

# Terminal 1
dotnet run --WorkerId=1

# Terminal 2
dotnet run --WorkerId=2

# Terminal 3
dotnet run --WorkerId=3

# Events automatically load-balanced across all 3 workers

Each worker processes different events:

  • Worker 1: Events 1, 4, 7, 10...
  • Worker 2: Events 2, 5, 8, 11...
  • Worker 3: Events 3, 6, 9, 12...

Consumer Group Options

var options = new ConsumerGroupOptions
{
    // Batch size (events per read)
    BatchSize = 100,

    // When to commit offsets
    CommitStrategy = OffsetCommitStrategy.AfterBatch,

    // Heartbeat interval
    HeartbeatInterval = TimeSpan.FromSeconds(10),

    // Session timeout
    SessionTimeout = TimeSpan.FromSeconds(30)
};

await foreach (var @event in _consumerGroup.ConsumeAsync(
    "orders",
    "email-notifications",
    _consumerId,
    options))
{
    // Process event
}

Monitoring

Check Consumer Status

public class ConsumerMonitor
{
    private readonly IConsumerOffsetStore _offsetStore;

    public async Task MonitorConsumersAsync()
    {
        // Get all consumers in group
        var consumers = await _offsetStore.GetConsumersAsync(
            streamName: "orders",
            groupId: "email-notifications");

        foreach (var consumer in consumers)
        {
            Console.WriteLine($"Consumer: {consumer.ConsumerId}");
            Console.WriteLine($"  Offset: {consumer.Offset}");
            Console.WriteLine($"  Updated: {consumer.UpdatedAt}");
        }
    }
}

Check Consumer Lag

public async Task<long> GetConsumerLagAsync(
    string streamName,
    string groupId,
    string consumerId)
{
    // Get stream head
    var streamHead = await GetStreamHeadAsync(streamName);

    // Get consumer offset
    var consumerOffset = await _offsetStore.GetOffsetAsync(
        streamName,
        groupId,
        consumerId);

    // Calculate lag
    return streamHead - consumerOffset;
}

Testing

Unit Testing with Mock

public class EmailNotificationWorkerTests
{
    [Fact]
    public async Task ProcessEvent_SendsEmail()
    {
        // Arrange
        var mockConsumerGroup = new Mock<IConsumerGroupReader>();
        var mockEmailService = new Mock<IEmailService>();

        var events = new[]
        {
            CreateStoredEvent(new OrderPlacedEvent
            {
                OrderId = 123,
                CustomerEmail = "test@example.com"
            })
        }.ToAsyncEnumerable();

        mockConsumerGroup
            .Setup(x => x.ConsumeAsync(
                It.IsAny<string>(),
                It.IsAny<string>(),
                It.IsAny<string>(),
                It.IsAny<ConsumerGroupOptions>(),
                It.IsAny<CancellationToken>()))
            .Returns(events);

        var worker = new EmailNotificationWorker(
            mockConsumerGroup.Object,
            mockEmailService.Object,
            Mock.Of<ILogger<EmailNotificationWorker>>());

        // Act
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
        await worker.StartAsync(cts.Token);
        await Task.Delay(500);
        await worker.StopAsync(CancellationToken.None);

        // Assert
        mockEmailService.Verify(
            x => x.SendOrderConfirmationAsync("test@example.com", 123),
            Times.Once);
    }
}

See Also