dotnet-cqrs/docs/event-streaming/grpc-streaming/persistent-subscriptions.md

13 KiB

gRPC Persistent Subscriptions

Subscribe to persistent event streams via gRPC bidirectional streaming.

Overview

gRPC persistent subscriptions provide real-time event delivery for persistent streams:

  • Bidirectional Streaming - Full-duplex communication
  • Offset-Based - Resume from specific positions
  • Broadcast Mode - All consumers receive all events
  • Real-Time Delivery - Low-latency event propagation

Quick Start

using Grpc.Net.Client;
using Svrnty.CQRS.Events.Grpc;

// Create gRPC client
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new EventStreamService.EventStreamServiceClient(channel);

// Subscribe to persistent stream
using var streamingCall = client.SubscribeToPersistent();

// Send subscription request
await streamingCall.RequestStream.WriteAsync(new PersistentSubscriptionRequest
{
    StreamName = "orders",
    StartOffset = 0,  // Start from beginning
    SubscriptionId = Guid.NewGuid().ToString()
});

// Receive events
await foreach (var @event in streamingCall.ResponseStream.ReadAllAsync())
{
    Console.WriteLine($"Received: {@event.EventType} at offset {@event.Offset}");
    await ProcessEventAsync(@event);
}

Service Implementation

EventStreamServiceImpl

using Grpc.Core;
using Svrnty.CQRS.Events.Abstractions;

public class EventStreamServiceImpl : EventStreamService.EventStreamServiceBase
{
    private readonly IEventStreamStore _eventStore;

    public override async Task SubscribeToPersistent(
        IAsyncStreamReader<PersistentSubscriptionRequest> requestStream,
        IServerStreamWriter<StreamEventProto> responseStream,
        ServerCallContext context)
    {
        // Read initial subscription request
        await requestStream.MoveNext(context.CancellationToken);
        var request = requestStream.Current;

        _logger.LogInformation(
            "Persistent subscription started: stream={Stream}, offset={Offset}, subscription={SubscriptionId}",
            request.StreamName,
            request.StartOffset,
            request.SubscriptionId);

        try
        {
            // Stream events from offset
            await foreach (var @event in _eventStore.ReadStreamAsync(
                request.StreamName,
                fromOffset: request.StartOffset,
                cancellationToken: context.CancellationToken))
            {
                // Convert to proto message
                var eventProto = new StreamEventProto
                {
                    EventId = @event.EventId,
                    EventType = @event.EventType,
                    StreamName = @event.StreamName,
                    Offset = @event.Offset,
                    Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(@event.Timestamp),
                    Data = @event.Data,
                    Metadata = { @event.Metadata }
                };

                // Send to client
                await responseStream.WriteAsync(eventProto, context.CancellationToken);
            }
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
        {
            _logger.LogInformation("Subscription cancelled by client: {SubscriptionId}", request.SubscriptionId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error in persistent subscription: {SubscriptionId}", request.SubscriptionId);
            throw;
        }
    }
}

Client Subscription

Basic Subscription

public class PersistentStreamSubscriber
{
    private readonly EventStreamService.EventStreamServiceClient _client;

    public async Task SubscribeAsync(
        string streamName,
        long startOffset,
        CancellationToken ct)
    {
        using var call = _client.SubscribeToPersistent();

        // Send subscription request
        await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
        {
            StreamName = streamName,
            StartOffset = startOffset,
            SubscriptionId = Guid.NewGuid().ToString()
        }, ct);

        // Process events
        await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
        {
            await ProcessEventAsync(@event, ct);
        }
    }

    private async Task ProcessEventAsync(StreamEventProto @event, CancellationToken ct)
    {
        _logger.LogInformation(
            "Processing event: {EventType} at offset {Offset}",
            @event.EventType,
            @event.Offset);

        // Handle event
        switch (@event.EventType)
        {
            case "OrderPlaced":
                var orderPlaced = JsonSerializer.Deserialize<OrderPlacedEvent>(@event.Data);
                await HandleOrderPlacedAsync(orderPlaced, ct);
                break;

            case "OrderShipped":
                var orderShipped = JsonSerializer.Deserialize<OrderShippedEvent>(@event.Data);
                await HandleOrderShippedAsync(orderShipped, ct);
                break;
        }
    }
}

Resume from Checkpoint

public async Task SubscribeWithCheckpointAsync(
    string streamName,
    string subscriptionId,
    CancellationToken ct)
{
    // Load checkpoint
    var checkpoint = await _checkpointStore.GetCheckpointAsync(subscriptionId, ct);

    _logger.LogInformation(
        "Resuming subscription {SubscriptionId} from offset {Offset}",
        subscriptionId,
        checkpoint);

    using var call = _client.SubscribeToPersistent();

    // Subscribe from checkpoint
    await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
    {
        StreamName = streamName,
        StartOffset = checkpoint + 1,  // Resume after last processed
        SubscriptionId = subscriptionId
    }, ct);

    // Process and checkpoint
    await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
    {
        await ProcessEventAsync(@event, ct);

        // Save checkpoint after processing
        await _checkpointStore.SaveCheckpointAsync(subscriptionId, @event.Offset, ct);
    }
}

Filtering Events

Client-Side Filtering

public async Task SubscribeWithFilterAsync(
    string streamName,
    HashSet<string> eventTypes,
    CancellationToken ct)
{
    using var call = _client.SubscribeToPersistent();

    await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
    {
        StreamName = streamName,
        StartOffset = 0,
        SubscriptionId = Guid.NewGuid().ToString()
    }, ct);

    await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
    {
        // Filter by event type
        if (!eventTypes.Contains(@event.EventType))
            continue;

        await ProcessEventAsync(@event, ct);
    }
}

// Usage
await SubscribeWithFilterAsync("orders", new HashSet<string>
{
    "OrderPlaced",
    "OrderShipped",
    "OrderCancelled"
}, ct);

Server-Side Filtering

public override async Task SubscribeToPersistent(
    IAsyncStreamReader<PersistentSubscriptionRequest> requestStream,
    IServerStreamWriter<StreamEventProto> responseStream,
    ServerCallContext context)
{
    await requestStream.MoveNext(context.CancellationToken);
    var request = requestStream.Current;

    // Parse event type filter from metadata
    var eventTypeFilter = context.RequestHeaders
        .FirstOrDefault(h => h.Key == "event-type-filter")
        ?.Value
        ?.Split(',')
        ?.ToHashSet();

    await foreach (var @event in _eventStore.ReadStreamAsync(
        request.StreamName,
        fromOffset: request.StartOffset,
        cancellationToken: context.CancellationToken))
    {
        // Server-side filtering
        if (eventTypeFilter != null && !eventTypeFilter.Contains(@event.EventType))
            continue;

        var eventProto = ConvertToProto(@event);
        await responseStream.WriteAsync(eventProto, context.CancellationToken);
    }
}

// Client sends filter in metadata
var metadata = new Metadata
{
    { "event-type-filter", "OrderPlaced,OrderShipped" }
};

using var call = _client.SubscribeToPersistent(metadata);

Error Handling

Reconnection Logic

public async Task SubscribeWithRetryAsync(
    string streamName,
    string subscriptionId,
    CancellationToken ct)
{
    var retryCount = 0;
    var maxRetries = 10;

    while (!ct.IsCancellationRequested && retryCount < maxRetries)
    {
        try
        {
            await SubscribeAsync(streamName, subscriptionId, ct);
            // If we reach here, stream ended normally
            break;
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
        {
            retryCount++;
            var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));

            _logger.LogWarning(
                "Connection lost, retrying in {Delay} (attempt {Attempt}/{Max})",
                delay,
                retryCount,
                maxRetries);

            await Task.Delay(delay, ct);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Fatal error in subscription");
            throw;
        }
    }
}

Idempotent Processing

private async Task ProcessEventAsync(StreamEventProto @event, CancellationToken ct)
{
    // Check if already processed (idempotency)
    var alreadyProcessed = await _processedEventsStore.ExistsAsync(@event.EventId, ct);

    if (alreadyProcessed)
    {
        _logger.LogDebug("Event {EventId} already processed, skipping", @event.EventId);
        return;
    }

    // Process event
    await HandleEventAsync(@event, ct);

    // Mark as processed
    await _processedEventsStore.AddAsync(@event.EventId, ct);
}

Multiple Streams

Subscribe to multiple streams concurrently:

public async Task SubscribeToMultipleStreamsAsync(CancellationToken ct)
{
    var streams = new[] { "orders", "payments", "shipments" };

    var tasks = streams.Select(streamName =>
        SubscribeAsync(streamName, subscriptionId: $"multi-{streamName}", ct));

    await Task.WhenAll(tasks);
}

Monitoring

Subscription Health

public class SubscriptionHealthMonitor
{
    private long _lastReceivedOffset;
    private DateTimeOffset _lastReceivedAt;

    public async Task MonitorAsync(CancellationToken ct)
    {
        using var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));

        while (await timer.WaitForNextTickAsync(ct))
        {
            var timeSinceLastEvent = DateTimeOffset.UtcNow - _lastReceivedAt;

            if (timeSinceLastEvent > TimeSpan.FromMinutes(5))
            {
                _logger.LogWarning(
                    "No events received for {Duration}, last offset: {Offset}",
                    timeSinceLastEvent,
                    _lastReceivedOffset);
            }
        }
    }

    public void RecordEvent(long offset)
    {
        _lastReceivedOffset = offset;
        _lastReceivedAt = DateTimeOffset.UtcNow;
    }
}

Metrics

public async Task SubscribeWithMetricsAsync(
    string streamName,
    string subscriptionId,
    CancellationToken ct)
{
    var eventsReceived = 0L;
    var startTime = DateTimeOffset.UtcNow;

    using var call = _client.SubscribeToPersistent();

    await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
    {
        StreamName = streamName,
        StartOffset = 0,
        SubscriptionId = subscriptionId
    }, ct);

    await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
    {
        await ProcessEventAsync(@event, ct);

        eventsReceived++;

        if (eventsReceived % 1000 == 0)
        {
            var elapsed = DateTimeOffset.UtcNow - startTime;
            var rate = eventsReceived / elapsed.TotalSeconds;

            _logger.LogInformation(
                "Processed {Count} events at {Rate:F0} events/sec",
                eventsReceived,
                rate);

            _metrics.RecordEventsProcessed(streamName, eventsReceived);
            _metrics.RecordProcessingRate(streamName, rate);
        }
    }
}

Best Practices

DO

  • Save checkpoints after processing events
  • Implement reconnection logic
  • Use idempotent event processing
  • Monitor subscription health
  • Filter events when possible
  • Handle cancellation gracefully
  • Log subscription lifecycle events
  • Use structured logging with correlation IDs

DON'T

  • Don't skip checkpoint saves
  • Don't retry indefinitely without backoff
  • Don't process events multiple times
  • Don't ignore connection errors
  • Don't subscribe to very high offset (validate first)
  • Don't forget to dispose calls
  • Don't block event processing
  • Don't ignore cancellation tokens

See Also