14 KiB
14 KiB
gRPC Queue Subscriptions
Subscribe to ephemeral streams with acknowledge/nack semantics via gRPC.
Overview
gRPC queue subscriptions provide reliable message queue delivery:
- At-Least-Once Delivery - Messages acknowledged after processing
- Visibility Timeout - Auto-redelivery on failure
- Ack/Nack - Explicit message acknowledgment
- Concurrent Consumers - Multiple consumers process in parallel
Quick Start
using Grpc.Net.Client;
using Svrnty.CQRS.Events.Grpc;
// Create gRPC client
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new EventStreamService.EventStreamServiceClient(channel);
// Subscribe to queue
using var call = client.SubscribeToQueue();
// Send subscription request
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
StreamName = "task-queue",
SubscriptionId = Guid.NewGuid().ToString(),
VisibilityTimeout = 30 // 30 seconds
});
// Process messages with ack/nack
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
try
{
await ProcessMessageAsync(message);
// Acknowledge success
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Acknowledge
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process message {MessageId}", message.MessageId);
// Negative acknowledge (requeue)
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Nack
});
}
}
Service Implementation
Queue Subscription Service
public override async Task SubscribeToQueue(
IAsyncStreamReader<QueueSubscriptionRequest> requestStream,
IServerStreamWriter<QueueMessageProto> responseStream,
ServerCallContext context)
{
// Read initial subscription
await requestStream.MoveNext(context.CancellationToken);
var initialRequest = requestStream.Current;
_logger.LogInformation(
"Queue subscription started: stream={Stream}, subscription={SubscriptionId}",
initialRequest.StreamName,
initialRequest.SubscriptionId);
var visibilityTimeout = TimeSpan.FromSeconds(initialRequest.VisibilityTimeout);
// Start background task to send messages
var sendTask = SendMessagesAsync(
initialRequest.StreamName,
initialRequest.SubscriptionId,
visibilityTimeout,
responseStream,
context.CancellationToken);
// Process acks/nacks
var receiveTask = ReceiveAcksAsync(
initialRequest.SubscriptionId,
requestStream,
context.CancellationToken);
await Task.WhenAll(sendTask, receiveTask);
}
private async Task SendMessagesAsync(
string streamName,
string subscriptionId,
TimeSpan visibilityTimeout,
IServerStreamWriter<QueueMessageProto> responseStream,
CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var messages = await _eventStore.DequeueAsync(
streamName,
visibilityTimeout,
batchSize: 10,
ct);
foreach (var message in messages)
{
var messageProto = new QueueMessageProto
{
MessageId = message.MessageId,
EventType = message.EventType,
Data = message.Data,
Metadata = { message.Metadata },
DeliveryAttempt = message.DeliveryAttempt
};
await responseStream.WriteAsync(messageProto, ct);
_logger.LogDebug(
"Sent message {MessageId} to subscription {SubscriptionId}",
message.MessageId,
subscriptionId);
}
// Wait before polling again
if (messages.Count == 0)
{
await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
}
}
}
private async Task ReceiveAcksAsync(
string subscriptionId,
IAsyncStreamReader<QueueSubscriptionRequest> requestStream,
CancellationToken ct)
{
while (await requestStream.MoveNext(ct))
{
var request = requestStream.Current;
switch (request.Action)
{
case QueueAction.Acknowledge:
await _eventStore.AcknowledgeAsync(request.MessageId, ct);
_logger.LogDebug("Message {MessageId} acknowledged", request.MessageId);
break;
case QueueAction.Nack:
await _eventStore.NackAsync(request.MessageId, ct);
_logger.LogDebug("Message {MessageId} nacked", request.MessageId);
break;
}
}
}
Client Implementation
Basic Queue Consumer
public class QueueConsumer
{
private readonly EventStreamService.EventStreamServiceClient _client;
public async Task ConsumeAsync(
string streamName,
string subscriptionId,
CancellationToken ct)
{
using var call = _client.SubscribeToQueue();
// Subscribe
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
StreamName = streamName,
SubscriptionId = subscriptionId,
VisibilityTimeout = 30
}, ct);
// Process messages
await foreach (var message in call.ResponseStream.ReadAllAsync(ct))
{
await ProcessWithAckAsync(call, message, ct);
}
}
private async Task ProcessWithAckAsync(
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
QueueMessageProto message,
CancellationToken ct)
{
try
{
_logger.LogInformation(
"Processing message {MessageId} (attempt {Attempt})",
message.MessageId,
message.DeliveryAttempt);
await ProcessMessageAsync(message, ct);
// Acknowledge
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Acknowledge
}, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message {MessageId}", message.MessageId);
// Nack - will be redelivered
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Nack
}, ct);
}
}
}
Batch Processing
public async Task ConsumeBatchAsync(
string streamName,
int batchSize,
CancellationToken ct)
{
using var call = _client.SubscribeToQueue();
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
StreamName = streamName,
SubscriptionId = Guid.NewGuid().ToString(),
VisibilityTimeout = 60, // Longer timeout for batch
BatchSize = batchSize
}, ct);
var batch = new List<QueueMessageProto>();
await foreach (var message in call.ResponseStream.ReadAllAsync(ct))
{
batch.Add(message);
if (batch.Count >= batchSize)
{
await ProcessBatchAsync(call, batch, ct);
batch.Clear();
}
}
// Process remaining
if (batch.Count > 0)
{
await ProcessBatchAsync(call, batch, ct);
}
}
private async Task ProcessBatchAsync(
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
List<QueueMessageProto> batch,
CancellationToken ct)
{
try
{
// Process batch
foreach (var message in batch)
{
await ProcessMessageAsync(message, ct);
}
// Acknowledge all
foreach (var message in batch)
{
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Acknowledge
}, ct);
}
}
catch
{
// Nack all on batch failure
foreach (var message in batch)
{
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Nack
}, ct);
}
throw;
}
}
Visibility Timeout
Handling Timeout
private async Task ProcessWithTimeoutAsync(
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
QueueMessageProto message,
TimeSpan visibilityTimeout,
CancellationToken ct)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
cts.CancelAfter(visibilityTimeout - TimeSpan.FromSeconds(5)); // 5s buffer
try
{
await ProcessMessageAsync(message, cts.Token);
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Acknowledge
}, ct);
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
_logger.LogWarning(
"Message {MessageId} processing timed out after {Timeout}",
message.MessageId,
visibilityTimeout);
// Nack - visibility timeout will expire anyway
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Nack
}, ct);
}
}
Extending Visibility
private async Task ProcessLongRunningAsync(
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
QueueMessageProto message,
CancellationToken ct)
{
// Start background task to extend visibility
using var extendCts = new CancellationTokenSource();
var extendTask = Task.Run(async () =>
{
while (!extendCts.Token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(15), extendCts.Token);
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.ExtendVisibility,
VisibilityTimeout = 30 // Extend by 30 seconds
}, extendCts.Token);
}
}, extendCts.Token);
try
{
// Process message (may take a long time)
await ProcessMessageAsync(message, ct);
// Acknowledge
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Acknowledge
}, ct);
}
finally
{
extendCts.Cancel();
await extendTask;
}
}
Error Handling
Retry with Dead Letter Queue
private async Task ProcessWithDlqAsync(
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
QueueMessageProto message,
int maxAttempts,
CancellationToken ct)
{
try
{
await ProcessMessageAsync(message, ct);
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Acknowledge
}, ct);
}
catch (Exception ex)
{
if (message.DeliveryAttempt >= maxAttempts)
{
_logger.LogError(ex,
"Message {MessageId} failed after {Attempts} attempts, moving to DLQ",
message.MessageId,
message.DeliveryAttempt);
// Move to dead letter queue
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.DeadLetter,
Reason = ex.Message
}, ct);
}
else
{
_logger.LogWarning(ex,
"Message {MessageId} failed (attempt {Attempt}/{Max}), will retry",
message.MessageId,
message.DeliveryAttempt,
maxAttempts);
// Nack for retry
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
MessageId = message.MessageId,
Action = QueueAction.Nack
}, ct);
}
}
}
Concurrent Consumers
Run multiple consumers for parallel processing:
public async Task RunMultipleConsumersAsync(
string streamName,
int consumerCount,
CancellationToken ct)
{
var tasks = Enumerable.Range(0, consumerCount)
.Select(i => ConsumeAsync(
streamName,
subscriptionId: $"consumer-{i}",
ct))
.ToArray();
await Task.WhenAll(tasks);
}
Best Practices
✅ DO
- Always acknowledge or nack messages
- Use appropriate visibility timeouts
- Handle timeouts gracefully
- Implement dead letter queues
- Use batch processing for throughput
- Run multiple consumers for scale
- Monitor delivery attempts
- Log ack/nack operations
❌ DON'T
- Don't forget to ack/nack
- Don't use very short visibility timeouts
- Don't process indefinitely without extending visibility
- Don't retry permanently failed messages indefinitely
- Don't skip error logging
- Don't ignore delivery attempt counts
- Don't block message processing
- Don't forget cancellation token handling