dotnet-cqrs/Svrnty.Sample/EVENT_STREAMING_EXAMPLES.md

8.2 KiB

Event Streaming Examples

This document provides examples of how to consume events from the Svrnty CQRS event streaming system.

Table of Contents

  1. Overview
  2. In-Process Consumption (IEventSubscriptionClient)
  3. gRPC Streaming Client
  4. Configuration

Overview

The Svrnty CQRS framework provides two primary ways to consume events:

  1. In-Process via IEventSubscriptionClient - For consuming events within the same application process
  2. gRPC Bidirectional Streaming - For consuming events from external clients/services

In-Process Consumption

Using IEventSubscriptionClient

The IEventSubscriptionClient provides an IAsyncEnumerable interface for consuming events within the same process.

Example: EventConsumerBackgroundService.cs

public class EventConsumerBackgroundService : BackgroundService
{
    private readonly IEventSubscriptionClient _subscriptionClient;
    private readonly ILogger _logger;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var consumerId = $"analytics-{Guid.NewGuid():N}";

        // Subscribe to events
        await foreach (var @event in _subscriptionClient.SubscribeAsync(
            "user-analytics",       // Subscription ID
            consumerId,             // Consumer ID
            stoppingToken))
        {
            // Process the event
            _logger.LogInformation("Received: {EventType}", @event.GetType().Name);

            // Type-specific processing
            if (@event is UserAddedEvent userAdded)
            {
                // Handle user added
            }
        }
    }
}

Subscription Modes

Broadcast Mode - All consumers receive all events

streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
{
    sub.Mode = SubscriptionMode.Broadcast;
});

Exclusive Mode - Only one consumer receives each event (load balancing)

streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
{
    sub.Mode = SubscriptionMode.Exclusive;
});

gRPC Streaming Client

External clients can consume events via gRPC bidirectional streaming using the EventService.

Basic Subscription

using Grpc.Net.Client;
using Svrnty.CQRS.Events.Grpc;

var channel = GrpcChannel.ForAddress("http://localhost:6000");
var client = new EventService.EventServiceClient(channel);

using var call = client.Subscribe();

// Start receiving events
var receiveTask = Task.Run(async () =>
{
    await foreach (var message in call.ResponseStream.ReadAllAsync())
    {
        switch (message.MessageTypeCase)
        {
            case EventMessage.MessageTypeOneofCase.Event:
                var evt = message.Event;
                Console.WriteLine($"Event: {evt.EventType}");
                Console.WriteLine($"  EventId: {evt.EventId}");
                Console.WriteLine($"  CorrelationId: {evt.CorrelationId}");
                break;

            case EventMessage.MessageTypeOneofCase.Completed:
                Console.WriteLine("Subscription completed");
                return;

            case EventMessage.MessageTypeOneofCase.Error:
                Console.WriteLine($"Error: {message.Error.Message}");
                break;
        }
    }
});

// Send subscribe command
await call.RequestStream.WriteAsync(new SubscriptionRequest
{
    Subscribe = new SubscribeCommand
    {
        SubscriptionId = Guid.NewGuid().ToString(),
        CorrelationId = "my-correlation-id",
        DeliveryMode = DeliveryMode.Immediate,
        ConsumerId = $"client-{Guid.NewGuid():N}"
    }
});

await receiveTask;
await call.RequestStream.CompleteAsync();

Event Type Filtering

Subscribe only to specific event types:

await call.RequestStream.WriteAsync(new SubscriptionRequest
{
    Subscribe = new SubscribeCommand
    {
        SubscriptionId = Guid.NewGuid().ToString(),
        CorrelationId = correlationId,
        DeliveryMode = DeliveryMode.Immediate,

        // Only receive these event types
        EventTypes =
        {
            "UserInvitedEvent",
            "UserInviteAcceptedEvent"
        }
    }
});

Terminal Events

Automatically complete subscription when specific events occur:

await call.RequestStream.WriteAsync(new SubscriptionRequest
{
    Subscribe = new SubscribeCommand
    {
        SubscriptionId = Guid.NewGuid().ToString(),
        CorrelationId = correlationId,
        DeliveryMode = DeliveryMode.Immediate,

        // Subscription completes when any of these events occur
        TerminalEventTypes =
        {
            "UserInviteAcceptedEvent",
            "UserInviteDeclinedEvent"
        }
    }
});

Manual Acknowledgment

Send acknowledgments for processed events:

await foreach (var message in call.ResponseStream.ReadAllAsync())
{
    if (message.MessageTypeCase == EventMessage.MessageTypeOneofCase.Event)
    {
        var evt = message.Event;

        try
        {
            // Process the event
            await ProcessEventAsync(evt);

            // Send acknowledgment
            await call.RequestStream.WriteAsync(new SubscriptionRequest
            {
                Acknowledge = new AcknowledgeCommand
                {
                    SubscriptionId = subscriptionId,
                    EventId = evt.EventId,
                    ConsumerId = consumerId
                }
            });
        }
        catch (Exception)
        {
            // Send negative acknowledgment (requeue for retry)
            await call.RequestStream.WriteAsync(new SubscriptionRequest
            {
                Nack = new NackCommand
                {
                    SubscriptionId = subscriptionId,
                    EventId = evt.EventId,
                    ConsumerId = consumerId,
                    Requeue = true // true = retry, false = dead letter
                }
            });
        }
    }
}

Configuration

Stream Configuration

Define streams for your workflows:

builder.Services.AddEventStreaming(streaming =>
{
    // Configure stream for UserWorkflow
    streaming.AddStream<UserWorkflow>(stream =>
    {
        stream.Type = StreamType.Ephemeral;              // Message queue semantics
        stream.DeliverySemantics = DeliverySemantics.AtLeastOnce;
        stream.Scope = StreamScope.Internal;             // Internal to service
    });

    // Persistent streams (Phase 2+)
    streaming.AddStream<AuditWorkflow>(stream =>
    {
        stream.Type = StreamType.Persistent;             // Event log semantics
        stream.EnableReplay = true;                      // Support replay
        stream.Retention = TimeSpan.FromDays(30);        // Keep for 30 days
    });
});

Subscription Configuration

Define subscriptions for your consumers:

builder.Services.AddEventStreaming(streaming =>
{
    // Broadcast subscription (all consumers get all events)
    streaming.AddSubscription<UserWorkflow>("user-analytics", sub =>
    {
        sub.Mode = SubscriptionMode.Broadcast;
        sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
        sub.EventTypeFilter = new HashSet<string> { "UserAddedEvent", "UserRemovedEvent" };
    });

    // Exclusive subscription (load balanced)
    streaming.AddSubscription<InvitationWorkflow>("invitation-processor", sub =>
    {
        sub.Mode = SubscriptionMode.Exclusive;
        sub.MaxConcurrentConsumers = 1;
        sub.VisibilityTimeout = TimeSpan.FromSeconds(30);
    });
});

Testing with grpcurl

You can test the EventService using grpcurl:

# List available services
grpcurl -plaintext localhost:6000 list

# Subscribe to events
grpcurl -plaintext -d '{
  "subscribe": {
    "subscription_id": "test-sub",
    "correlation_id": "test-correlation",
    "delivery_mode": "DELIVERY_MODE_IMMEDIATE"
  }
}' localhost:6000 svrnty.cqrs.events.EventService.Subscribe

See Also