468 lines
13 KiB
Markdown
468 lines
13 KiB
Markdown
# gRPC Persistent Subscriptions
|
|
|
|
Subscribe to persistent event streams via gRPC bidirectional streaming.
|
|
|
|
## Overview
|
|
|
|
gRPC persistent subscriptions provide real-time event delivery for persistent streams:
|
|
- **Bidirectional Streaming** - Full-duplex communication
|
|
- **Offset-Based** - Resume from specific positions
|
|
- **Broadcast Mode** - All consumers receive all events
|
|
- **Real-Time Delivery** - Low-latency event propagation
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
using Grpc.Net.Client;
|
|
using Svrnty.CQRS.Events.Grpc;
|
|
|
|
// Create gRPC client
|
|
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
|
|
var client = new EventStreamService.EventStreamServiceClient(channel);
|
|
|
|
// Subscribe to persistent stream
|
|
using var streamingCall = client.SubscribeToPersistent();
|
|
|
|
// Send subscription request
|
|
await streamingCall.RequestStream.WriteAsync(new PersistentSubscriptionRequest
|
|
{
|
|
StreamName = "orders",
|
|
StartOffset = 0, // Start from beginning
|
|
SubscriptionId = Guid.NewGuid().ToString()
|
|
});
|
|
|
|
// Receive events
|
|
await foreach (var @event in streamingCall.ResponseStream.ReadAllAsync())
|
|
{
|
|
Console.WriteLine($"Received: {@event.EventType} at offset {@event.Offset}");
|
|
await ProcessEventAsync(@event);
|
|
}
|
|
```
|
|
|
|
## Service Implementation
|
|
|
|
### EventStreamServiceImpl
|
|
|
|
```csharp
|
|
using Grpc.Core;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
public class EventStreamServiceImpl : EventStreamService.EventStreamServiceBase
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public override async Task SubscribeToPersistent(
|
|
IAsyncStreamReader<PersistentSubscriptionRequest> requestStream,
|
|
IServerStreamWriter<StreamEventProto> responseStream,
|
|
ServerCallContext context)
|
|
{
|
|
// Read initial subscription request
|
|
await requestStream.MoveNext(context.CancellationToken);
|
|
var request = requestStream.Current;
|
|
|
|
_logger.LogInformation(
|
|
"Persistent subscription started: stream={Stream}, offset={Offset}, subscription={SubscriptionId}",
|
|
request.StreamName,
|
|
request.StartOffset,
|
|
request.SubscriptionId);
|
|
|
|
try
|
|
{
|
|
// Stream events from offset
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
request.StreamName,
|
|
fromOffset: request.StartOffset,
|
|
cancellationToken: context.CancellationToken))
|
|
{
|
|
// Convert to proto message
|
|
var eventProto = new StreamEventProto
|
|
{
|
|
EventId = @event.EventId,
|
|
EventType = @event.EventType,
|
|
StreamName = @event.StreamName,
|
|
Offset = @event.Offset,
|
|
Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTimeOffset(@event.Timestamp),
|
|
Data = @event.Data,
|
|
Metadata = { @event.Metadata }
|
|
};
|
|
|
|
// Send to client
|
|
await responseStream.WriteAsync(eventProto, context.CancellationToken);
|
|
}
|
|
}
|
|
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
|
|
{
|
|
_logger.LogInformation("Subscription cancelled by client: {SubscriptionId}", request.SubscriptionId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error in persistent subscription: {SubscriptionId}", request.SubscriptionId);
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Client Subscription
|
|
|
|
### Basic Subscription
|
|
|
|
```csharp
|
|
public class PersistentStreamSubscriber
|
|
{
|
|
private readonly EventStreamService.EventStreamServiceClient _client;
|
|
|
|
public async Task SubscribeAsync(
|
|
string streamName,
|
|
long startOffset,
|
|
CancellationToken ct)
|
|
{
|
|
using var call = _client.SubscribeToPersistent();
|
|
|
|
// Send subscription request
|
|
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
|
|
{
|
|
StreamName = streamName,
|
|
StartOffset = startOffset,
|
|
SubscriptionId = Guid.NewGuid().ToString()
|
|
}, ct);
|
|
|
|
// Process events
|
|
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
|
|
{
|
|
await ProcessEventAsync(@event, ct);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessEventAsync(StreamEventProto @event, CancellationToken ct)
|
|
{
|
|
_logger.LogInformation(
|
|
"Processing event: {EventType} at offset {Offset}",
|
|
@event.EventType,
|
|
@event.Offset);
|
|
|
|
// Handle event
|
|
switch (@event.EventType)
|
|
{
|
|
case "OrderPlaced":
|
|
var orderPlaced = JsonSerializer.Deserialize<OrderPlacedEvent>(@event.Data);
|
|
await HandleOrderPlacedAsync(orderPlaced, ct);
|
|
break;
|
|
|
|
case "OrderShipped":
|
|
var orderShipped = JsonSerializer.Deserialize<OrderShippedEvent>(@event.Data);
|
|
await HandleOrderShippedAsync(orderShipped, ct);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Resume from Checkpoint
|
|
|
|
```csharp
|
|
public async Task SubscribeWithCheckpointAsync(
|
|
string streamName,
|
|
string subscriptionId,
|
|
CancellationToken ct)
|
|
{
|
|
// Load checkpoint
|
|
var checkpoint = await _checkpointStore.GetCheckpointAsync(subscriptionId, ct);
|
|
|
|
_logger.LogInformation(
|
|
"Resuming subscription {SubscriptionId} from offset {Offset}",
|
|
subscriptionId,
|
|
checkpoint);
|
|
|
|
using var call = _client.SubscribeToPersistent();
|
|
|
|
// Subscribe from checkpoint
|
|
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
|
|
{
|
|
StreamName = streamName,
|
|
StartOffset = checkpoint + 1, // Resume after last processed
|
|
SubscriptionId = subscriptionId
|
|
}, ct);
|
|
|
|
// Process and checkpoint
|
|
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
|
|
{
|
|
await ProcessEventAsync(@event, ct);
|
|
|
|
// Save checkpoint after processing
|
|
await _checkpointStore.SaveCheckpointAsync(subscriptionId, @event.Offset, ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Filtering Events
|
|
|
|
### Client-Side Filtering
|
|
|
|
```csharp
|
|
public async Task SubscribeWithFilterAsync(
|
|
string streamName,
|
|
HashSet<string> eventTypes,
|
|
CancellationToken ct)
|
|
{
|
|
using var call = _client.SubscribeToPersistent();
|
|
|
|
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
|
|
{
|
|
StreamName = streamName,
|
|
StartOffset = 0,
|
|
SubscriptionId = Guid.NewGuid().ToString()
|
|
}, ct);
|
|
|
|
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
|
|
{
|
|
// Filter by event type
|
|
if (!eventTypes.Contains(@event.EventType))
|
|
continue;
|
|
|
|
await ProcessEventAsync(@event, ct);
|
|
}
|
|
}
|
|
|
|
// Usage
|
|
await SubscribeWithFilterAsync("orders", new HashSet<string>
|
|
{
|
|
"OrderPlaced",
|
|
"OrderShipped",
|
|
"OrderCancelled"
|
|
}, ct);
|
|
```
|
|
|
|
### Server-Side Filtering
|
|
|
|
```csharp
|
|
public override async Task SubscribeToPersistent(
|
|
IAsyncStreamReader<PersistentSubscriptionRequest> requestStream,
|
|
IServerStreamWriter<StreamEventProto> responseStream,
|
|
ServerCallContext context)
|
|
{
|
|
await requestStream.MoveNext(context.CancellationToken);
|
|
var request = requestStream.Current;
|
|
|
|
// Parse event type filter from metadata
|
|
var eventTypeFilter = context.RequestHeaders
|
|
.FirstOrDefault(h => h.Key == "event-type-filter")
|
|
?.Value
|
|
?.Split(',')
|
|
?.ToHashSet();
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
request.StreamName,
|
|
fromOffset: request.StartOffset,
|
|
cancellationToken: context.CancellationToken))
|
|
{
|
|
// Server-side filtering
|
|
if (eventTypeFilter != null && !eventTypeFilter.Contains(@event.EventType))
|
|
continue;
|
|
|
|
var eventProto = ConvertToProto(@event);
|
|
await responseStream.WriteAsync(eventProto, context.CancellationToken);
|
|
}
|
|
}
|
|
|
|
// Client sends filter in metadata
|
|
var metadata = new Metadata
|
|
{
|
|
{ "event-type-filter", "OrderPlaced,OrderShipped" }
|
|
};
|
|
|
|
using var call = _client.SubscribeToPersistent(metadata);
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Reconnection Logic
|
|
|
|
```csharp
|
|
public async Task SubscribeWithRetryAsync(
|
|
string streamName,
|
|
string subscriptionId,
|
|
CancellationToken ct)
|
|
{
|
|
var retryCount = 0;
|
|
var maxRetries = 10;
|
|
|
|
while (!ct.IsCancellationRequested && retryCount < maxRetries)
|
|
{
|
|
try
|
|
{
|
|
await SubscribeAsync(streamName, subscriptionId, ct);
|
|
// If we reach here, stream ended normally
|
|
break;
|
|
}
|
|
catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
|
|
{
|
|
retryCount++;
|
|
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
|
|
|
|
_logger.LogWarning(
|
|
"Connection lost, retrying in {Delay} (attempt {Attempt}/{Max})",
|
|
delay,
|
|
retryCount,
|
|
maxRetries);
|
|
|
|
await Task.Delay(delay, ct);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Fatal error in subscription");
|
|
throw;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Idempotent Processing
|
|
|
|
```csharp
|
|
private async Task ProcessEventAsync(StreamEventProto @event, CancellationToken ct)
|
|
{
|
|
// Check if already processed (idempotency)
|
|
var alreadyProcessed = await _processedEventsStore.ExistsAsync(@event.EventId, ct);
|
|
|
|
if (alreadyProcessed)
|
|
{
|
|
_logger.LogDebug("Event {EventId} already processed, skipping", @event.EventId);
|
|
return;
|
|
}
|
|
|
|
// Process event
|
|
await HandleEventAsync(@event, ct);
|
|
|
|
// Mark as processed
|
|
await _processedEventsStore.AddAsync(@event.EventId, ct);
|
|
}
|
|
```
|
|
|
|
## Multiple Streams
|
|
|
|
Subscribe to multiple streams concurrently:
|
|
|
|
```csharp
|
|
public async Task SubscribeToMultipleStreamsAsync(CancellationToken ct)
|
|
{
|
|
var streams = new[] { "orders", "payments", "shipments" };
|
|
|
|
var tasks = streams.Select(streamName =>
|
|
SubscribeAsync(streamName, subscriptionId: $"multi-{streamName}", ct));
|
|
|
|
await Task.WhenAll(tasks);
|
|
}
|
|
```
|
|
|
|
## Monitoring
|
|
|
|
### Subscription Health
|
|
|
|
```csharp
|
|
public class SubscriptionHealthMonitor
|
|
{
|
|
private long _lastReceivedOffset;
|
|
private DateTimeOffset _lastReceivedAt;
|
|
|
|
public async Task MonitorAsync(CancellationToken ct)
|
|
{
|
|
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));
|
|
|
|
while (await timer.WaitForNextTickAsync(ct))
|
|
{
|
|
var timeSinceLastEvent = DateTimeOffset.UtcNow - _lastReceivedAt;
|
|
|
|
if (timeSinceLastEvent > TimeSpan.FromMinutes(5))
|
|
{
|
|
_logger.LogWarning(
|
|
"No events received for {Duration}, last offset: {Offset}",
|
|
timeSinceLastEvent,
|
|
_lastReceivedOffset);
|
|
}
|
|
}
|
|
}
|
|
|
|
public void RecordEvent(long offset)
|
|
{
|
|
_lastReceivedOffset = offset;
|
|
_lastReceivedAt = DateTimeOffset.UtcNow;
|
|
}
|
|
}
|
|
```
|
|
|
|
### Metrics
|
|
|
|
```csharp
|
|
public async Task SubscribeWithMetricsAsync(
|
|
string streamName,
|
|
string subscriptionId,
|
|
CancellationToken ct)
|
|
{
|
|
var eventsReceived = 0L;
|
|
var startTime = DateTimeOffset.UtcNow;
|
|
|
|
using var call = _client.SubscribeToPersistent();
|
|
|
|
await call.RequestStream.WriteAsync(new PersistentSubscriptionRequest
|
|
{
|
|
StreamName = streamName,
|
|
StartOffset = 0,
|
|
SubscriptionId = subscriptionId
|
|
}, ct);
|
|
|
|
await foreach (var @event in call.ResponseStream.ReadAllAsync(ct))
|
|
{
|
|
await ProcessEventAsync(@event, ct);
|
|
|
|
eventsReceived++;
|
|
|
|
if (eventsReceived % 1000 == 0)
|
|
{
|
|
var elapsed = DateTimeOffset.UtcNow - startTime;
|
|
var rate = eventsReceived / elapsed.TotalSeconds;
|
|
|
|
_logger.LogInformation(
|
|
"Processed {Count} events at {Rate:F0} events/sec",
|
|
eventsReceived,
|
|
rate);
|
|
|
|
_metrics.RecordEventsProcessed(streamName, eventsReceived);
|
|
_metrics.RecordProcessingRate(streamName, rate);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Save checkpoints after processing events
|
|
- Implement reconnection logic
|
|
- Use idempotent event processing
|
|
- Monitor subscription health
|
|
- Filter events when possible
|
|
- Handle cancellation gracefully
|
|
- Log subscription lifecycle events
|
|
- Use structured logging with correlation IDs
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't skip checkpoint saves
|
|
- Don't retry indefinitely without backoff
|
|
- Don't process events multiple times
|
|
- Don't ignore connection errors
|
|
- Don't subscribe to very high offset (validate first)
|
|
- Don't forget to dispose calls
|
|
- Don't block event processing
|
|
- Don't ignore cancellation tokens
|
|
|
|
## See Also
|
|
|
|
- [gRPC Streaming Overview](README.md)
|
|
- [Queue Subscriptions](queue-subscriptions.md)
|
|
- [gRPC Clients](grpc-clients.md)
|
|
- [Consumer Groups](../consumer-groups/README.md)
|
|
- [Subscriptions](../fundamentals/subscriptions.md)
|