dotnet-cqrs/Svrnty.CQRS.Events.RabbitMQ/RabbitMqDomainEventPublisher.cs

164 lines
5.0 KiB
C#

using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.RabbitMQ;
/// <summary>
/// RabbitMQ implementation of the domain event publisher.
/// </summary>
public class RabbitMqDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable
{
private readonly RabbitMqEventOptions _options;
private readonly ILogger<RabbitMqDomainEventPublisher> _logger;
private IConnection? _connection;
private IChannel? _channel;
private readonly SemaphoreSlim _connectionLock = new(1, 1);
private bool _disposed;
/// <summary>
/// Creates a new RabbitMQ domain event publisher.
/// </summary>
public RabbitMqDomainEventPublisher(
IOptions<RabbitMqEventOptions> options,
ILogger<RabbitMqDomainEventPublisher> logger)
{
_options = options.Value;
_logger = logger;
}
/// <inheritdoc />
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : IDomainEvent
{
await EnsureConnectionAsync(cancellationToken);
var eventTypeName = typeof(TEvent).Name;
var routingKey = GetRoutingKey(eventTypeName);
var body = JsonSerializer.SerializeToUtf8Bytes(@event);
var properties = new BasicProperties
{
MessageId = @event.EventId.ToString(),
ContentType = "application/json",
DeliveryMode = _options.Durable ? DeliveryModes.Persistent : DeliveryModes.Transient,
Timestamp = new AmqpTimestamp(new DateTimeOffset(@event.OccurredAt).ToUnixTimeSeconds()),
Headers = new Dictionary<string, object?>
{
["event-type"] = eventTypeName,
["event-id"] = @event.EventId.ToString()
}
};
await _channel!.BasicPublishAsync(
exchange: _options.Exchange,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body,
cancellationToken: cancellationToken);
_logger.LogDebug(
"Published domain event {EventType} with ID {EventId} to routing key {RoutingKey}",
eventTypeName, @event.EventId, routingKey);
}
private static string GetRoutingKey(string eventTypeName)
{
// Convert PascalCase to dot-notation, e.g., "InventoryMovementEvent" -> "events.inventory.movement"
var name = eventTypeName.Replace("Event", "");
var words = new List<string>();
var currentWord = new StringBuilder();
foreach (var c in name)
{
if (char.IsUpper(c) && currentWord.Length > 0)
{
words.Add(currentWord.ToString().ToLowerInvariant());
currentWord.Clear();
}
currentWord.Append(c);
}
if (currentWord.Length > 0)
{
words.Add(currentWord.ToString().ToLowerInvariant());
}
return "events." + string.Join(".", words);
}
private async Task EnsureConnectionAsync(CancellationToken cancellationToken)
{
if (_connection?.IsOpen == true && _channel?.IsOpen == true)
{
return;
}
await _connectionLock.WaitAsync(cancellationToken);
try
{
if (_connection?.IsOpen == true && _channel?.IsOpen == true)
{
return;
}
var factory = new ConnectionFactory
{
HostName = _options.HostName,
Port = _options.Port,
UserName = _options.UserName,
Password = _options.Password,
VirtualHost = _options.VirtualHost
};
_connection = await factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
// Declare topic exchange for domain events
await _channel.ExchangeDeclareAsync(
exchange: _options.Exchange,
type: ExchangeType.Topic,
durable: _options.Durable,
autoDelete: false,
cancellationToken: cancellationToken);
_logger.LogInformation(
"Connected to RabbitMQ at {Host}:{Port}, exchange: {Exchange}",
_options.HostName, _options.Port, _options.Exchange);
}
finally
{
_connectionLock.Release();
}
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
if (_channel?.IsOpen == true)
{
await _channel.CloseAsync();
}
_channel?.Dispose();
if (_connection?.IsOpen == true)
{
await _connection.CloseAsync();
}
_connection?.Dispose();
_connectionLock.Dispose();
}
}