using System;
using Svrnty.CQRS.Events.RabbitMQ.Serialization;
using Svrnty.CQRS.Events.Abstractions.Delivery;
using Svrnty.CQRS.Events.RabbitMQ.Configuration;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
using Svrnty.CQRS.Events.Abstractions.Configuration;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.RabbitMQ.Delivery;
///
/// RabbitMQ implementation of external event delivery provider.
///
///
///
/// This provider publishes events to RabbitMQ exchanges and subscribes to queues
/// for cross-service event streaming.
///
///
/// Features:
/// - Automatic topology management (exchanges, queues, bindings)
/// - Connection resilience with automatic recovery
/// - Publisher confirms for reliable delivery
/// - Consumer acknowledgments with redelivery
/// - Dead letter queue support
///
///
public sealed class RabbitMQEventDeliveryProvider : IExternalEventDeliveryProvider, IDisposable
{
private readonly RabbitMQConfiguration _config;
private readonly RabbitMQTopologyManager _topologyManager;
private readonly RabbitMQEventSerializer _serializer;
private readonly ILogger _logger;
private IConnection? _connection;
private IChannel? _publishChannel;
private readonly SemaphoreSlim _connectionLock = new(1, 1);
private readonly ConcurrentDictionary _activeConsumers = new();
private bool _isStarted;
private bool _isDisposed;
public string ProviderName => "RabbitMQ";
private class ConsumerInfo
{
public required IChannel Channel { get; init; }
public required string QueueName { get; init; }
public required string ConsumerTag { get; init; }
public required CancellationTokenSource CancellationTokenSource { get; init; }
}
public RabbitMQEventDeliveryProvider(
IOptions config,
ILogger logger,
ILoggerFactory loggerFactory)
{
_config = config?.Value ?? throw new ArgumentNullException(nameof(config));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_config.Validate();
_topologyManager = new RabbitMQTopologyManager(_config, loggerFactory.CreateLogger());
_serializer = new RabbitMQEventSerializer(loggerFactory.CreateLogger());
}
public async Task StartAsync(CancellationToken cancellationToken = default)
{
if (_isStarted)
return;
await _connectionLock.WaitAsync(cancellationToken);
try
{
if (_isStarted)
return;
_logger.LogInformation("Starting RabbitMQ event delivery provider");
await EnsureConnectionAsync(cancellationToken);
// Declare dead letter exchange if configured
if (_publishChannel != null && !string.IsNullOrWhiteSpace(_config.DeadLetterExchange))
{
_topologyManager.DeclareDeadLetterExchange(_publishChannel);
}
_isStarted = true;
_logger.LogInformation("RabbitMQ event delivery provider started successfully");
}
finally
{
_connectionLock.Release();
}
}
public async Task StopAsync(CancellationToken cancellationToken = default)
{
if (!_isStarted)
return;
await _connectionLock.WaitAsync(cancellationToken);
try
{
if (!_isStarted)
return;
_logger.LogInformation("Stopping RabbitMQ event delivery provider");
// Stop all consumers
foreach (var consumer in _activeConsumers.Values)
{
consumer.CancellationTokenSource.Cancel();
await consumer.Channel.CloseAsync();
consumer.Channel.Dispose();
}
_activeConsumers.Clear();
// Close channels
if (_publishChannel != null)
{
await _publishChannel.CloseAsync();
_publishChannel.Dispose();
_publishChannel = null;
}
// Close connection
if (_connection != null)
{
await _connection.CloseAsync();
_connection.Dispose();
_connection = null;
}
_isStarted = false;
_logger.LogInformation("RabbitMQ event delivery provider stopped");
}
finally
{
_connectionLock.Release();
}
}
public Task NotifyEventAvailableAsync(
string streamName,
ICorrelatedEvent @event,
CancellationToken cancellationToken = default)
{
// This is for internal notifications (gRPC push), not used for RabbitMQ
// RabbitMQ events are explicitly published via PublishExternalAsync
return Task.CompletedTask;
}
public async Task PublishExternalAsync(
string streamName,
ICorrelatedEvent @event,
IDictionary? metadata = null,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (@event == null)
throw new ArgumentNullException(nameof(@event));
await EnsureConnectionAsync(cancellationToken);
if (_publishChannel == null)
throw new InvalidOperationException("Publish channel is not available");
var exchangeName = _topologyManager.GetExchangeName(streamName);
var routingKey = _topologyManager.GetRoutingKey(streamName, @event);
// Ensure exchange exists
_topologyManager.DeclareExchange(_publishChannel, streamName);
// Serialize event
var (body, properties) = _serializer.Serialize(@event, metadata);
// Publish with retries
var attempt = 0;
while (attempt <= _config.MaxPublishRetries)
{
try
{
await _publishChannel.BasicPublishAsync(
exchange: exchangeName,
routingKey: routingKey,
mandatory: false,
body: body,
cancellationToken: cancellationToken);
_logger.LogDebug(
"Published event {EventType} (ID: {EventId}) to exchange {ExchangeName} with routing key {RoutingKey}",
@event.GetType().Name,
@event.EventId,
exchangeName,
routingKey);
return;
}
catch (Exception ex) when (attempt < _config.MaxPublishRetries)
{
attempt++;
_logger.LogWarning(
ex,
"Failed to publish event {EventId}, attempt {Attempt}/{MaxAttempts}",
@event.EventId,
attempt,
_config.MaxPublishRetries);
await Task.Delay(_config.PublishRetryDelay * attempt, cancellationToken);
}
}
throw new InvalidOperationException($"Failed to publish event {@event.EventId} after {_config.MaxPublishRetries} attempts");
}
public async Task SubscribeExternalAsync(
string streamName,
string subscriptionId,
string consumerId,
Func, CancellationToken, Task> eventHandler,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (string.IsNullOrWhiteSpace(subscriptionId))
throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId));
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
if (eventHandler == null)
throw new ArgumentNullException(nameof(eventHandler));
await EnsureConnectionAsync(cancellationToken);
if (_connection == null)
throw new InvalidOperationException("Connection is not available");
var consumerKey = $"{subscriptionId}:{consumerId}";
if (_activeConsumers.ContainsKey(consumerKey))
{
_logger.LogWarning("Consumer {ConsumerKey} is already subscribed", consumerKey);
return;
}
var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
await channel.BasicQosAsync(0, _config.PrefetchCount, false, cancellationToken);
var exchangeName = _topologyManager.GetExchangeName(streamName);
// Declare exchange
_topologyManager.DeclareExchange(channel, streamName);
// Declare queue (assume ConsumerGroup mode for simplicity)
var queueName = _topologyManager.DeclareQueue(channel, subscriptionId, consumerId, SubscriptionMode.ConsumerGroup);
// Bind queue with wildcard routing key to receive all events
_topologyManager.BindQueue(channel, streamName, queueName, new[] { "#" });
// Create consumer
var consumer = new AsyncEventingBasicConsumer(channel);
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
consumer.ReceivedAsync += async (sender, args) =>
{
try
{
var @event = _serializer.Deserialize(args.Body, args.BasicProperties, out var metadata);
if (@event != null)
{
await eventHandler(@event, metadata, cts.Token);
await channel.BasicAckAsync(args.DeliveryTag, false, cts.Token);
}
else
{
_logger.LogWarning("Failed to deserialize message, sending NACK");
await channel.BasicNackAsync(args.DeliveryTag, false, false, cts.Token);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing message from queue {QueueName}", queueName);
await channel.BasicNackAsync(args.DeliveryTag, false, true, cts.Token); // Requeue
}
};
var consumerTag = await channel.BasicConsumeAsync(
queue: queueName,
autoAck: false,
consumer: consumer,
cancellationToken: cancellationToken);
var consumerInfo = new ConsumerInfo
{
Channel = channel,
QueueName = queueName,
ConsumerTag = consumerTag,
CancellationTokenSource = cts
};
_activeConsumers[consumerKey] = consumerInfo;
_logger.LogInformation(
"Subscribed to stream {StreamName} (queue: {QueueName}, consumer: {ConsumerId})",
streamName,
queueName,
consumerId);
// Wait for cancellation
await Task.Delay(Timeout.Infinite, cts.Token).ContinueWith(_ => { }, TaskContinuationOptions.OnlyOnCanceled);
}
public async Task UnsubscribeExternalAsync(
string streamName,
string subscriptionId,
string consumerId,
CancellationToken cancellationToken = default)
{
var consumerKey = $"{subscriptionId}:{consumerId}";
if (_activeConsumers.TryRemove(consumerKey, out var consumerInfo))
{
consumerInfo.CancellationTokenSource.Cancel();
await consumerInfo.Channel.BasicCancelAsync(consumerInfo.ConsumerTag, false, cancellationToken);
await consumerInfo.Channel.CloseAsync(cancellationToken);
consumerInfo.Channel.Dispose();
_logger.LogInformation("Unsubscribed consumer {ConsumerKey} from stream {StreamName}", consumerKey, streamName);
}
}
public bool SupportsStream(string streamName)
{
// RabbitMQ provider supports all streams by default
return true;
}
public int GetActiveConsumerCount()
{
return _activeConsumers.Count;
}
public bool IsHealthy()
{
return _connection?.IsOpen ?? false;
}
private async Task EnsureConnectionAsync(CancellationToken cancellationToken)
{
if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true)
return;
await _connectionLock.WaitAsync(cancellationToken);
try
{
if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true)
return;
_logger.LogInformation("Establishing connection to RabbitMQ at {ConnectionString}", _config.ConnectionString);
var factory = new ConnectionFactory
{
Uri = new Uri(_config.ConnectionString),
AutomaticRecoveryEnabled = _config.AutoRecovery,
NetworkRecoveryInterval = _config.RecoveryInterval,
RequestedHeartbeat = _config.HeartbeatInterval,
ClientProperties = _config.ConnectionProperties.ToDictionary, string, object?>(
kvp => kvp.Key,
kvp => kvp.Value)
};
_connection = await factory.CreateConnectionAsync(cancellationToken);
_publishChannel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
_logger.LogInformation("Connected to RabbitMQ successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to connect to RabbitMQ");
throw;
}
finally
{
_connectionLock.Release();
}
}
public void Dispose()
{
if (_isDisposed)
return;
StopAsync().GetAwaiter().GetResult();
_connectionLock.Dispose();
_isDisposed = true;
}
}