using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.RabbitMQ; /// /// RabbitMQ implementation of the domain event publisher. /// public class RabbitMqDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable { private readonly RabbitMqEventOptions _options; private readonly ILogger _logger; private IConnection? _connection; private IChannel? _channel; private readonly SemaphoreSlim _connectionLock = new(1, 1); private bool _disposed; /// /// Creates a new RabbitMQ domain event publisher. /// public RabbitMqDomainEventPublisher( IOptions options, ILogger logger) { _options = options.Value; _logger = logger; } /// public async Task PublishAsync(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IDomainEvent { await EnsureConnectionAsync(cancellationToken); var eventTypeName = typeof(TEvent).Name; var routingKey = GetRoutingKey(eventTypeName); var body = JsonSerializer.SerializeToUtf8Bytes(@event); var properties = new BasicProperties { MessageId = @event.EventId.ToString(), ContentType = "application/json", DeliveryMode = _options.Durable ? DeliveryModes.Persistent : DeliveryModes.Transient, Timestamp = new AmqpTimestamp(new DateTimeOffset(@event.OccurredAt).ToUnixTimeSeconds()), Headers = new Dictionary { ["event-type"] = eventTypeName, ["event-id"] = @event.EventId.ToString() } }; await _channel!.BasicPublishAsync( exchange: _options.Exchange, routingKey: routingKey, mandatory: false, basicProperties: properties, body: body, cancellationToken: cancellationToken); _logger.LogDebug( "Published domain event {EventType} with ID {EventId} to routing key {RoutingKey}", eventTypeName, @event.EventId, routingKey); } private static string GetRoutingKey(string eventTypeName) { // Convert PascalCase to dot-notation, e.g., "InventoryMovementEvent" -> "events.inventory.movement" var name = eventTypeName.Replace("Event", ""); var words = new List(); var currentWord = new StringBuilder(); foreach (var c in name) { if (char.IsUpper(c) && currentWord.Length > 0) { words.Add(currentWord.ToString().ToLowerInvariant()); currentWord.Clear(); } currentWord.Append(c); } if (currentWord.Length > 0) { words.Add(currentWord.ToString().ToLowerInvariant()); } return "events." + string.Join(".", words); } private async Task EnsureConnectionAsync(CancellationToken cancellationToken) { if (_connection?.IsOpen == true && _channel?.IsOpen == true) { return; } await _connectionLock.WaitAsync(cancellationToken); try { if (_connection?.IsOpen == true && _channel?.IsOpen == true) { return; } var factory = new ConnectionFactory { HostName = _options.HostName, Port = _options.Port, UserName = _options.UserName, Password = _options.Password, VirtualHost = _options.VirtualHost }; _connection = await factory.CreateConnectionAsync(cancellationToken); _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); // Declare topic exchange for domain events await _channel.ExchangeDeclareAsync( exchange: _options.Exchange, type: ExchangeType.Topic, durable: _options.Durable, autoDelete: false, cancellationToken: cancellationToken); _logger.LogInformation( "Connected to RabbitMQ at {Host}:{Port}, exchange: {Exchange}", _options.HostName, _options.Port, _options.Exchange); } finally { _connectionLock.Release(); } } /// public async ValueTask DisposeAsync() { if (_disposed) { return; } _disposed = true; if (_channel?.IsOpen == true) { await _channel.CloseAsync(); } _channel?.Dispose(); if (_connection?.IsOpen == true) { await _connection.CloseAsync(); } _connection?.Dispose(); _connectionLock.Dispose(); } }