# Event Streaming Examples This document provides examples of how to consume events from the Svrnty CQRS event streaming system. ## Table of Contents 1. [Overview](#overview) 2. [In-Process Consumption (IEventSubscriptionClient)](#in-process-consumption) 3. [gRPC Streaming Client](#grpc-streaming-client) 4. [Configuration](#configuration) --- ## Overview The Svrnty CQRS framework provides two primary ways to consume events: 1. **In-Process via IEventSubscriptionClient** - For consuming events within the same application process 2. **gRPC Bidirectional Streaming** - For consuming events from external clients/services ## In-Process Consumption ### Using IEventSubscriptionClient The `IEventSubscriptionClient` provides an `IAsyncEnumerable` interface for consuming events within the same process. **Example: EventConsumerBackgroundService.cs** ```csharp public class EventConsumerBackgroundService : BackgroundService { private readonly IEventSubscriptionClient _subscriptionClient; private readonly ILogger _logger; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var consumerId = $"analytics-{Guid.NewGuid():N}"; // Subscribe to events await foreach (var @event in _subscriptionClient.SubscribeAsync( "user-analytics", // Subscription ID consumerId, // Consumer ID stoppingToken)) { // Process the event _logger.LogInformation("Received: {EventType}", @event.GetType().Name); // Type-specific processing if (@event is UserAddedEvent userAdded) { // Handle user added } } } } ``` ### Subscription Modes **Broadcast Mode** - All consumers receive all events ```csharp streaming.AddSubscription("user-analytics", sub => { sub.Mode = SubscriptionMode.Broadcast; }); ``` **Exclusive Mode** - Only one consumer receives each event (load balancing) ```csharp streaming.AddSubscription("invitation-processor", sub => { sub.Mode = SubscriptionMode.Exclusive; }); ``` --- ## gRPC Streaming Client External clients can consume events via gRPC bidirectional streaming using the `EventService`. ### Basic Subscription ```csharp using Grpc.Net.Client; using Svrnty.CQRS.Events.Grpc; var channel = GrpcChannel.ForAddress("http://localhost:6000"); var client = new EventService.EventServiceClient(channel); using var call = client.Subscribe(); // Start receiving events var receiveTask = Task.Run(async () => { await foreach (var message in call.ResponseStream.ReadAllAsync()) { switch (message.MessageTypeCase) { case EventMessage.MessageTypeOneofCase.Event: var evt = message.Event; Console.WriteLine($"Event: {evt.EventType}"); Console.WriteLine($" EventId: {evt.EventId}"); Console.WriteLine($" CorrelationId: {evt.CorrelationId}"); break; case EventMessage.MessageTypeOneofCase.Completed: Console.WriteLine("Subscription completed"); return; case EventMessage.MessageTypeOneofCase.Error: Console.WriteLine($"Error: {message.Error.Message}"); break; } } }); // Send subscribe command await call.RequestStream.WriteAsync(new SubscriptionRequest { Subscribe = new SubscribeCommand { SubscriptionId = Guid.NewGuid().ToString(), CorrelationId = "my-correlation-id", DeliveryMode = DeliveryMode.Immediate, ConsumerId = $"client-{Guid.NewGuid():N}" } }); await receiveTask; await call.RequestStream.CompleteAsync(); ``` ### Event Type Filtering Subscribe only to specific event types: ```csharp await call.RequestStream.WriteAsync(new SubscriptionRequest { Subscribe = new SubscribeCommand { SubscriptionId = Guid.NewGuid().ToString(), CorrelationId = correlationId, DeliveryMode = DeliveryMode.Immediate, // Only receive these event types EventTypes = { "UserInvitedEvent", "UserInviteAcceptedEvent" } } }); ``` ### Terminal Events Automatically complete subscription when specific events occur: ```csharp await call.RequestStream.WriteAsync(new SubscriptionRequest { Subscribe = new SubscribeCommand { SubscriptionId = Guid.NewGuid().ToString(), CorrelationId = correlationId, DeliveryMode = DeliveryMode.Immediate, // Subscription completes when any of these events occur TerminalEventTypes = { "UserInviteAcceptedEvent", "UserInviteDeclinedEvent" } } }); ``` ### Manual Acknowledgment Send acknowledgments for processed events: ```csharp await foreach (var message in call.ResponseStream.ReadAllAsync()) { if (message.MessageTypeCase == EventMessage.MessageTypeOneofCase.Event) { var evt = message.Event; try { // Process the event await ProcessEventAsync(evt); // Send acknowledgment await call.RequestStream.WriteAsync(new SubscriptionRequest { Acknowledge = new AcknowledgeCommand { SubscriptionId = subscriptionId, EventId = evt.EventId, ConsumerId = consumerId } }); } catch (Exception) { // Send negative acknowledgment (requeue for retry) await call.RequestStream.WriteAsync(new SubscriptionRequest { Nack = new NackCommand { SubscriptionId = subscriptionId, EventId = evt.EventId, ConsumerId = consumerId, Requeue = true // true = retry, false = dead letter } }); } } } ``` --- ## Configuration ### Stream Configuration Define streams for your workflows: ```csharp builder.Services.AddEventStreaming(streaming => { // Configure stream for UserWorkflow streaming.AddStream(stream => { stream.Type = StreamType.Ephemeral; // Message queue semantics stream.DeliverySemantics = DeliverySemantics.AtLeastOnce; stream.Scope = StreamScope.Internal; // Internal to service }); // Persistent streams (Phase 2+) streaming.AddStream(stream => { stream.Type = StreamType.Persistent; // Event log semantics stream.EnableReplay = true; // Support replay stream.Retention = TimeSpan.FromDays(30); // Keep for 30 days }); }); ``` ### Subscription Configuration Define subscriptions for your consumers: ```csharp builder.Services.AddEventStreaming(streaming => { // Broadcast subscription (all consumers get all events) streaming.AddSubscription("user-analytics", sub => { sub.Mode = SubscriptionMode.Broadcast; sub.VisibilityTimeout = TimeSpan.FromSeconds(30); sub.EventTypeFilter = new HashSet { "UserAddedEvent", "UserRemovedEvent" }; }); // Exclusive subscription (load balanced) streaming.AddSubscription("invitation-processor", sub => { sub.Mode = SubscriptionMode.Exclusive; sub.MaxConcurrentConsumers = 1; sub.VisibilityTimeout = TimeSpan.FromSeconds(30); }); }); ``` --- ## Testing with grpcurl You can test the EventService using `grpcurl`: ```bash # List available services grpcurl -plaintext localhost:6000 list # Subscribe to events grpcurl -plaintext -d '{ "subscribe": { "subscription_id": "test-sub", "correlation_id": "test-correlation", "delivery_mode": "DELIVERY_MODE_IMMEDIATE" } }' localhost:6000 svrnty.cqrs.events.EventService.Subscribe ``` --- ## See Also - [EventConsumerBackgroundService.cs](./EventConsumerBackgroundService.cs) - Full example of in-process consumption - [Program.cs](./Program.cs) - Stream and subscription configuration - [EVENT-STREAMING-IMPLEMENTATION-PLAN.md](../EVENT-STREAMING-IMPLEMENTATION-PLAN.md) - Implementation roadmap