507 lines
14 KiB
Markdown
507 lines
14 KiB
Markdown
# gRPC Queue Subscriptions
|
|
|
|
Subscribe to ephemeral streams with acknowledge/nack semantics via gRPC.
|
|
|
|
## Overview
|
|
|
|
gRPC queue subscriptions provide reliable message queue delivery:
|
|
- **At-Least-Once Delivery** - Messages acknowledged after processing
|
|
- **Visibility Timeout** - Auto-redelivery on failure
|
|
- **Ack/Nack** - Explicit message acknowledgment
|
|
- **Concurrent Consumers** - Multiple consumers process in parallel
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
using Grpc.Net.Client;
|
|
using Svrnty.CQRS.Events.Grpc;
|
|
|
|
// Create gRPC client
|
|
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
|
|
var client = new EventStreamService.EventStreamServiceClient(channel);
|
|
|
|
// Subscribe to queue
|
|
using var call = client.SubscribeToQueue();
|
|
|
|
// Send subscription request
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
StreamName = "task-queue",
|
|
SubscriptionId = Guid.NewGuid().ToString(),
|
|
VisibilityTimeout = 30 // 30 seconds
|
|
});
|
|
|
|
// Process messages with ack/nack
|
|
await foreach (var message in call.ResponseStream.ReadAllAsync())
|
|
{
|
|
try
|
|
{
|
|
await ProcessMessageAsync(message);
|
|
|
|
// Acknowledge success
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Acknowledge
|
|
});
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to process message {MessageId}", message.MessageId);
|
|
|
|
// Negative acknowledge (requeue)
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Nack
|
|
});
|
|
}
|
|
}
|
|
```
|
|
|
|
## Service Implementation
|
|
|
|
### Queue Subscription Service
|
|
|
|
```csharp
|
|
public override async Task SubscribeToQueue(
|
|
IAsyncStreamReader<QueueSubscriptionRequest> requestStream,
|
|
IServerStreamWriter<QueueMessageProto> responseStream,
|
|
ServerCallContext context)
|
|
{
|
|
// Read initial subscription
|
|
await requestStream.MoveNext(context.CancellationToken);
|
|
var initialRequest = requestStream.Current;
|
|
|
|
_logger.LogInformation(
|
|
"Queue subscription started: stream={Stream}, subscription={SubscriptionId}",
|
|
initialRequest.StreamName,
|
|
initialRequest.SubscriptionId);
|
|
|
|
var visibilityTimeout = TimeSpan.FromSeconds(initialRequest.VisibilityTimeout);
|
|
|
|
// Start background task to send messages
|
|
var sendTask = SendMessagesAsync(
|
|
initialRequest.StreamName,
|
|
initialRequest.SubscriptionId,
|
|
visibilityTimeout,
|
|
responseStream,
|
|
context.CancellationToken);
|
|
|
|
// Process acks/nacks
|
|
var receiveTask = ReceiveAcksAsync(
|
|
initialRequest.SubscriptionId,
|
|
requestStream,
|
|
context.CancellationToken);
|
|
|
|
await Task.WhenAll(sendTask, receiveTask);
|
|
}
|
|
|
|
private async Task SendMessagesAsync(
|
|
string streamName,
|
|
string subscriptionId,
|
|
TimeSpan visibilityTimeout,
|
|
IServerStreamWriter<QueueMessageProto> responseStream,
|
|
CancellationToken ct)
|
|
{
|
|
while (!ct.IsCancellationRequested)
|
|
{
|
|
var messages = await _eventStore.DequeueAsync(
|
|
streamName,
|
|
visibilityTimeout,
|
|
batchSize: 10,
|
|
ct);
|
|
|
|
foreach (var message in messages)
|
|
{
|
|
var messageProto = new QueueMessageProto
|
|
{
|
|
MessageId = message.MessageId,
|
|
EventType = message.EventType,
|
|
Data = message.Data,
|
|
Metadata = { message.Metadata },
|
|
DeliveryAttempt = message.DeliveryAttempt
|
|
};
|
|
|
|
await responseStream.WriteAsync(messageProto, ct);
|
|
|
|
_logger.LogDebug(
|
|
"Sent message {MessageId} to subscription {SubscriptionId}",
|
|
message.MessageId,
|
|
subscriptionId);
|
|
}
|
|
|
|
// Wait before polling again
|
|
if (messages.Count == 0)
|
|
{
|
|
await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ReceiveAcksAsync(
|
|
string subscriptionId,
|
|
IAsyncStreamReader<QueueSubscriptionRequest> requestStream,
|
|
CancellationToken ct)
|
|
{
|
|
while (await requestStream.MoveNext(ct))
|
|
{
|
|
var request = requestStream.Current;
|
|
|
|
switch (request.Action)
|
|
{
|
|
case QueueAction.Acknowledge:
|
|
await _eventStore.AcknowledgeAsync(request.MessageId, ct);
|
|
_logger.LogDebug("Message {MessageId} acknowledged", request.MessageId);
|
|
break;
|
|
|
|
case QueueAction.Nack:
|
|
await _eventStore.NackAsync(request.MessageId, ct);
|
|
_logger.LogDebug("Message {MessageId} nacked", request.MessageId);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Client Implementation
|
|
|
|
### Basic Queue Consumer
|
|
|
|
```csharp
|
|
public class QueueConsumer
|
|
{
|
|
private readonly EventStreamService.EventStreamServiceClient _client;
|
|
|
|
public async Task ConsumeAsync(
|
|
string streamName,
|
|
string subscriptionId,
|
|
CancellationToken ct)
|
|
{
|
|
using var call = _client.SubscribeToQueue();
|
|
|
|
// Subscribe
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
StreamName = streamName,
|
|
SubscriptionId = subscriptionId,
|
|
VisibilityTimeout = 30
|
|
}, ct);
|
|
|
|
// Process messages
|
|
await foreach (var message in call.ResponseStream.ReadAllAsync(ct))
|
|
{
|
|
await ProcessWithAckAsync(call, message, ct);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessWithAckAsync(
|
|
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
|
|
QueueMessageProto message,
|
|
CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
_logger.LogInformation(
|
|
"Processing message {MessageId} (attempt {Attempt})",
|
|
message.MessageId,
|
|
message.DeliveryAttempt);
|
|
|
|
await ProcessMessageAsync(message, ct);
|
|
|
|
// Acknowledge
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Acknowledge
|
|
}, ct);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing message {MessageId}", message.MessageId);
|
|
|
|
// Nack - will be redelivered
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Nack
|
|
}, ct);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Batch Processing
|
|
|
|
```csharp
|
|
public async Task ConsumeBatchAsync(
|
|
string streamName,
|
|
int batchSize,
|
|
CancellationToken ct)
|
|
{
|
|
using var call = _client.SubscribeToQueue();
|
|
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
StreamName = streamName,
|
|
SubscriptionId = Guid.NewGuid().ToString(),
|
|
VisibilityTimeout = 60, // Longer timeout for batch
|
|
BatchSize = batchSize
|
|
}, ct);
|
|
|
|
var batch = new List<QueueMessageProto>();
|
|
|
|
await foreach (var message in call.ResponseStream.ReadAllAsync(ct))
|
|
{
|
|
batch.Add(message);
|
|
|
|
if (batch.Count >= batchSize)
|
|
{
|
|
await ProcessBatchAsync(call, batch, ct);
|
|
batch.Clear();
|
|
}
|
|
}
|
|
|
|
// Process remaining
|
|
if (batch.Count > 0)
|
|
{
|
|
await ProcessBatchAsync(call, batch, ct);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessBatchAsync(
|
|
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
|
|
List<QueueMessageProto> batch,
|
|
CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
// Process batch
|
|
foreach (var message in batch)
|
|
{
|
|
await ProcessMessageAsync(message, ct);
|
|
}
|
|
|
|
// Acknowledge all
|
|
foreach (var message in batch)
|
|
{
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Acknowledge
|
|
}, ct);
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
// Nack all on batch failure
|
|
foreach (var message in batch)
|
|
{
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Nack
|
|
}, ct);
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Visibility Timeout
|
|
|
|
### Handling Timeout
|
|
|
|
```csharp
|
|
private async Task ProcessWithTimeoutAsync(
|
|
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
|
|
QueueMessageProto message,
|
|
TimeSpan visibilityTimeout,
|
|
CancellationToken ct)
|
|
{
|
|
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
cts.CancelAfter(visibilityTimeout - TimeSpan.FromSeconds(5)); // 5s buffer
|
|
|
|
try
|
|
{
|
|
await ProcessMessageAsync(message, cts.Token);
|
|
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Acknowledge
|
|
}, ct);
|
|
}
|
|
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
|
|
{
|
|
_logger.LogWarning(
|
|
"Message {MessageId} processing timed out after {Timeout}",
|
|
message.MessageId,
|
|
visibilityTimeout);
|
|
|
|
// Nack - visibility timeout will expire anyway
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Nack
|
|
}, ct);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Extending Visibility
|
|
|
|
```csharp
|
|
private async Task ProcessLongRunningAsync(
|
|
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
|
|
QueueMessageProto message,
|
|
CancellationToken ct)
|
|
{
|
|
// Start background task to extend visibility
|
|
using var extendCts = new CancellationTokenSource();
|
|
|
|
var extendTask = Task.Run(async () =>
|
|
{
|
|
while (!extendCts.Token.IsCancellationRequested)
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(15), extendCts.Token);
|
|
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.ExtendVisibility,
|
|
VisibilityTimeout = 30 // Extend by 30 seconds
|
|
}, extendCts.Token);
|
|
}
|
|
}, extendCts.Token);
|
|
|
|
try
|
|
{
|
|
// Process message (may take a long time)
|
|
await ProcessMessageAsync(message, ct);
|
|
|
|
// Acknowledge
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Acknowledge
|
|
}, ct);
|
|
}
|
|
finally
|
|
{
|
|
extendCts.Cancel();
|
|
await extendTask;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Retry with Dead Letter Queue
|
|
|
|
```csharp
|
|
private async Task ProcessWithDlqAsync(
|
|
AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
|
|
QueueMessageProto message,
|
|
int maxAttempts,
|
|
CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await ProcessMessageAsync(message, ct);
|
|
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Acknowledge
|
|
}, ct);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
if (message.DeliveryAttempt >= maxAttempts)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Message {MessageId} failed after {Attempts} attempts, moving to DLQ",
|
|
message.MessageId,
|
|
message.DeliveryAttempt);
|
|
|
|
// Move to dead letter queue
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.DeadLetter,
|
|
Reason = ex.Message
|
|
}, ct);
|
|
}
|
|
else
|
|
{
|
|
_logger.LogWarning(ex,
|
|
"Message {MessageId} failed (attempt {Attempt}/{Max}), will retry",
|
|
message.MessageId,
|
|
message.DeliveryAttempt,
|
|
maxAttempts);
|
|
|
|
// Nack for retry
|
|
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
|
|
{
|
|
MessageId = message.MessageId,
|
|
Action = QueueAction.Nack
|
|
}, ct);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Concurrent Consumers
|
|
|
|
Run multiple consumers for parallel processing:
|
|
|
|
```csharp
|
|
public async Task RunMultipleConsumersAsync(
|
|
string streamName,
|
|
int consumerCount,
|
|
CancellationToken ct)
|
|
{
|
|
var tasks = Enumerable.Range(0, consumerCount)
|
|
.Select(i => ConsumeAsync(
|
|
streamName,
|
|
subscriptionId: $"consumer-{i}",
|
|
ct))
|
|
.ToArray();
|
|
|
|
await Task.WhenAll(tasks);
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Always acknowledge or nack messages
|
|
- Use appropriate visibility timeouts
|
|
- Handle timeouts gracefully
|
|
- Implement dead letter queues
|
|
- Use batch processing for throughput
|
|
- Run multiple consumers for scale
|
|
- Monitor delivery attempts
|
|
- Log ack/nack operations
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't forget to ack/nack
|
|
- Don't use very short visibility timeouts
|
|
- Don't process indefinitely without extending visibility
|
|
- Don't retry permanently failed messages indefinitely
|
|
- Don't skip error logging
|
|
- Don't ignore delivery attempt counts
|
|
- Don't block message processing
|
|
- Don't forget cancellation token handling
|
|
|
|
## See Also
|
|
|
|
- [gRPC Streaming Overview](README.md)
|
|
- [Persistent Subscriptions](persistent-subscriptions.md)
|
|
- [gRPC Clients](grpc-clients.md)
|
|
- [Ephemeral Streams](../fundamentals/ephemeral-streams.md)
|
|
- [Dead Letter Queues](../stream-configuration/dead-letter-queues.md)
|