316 lines
7.8 KiB
Markdown
316 lines
7.8 KiB
Markdown
# Getting Started with Consumer Groups
|
|
|
|
Create your first consumer group for scalable event processing.
|
|
|
|
## Installation
|
|
|
|
```bash
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL
|
|
dotnet add package Svrnty.CQRS.Events.ConsumerGroups
|
|
```
|
|
|
|
## Configuration
|
|
|
|
**appsettings.json:**
|
|
```json
|
|
{
|
|
"ConnectionStrings": {
|
|
"EventStore": "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"
|
|
},
|
|
"EventStreaming": {
|
|
"ConsumerGroups": {
|
|
"HeartbeatInterval": "00:00:10",
|
|
"SessionTimeout": "00:00:30",
|
|
"CleanupInterval": "00:01:00"
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
**Program.cs:**
|
|
```csharp
|
|
using Svrnty.CQRS.Events.PostgreSQL;
|
|
using Svrnty.CQRS.Events.ConsumerGroups;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// PostgreSQL event streaming
|
|
builder.Services.AddPostgresEventStreaming(
|
|
builder.Configuration.GetConnectionString("EventStore"));
|
|
|
|
// Consumer groups
|
|
builder.Services.AddPostgresConsumerGroups(
|
|
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
## Basic Consumer
|
|
|
|
```csharp
|
|
public class EmailNotificationWorker : BackgroundService
|
|
{
|
|
private readonly IConsumerGroupReader _consumerGroup;
|
|
private readonly IEmailService _emailService;
|
|
private readonly ILogger<EmailNotificationWorker> _logger;
|
|
private readonly string _consumerId;
|
|
|
|
public EmailNotificationWorker(
|
|
IConsumerGroupReader consumerGroup,
|
|
IEmailService emailService,
|
|
ILogger<EmailNotificationWorker> logger)
|
|
{
|
|
_consumerGroup = consumerGroup;
|
|
_emailService = emailService;
|
|
_logger = logger;
|
|
_consumerId = $"{Environment.MachineName}-{Guid.NewGuid():N}";
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("Email worker {ConsumerId} started", _consumerId);
|
|
|
|
try
|
|
{
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
streamName: "orders",
|
|
groupId: "email-notifications",
|
|
consumerId: _consumerId,
|
|
options: new ConsumerGroupOptions
|
|
{
|
|
BatchSize = 100,
|
|
CommitStrategy = OffsetCommitStrategy.AfterBatch
|
|
},
|
|
cancellationToken: stoppingToken))
|
|
{
|
|
try
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing event {EventId}", @event.EventId);
|
|
// Offset not committed, will retry
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.LogInformation("Email worker {ConsumerId} stopping", _consumerId);
|
|
}
|
|
finally
|
|
{
|
|
_logger.LogInformation("Email worker {ConsumerId} stopped", _consumerId);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessEventAsync(StoredEvent @event)
|
|
{
|
|
var eventData = JsonSerializer.Deserialize(
|
|
@event.Data,
|
|
Type.GetType(@event.EventType));
|
|
|
|
switch (eventData)
|
|
{
|
|
case OrderPlacedEvent placed:
|
|
await _emailService.SendOrderConfirmationAsync(
|
|
placed.CustomerEmail,
|
|
placed.OrderId);
|
|
break;
|
|
|
|
case OrderShippedEvent shipped:
|
|
await _emailService.SendShippingNotificationAsync(
|
|
shipped.CustomerEmail,
|
|
shipped.OrderId,
|
|
shipped.TrackingNumber);
|
|
break;
|
|
}
|
|
|
|
_logger.LogInformation(
|
|
"Processed {EventType} at offset {Offset}",
|
|
@event.EventType,
|
|
@event.Offset);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Registration
|
|
|
|
```csharp
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Event streaming
|
|
builder.Services.AddPostgresEventStreaming(
|
|
builder.Configuration.GetConnectionString("EventStore"));
|
|
|
|
// Consumer groups
|
|
builder.Services.AddPostgresConsumerGroups(
|
|
builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));
|
|
|
|
// Services
|
|
builder.Services.AddSingleton<IEmailService, EmailService>();
|
|
|
|
// Background worker
|
|
builder.Services.AddHostedService<EmailNotificationWorker>();
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
## Running Multiple Workers
|
|
|
|
Scale horizontally by running multiple instances:
|
|
|
|
```bash
|
|
# Terminal 1
|
|
dotnet run --WorkerId=1
|
|
|
|
# Terminal 2
|
|
dotnet run --WorkerId=2
|
|
|
|
# Terminal 3
|
|
dotnet run --WorkerId=3
|
|
|
|
# Events automatically load-balanced across all 3 workers
|
|
```
|
|
|
|
Each worker processes different events:
|
|
- Worker 1: Events 1, 4, 7, 10...
|
|
- Worker 2: Events 2, 5, 8, 11...
|
|
- Worker 3: Events 3, 6, 9, 12...
|
|
|
|
## Consumer Group Options
|
|
|
|
```csharp
|
|
var options = new ConsumerGroupOptions
|
|
{
|
|
// Batch size (events per read)
|
|
BatchSize = 100,
|
|
|
|
// When to commit offsets
|
|
CommitStrategy = OffsetCommitStrategy.AfterBatch,
|
|
|
|
// Heartbeat interval
|
|
HeartbeatInterval = TimeSpan.FromSeconds(10),
|
|
|
|
// Session timeout
|
|
SessionTimeout = TimeSpan.FromSeconds(30)
|
|
};
|
|
|
|
await foreach (var @event in _consumerGroup.ConsumeAsync(
|
|
"orders",
|
|
"email-notifications",
|
|
_consumerId,
|
|
options))
|
|
{
|
|
// Process event
|
|
}
|
|
```
|
|
|
|
## Monitoring
|
|
|
|
### Check Consumer Status
|
|
|
|
```csharp
|
|
public class ConsumerMonitor
|
|
{
|
|
private readonly IConsumerOffsetStore _offsetStore;
|
|
|
|
public async Task MonitorConsumersAsync()
|
|
{
|
|
// Get all consumers in group
|
|
var consumers = await _offsetStore.GetConsumersAsync(
|
|
streamName: "orders",
|
|
groupId: "email-notifications");
|
|
|
|
foreach (var consumer in consumers)
|
|
{
|
|
Console.WriteLine($"Consumer: {consumer.ConsumerId}");
|
|
Console.WriteLine($" Offset: {consumer.Offset}");
|
|
Console.WriteLine($" Updated: {consumer.UpdatedAt}");
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Check Consumer Lag
|
|
|
|
```csharp
|
|
public async Task<long> GetConsumerLagAsync(
|
|
string streamName,
|
|
string groupId,
|
|
string consumerId)
|
|
{
|
|
// Get stream head
|
|
var streamHead = await GetStreamHeadAsync(streamName);
|
|
|
|
// Get consumer offset
|
|
var consumerOffset = await _offsetStore.GetOffsetAsync(
|
|
streamName,
|
|
groupId,
|
|
consumerId);
|
|
|
|
// Calculate lag
|
|
return streamHead - consumerOffset;
|
|
}
|
|
```
|
|
|
|
## Testing
|
|
|
|
### Unit Testing with Mock
|
|
|
|
```csharp
|
|
public class EmailNotificationWorkerTests
|
|
{
|
|
[Fact]
|
|
public async Task ProcessEvent_SendsEmail()
|
|
{
|
|
// Arrange
|
|
var mockConsumerGroup = new Mock<IConsumerGroupReader>();
|
|
var mockEmailService = new Mock<IEmailService>();
|
|
|
|
var events = new[]
|
|
{
|
|
CreateStoredEvent(new OrderPlacedEvent
|
|
{
|
|
OrderId = 123,
|
|
CustomerEmail = "test@example.com"
|
|
})
|
|
}.ToAsyncEnumerable();
|
|
|
|
mockConsumerGroup
|
|
.Setup(x => x.ConsumeAsync(
|
|
It.IsAny<string>(),
|
|
It.IsAny<string>(),
|
|
It.IsAny<string>(),
|
|
It.IsAny<ConsumerGroupOptions>(),
|
|
It.IsAny<CancellationToken>()))
|
|
.Returns(events);
|
|
|
|
var worker = new EmailNotificationWorker(
|
|
mockConsumerGroup.Object,
|
|
mockEmailService.Object,
|
|
Mock.Of<ILogger<EmailNotificationWorker>>());
|
|
|
|
// Act
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
|
|
await worker.StartAsync(cts.Token);
|
|
await Task.Delay(500);
|
|
await worker.StopAsync(CancellationToken.None);
|
|
|
|
// Assert
|
|
mockEmailService.Verify(
|
|
x => x.SendOrderConfirmationAsync("test@example.com", 123),
|
|
Times.Once);
|
|
}
|
|
}
|
|
```
|
|
|
|
## See Also
|
|
|
|
- [Consumer Groups Overview](README.md)
|
|
- [Offset Management](offset-management.md)
|
|
- [Commit Strategies](commit-strategies.md)
|
|
- [Load Balancing](load-balancing.md)
|