using System; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Collections.Generic; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.RabbitMQ.Serialization; /// /// Serializes and deserializes events for RabbitMQ transport. /// public sealed class RabbitMQEventSerializer { private readonly ILogger _logger; private readonly JsonSerializerOptions _jsonOptions; // Header keys private const string EventTypeHeader = "event-type"; private const string EventIdHeader = "event-id"; private const string CorrelationIdHeader = "correlation-id"; private const string TimestampHeader = "timestamp"; private const string AssemblyQualifiedNameHeader = "assembly-qualified-name"; public RabbitMQEventSerializer(ILogger logger) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true, WriteIndented = false }; } /// /// Serializes an event to a RabbitMQ message. /// /// The event to serialize. /// Additional metadata to include in headers. /// Message body and properties. public (byte[] Body, IReadOnlyBasicProperties Properties) Serialize( ICorrelatedEvent @event, IDictionary? additionalMetadata = null) { if (@event == null) throw new ArgumentNullException(nameof(@event)); try { // Serialize event to JSON var eventType = @event.GetType(); var json = JsonSerializer.Serialize(@event, eventType, _jsonOptions); var body = Encoding.UTF8.GetBytes(json); // Create properties with headers var properties = new BasicProperties { Persistent = true, ContentType = "application/json", ContentEncoding = "utf-8", Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()), Headers = new Dictionary() }; // Add event metadata to headers properties.Headers[EventTypeHeader] = eventType.Name; properties.Headers[EventIdHeader] = @event.EventId; properties.Headers[CorrelationIdHeader] = @event.CorrelationId ?? string.Empty; properties.Headers[TimestampHeader] = DateTimeOffset.UtcNow.ToString("O"); properties.Headers[AssemblyQualifiedNameHeader] = eventType.AssemblyQualifiedName ?? eventType.FullName ?? eventType.Name; // Add additional metadata if (additionalMetadata != null) { foreach (var kvp in additionalMetadata) { properties.Headers[kvp.Key] = kvp.Value; } } _logger.LogDebug( "Serialized event {EventType} (ID: {EventId}) to {Bytes} bytes", eventType.Name, @event.EventId, body.Length); return (body, properties); } catch (Exception ex) { _logger.LogError(ex, "Failed to serialize event {EventType}", @event.GetType().Name); throw; } } /// /// Deserializes a RabbitMQ message to an event. /// /// The message body. /// The message properties. /// Output dictionary containing message metadata. /// The deserialized event, or null if deserialization fails. public ICorrelatedEvent? Deserialize( ReadOnlyMemory body, IReadOnlyBasicProperties properties, out Dictionary metadata) { metadata = new Dictionary(); try { // Extract metadata from headers if (properties.Headers != null) { foreach (var header in properties.Headers) { var value = header.Value?.ToString() ?? string.Empty; metadata[header.Key] = value; } } // Get event type from headers if (!metadata.TryGetValue(AssemblyQualifiedNameHeader, out var assemblyQualifiedName) || string.IsNullOrWhiteSpace(assemblyQualifiedName)) { _logger.LogWarning("Message missing assembly-qualified-name header, cannot deserialize"); return null; } // Resolve type var eventType = Type.GetType(assemblyQualifiedName); if (eventType == null) { _logger.LogWarning( "Could not resolve event type {TypeName}, event may be from different assembly version", assemblyQualifiedName); return null; } // Deserialize JSON var json = Encoding.UTF8.GetString(body.Span); var @event = JsonSerializer.Deserialize(json, eventType, _jsonOptions) as ICorrelatedEvent; if (@event == null) { _logger.LogWarning( "Deserialized object is not an ICorrelatedEvent (type: {TypeName})", eventType.Name); return null; } _logger.LogDebug( "Deserialized event {EventType} (ID: {EventId})", eventType.Name, @event.EventId); return @event; } catch (Exception ex) { _logger.LogError(ex, "Failed to deserialize message"); return null; } } /// /// Extracts metadata from message properties without deserializing the body. /// /// The message properties. /// Dictionary containing message metadata. public Dictionary ExtractMetadata(IReadOnlyBasicProperties properties) { var metadata = new Dictionary(); if (properties.Headers == null) return metadata; foreach (var header in properties.Headers) { var value = header.Value?.ToString() ?? string.Empty; metadata[header.Key] = value; } return metadata; } }