dotnet-cqrs/docs/event-streaming/grpc-streaming/queue-subscriptions.md

14 KiB

gRPC Queue Subscriptions

Subscribe to ephemeral streams with acknowledge/nack semantics via gRPC.

Overview

gRPC queue subscriptions provide reliable message queue delivery:

  • At-Least-Once Delivery - Messages acknowledged after processing
  • Visibility Timeout - Auto-redelivery on failure
  • Ack/Nack - Explicit message acknowledgment
  • Concurrent Consumers - Multiple consumers process in parallel

Quick Start

using Grpc.Net.Client;
using Svrnty.CQRS.Events.Grpc;

// Create gRPC client
using var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = new EventStreamService.EventStreamServiceClient(channel);

// Subscribe to queue
using var call = client.SubscribeToQueue();

// Send subscription request
await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
{
    StreamName = "task-queue",
    SubscriptionId = Guid.NewGuid().ToString(),
    VisibilityTimeout = 30  // 30 seconds
});

// Process messages with ack/nack
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
    try
    {
        await ProcessMessageAsync(message);

        // Acknowledge success
        await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
        {
            MessageId = message.MessageId,
            Action = QueueAction.Acknowledge
        });
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Failed to process message {MessageId}", message.MessageId);

        // Negative acknowledge (requeue)
        await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
        {
            MessageId = message.MessageId,
            Action = QueueAction.Nack
        });
    }
}

Service Implementation

Queue Subscription Service

public override async Task SubscribeToQueue(
    IAsyncStreamReader<QueueSubscriptionRequest> requestStream,
    IServerStreamWriter<QueueMessageProto> responseStream,
    ServerCallContext context)
{
    // Read initial subscription
    await requestStream.MoveNext(context.CancellationToken);
    var initialRequest = requestStream.Current;

    _logger.LogInformation(
        "Queue subscription started: stream={Stream}, subscription={SubscriptionId}",
        initialRequest.StreamName,
        initialRequest.SubscriptionId);

    var visibilityTimeout = TimeSpan.FromSeconds(initialRequest.VisibilityTimeout);

    // Start background task to send messages
    var sendTask = SendMessagesAsync(
        initialRequest.StreamName,
        initialRequest.SubscriptionId,
        visibilityTimeout,
        responseStream,
        context.CancellationToken);

    // Process acks/nacks
    var receiveTask = ReceiveAcksAsync(
        initialRequest.SubscriptionId,
        requestStream,
        context.CancellationToken);

    await Task.WhenAll(sendTask, receiveTask);
}

private async Task SendMessagesAsync(
    string streamName,
    string subscriptionId,
    TimeSpan visibilityTimeout,
    IServerStreamWriter<QueueMessageProto> responseStream,
    CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        var messages = await _eventStore.DequeueAsync(
            streamName,
            visibilityTimeout,
            batchSize: 10,
            ct);

        foreach (var message in messages)
        {
            var messageProto = new QueueMessageProto
            {
                MessageId = message.MessageId,
                EventType = message.EventType,
                Data = message.Data,
                Metadata = { message.Metadata },
                DeliveryAttempt = message.DeliveryAttempt
            };

            await responseStream.WriteAsync(messageProto, ct);

            _logger.LogDebug(
                "Sent message {MessageId} to subscription {SubscriptionId}",
                message.MessageId,
                subscriptionId);
        }

        // Wait before polling again
        if (messages.Count == 0)
        {
            await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
        }
    }
}

private async Task ReceiveAcksAsync(
    string subscriptionId,
    IAsyncStreamReader<QueueSubscriptionRequest> requestStream,
    CancellationToken ct)
{
    while (await requestStream.MoveNext(ct))
    {
        var request = requestStream.Current;

        switch (request.Action)
        {
            case QueueAction.Acknowledge:
                await _eventStore.AcknowledgeAsync(request.MessageId, ct);
                _logger.LogDebug("Message {MessageId} acknowledged", request.MessageId);
                break;

            case QueueAction.Nack:
                await _eventStore.NackAsync(request.MessageId, ct);
                _logger.LogDebug("Message {MessageId} nacked", request.MessageId);
                break;
        }
    }
}

Client Implementation

Basic Queue Consumer

public class QueueConsumer
{
    private readonly EventStreamService.EventStreamServiceClient _client;

    public async Task ConsumeAsync(
        string streamName,
        string subscriptionId,
        CancellationToken ct)
    {
        using var call = _client.SubscribeToQueue();

        // Subscribe
        await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
        {
            StreamName = streamName,
            SubscriptionId = subscriptionId,
            VisibilityTimeout = 30
        }, ct);

        // Process messages
        await foreach (var message in call.ResponseStream.ReadAllAsync(ct))
        {
            await ProcessWithAckAsync(call, message, ct);
        }
    }

    private async Task ProcessWithAckAsync(
        AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
        QueueMessageProto message,
        CancellationToken ct)
    {
        try
        {
            _logger.LogInformation(
                "Processing message {MessageId} (attempt {Attempt})",
                message.MessageId,
                message.DeliveryAttempt);

            await ProcessMessageAsync(message, ct);

            // Acknowledge
            await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
            {
                MessageId = message.MessageId,
                Action = QueueAction.Acknowledge
            }, ct);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing message {MessageId}", message.MessageId);

            // Nack - will be redelivered
            await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
            {
                MessageId = message.MessageId,
                Action = QueueAction.Nack
            }, ct);
        }
    }
}

Batch Processing

public async Task ConsumeBatchAsync(
    string streamName,
    int batchSize,
    CancellationToken ct)
{
    using var call = _client.SubscribeToQueue();

    await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
    {
        StreamName = streamName,
        SubscriptionId = Guid.NewGuid().ToString(),
        VisibilityTimeout = 60,  // Longer timeout for batch
        BatchSize = batchSize
    }, ct);

    var batch = new List<QueueMessageProto>();

    await foreach (var message in call.ResponseStream.ReadAllAsync(ct))
    {
        batch.Add(message);

        if (batch.Count >= batchSize)
        {
            await ProcessBatchAsync(call, batch, ct);
            batch.Clear();
        }
    }

    // Process remaining
    if (batch.Count > 0)
    {
        await ProcessBatchAsync(call, batch, ct);
    }
}

private async Task ProcessBatchAsync(
    AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
    List<QueueMessageProto> batch,
    CancellationToken ct)
{
    try
    {
        // Process batch
        foreach (var message in batch)
        {
            await ProcessMessageAsync(message, ct);
        }

        // Acknowledge all
        foreach (var message in batch)
        {
            await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
            {
                MessageId = message.MessageId,
                Action = QueueAction.Acknowledge
            }, ct);
        }
    }
    catch
    {
        // Nack all on batch failure
        foreach (var message in batch)
        {
            await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
            {
                MessageId = message.MessageId,
                Action = QueueAction.Nack
            }, ct);
        }
        throw;
    }
}

Visibility Timeout

Handling Timeout

private async Task ProcessWithTimeoutAsync(
    AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
    QueueMessageProto message,
    TimeSpan visibilityTimeout,
    CancellationToken ct)
{
    using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
    cts.CancelAfter(visibilityTimeout - TimeSpan.FromSeconds(5));  // 5s buffer

    try
    {
        await ProcessMessageAsync(message, cts.Token);

        await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
        {
            MessageId = message.MessageId,
            Action = QueueAction.Acknowledge
        }, ct);
    }
    catch (OperationCanceledException) when (!ct.IsCancellationRequested)
    {
        _logger.LogWarning(
            "Message {MessageId} processing timed out after {Timeout}",
            message.MessageId,
            visibilityTimeout);

        // Nack - visibility timeout will expire anyway
        await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
        {
            MessageId = message.MessageId,
            Action = QueueAction.Nack
        }, ct);
    }
}

Extending Visibility

private async Task ProcessLongRunningAsync(
    AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
    QueueMessageProto message,
    CancellationToken ct)
{
    // Start background task to extend visibility
    using var extendCts = new CancellationTokenSource();

    var extendTask = Task.Run(async () =>
    {
        while (!extendCts.Token.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(15), extendCts.Token);

            await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
            {
                MessageId = message.MessageId,
                Action = QueueAction.ExtendVisibility,
                VisibilityTimeout = 30  // Extend by 30 seconds
            }, extendCts.Token);
        }
    }, extendCts.Token);

    try
    {
        // Process message (may take a long time)
        await ProcessMessageAsync(message, ct);

        // Acknowledge
        await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
        {
            MessageId = message.MessageId,
            Action = QueueAction.Acknowledge
        }, ct);
    }
    finally
    {
        extendCts.Cancel();
        await extendTask;
    }
}

Error Handling

Retry with Dead Letter Queue

private async Task ProcessWithDlqAsync(
    AsyncDuplexStreamingCall<QueueSubscriptionRequest, QueueMessageProto> call,
    QueueMessageProto message,
    int maxAttempts,
    CancellationToken ct)
{
    try
    {
        await ProcessMessageAsync(message, ct);

        await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
        {
            MessageId = message.MessageId,
            Action = QueueAction.Acknowledge
        }, ct);
    }
    catch (Exception ex)
    {
        if (message.DeliveryAttempt >= maxAttempts)
        {
            _logger.LogError(ex,
                "Message {MessageId} failed after {Attempts} attempts, moving to DLQ",
                message.MessageId,
                message.DeliveryAttempt);

            // Move to dead letter queue
            await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
            {
                MessageId = message.MessageId,
                Action = QueueAction.DeadLetter,
                Reason = ex.Message
            }, ct);
        }
        else
        {
            _logger.LogWarning(ex,
                "Message {MessageId} failed (attempt {Attempt}/{Max}), will retry",
                message.MessageId,
                message.DeliveryAttempt,
                maxAttempts);

            // Nack for retry
            await call.RequestStream.WriteAsync(new QueueSubscriptionRequest
            {
                MessageId = message.MessageId,
                Action = QueueAction.Nack
            }, ct);
        }
    }
}

Concurrent Consumers

Run multiple consumers for parallel processing:

public async Task RunMultipleConsumersAsync(
    string streamName,
    int consumerCount,
    CancellationToken ct)
{
    var tasks = Enumerable.Range(0, consumerCount)
        .Select(i => ConsumeAsync(
            streamName,
            subscriptionId: $"consumer-{i}",
            ct))
        .ToArray();

    await Task.WhenAll(tasks);
}

Best Practices

DO

  • Always acknowledge or nack messages
  • Use appropriate visibility timeouts
  • Handle timeouts gracefully
  • Implement dead letter queues
  • Use batch processing for throughput
  • Run multiple consumers for scale
  • Monitor delivery attempts
  • Log ack/nack operations

DON'T

  • Don't forget to ack/nack
  • Don't use very short visibility timeouts
  • Don't process indefinitely without extending visibility
  • Don't retry permanently failed messages indefinitely
  • Don't skip error logging
  • Don't ignore delivery attempt counts
  • Don't block message processing
  • Don't forget cancellation token handling

See Also