410 lines
14 KiB
C#
410 lines
14 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.RabbitMQ.Serialization;
|
|
using Svrnty.CQRS.Events.Abstractions.Delivery;
|
|
using Svrnty.CQRS.Events.RabbitMQ.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
|
|
using Svrnty.CQRS.Events.Abstractions.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.RabbitMQ.Delivery;
|
|
|
|
/// <summary>
|
|
/// RabbitMQ implementation of external event delivery provider.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// This provider publishes events to RabbitMQ exchanges and subscribes to queues
|
|
/// for cross-service event streaming.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Features:</strong>
|
|
/// - Automatic topology management (exchanges, queues, bindings)
|
|
/// - Connection resilience with automatic recovery
|
|
/// - Publisher confirms for reliable delivery
|
|
/// - Consumer acknowledgments with redelivery
|
|
/// - Dead letter queue support
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class RabbitMQEventDeliveryProvider : IExternalEventDeliveryProvider, IDisposable
|
|
{
|
|
private readonly RabbitMQConfiguration _config;
|
|
private readonly RabbitMQTopologyManager _topologyManager;
|
|
private readonly RabbitMQEventSerializer _serializer;
|
|
private readonly ILogger<RabbitMQEventDeliveryProvider> _logger;
|
|
|
|
private IConnection? _connection;
|
|
private IChannel? _publishChannel;
|
|
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
|
private readonly ConcurrentDictionary<string, ConsumerInfo> _activeConsumers = new();
|
|
private bool _isStarted;
|
|
private bool _isDisposed;
|
|
|
|
public string ProviderName => "RabbitMQ";
|
|
|
|
private class ConsumerInfo
|
|
{
|
|
public required IChannel Channel { get; init; }
|
|
public required string QueueName { get; init; }
|
|
public required string ConsumerTag { get; init; }
|
|
public required CancellationTokenSource CancellationTokenSource { get; init; }
|
|
}
|
|
|
|
public RabbitMQEventDeliveryProvider(
|
|
IOptions<RabbitMQConfiguration> config,
|
|
ILogger<RabbitMQEventDeliveryProvider> logger,
|
|
ILoggerFactory loggerFactory)
|
|
{
|
|
_config = config?.Value ?? throw new ArgumentNullException(nameof(config));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
|
|
_config.Validate();
|
|
|
|
_topologyManager = new RabbitMQTopologyManager(_config, loggerFactory.CreateLogger<RabbitMQTopologyManager>());
|
|
_serializer = new RabbitMQEventSerializer(loggerFactory.CreateLogger<RabbitMQEventSerializer>());
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
if (_isStarted)
|
|
return;
|
|
|
|
await _connectionLock.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
if (_isStarted)
|
|
return;
|
|
|
|
_logger.LogInformation("Starting RabbitMQ event delivery provider");
|
|
|
|
await EnsureConnectionAsync(cancellationToken);
|
|
|
|
// Declare dead letter exchange if configured
|
|
if (_publishChannel != null && !string.IsNullOrWhiteSpace(_config.DeadLetterExchange))
|
|
{
|
|
_topologyManager.DeclareDeadLetterExchange(_publishChannel);
|
|
}
|
|
|
|
_isStarted = true;
|
|
_logger.LogInformation("RabbitMQ event delivery provider started successfully");
|
|
}
|
|
finally
|
|
{
|
|
_connectionLock.Release();
|
|
}
|
|
}
|
|
|
|
public async Task StopAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
if (!_isStarted)
|
|
return;
|
|
|
|
await _connectionLock.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
if (!_isStarted)
|
|
return;
|
|
|
|
_logger.LogInformation("Stopping RabbitMQ event delivery provider");
|
|
|
|
// Stop all consumers
|
|
foreach (var consumer in _activeConsumers.Values)
|
|
{
|
|
consumer.CancellationTokenSource.Cancel();
|
|
await consumer.Channel.CloseAsync();
|
|
consumer.Channel.Dispose();
|
|
}
|
|
|
|
_activeConsumers.Clear();
|
|
|
|
// Close channels
|
|
if (_publishChannel != null)
|
|
{
|
|
await _publishChannel.CloseAsync();
|
|
_publishChannel.Dispose();
|
|
_publishChannel = null;
|
|
}
|
|
|
|
// Close connection
|
|
if (_connection != null)
|
|
{
|
|
await _connection.CloseAsync();
|
|
_connection.Dispose();
|
|
_connection = null;
|
|
}
|
|
|
|
_isStarted = false;
|
|
_logger.LogInformation("RabbitMQ event delivery provider stopped");
|
|
}
|
|
finally
|
|
{
|
|
_connectionLock.Release();
|
|
}
|
|
}
|
|
|
|
public Task NotifyEventAvailableAsync(
|
|
string streamName,
|
|
ICorrelatedEvent @event,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
// This is for internal notifications (gRPC push), not used for RabbitMQ
|
|
// RabbitMQ events are explicitly published via PublishExternalAsync
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public async Task PublishExternalAsync(
|
|
string streamName,
|
|
ICorrelatedEvent @event,
|
|
IDictionary<string, string>? metadata = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
|
|
if (@event == null)
|
|
throw new ArgumentNullException(nameof(@event));
|
|
|
|
await EnsureConnectionAsync(cancellationToken);
|
|
|
|
if (_publishChannel == null)
|
|
throw new InvalidOperationException("Publish channel is not available");
|
|
|
|
var exchangeName = _topologyManager.GetExchangeName(streamName);
|
|
var routingKey = _topologyManager.GetRoutingKey(streamName, @event);
|
|
|
|
// Ensure exchange exists
|
|
_topologyManager.DeclareExchange(_publishChannel, streamName);
|
|
|
|
// Serialize event
|
|
var (body, properties) = _serializer.Serialize(@event, metadata);
|
|
|
|
// Publish with retries
|
|
var attempt = 0;
|
|
while (attempt <= _config.MaxPublishRetries)
|
|
{
|
|
try
|
|
{
|
|
await _publishChannel.BasicPublishAsync(
|
|
exchange: exchangeName,
|
|
routingKey: routingKey,
|
|
mandatory: false,
|
|
body: body,
|
|
cancellationToken: cancellationToken);
|
|
|
|
_logger.LogDebug(
|
|
"Published event {EventType} (ID: {EventId}) to exchange {ExchangeName} with routing key {RoutingKey}",
|
|
@event.GetType().Name,
|
|
@event.EventId,
|
|
exchangeName,
|
|
routingKey);
|
|
|
|
return;
|
|
}
|
|
catch (Exception ex) when (attempt < _config.MaxPublishRetries)
|
|
{
|
|
attempt++;
|
|
_logger.LogWarning(
|
|
ex,
|
|
"Failed to publish event {EventId}, attempt {Attempt}/{MaxAttempts}",
|
|
@event.EventId,
|
|
attempt,
|
|
_config.MaxPublishRetries);
|
|
|
|
await Task.Delay(_config.PublishRetryDelay * attempt, cancellationToken);
|
|
}
|
|
}
|
|
|
|
throw new InvalidOperationException($"Failed to publish event {@event.EventId} after {_config.MaxPublishRetries} attempts");
|
|
}
|
|
|
|
public async Task SubscribeExternalAsync(
|
|
string streamName,
|
|
string subscriptionId,
|
|
string consumerId,
|
|
Func<ICorrelatedEvent, IDictionary<string, string>, CancellationToken, Task> eventHandler,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
|
|
if (string.IsNullOrWhiteSpace(subscriptionId))
|
|
throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId));
|
|
if (string.IsNullOrWhiteSpace(consumerId))
|
|
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
|
|
if (eventHandler == null)
|
|
throw new ArgumentNullException(nameof(eventHandler));
|
|
|
|
await EnsureConnectionAsync(cancellationToken);
|
|
|
|
if (_connection == null)
|
|
throw new InvalidOperationException("Connection is not available");
|
|
|
|
var consumerKey = $"{subscriptionId}:{consumerId}";
|
|
if (_activeConsumers.ContainsKey(consumerKey))
|
|
{
|
|
_logger.LogWarning("Consumer {ConsumerKey} is already subscribed", consumerKey);
|
|
return;
|
|
}
|
|
|
|
var channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
|
await channel.BasicQosAsync(0, _config.PrefetchCount, false, cancellationToken);
|
|
|
|
var exchangeName = _topologyManager.GetExchangeName(streamName);
|
|
|
|
// Declare exchange
|
|
_topologyManager.DeclareExchange(channel, streamName);
|
|
|
|
// Declare queue (assume ConsumerGroup mode for simplicity)
|
|
var queueName = _topologyManager.DeclareQueue(channel, subscriptionId, consumerId, SubscriptionMode.ConsumerGroup);
|
|
|
|
// Bind queue with wildcard routing key to receive all events
|
|
_topologyManager.BindQueue(channel, streamName, queueName, new[] { "#" });
|
|
|
|
// Create consumer
|
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
|
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
|
|
consumer.ReceivedAsync += async (sender, args) =>
|
|
{
|
|
try
|
|
{
|
|
var @event = _serializer.Deserialize(args.Body, args.BasicProperties, out var metadata);
|
|
|
|
if (@event != null)
|
|
{
|
|
await eventHandler(@event, metadata, cts.Token);
|
|
await channel.BasicAckAsync(args.DeliveryTag, false, cts.Token);
|
|
}
|
|
else
|
|
{
|
|
_logger.LogWarning("Failed to deserialize message, sending NACK");
|
|
await channel.BasicNackAsync(args.DeliveryTag, false, false, cts.Token);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing message from queue {QueueName}", queueName);
|
|
await channel.BasicNackAsync(args.DeliveryTag, false, true, cts.Token); // Requeue
|
|
}
|
|
};
|
|
|
|
var consumerTag = await channel.BasicConsumeAsync(
|
|
queue: queueName,
|
|
autoAck: false,
|
|
consumer: consumer,
|
|
cancellationToken: cancellationToken);
|
|
|
|
var consumerInfo = new ConsumerInfo
|
|
{
|
|
Channel = channel,
|
|
QueueName = queueName,
|
|
ConsumerTag = consumerTag,
|
|
CancellationTokenSource = cts
|
|
};
|
|
|
|
_activeConsumers[consumerKey] = consumerInfo;
|
|
|
|
_logger.LogInformation(
|
|
"Subscribed to stream {StreamName} (queue: {QueueName}, consumer: {ConsumerId})",
|
|
streamName,
|
|
queueName,
|
|
consumerId);
|
|
|
|
// Wait for cancellation
|
|
await Task.Delay(Timeout.Infinite, cts.Token).ContinueWith(_ => { }, TaskContinuationOptions.OnlyOnCanceled);
|
|
}
|
|
|
|
public async Task UnsubscribeExternalAsync(
|
|
string streamName,
|
|
string subscriptionId,
|
|
string consumerId,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var consumerKey = $"{subscriptionId}:{consumerId}";
|
|
|
|
if (_activeConsumers.TryRemove(consumerKey, out var consumerInfo))
|
|
{
|
|
consumerInfo.CancellationTokenSource.Cancel();
|
|
await consumerInfo.Channel.BasicCancelAsync(consumerInfo.ConsumerTag, false, cancellationToken);
|
|
await consumerInfo.Channel.CloseAsync(cancellationToken);
|
|
consumerInfo.Channel.Dispose();
|
|
|
|
_logger.LogInformation("Unsubscribed consumer {ConsumerKey} from stream {StreamName}", consumerKey, streamName);
|
|
}
|
|
}
|
|
|
|
public bool SupportsStream(string streamName)
|
|
{
|
|
// RabbitMQ provider supports all streams by default
|
|
return true;
|
|
}
|
|
|
|
public int GetActiveConsumerCount()
|
|
{
|
|
return _activeConsumers.Count;
|
|
}
|
|
|
|
public bool IsHealthy()
|
|
{
|
|
return _connection?.IsOpen ?? false;
|
|
}
|
|
|
|
private async Task EnsureConnectionAsync(CancellationToken cancellationToken)
|
|
{
|
|
if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true)
|
|
return;
|
|
|
|
await _connectionLock.WaitAsync(cancellationToken);
|
|
try
|
|
{
|
|
if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true)
|
|
return;
|
|
|
|
_logger.LogInformation("Establishing connection to RabbitMQ at {ConnectionString}", _config.ConnectionString);
|
|
|
|
var factory = new ConnectionFactory
|
|
{
|
|
Uri = new Uri(_config.ConnectionString),
|
|
AutomaticRecoveryEnabled = _config.AutoRecovery,
|
|
NetworkRecoveryInterval = _config.RecoveryInterval,
|
|
RequestedHeartbeat = _config.HeartbeatInterval,
|
|
ClientProperties = _config.ConnectionProperties.ToDictionary<KeyValuePair<string, string>, string, object?>(
|
|
kvp => kvp.Key,
|
|
kvp => kvp.Value)
|
|
};
|
|
|
|
_connection = await factory.CreateConnectionAsync(cancellationToken);
|
|
_publishChannel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
|
|
|
_logger.LogInformation("Connected to RabbitMQ successfully");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to connect to RabbitMQ");
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
_connectionLock.Release();
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_isDisposed)
|
|
return;
|
|
|
|
StopAsync().GetAwaiter().GetResult();
|
|
|
|
_connectionLock.Dispose();
|
|
_isDisposed = true;
|
|
}
|
|
}
|