dotnet-cqrs/docs/event-streaming/grpc-streaming/persistent-subscriptions.md

468 lines
13 KiB
Markdown

# gRPC Persistent Subscriptions
Subscribe to persistent event streams via gRPC bidirectional streaming.
## Overview
gRPC persistent subscriptions provide real-time event delivery for persistent streams:
- **Bidirectional Streaming** - Full-duplex communication
- **Offset-Based** - Resume from specific positions
- **Broadcast Mode** - All consumers receive all events
- **Real-Time Delivery** - Low-latency event propagation
## Quick Start
```csharp
using Grpc.Net.Client;
using Svrnty.CQRS.Events.Grpc;
// Create gRPC client
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new EventStreamService.EventStreamServiceClient(channel);
// Subscribe to persistent stream
using var streamingCall = client.SubscribeToPersistent();
// Send subscription request
await streamingCall.RequestStream.WriteAsync(new PersistentSubscriptionRequest
{
StreamName = "orders",
StartOffset = 0, // Start from beginning
SubscriptionId = Guid.NewGuid().ToString()
});
// Receive events
await foreach (var @event in streamingCall.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"Received: {@event.EventType} at offset {@event.Offset}");
await ProcessEventAsync(@event);
}
```
## Service Implementation
### EventStreamServiceImpl
```csharp
using Grpc.Core;
using Svrnty.CQRS.Events.Abstractions;
public class EventStreamServiceImpl : EventStreamService.EventStreamServiceBase
{
private readonly IEventStreamStore _eventStore;
public override async Task SubscribeToPersistent(
IAsyncStreamReader<PersistentSubscriptionRequest> requestStream,
IServerStreamWriter<StreamEventProto> responseStream,
ServerCallContext context)
{
// Read initial subscription request
await requestStream.MoveNext(context.CancellationToken);
var request = requestStream.Current;
_logger.LogInformation(
"Persistent subscription started: stream={Stream}, offset={Offset}, subscription={SubscriptionId}",
request.StreamName,
request.StartOffset,
request.SubscriptionId);
try
{
// Stream events from offset
await foreach (var @event in _eventStore.ReadStreamAsync(
request.StreamName,
fromOffset: request.StartOffset,
cancellationToken: context.CancellationToken))
{
// Convert to proto message
var eventProto = new StreamEventProto
{
EventId = @event.EventId,
EventType = @event.EventType,
StreamName = @event.StreamName,
Offset = @event.Offset,
Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(@event.Timestamp),
Data = @event.Data,
Metadata = { @event.Metadata }
};
// Send to client
await responseStream.WriteAsync(eventProto, context.CancellationToken);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
_logger.LogInformation("Subscription cancelled by client: {SubscriptionId}", request.SubscriptionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in persistent subscription: {SubscriptionId}", request.SubscriptionId);
throw;
}
}
}
```
## Client Subscription
### Basic Subscription
```csharp
public class PersistentStreamSubscriber
{
private readonly EventStreamService.EventStreamServiceClient _client;
public async Task SubscribeAsync(
string streamName,
long startOffset,
CancellationToken ct)
{
using var call = _client.SubscribeToPersistent();
// Send subscription request
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
{
StreamName = streamName,
StartOffset = startOffset,
SubscriptionId = Guid.NewGuid().ToString()
}, ct);
// Process events
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
{
await ProcessEventAsync(@event, ct);
}
}
private async Task ProcessEventAsync(StreamEventProto @event, CancellationToken ct)
{
_logger.LogInformation(
"Processing event: {EventType} at offset {Offset}",
@event.EventType,
@event.Offset);
// Handle event
switch (@event.EventType)
{
case "OrderPlaced":
var orderPlaced = JsonSerializer.Deserialize<OrderPlacedEvent>(@event.Data);
await HandleOrderPlacedAsync(orderPlaced, ct);
break;
case "OrderShipped":
var orderShipped = JsonSerializer.Deserialize<OrderShippedEvent>(@event.Data);
await HandleOrderShippedAsync(orderShipped, ct);
break;
}
}
}
```
### Resume from Checkpoint
```csharp
public async Task SubscribeWithCheckpointAsync(
string streamName,
string subscriptionId,
CancellationToken ct)
{
// Load checkpoint
var checkpoint = await _checkpointStore.GetCheckpointAsync(subscriptionId, ct);
_logger.LogInformation(
"Resuming subscription {SubscriptionId} from offset {Offset}",
subscriptionId,
checkpoint);
using var call = _client.SubscribeToPersistent();
// Subscribe from checkpoint
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
{
StreamName = streamName,
StartOffset = checkpoint + 1, // Resume after last processed
SubscriptionId = subscriptionId
}, ct);
// Process and checkpoint
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
{
await ProcessEventAsync(@event, ct);
// Save checkpoint after processing
await _checkpointStore.SaveCheckpointAsync(subscriptionId, @event.Offset, ct);
}
}
```
## Filtering Events
### Client-Side Filtering
```csharp
public async Task SubscribeWithFilterAsync(
string streamName,
HashSet<string> eventTypes,
CancellationToken ct)
{
using var call = _client.SubscribeToPersistent();
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
{
StreamName = streamName,
StartOffset = 0,
SubscriptionId = Guid.NewGuid().ToString()
}, ct);
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
{
// Filter by event type
if (!eventTypes.Contains(@event.EventType))
continue;
await ProcessEventAsync(@event, ct);
}
}
// Usage
await SubscribeWithFilterAsync("orders", new HashSet<string>
{
"OrderPlaced",
"OrderShipped",
"OrderCancelled"
}, ct);
```
### Server-Side Filtering
```csharp
public override async Task SubscribeToPersistent(
IAsyncStreamReader<PersistentSubscriptionRequest> requestStream,
IServerStreamWriter<StreamEventProto> responseStream,
ServerCallContext context)
{
await requestStream.MoveNext(context.CancellationToken);
var request = requestStream.Current;
// Parse event type filter from metadata
var eventTypeFilter = context.RequestHeaders
.FirstOrDefault(h => h.Key == "event-type-filter")
?.Value
?.Split(',')
?.ToHashSet();
await foreach (var @event in _eventStore.ReadStreamAsync(
request.StreamName,
fromOffset: request.StartOffset,
cancellationToken: context.CancellationToken))
{
// Server-side filtering
if (eventTypeFilter != null && !eventTypeFilter.Contains(@event.EventType))
continue;
var eventProto = ConvertToProto(@event);
await responseStream.WriteAsync(eventProto, context.CancellationToken);
}
}
// Client sends filter in metadata
var metadata = new Metadata
{
{ "event-type-filter", "OrderPlaced,OrderShipped" }
};
using var call = _client.SubscribeToPersistent(metadata);
```
## Error Handling
### Reconnection Logic
```csharp
public async Task SubscribeWithRetryAsync(
string streamName,
string subscriptionId,
CancellationToken ct)
{
var retryCount = 0;
var maxRetries = 10;
while (!ct.IsCancellationRequested && retryCount < maxRetries)
{
try
{
await SubscribeAsync(streamName, subscriptionId, ct);
// If we reach here, stream ended normally
break;
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
{
retryCount++;
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
_logger.LogWarning(
"Connection lost, retrying in {Delay} (attempt {Attempt}/{Max})",
delay,
retryCount,
maxRetries);
await Task.Delay(delay, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Fatal error in subscription");
throw;
}
}
}
```
### Idempotent Processing
```csharp
private async Task ProcessEventAsync(StreamEventProto @event, CancellationToken ct)
{
// Check if already processed (idempotency)
var alreadyProcessed = await _processedEventsStore.ExistsAsync(@event.EventId, ct);
if (alreadyProcessed)
{
_logger.LogDebug("Event {EventId} already processed, skipping", @event.EventId);
return;
}
// Process event
await HandleEventAsync(@event, ct);
// Mark as processed
await _processedEventsStore.AddAsync(@event.EventId, ct);
}
```
## Multiple Streams
Subscribe to multiple streams concurrently:
```csharp
public async Task SubscribeToMultipleStreamsAsync(CancellationToken ct)
{
var streams = new[] { "orders", "payments", "shipments" };
var tasks = streams.Select(streamName =>
SubscribeAsync(streamName, subscriptionId: $"multi-{streamName}", ct));
await Task.WhenAll(tasks);
}
```
## Monitoring
### Subscription Health
```csharp
public class SubscriptionHealthMonitor
{
private long _lastReceivedOffset;
private DateTimeOffset _lastReceivedAt;
public async Task MonitorAsync(CancellationToken ct)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));
while (await timer.WaitForNextTickAsync(ct))
{
var timeSinceLastEvent = DateTimeOffset.UtcNow - _lastReceivedAt;
if (timeSinceLastEvent > TimeSpan.FromMinutes(5))
{
_logger.LogWarning(
"No events received for {Duration}, last offset: {Offset}",
timeSinceLastEvent,
_lastReceivedOffset);
}
}
}
public void RecordEvent(long offset)
{
_lastReceivedOffset = offset;
_lastReceivedAt = DateTimeOffset.UtcNow;
}
}
```
### Metrics
```csharp
public async Task SubscribeWithMetricsAsync(
string streamName,
string subscriptionId,
CancellationToken ct)
{
var eventsReceived = 0L;
var startTime = DateTimeOffset.UtcNow;
using var call = _client.SubscribeToPersistent();
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
{
StreamName = streamName,
StartOffset = 0,
SubscriptionId = subscriptionId
}, ct);
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
{
await ProcessEventAsync(@event, ct);
eventsReceived++;
if (eventsReceived % 1000 == 0)
{
var elapsed = DateTimeOffset.UtcNow - startTime;
var rate = eventsReceived / elapsed.TotalSeconds;
_logger.LogInformation(
"Processed {Count} events at {Rate:F0} events/sec",
eventsReceived,
rate);
_metrics.RecordEventsProcessed(streamName, eventsReceived);
_metrics.RecordProcessingRate(streamName, rate);
}
}
}
```
## Best Practices
### ✅ DO
- Save checkpoints after processing events
- Implement reconnection logic
- Use idempotent event processing
- Monitor subscription health
- Filter events when possible
- Handle cancellation gracefully
- Log subscription lifecycle events
- Use structured logging with correlation IDs
### ❌ DON'T
- Don't skip checkpoint saves
- Don't retry indefinitely without backoff
- Don't process events multiple times
- Don't ignore connection errors
- Don't subscribe to very high offset (validate first)
- Don't forget to dispose calls
- Don't block event processing
- Don't ignore cancellation tokens
## See Also
- [gRPC Streaming Overview](README.md)
- [Queue Subscriptions](queue-subscriptions.md)
- [gRPC Clients](grpc-clients.md)
- [Consumer Groups](../consumer-groups/README.md)
- [Subscriptions](../fundamentals/subscriptions.md)