using System;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.RabbitMQ.Serialization;
///
/// Serializes and deserializes events for RabbitMQ transport.
///
public sealed class RabbitMQEventSerializer
{
private readonly ILogger _logger;
private readonly JsonSerializerOptions _jsonOptions;
// Header keys
private const string EventTypeHeader = "event-type";
private const string EventIdHeader = "event-id";
private const string CorrelationIdHeader = "correlation-id";
private const string TimestampHeader = "timestamp";
private const string AssemblyQualifiedNameHeader = "assembly-qualified-name";
public RabbitMQEventSerializer(ILogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_jsonOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
WriteIndented = false
};
}
///
/// Serializes an event to a RabbitMQ message.
///
/// The event to serialize.
/// Additional metadata to include in headers.
/// Message body and properties.
public (byte[] Body, IReadOnlyBasicProperties Properties) Serialize(
ICorrelatedEvent @event,
IDictionary? additionalMetadata = null)
{
if (@event == null)
throw new ArgumentNullException(nameof(@event));
try
{
// Serialize event to JSON
var eventType = @event.GetType();
var json = JsonSerializer.Serialize(@event, eventType, _jsonOptions);
var body = Encoding.UTF8.GetBytes(json);
// Create properties with headers
var properties = new BasicProperties
{
Persistent = true,
ContentType = "application/json",
ContentEncoding = "utf-8",
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
Headers = new Dictionary()
};
// Add event metadata to headers
properties.Headers[EventTypeHeader] = eventType.Name;
properties.Headers[EventIdHeader] = @event.EventId;
properties.Headers[CorrelationIdHeader] = @event.CorrelationId ?? string.Empty;
properties.Headers[TimestampHeader] = DateTimeOffset.UtcNow.ToString("O");
properties.Headers[AssemblyQualifiedNameHeader] = eventType.AssemblyQualifiedName ?? eventType.FullName ?? eventType.Name;
// Add additional metadata
if (additionalMetadata != null)
{
foreach (var kvp in additionalMetadata)
{
properties.Headers[kvp.Key] = kvp.Value;
}
}
_logger.LogDebug(
"Serialized event {EventType} (ID: {EventId}) to {Bytes} bytes",
eventType.Name,
@event.EventId,
body.Length);
return (body, properties);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to serialize event {EventType}", @event.GetType().Name);
throw;
}
}
///
/// Deserializes a RabbitMQ message to an event.
///
/// The message body.
/// The message properties.
/// Output dictionary containing message metadata.
/// The deserialized event, or null if deserialization fails.
public ICorrelatedEvent? Deserialize(
ReadOnlyMemory body,
IReadOnlyBasicProperties properties,
out Dictionary metadata)
{
metadata = new Dictionary();
try
{
// Extract metadata from headers
if (properties.Headers != null)
{
foreach (var header in properties.Headers)
{
var value = header.Value?.ToString() ?? string.Empty;
metadata[header.Key] = value;
}
}
// Get event type from headers
if (!metadata.TryGetValue(AssemblyQualifiedNameHeader, out var assemblyQualifiedName) ||
string.IsNullOrWhiteSpace(assemblyQualifiedName))
{
_logger.LogWarning("Message missing assembly-qualified-name header, cannot deserialize");
return null;
}
// Resolve type
var eventType = Type.GetType(assemblyQualifiedName);
if (eventType == null)
{
_logger.LogWarning(
"Could not resolve event type {TypeName}, event may be from different assembly version",
assemblyQualifiedName);
return null;
}
// Deserialize JSON
var json = Encoding.UTF8.GetString(body.Span);
var @event = JsonSerializer.Deserialize(json, eventType, _jsonOptions) as ICorrelatedEvent;
if (@event == null)
{
_logger.LogWarning(
"Deserialized object is not an ICorrelatedEvent (type: {TypeName})",
eventType.Name);
return null;
}
_logger.LogDebug(
"Deserialized event {EventType} (ID: {EventId})",
eventType.Name,
@event.EventId);
return @event;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to deserialize message");
return null;
}
}
///
/// Extracts metadata from message properties without deserializing the body.
///
/// The message properties.
/// Dictionary containing message metadata.
public Dictionary ExtractMetadata(IReadOnlyBasicProperties properties)
{
var metadata = new Dictionary();
if (properties.Headers == null)
return metadata;
foreach (var header in properties.Headers)
{
var value = header.Value?.ToString() ?? string.Empty;
metadata[header.Key] = value;
}
return metadata;
}
}