8.2 KiB
Event Streaming Examples
This document provides examples of how to consume events from the Svrnty CQRS event streaming system.
Table of Contents
Overview
The Svrnty CQRS framework provides two primary ways to consume events:
- In-Process via IEventSubscriptionClient - For consuming events within the same application process
- gRPC Bidirectional Streaming - For consuming events from external clients/services
In-Process Consumption
Using IEventSubscriptionClient
The IEventSubscriptionClient provides an IAsyncEnumerable interface for consuming events within the same process.
Example: EventConsumerBackgroundService.cs
public class EventConsumerBackgroundService : BackgroundService
{
private readonly IEventSubscriptionClient _subscriptionClient;
private readonly ILogger _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumerId = $"analytics-{Guid.NewGuid():N}";
// Subscribe to events
await foreach (var @event in _subscriptionClient.SubscribeAsync(
"user-analytics", // Subscription ID
consumerId, // Consumer ID
stoppingToken))
{
// Process the event
_logger.LogInformation("Received: {EventType}", @event.GetType().Name);
// Type-specific processing
if (@event is UserAddedEvent userAdded)
{
// Handle user added
}
}
}
}
Subscription Modes
Broadcast Mode - All consumers receive all events
streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
{
sub.Mode = SubscriptionMode.Broadcast;
});
Exclusive Mode - Only one consumer receives each event (load balancing)
streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
{
sub.Mode = SubscriptionMode.Exclusive;
});
gRPC Streaming Client
External clients can consume events via gRPC bidirectional streaming using the EventService.
Basic Subscription
using Grpc.Net.Client;
using Svrnty.CQRS.Events.Grpc;
var channel = GrpcChannel.ForAddress("http://localhost:6000");
var client = new EventService.EventServiceClient(channel);
using var call = client.Subscribe();
// Start receiving events
var receiveTask = Task.Run(async () =>
{
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
switch (message.MessageTypeCase)
{
case EventMessage.MessageTypeOneofCase.Event:
var evt = message.Event;
Console.WriteLine($"Event: {evt.EventType}");
Console.WriteLine($" EventId: {evt.EventId}");
Console.WriteLine($" CorrelationId: {evt.CorrelationId}");
break;
case EventMessage.MessageTypeOneofCase.Completed:
Console.WriteLine("Subscription completed");
return;
case EventMessage.MessageTypeOneofCase.Error:
Console.WriteLine($"Error: {message.Error.Message}");
break;
}
}
});
// Send subscribe command
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Subscribe = new SubscribeCommand
{
SubscriptionId = Guid.NewGuid().ToString(),
CorrelationId = "my-correlation-id",
DeliveryMode = DeliveryMode.Immediate,
ConsumerId = $"client-{Guid.NewGuid():N}"
}
});
await receiveTask;
await call.RequestStream.CompleteAsync();
Event Type Filtering
Subscribe only to specific event types:
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Subscribe = new SubscribeCommand
{
SubscriptionId = Guid.NewGuid().ToString(),
CorrelationId = correlationId,
DeliveryMode = DeliveryMode.Immediate,
// Only receive these event types
EventTypes =
{
"UserInvitedEvent",
"UserInviteAcceptedEvent"
}
}
});
Terminal Events
Automatically complete subscription when specific events occur:
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Subscribe = new SubscribeCommand
{
SubscriptionId = Guid.NewGuid().ToString(),
CorrelationId = correlationId,
DeliveryMode = DeliveryMode.Immediate,
// Subscription completes when any of these events occur
TerminalEventTypes =
{
"UserInviteAcceptedEvent",
"UserInviteDeclinedEvent"
}
}
});
Manual Acknowledgment
Send acknowledgments for processed events:
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
if (message.MessageTypeCase == EventMessage.MessageTypeOneofCase.Event)
{
var evt = message.Event;
try
{
// Process the event
await ProcessEventAsync(evt);
// Send acknowledgment
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Acknowledge = new AcknowledgeCommand
{
SubscriptionId = subscriptionId,
EventId = evt.EventId,
ConsumerId = consumerId
}
});
}
catch (Exception)
{
// Send negative acknowledgment (requeue for retry)
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
Nack = new NackCommand
{
SubscriptionId = subscriptionId,
EventId = evt.EventId,
ConsumerId = consumerId,
Requeue = true // true = retry, false = dead letter
}
});
}
}
}
Configuration
Stream Configuration
Define streams for your workflows:
builder.Services.AddEventStreaming(streaming =>
{
// Configure stream for UserWorkflow
streaming.AddStream<UserWorkflow>(stream =>
{
stream.Type = StreamType.Ephemeral; // Message queue semantics
stream.DeliverySemantics = DeliverySemantics.AtLeastOnce;
stream.Scope = StreamScope.Internal; // Internal to service
});
// Persistent streams (Phase 2+)
streaming.AddStream<AuditWorkflow>(stream =>
{
stream.Type = StreamType.Persistent; // Event log semantics
stream.EnableReplay = true; // Support replay
stream.Retention = TimeSpan.FromDays(30); // Keep for 30 days
});
});
Subscription Configuration
Define subscriptions for your consumers:
builder.Services.AddEventStreaming(streaming =>
{
// Broadcast subscription (all consumers get all events)
streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
{
sub.Mode = SubscriptionMode.Broadcast;
sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
sub.EventTypeFilter = new HashSet<string> { "UserAddedEvent", "UserRemovedEvent" };
});
// Exclusive subscription (load balanced)
streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
{
sub.Mode = SubscriptionMode.Exclusive;
sub.MaxConcurrentConsumers = 1;
sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
});
});
Testing with grpcurl
You can test the EventService using grpcurl:
# List available services
grpcurl -plaintext localhost:6000 list
# Subscribe to events
grpcurl -plaintext -d '{
"subscribe": {
"subscription_id": "test-sub",
"correlation_id": "test-correlation",
"delivery_mode": "DELIVERY_MODE_IMMEDIATE"
}
}' localhost:6000 svrnty.cqrs.events.EventService.Subscribe
See Also
- EventConsumerBackgroundService.cs - Full example of in-process consumption
- Program.cs - Stream and subscription configuration
- EVENT-STREAMING-IMPLEMENTATION-PLAN.md - Implementation roadmap