306 lines
8.2 KiB
Markdown
306 lines
8.2 KiB
Markdown
# Event Streaming Examples
|
|
|
|
This document provides examples of how to consume events from the Svrnty CQRS event streaming system.
|
|
|
|
## Table of Contents
|
|
|
|
1. [Overview](#overview)
|
|
2. [In-Process Consumption (IEventSubscriptionClient)](#in-process-consumption)
|
|
3. [gRPC Streaming Client](#grpc-streaming-client)
|
|
4. [Configuration](#configuration)
|
|
|
|
---
|
|
|
|
## Overview
|
|
|
|
The Svrnty CQRS framework provides two primary ways to consume events:
|
|
|
|
1. **In-Process via IEventSubscriptionClient** - For consuming events within the same application process
|
|
2. **gRPC Bidirectional Streaming** - For consuming events from external clients/services
|
|
|
|
## In-Process Consumption
|
|
|
|
### Using IEventSubscriptionClient
|
|
|
|
The `IEventSubscriptionClient` provides an `IAsyncEnumerable` interface for consuming events within the same process.
|
|
|
|
**Example: EventConsumerBackgroundService.cs**
|
|
|
|
```csharp
|
|
public class EventConsumerBackgroundService : BackgroundService
|
|
{
|
|
private readonly IEventSubscriptionClient _subscriptionClient;
|
|
private readonly ILogger _logger;
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var consumerId = $"analytics-{Guid.NewGuid():N}";
|
|
|
|
// Subscribe to events
|
|
await foreach (var @event in _subscriptionClient.SubscribeAsync(
|
|
"user-analytics", // Subscription ID
|
|
consumerId, // Consumer ID
|
|
stoppingToken))
|
|
{
|
|
// Process the event
|
|
_logger.LogInformation("Received: {EventType}", @event.GetType().Name);
|
|
|
|
// Type-specific processing
|
|
if (@event is UserAddedEvent userAdded)
|
|
{
|
|
// Handle user added
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Subscription Modes
|
|
|
|
**Broadcast Mode** - All consumers receive all events
|
|
```csharp
|
|
streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
|
|
{
|
|
sub.Mode = SubscriptionMode.Broadcast;
|
|
});
|
|
```
|
|
|
|
**Exclusive Mode** - Only one consumer receives each event (load balancing)
|
|
```csharp
|
|
streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
|
|
{
|
|
sub.Mode = SubscriptionMode.Exclusive;
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## gRPC Streaming Client
|
|
|
|
External clients can consume events via gRPC bidirectional streaming using the `EventService`.
|
|
|
|
### Basic Subscription
|
|
|
|
```csharp
|
|
using Grpc.Net.Client;
|
|
using Svrnty.CQRS.Events.Grpc;
|
|
|
|
var channel = GrpcChannel.ForAddress("http://localhost:6000");
|
|
var client = new EventService.EventServiceClient(channel);
|
|
|
|
using var call = client.Subscribe();
|
|
|
|
// Start receiving events
|
|
var receiveTask = Task.Run(async () =>
|
|
{
|
|
await foreach (var message in call.ResponseStream.ReadAllAsync())
|
|
{
|
|
switch (message.MessageTypeCase)
|
|
{
|
|
case EventMessage.MessageTypeOneofCase.Event:
|
|
var evt = message.Event;
|
|
Console.WriteLine($"Event: {evt.EventType}");
|
|
Console.WriteLine($" EventId: {evt.EventId}");
|
|
Console.WriteLine($" CorrelationId: {evt.CorrelationId}");
|
|
break;
|
|
|
|
case EventMessage.MessageTypeOneofCase.Completed:
|
|
Console.WriteLine("Subscription completed");
|
|
return;
|
|
|
|
case EventMessage.MessageTypeOneofCase.Error:
|
|
Console.WriteLine($"Error: {message.Error.Message}");
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Send subscribe command
|
|
await call.RequestStream.WriteAsync(new SubscriptionRequest
|
|
{
|
|
Subscribe = new SubscribeCommand
|
|
{
|
|
SubscriptionId = Guid.NewGuid().ToString(),
|
|
CorrelationId = "my-correlation-id",
|
|
DeliveryMode = DeliveryMode.Immediate,
|
|
ConsumerId = $"client-{Guid.NewGuid():N}"
|
|
}
|
|
});
|
|
|
|
await receiveTask;
|
|
await call.RequestStream.CompleteAsync();
|
|
```
|
|
|
|
### Event Type Filtering
|
|
|
|
Subscribe only to specific event types:
|
|
|
|
```csharp
|
|
await call.RequestStream.WriteAsync(new SubscriptionRequest
|
|
{
|
|
Subscribe = new SubscribeCommand
|
|
{
|
|
SubscriptionId = Guid.NewGuid().ToString(),
|
|
CorrelationId = correlationId,
|
|
DeliveryMode = DeliveryMode.Immediate,
|
|
|
|
// Only receive these event types
|
|
EventTypes =
|
|
{
|
|
"UserInvitedEvent",
|
|
"UserInviteAcceptedEvent"
|
|
}
|
|
}
|
|
});
|
|
```
|
|
|
|
### Terminal Events
|
|
|
|
Automatically complete subscription when specific events occur:
|
|
|
|
```csharp
|
|
await call.RequestStream.WriteAsync(new SubscriptionRequest
|
|
{
|
|
Subscribe = new SubscribeCommand
|
|
{
|
|
SubscriptionId = Guid.NewGuid().ToString(),
|
|
CorrelationId = correlationId,
|
|
DeliveryMode = DeliveryMode.Immediate,
|
|
|
|
// Subscription completes when any of these events occur
|
|
TerminalEventTypes =
|
|
{
|
|
"UserInviteAcceptedEvent",
|
|
"UserInviteDeclinedEvent"
|
|
}
|
|
}
|
|
});
|
|
```
|
|
|
|
### Manual Acknowledgment
|
|
|
|
Send acknowledgments for processed events:
|
|
|
|
```csharp
|
|
await foreach (var message in call.ResponseStream.ReadAllAsync())
|
|
{
|
|
if (message.MessageTypeCase == EventMessage.MessageTypeOneofCase.Event)
|
|
{
|
|
var evt = message.Event;
|
|
|
|
try
|
|
{
|
|
// Process the event
|
|
await ProcessEventAsync(evt);
|
|
|
|
// Send acknowledgment
|
|
await call.RequestStream.WriteAsync(new SubscriptionRequest
|
|
{
|
|
Acknowledge = new AcknowledgeCommand
|
|
{
|
|
SubscriptionId = subscriptionId,
|
|
EventId = evt.EventId,
|
|
ConsumerId = consumerId
|
|
}
|
|
});
|
|
}
|
|
catch (Exception)
|
|
{
|
|
// Send negative acknowledgment (requeue for retry)
|
|
await call.RequestStream.WriteAsync(new SubscriptionRequest
|
|
{
|
|
Nack = new NackCommand
|
|
{
|
|
SubscriptionId = subscriptionId,
|
|
EventId = evt.EventId,
|
|
ConsumerId = consumerId,
|
|
Requeue = true // true = retry, false = dead letter
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Configuration
|
|
|
|
### Stream Configuration
|
|
|
|
Define streams for your workflows:
|
|
|
|
```csharp
|
|
builder.Services.AddEventStreaming(streaming =>
|
|
{
|
|
// Configure stream for UserWorkflow
|
|
streaming.AddStream<UserWorkflow>(stream =>
|
|
{
|
|
stream.Type = StreamType.Ephemeral; // Message queue semantics
|
|
stream.DeliverySemantics = DeliverySemantics.AtLeastOnce;
|
|
stream.Scope = StreamScope.Internal; // Internal to service
|
|
});
|
|
|
|
// Persistent streams (Phase 2+)
|
|
streaming.AddStream<AuditWorkflow>(stream =>
|
|
{
|
|
stream.Type = StreamType.Persistent; // Event log semantics
|
|
stream.EnableReplay = true; // Support replay
|
|
stream.Retention = TimeSpan.FromDays(30); // Keep for 30 days
|
|
});
|
|
});
|
|
```
|
|
|
|
### Subscription Configuration
|
|
|
|
Define subscriptions for your consumers:
|
|
|
|
```csharp
|
|
builder.Services.AddEventStreaming(streaming =>
|
|
{
|
|
// Broadcast subscription (all consumers get all events)
|
|
streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
|
|
{
|
|
sub.Mode = SubscriptionMode.Broadcast;
|
|
sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
|
|
sub.EventTypeFilter = new HashSet<string> { "UserAddedEvent", "UserRemovedEvent" };
|
|
});
|
|
|
|
// Exclusive subscription (load balanced)
|
|
streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
|
|
{
|
|
sub.Mode = SubscriptionMode.Exclusive;
|
|
sub.MaxConcurrentConsumers = 1;
|
|
sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
|
|
});
|
|
});
|
|
```
|
|
|
|
---
|
|
|
|
## Testing with grpcurl
|
|
|
|
You can test the EventService using `grpcurl`:
|
|
|
|
```bash
|
|
# List available services
|
|
grpcurl -plaintext localhost:6000 list
|
|
|
|
# Subscribe to events
|
|
grpcurl -plaintext -d '{
|
|
"subscribe": {
|
|
"subscription_id": "test-sub",
|
|
"correlation_id": "test-correlation",
|
|
"delivery_mode": "DELIVERY_MODE_IMMEDIATE"
|
|
}
|
|
}' localhost:6000 svrnty.cqrs.events.EventService.Subscribe
|
|
```
|
|
|
|
---
|
|
|
|
## See Also
|
|
|
|
- [EventConsumerBackgroundService.cs](./EventConsumerBackgroundService.cs) - Full example of in-process consumption
|
|
- [Program.cs](./Program.cs) - Stream and subscription configuration
|
|
- [EVENT-STREAMING-IMPLEMENTATION-PLAN.md](../EVENT-STREAMING-IMPLEMENTATION-PLAN.md) - Implementation roadmap
|