3.9 KiB
3.9 KiB
gRPC Streaming
Real-time event delivery via gRPC bidirectional streaming.
Overview
gRPC streaming provides real-time event delivery from server to clients using bidirectional streams. Clients can subscribe to persistent or ephemeral streams and receive events as they occur.
Key Features:
- ✅ Bidirectional Streaming - Full duplex communication
- ✅ Real-Time Delivery - Events pushed as they occur
- ✅ Persistent Subscriptions - Subscribe to event sourced streams
- ✅ Queue Subscriptions - Dequeue with ack/nack
- ✅ Automatic Reconnection - Client handles disconnects
Proto Definition
service EventStreamService {
rpc SubscribeToPersistentStream (PersistentSubscriptionRequest) returns (stream EventMessage);
rpc SubscribeToQueue (QueueSubscriptionRequest) returns (stream QueueMessage);
rpc AcknowledgeMessage (AckRequest) returns (google.protobuf.Empty);
}
message PersistentSubscriptionRequest {
string stream_name = 1;
int64 from_offset = 2;
}
message EventMessage {
int64 offset = 1;
string event_id = 2;
string event_type = 3;
bytes data = 4;
google.protobuf.Timestamp timestamp = 5;
}
message QueueSubscriptionRequest {
string stream_name = 1;
int32 visibility_timeout_seconds = 2;
}
message QueueMessage {
string message_id = 1;
bytes data = 2;
int32 delivery_attempts = 3;
}
message AckRequest {
string stream_name = 1;
string message_id = 2;
bool success = 3;
}
Server Setup
builder.Services.AddGrpc();
builder.Services.AddSingleton<EventStreamServiceImpl>();
app.MapGrpcService<EventStreamServiceImpl>();
Client Usage
Subscribe to Persistent Stream
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new EventStreamService.EventStreamServiceClient(channel);
var request = new PersistentSubscriptionRequest
{
StreamName = "orders",
FromOffset = 0
};
using var stream = client.SubscribeToPersistentStream(request);
await foreach (var eventMessage in stream.ResponseStream.ReadAllAsync())
{
Console.WriteLine($"Event: {eventMessage.EventType} at offset {eventMessage.Offset}");
await ProcessEventAsync(eventMessage);
}
Subscribe to Queue
var request = new QueueSubscriptionRequest
{
StreamName = "email-queue",
VisibilityTimeoutSeconds = 300 // 5 minutes
};
using var stream = client.SubscribeToQueue(request);
await foreach (var message in stream.ResponseStream.ReadAllAsync())
{
try
{
await ProcessMessageAsync(message);
// Acknowledge success
await client.AcknowledgeMessageAsync(new AckRequest
{
StreamName = "email-queue",
MessageId = message.MessageId,
Success = true
});
}
catch (Exception ex)
{
// Nack - message will be redelivered
await client.AcknowledgeMessageAsync(new AckRequest
{
StreamName = "email-queue",
MessageId = message.MessageId,
Success = false
});
}
}
Features
Persistent Subscriptions
Subscribe to event-sourced streams for real-time event delivery.
Queue Subscriptions
Dequeue messages with ack/nack for reliable processing.
Best Practices
✅ DO
- Implement automatic reconnection
- Handle stream cancellation gracefully
- Use heartbeats for connection monitoring
- Set appropriate timeouts
- Process events idempotently
❌ DON'T
- Don't block stream processing
- Don't forget to acknowledge messages
- Don't ignore connection errors
- Don't skip error handling