dotnet-cqrs/docs/event-streaming/grpc-streaming/README.md

158 lines
3.9 KiB
Markdown

# gRPC Streaming
Real-time event delivery via gRPC bidirectional streaming.
## Overview
gRPC streaming provides real-time event delivery from server to clients using bidirectional streams. Clients can subscribe to persistent or ephemeral streams and receive events as they occur.
**Key Features:**
-**Bidirectional Streaming** - Full duplex communication
-**Real-Time Delivery** - Events pushed as they occur
-**Persistent Subscriptions** - Subscribe to event sourced streams
-**Queue Subscriptions** - Dequeue with ack/nack
-**Automatic Reconnection** - Client handles disconnects
## Proto Definition
```protobuf
service EventStreamService {
rpc SubscribeToPersistentStream (PersistentSubscriptionRequest) returns (stream EventMessage);
rpc SubscribeToQueue (QueueSubscriptionRequest) returns (stream QueueMessage);
rpc AcknowledgeMessage (AckRequest) returns (google.protobuf.Empty);
}
message PersistentSubscriptionRequest {
string stream_name = 1;
int64 from_offset = 2;
}
message EventMessage {
int64 offset = 1;
string event_id = 2;
string event_type = 3;
bytes data = 4;
google.protobuf.Timestamp timestamp = 5;
}
message QueueSubscriptionRequest {
string stream_name = 1;
int32 visibility_timeout_seconds = 2;
}
message QueueMessage {
string message_id = 1;
bytes data = 2;
int32 delivery_attempts = 3;
}
message AckRequest {
string stream_name = 1;
string message_id = 2;
bool success = 3;
}
```
## Server Setup
```csharp
builder.Services.AddGrpc();
builder.Services.AddSingleton<EventStreamServiceImpl>();
app.MapGrpcService<EventStreamServiceImpl>();
```
## Client Usage
### Subscribe to Persistent Stream
```csharp
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new EventStreamService.EventStreamServiceClient(channel);
var request = new PersistentSubscriptionRequest
{
StreamName = "orders",
FromOffset = 0
};
using var stream = client.SubscribeToPersistentStream(request);
await foreach (var eventMessage in stream.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"Event: {eventMessage.EventType} at offset {eventMessage.Offset}");
await ProcessEventAsync(eventMessage);
}
```
### Subscribe to Queue
```csharp
var request = new QueueSubscriptionRequest
{
StreamName = "email-queue",
VisibilityTimeoutSeconds = 300 // 5 minutes
};
using var stream = client.SubscribeToQueue(request);
await foreach (var message in stream.ResponseStream.ReadAllAsync())
{
try
{
await ProcessMessageAsync(message);
// Acknowledge success
await client.AcknowledgeMessageAsync(new AckRequest
{
StreamName = "email-queue",
MessageId = message.MessageId,
Success = true
});
}
catch (Exception ex)
{
// Nack - message will be redelivered
await client.AcknowledgeMessageAsync(new AckRequest
{
StreamName = "email-queue",
MessageId = message.MessageId,
Success = false
});
}
}
```
## Features
### [Persistent Subscriptions](persistent-subscriptions.md)
Subscribe to event-sourced streams for real-time event delivery.
### [Queue Subscriptions](queue-subscriptions.md)
Dequeue messages with ack/nack for reliable processing.
## Best Practices
### ✅ DO
- Implement automatic reconnection
- Handle stream cancellation gracefully
- Use heartbeats for connection monitoring
- Set appropriate timeouts
- Process events idempotently
### ❌ DON'T
- Don't block stream processing
- Don't forget to acknowledge messages
- Don't ignore connection errors
- Don't skip error handling
## See Also
- [Event Streaming Overview](../README.md)
- [gRPC Integration](../../grpc-integration/README.md)
- [Persistent Streams](../fundamentals/persistent-streams.md)
- [Ephemeral Streams](../fundamentals/ephemeral-streams.md)