dotnet-cqrs/docs/event-streaming/consumer-groups/getting-started.md

316 lines
7.8 KiB
Markdown

# Getting Started with Consumer Groups
Create your first consumer group for scalable event processing.
## Installation
```bash
dotnet add package Svrnty.CQRS.Events.PostgreSQL
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
```
## Configuration
**appsettings.json:**
```json
{
"ConnectionStrings": {
"EventStore": "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"
},
"EventStreaming": {
"ConsumerGroups": {
"HeartbeatInterval": "00:00:10",
"SessionTimeout": "00:00:30",
"CleanupInterval": "00:01:00"
}
}
}
```
**Program.cs:**
```csharp
using Svrnty.CQRS.Events.PostgreSQL;
using Svrnty.CQRS.Events.ConsumerGroups;
var builder = WebApplication.CreateBuilder(args);
// PostgreSQL event streaming
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
// Consumer groups
builder.Services.AddPostgresConsumerGroups(
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
var app = builder.Build();
app.Run();
```
## Basic Consumer
```csharp
public class EmailNotificationWorker : BackgroundService
{
private readonly IConsumerGroupReader _consumerGroup;
private readonly IEmailService _emailService;
private readonly ILogger<EmailNotificationWorker> _logger;
private readonly string _consumerId;
public EmailNotificationWorker(
IConsumerGroupReader consumerGroup,
IEmailService emailService,
ILogger<EmailNotificationWorker> logger)
{
_consumerGroup = consumerGroup;
_emailService = emailService;
_logger = logger;
_consumerId = $"{Environment.MachineName}-{Guid.NewGuid():N}";
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Email worker {ConsumerId} started", _consumerId);
try
{
await foreach (var @event in _consumerGroup.ConsumeAsync(
streamName: "orders",
groupId: "email-notifications",
consumerId: _consumerId,
options: new ConsumerGroupOptions
{
BatchSize = 100,
CommitStrategy = OffsetCommitStrategy.AfterBatch
},
cancellationToken: stoppingToken))
{
try
{
await ProcessEventAsync(@event);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing event {EventId}", @event.EventId);
// Offset not committed, will retry
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Email worker {ConsumerId} stopping", _consumerId);
}
finally
{
_logger.LogInformation("Email worker {ConsumerId} stopped", _consumerId);
}
}
private async Task ProcessEventAsync(StoredEvent @event)
{
var eventData = JsonSerializer.Deserialize(
@event.Data,
Type.GetType(@event.EventType));
switch (eventData)
{
case OrderPlacedEvent placed:
await _emailService.SendOrderConfirmationAsync(
placed.CustomerEmail,
placed.OrderId);
break;
case OrderShippedEvent shipped:
await _emailService.SendShippingNotificationAsync(
shipped.CustomerEmail,
shipped.OrderId,
shipped.TrackingNumber);
break;
}
_logger.LogInformation(
"Processed {EventType} at offset {Offset}",
@event.EventType,
@event.Offset);
}
}
```
## Registration
```csharp
var builder = WebApplication.CreateBuilder(args);
// Event streaming
builder.Services.AddPostgresEventStreaming(
builder.Configuration.GetConnectionString("EventStore"));
// Consumer groups
builder.Services.AddPostgresConsumerGroups(
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
// Services
builder.Services.AddSingleton<IEmailService, EmailService>();
// Background worker
builder.Services.AddHostedService<EmailNotificationWorker>();
var app = builder.Build();
app.Run();
```
## Running Multiple Workers
Scale horizontally by running multiple instances:
```bash
# Terminal 1
dotnet run --WorkerId=1
# Terminal 2
dotnet run --WorkerId=2
# Terminal 3
dotnet run --WorkerId=3
# Events automatically load-balanced across all 3 workers
```
Each worker processes different events:
- Worker 1: Events 1, 4, 7, 10...
- Worker 2: Events 2, 5, 8, 11...
- Worker 3: Events 3, 6, 9, 12...
## Consumer Group Options
```csharp
var options = new ConsumerGroupOptions
{
// Batch size (events per read)
BatchSize = 100,
// When to commit offsets
CommitStrategy = OffsetCommitStrategy.AfterBatch,
// Heartbeat interval
HeartbeatInterval = TimeSpan.FromSeconds(10),
// Session timeout
SessionTimeout = TimeSpan.FromSeconds(30)
};
await foreach (var @event in _consumerGroup.ConsumeAsync(
"orders",
"email-notifications",
_consumerId,
options))
{
// Process event
}
```
## Monitoring
### Check Consumer Status
```csharp
public class ConsumerMonitor
{
private readonly IConsumerOffsetStore _offsetStore;
public async Task MonitorConsumersAsync()
{
// Get all consumers in group
var consumers = await _offsetStore.GetConsumersAsync(
streamName: "orders",
groupId: "email-notifications");
foreach (var consumer in consumers)
{
Console.WriteLine($"Consumer: {consumer.ConsumerId}");
Console.WriteLine($" Offset: {consumer.Offset}");
Console.WriteLine($" Updated: {consumer.UpdatedAt}");
}
}
}
```
### Check Consumer Lag
```csharp
public async Task<long> GetConsumerLagAsync(
string streamName,
string groupId,
string consumerId)
{
// Get stream head
var streamHead = await GetStreamHeadAsync(streamName);
// Get consumer offset
var consumerOffset = await _offsetStore.GetOffsetAsync(
streamName,
groupId,
consumerId);
// Calculate lag
return streamHead - consumerOffset;
}
```
## Testing
### Unit Testing with Mock
```csharp
public class EmailNotificationWorkerTests
{
[Fact]
public async Task ProcessEvent_SendsEmail()
{
// Arrange
var mockConsumerGroup = new Mock<IConsumerGroupReader>();
var mockEmailService = new Mock<IEmailService>();
var events = new[]
{
CreateStoredEvent(new OrderPlacedEvent
{
OrderId = 123,
CustomerEmail = "test@example.com"
})
}.ToAsyncEnumerable();
mockConsumerGroup
.Setup(x => x.ConsumeAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<ConsumerGroupOptions>(),
It.IsAny<CancellationToken>()))
.Returns(events);
var worker = new EmailNotificationWorker(
mockConsumerGroup.Object,
mockEmailService.Object,
Mock.Of<ILogger<EmailNotificationWorker>>());
// Act
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
await worker.StartAsync(cts.Token);
await Task.Delay(500);
await worker.StopAsync(CancellationToken.None);
// Assert
mockEmailService.Verify(
x => x.SendOrderConfirmationAsync("test@example.com", 123),
Times.Once);
}
}
```
## See Also
- [Consumer Groups Overview](README.md)
- [Offset Management](offset-management.md)
- [Commit Strategies](commit-strategies.md)
- [Load Balancing](load-balancing.md)