using System; using Svrnty.CQRS.Events.RabbitMQ.Serialization; using Svrnty.CQRS.Events.Abstractions.Delivery; using Svrnty.CQRS.Events.RabbitMQ.Configuration; using Svrnty.CQRS.Events.Abstractions.Subscriptions; using Svrnty.CQRS.Events.Abstractions.Configuration; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.RabbitMQ.Delivery; /// /// RabbitMQ implementation of external event delivery provider. /// /// /// /// This provider publishes events to RabbitMQ exchanges and subscribes to queues /// for cross-service event streaming. /// /// /// Features: /// - Automatic topology management (exchanges, queues, bindings) /// - Connection resilience with automatic recovery /// - Publisher confirms for reliable delivery /// - Consumer acknowledgments with redelivery /// - Dead letter queue support /// /// public sealed class RabbitMQEventDeliveryProvider : IExternalEventDeliveryProvider, IDisposable { private readonly RabbitMQConfiguration _config; private readonly RabbitMQTopologyManager _topologyManager; private readonly RabbitMQEventSerializer _serializer; private readonly ILogger _logger; private IConnection? _connection; private IChannel? _publishChannel; private readonly SemaphoreSlim _connectionLock = new(1, 1); private readonly ConcurrentDictionary _activeConsumers = new(); private bool _isStarted; private bool _isDisposed; public string ProviderName => "RabbitMQ"; private class ConsumerInfo { public required IChannel Channel { get; init; } public required string QueueName { get; init; } public required string ConsumerTag { get; init; } public required CancellationTokenSource CancellationTokenSource { get; init; } } public RabbitMQEventDeliveryProvider( IOptions config, ILogger logger, ILoggerFactory loggerFactory) { _config = config?.Value ?? throw new ArgumentNullException(nameof(config)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _config.Validate(); _topologyManager = new RabbitMQTopologyManager(_config, loggerFactory.CreateLogger()); _serializer = new RabbitMQEventSerializer(loggerFactory.CreateLogger()); } public async Task StartAsync(CancellationToken cancellationToken = default) { if (_isStarted) return; await _connectionLock.WaitAsync(cancellationToken); try { if (_isStarted) return; _logger.LogInformation("Starting RabbitMQ event delivery provider"); await EnsureConnectionAsync(cancellationToken); // Declare dead letter exchange if configured if (_publishChannel != null && !string.IsNullOrWhiteSpace(_config.DeadLetterExchange)) { _topologyManager.DeclareDeadLetterExchange(_publishChannel); } _isStarted = true; _logger.LogInformation("RabbitMQ event delivery provider started successfully"); } finally { _connectionLock.Release(); } } public async Task StopAsync(CancellationToken cancellationToken = default) { if (!_isStarted) return; await _connectionLock.WaitAsync(cancellationToken); try { if (!_isStarted) return; _logger.LogInformation("Stopping RabbitMQ event delivery provider"); // Stop all consumers foreach (var consumer in _activeConsumers.Values) { consumer.CancellationTokenSource.Cancel(); await consumer.Channel.CloseAsync(); consumer.Channel.Dispose(); } _activeConsumers.Clear(); // Close channels if (_publishChannel != null) { await _publishChannel.CloseAsync(); _publishChannel.Dispose(); _publishChannel = null; } // Close connection if (_connection != null) { await _connection.CloseAsync(); _connection.Dispose(); _connection = null; } _isStarted = false; _logger.LogInformation("RabbitMQ event delivery provider stopped"); } finally { _connectionLock.Release(); } } public Task NotifyEventAvailableAsync( string streamName, ICorrelatedEvent @event, CancellationToken cancellationToken = default) { // This is for internal notifications (gRPC push), not used for RabbitMQ // RabbitMQ events are explicitly published via PublishExternalAsync return Task.CompletedTask; } public async Task PublishExternalAsync( string streamName, ICorrelatedEvent @event, IDictionary? metadata = null, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (@event == null) throw new ArgumentNullException(nameof(@event)); await EnsureConnectionAsync(cancellationToken); if (_publishChannel == null) throw new InvalidOperationException("Publish channel is not available"); var exchangeName = _topologyManager.GetExchangeName(streamName); var routingKey = _topologyManager.GetRoutingKey(streamName, @event); // Ensure exchange exists _topologyManager.DeclareExchange(_publishChannel, streamName); // Serialize event var (body, properties) = _serializer.Serialize(@event, metadata); // Publish with retries var attempt = 0; while (attempt <= _config.MaxPublishRetries) { try { await _publishChannel.BasicPublishAsync( exchange: exchangeName, routingKey: routingKey, mandatory: false, body: body, cancellationToken: cancellationToken); _logger.LogDebug( "Published event {EventType} (ID: {EventId}) to exchange {ExchangeName} with routing key {RoutingKey}", @event.GetType().Name, @event.EventId, exchangeName, routingKey); return; } catch (Exception ex) when (attempt < _config.MaxPublishRetries) { attempt++; _logger.LogWarning( ex, "Failed to publish event {EventId}, attempt {Attempt}/{MaxAttempts}", @event.EventId, attempt, _config.MaxPublishRetries); await Task.Delay(_config.PublishRetryDelay * attempt, cancellationToken); } } throw new InvalidOperationException($"Failed to publish event {@event.EventId} after {_config.MaxPublishRetries} attempts"); } public async Task SubscribeExternalAsync( string streamName, string subscriptionId, string consumerId, Func, CancellationToken, Task> eventHandler, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (eventHandler == null) throw new ArgumentNullException(nameof(eventHandler)); await EnsureConnectionAsync(cancellationToken); if (_connection == null) throw new InvalidOperationException("Connection is not available"); var consumerKey = $"{subscriptionId}:{consumerId}"; if (_activeConsumers.ContainsKey(consumerKey)) { _logger.LogWarning("Consumer {ConsumerKey} is already subscribed", consumerKey); return; } var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); await channel.BasicQosAsync(0, _config.PrefetchCount, false, cancellationToken); var exchangeName = _topologyManager.GetExchangeName(streamName); // Declare exchange _topologyManager.DeclareExchange(channel, streamName); // Declare queue (assume ConsumerGroup mode for simplicity) var queueName = _topologyManager.DeclareQueue(channel, subscriptionId, consumerId, SubscriptionMode.ConsumerGroup); // Bind queue with wildcard routing key to receive all events _topologyManager.BindQueue(channel, streamName, queueName, new[] { "#" }); // Create consumer var consumer = new AsyncEventingBasicConsumer(channel); var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); consumer.ReceivedAsync += async (sender, args) => { try { var @event = _serializer.Deserialize(args.Body, args.BasicProperties, out var metadata); if (@event != null) { await eventHandler(@event, metadata, cts.Token); await channel.BasicAckAsync(args.DeliveryTag, false, cts.Token); } else { _logger.LogWarning("Failed to deserialize message, sending NACK"); await channel.BasicNackAsync(args.DeliveryTag, false, false, cts.Token); } } catch (Exception ex) { _logger.LogError(ex, "Error processing message from queue {QueueName}", queueName); await channel.BasicNackAsync(args.DeliveryTag, false, true, cts.Token); // Requeue } }; var consumerTag = await channel.BasicConsumeAsync( queue: queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken); var consumerInfo = new ConsumerInfo { Channel = channel, QueueName = queueName, ConsumerTag = consumerTag, CancellationTokenSource = cts }; _activeConsumers[consumerKey] = consumerInfo; _logger.LogInformation( "Subscribed to stream {StreamName} (queue: {QueueName}, consumer: {ConsumerId})", streamName, queueName, consumerId); // Wait for cancellation await Task.Delay(Timeout.Infinite, cts.Token).ContinueWith(_ => { }, TaskContinuationOptions.OnlyOnCanceled); } public async Task UnsubscribeExternalAsync( string streamName, string subscriptionId, string consumerId, CancellationToken cancellationToken = default) { var consumerKey = $"{subscriptionId}:{consumerId}"; if (_activeConsumers.TryRemove(consumerKey, out var consumerInfo)) { consumerInfo.CancellationTokenSource.Cancel(); await consumerInfo.Channel.BasicCancelAsync(consumerInfo.ConsumerTag, false, cancellationToken); await consumerInfo.Channel.CloseAsync(cancellationToken); consumerInfo.Channel.Dispose(); _logger.LogInformation("Unsubscribed consumer {ConsumerKey} from stream {StreamName}", consumerKey, streamName); } } public bool SupportsStream(string streamName) { // RabbitMQ provider supports all streams by default return true; } public int GetActiveConsumerCount() { return _activeConsumers.Count; } public bool IsHealthy() { return _connection?.IsOpen ?? false; } private async Task EnsureConnectionAsync(CancellationToken cancellationToken) { if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true) return; await _connectionLock.WaitAsync(cancellationToken); try { if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true) return; _logger.LogInformation("Establishing connection to RabbitMQ at {ConnectionString}", _config.ConnectionString); var factory = new ConnectionFactory { Uri = new Uri(_config.ConnectionString), AutomaticRecoveryEnabled = _config.AutoRecovery, NetworkRecoveryInterval = _config.RecoveryInterval, RequestedHeartbeat = _config.HeartbeatInterval, ClientProperties = _config.ConnectionProperties.ToDictionary, string, object?>( kvp => kvp.Key, kvp => kvp.Value) }; _connection = await factory.CreateConnectionAsync(cancellationToken); _publishChannel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); _logger.LogInformation("Connected to RabbitMQ successfully"); } catch (Exception ex) { _logger.LogError(ex, "Failed to connect to RabbitMQ"); throw; } finally { _connectionLock.Release(); } } public void Dispose() { if (_isDisposed) return; StopAsync().GetAwaiter().GetResult(); _connectionLock.Dispose(); _isDisposed = true; } }