dotnet-cqrs/Svrnty.Sample/EVENT_STREAMING_EXAMPLES.md

306 lines
8.2 KiB
Markdown

# Event Streaming Examples
This document provides examples of how to consume events from the Svrnty CQRS event streaming system.
## Table of Contents
1. [Overview](#overview)
2. [In-Process Consumption (IEventSubscriptionClient)](#in-process-consumption)
3. [gRPC Streaming Client](#grpc-streaming-client)
4. [Configuration](#configuration)
---
## Overview
The Svrnty CQRS framework provides two primary ways to consume events:
1. **In-Process via IEventSubscriptionClient** - For consuming events within the same application process
2. **gRPC Bidirectional Streaming** - For consuming events from external clients/services
## In-Process Consumption
### Using IEventSubscriptionClient
The `IEventSubscriptionClient` provides an `IAsyncEnumerable` interface for consuming events within the same process.
**Example: EventConsumerBackgroundService.cs**
```csharp
public class EventConsumerBackgroundService : BackgroundService
{
private readonly IEventSubscriptionClient _subscriptionClient;
private readonly ILogger _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumerId = $"analytics-{Guid.NewGuid():N}";
// Subscribe to events
await foreach (var @event in _subscriptionClient.SubscribeAsync(
"user-analytics", // Subscription ID
consumerId, // Consumer ID
stoppingToken))
{
// Process the event
_logger.LogInformation("Received: {EventType}", @event.GetType().Name);
// Type-specific processing
if (@event is UserAddedEvent userAdded)
{
// Handle user added
}
}
}
}
```
### Subscription Modes
**Broadcast Mode** - All consumers receive all events
```csharp
streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
{
sub.Mode = SubscriptionMode.Broadcast;
});
```
**Exclusive Mode** - Only one consumer receives each event (load balancing)
```csharp
streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
{
sub.Mode = SubscriptionMode.Exclusive;
});
```
---
## gRPC Streaming Client
External clients can consume events via gRPC bidirectional streaming using the `EventService`.
### Basic Subscription
```csharp
using Grpc.Net.Client;
using Svrnty.CQRS.Events.Grpc;
var channel = GrpcChannel.ForAddress("http://localhost:6000");
var client = new EventService.EventServiceClient(channel);
using var call = client.Subscribe();
// Start receiving events
var receiveTask = Task.Run(async () =>
{
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
switch (message.MessageTypeCase)
{
case EventMessage.MessageTypeOneofCase.Event:
var evt = message.Event;
Console.WriteLine($"Event: {evt.EventType}");
Console.WriteLine($" EventId: {evt.EventId}");
Console.WriteLine($" CorrelationId: {evt.CorrelationId}");
break;
case EventMessage.MessageTypeOneofCase.Completed:
Console.WriteLine("Subscription completed");
return;
case EventMessage.MessageTypeOneofCase.Error:
Console.WriteLine($"Error: {message.Error.Message}");
break;
}
}
});
// Send subscribe command
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Subscribe = new SubscribeCommand
{
SubscriptionId = Guid.NewGuid().ToString(),
CorrelationId = "my-correlation-id",
DeliveryMode = DeliveryMode.Immediate,
ConsumerId = $"client-{Guid.NewGuid():N}"
}
});
await receiveTask;
await call.RequestStream.CompleteAsync();
```
### Event Type Filtering
Subscribe only to specific event types:
```csharp
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Subscribe = new SubscribeCommand
{
SubscriptionId = Guid.NewGuid().ToString(),
CorrelationId = correlationId,
DeliveryMode = DeliveryMode.Immediate,
// Only receive these event types
EventTypes =
{
"UserInvitedEvent",
"UserInviteAcceptedEvent"
}
}
});
```
### Terminal Events
Automatically complete subscription when specific events occur:
```csharp
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Subscribe = new SubscribeCommand
{
SubscriptionId = Guid.NewGuid().ToString(),
CorrelationId = correlationId,
DeliveryMode = DeliveryMode.Immediate,
// Subscription completes when any of these events occur
TerminalEventTypes =
{
"UserInviteAcceptedEvent",
"UserInviteDeclinedEvent"
}
}
});
```
### Manual Acknowledgment
Send acknowledgments for processed events:
```csharp
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
if (message.MessageTypeCase == EventMessage.MessageTypeOneofCase.Event)
{
var evt = message.Event;
try
{
// Process the event
await ProcessEventAsync(evt);
// Send acknowledgment
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Acknowledge = new AcknowledgeCommand
{
SubscriptionId = subscriptionId,
EventId = evt.EventId,
ConsumerId = consumerId
}
});
}
catch (Exception)
{
// Send negative acknowledgment (requeue for retry)
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Nack = new NackCommand
{
SubscriptionId = subscriptionId,
EventId = evt.EventId,
ConsumerId = consumerId,
Requeue = true // true = retry, false = dead letter
}
});
}
}
}
```
---
## Configuration
### Stream Configuration
Define streams for your workflows:
```csharp
builder.Services.AddEventStreaming(streaming =>
{
// Configure stream for UserWorkflow
streaming.AddStream<UserWorkflow>(stream =>
{
stream.Type = StreamType.Ephemeral; // Message queue semantics
stream.DeliverySemantics = DeliverySemantics.AtLeastOnce;
stream.Scope = StreamScope.Internal; // Internal to service
});
// Persistent streams (Phase 2+)
streaming.AddStream<AuditWorkflow>(stream =>
{
stream.Type = StreamType.Persistent; // Event log semantics
stream.EnableReplay = true; // Support replay
stream.Retention = TimeSpan.FromDays(30); // Keep for 30 days
});
});
```
### Subscription Configuration
Define subscriptions for your consumers:
```csharp
builder.Services.AddEventStreaming(streaming =>
{
// Broadcast subscription (all consumers get all events)
streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
{
sub.Mode = SubscriptionMode.Broadcast;
sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
sub.EventTypeFilter = new HashSet<string> { "UserAddedEvent", "UserRemovedEvent" };
});
// Exclusive subscription (load balanced)
streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
{
sub.Mode = SubscriptionMode.Exclusive;
sub.MaxConcurrentConsumers = 1;
sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
});
});
```
---
## Testing with grpcurl
You can test the EventService using `grpcurl`:
```bash
# List available services
grpcurl -plaintext localhost:6000 list
# Subscribe to events
grpcurl -plaintext -d '{
"subscribe": {
"subscription_id": "test-sub",
"correlation_id": "test-correlation",
"delivery_mode": "DELIVERY_MODE_IMMEDIATE"
}
}' localhost:6000 svrnty.cqrs.events.EventService.Subscribe
```
---
## See Also
- [EventConsumerBackgroundService.cs](./EventConsumerBackgroundService.cs) - Full example of in-process consumption
- [Program.cs](./Program.cs) - Stream and subscription configuration
- [EVENT-STREAMING-IMPLEMENTATION-PLAN.md](../EVENT-STREAMING-IMPLEMENTATION-PLAN.md) - Implementation roadmap