190 lines
6.6 KiB
C#
190 lines
6.6 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using System.Collections.Generic;
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using Microsoft.Extensions.Logging;
|
|
using RabbitMQ.Client;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.RabbitMQ.Serialization;
|
|
|
|
/// <summary>
|
|
/// Serializes and deserializes events for RabbitMQ transport.
|
|
/// </summary>
|
|
public sealed class RabbitMQEventSerializer
|
|
{
|
|
private readonly ILogger<RabbitMQEventSerializer> _logger;
|
|
private readonly JsonSerializerOptions _jsonOptions;
|
|
|
|
// Header keys
|
|
private const string EventTypeHeader = "event-type";
|
|
private const string EventIdHeader = "event-id";
|
|
private const string CorrelationIdHeader = "correlation-id";
|
|
private const string TimestampHeader = "timestamp";
|
|
private const string AssemblyQualifiedNameHeader = "assembly-qualified-name";
|
|
|
|
public RabbitMQEventSerializer(ILogger<RabbitMQEventSerializer> logger)
|
|
{
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
|
|
_jsonOptions = new JsonSerializerOptions
|
|
{
|
|
PropertyNameCaseInsensitive = true,
|
|
WriteIndented = false
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Serializes an event to a RabbitMQ message.
|
|
/// </summary>
|
|
/// <param name="event">The event to serialize.</param>
|
|
/// <param name="additionalMetadata">Additional metadata to include in headers.</param>
|
|
/// <returns>Message body and properties.</returns>
|
|
public (byte[] Body, IReadOnlyBasicProperties Properties) Serialize(
|
|
ICorrelatedEvent @event,
|
|
IDictionary<string, string>? additionalMetadata = null)
|
|
{
|
|
if (@event == null)
|
|
throw new ArgumentNullException(nameof(@event));
|
|
|
|
try
|
|
{
|
|
// Serialize event to JSON
|
|
var eventType = @event.GetType();
|
|
var json = JsonSerializer.Serialize(@event, eventType, _jsonOptions);
|
|
var body = Encoding.UTF8.GetBytes(json);
|
|
|
|
// Create properties with headers
|
|
var properties = new BasicProperties
|
|
{
|
|
Persistent = true,
|
|
ContentType = "application/json",
|
|
ContentEncoding = "utf-8",
|
|
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
|
|
Headers = new Dictionary<string, object?>()
|
|
};
|
|
|
|
// Add event metadata to headers
|
|
properties.Headers[EventTypeHeader] = eventType.Name;
|
|
properties.Headers[EventIdHeader] = @event.EventId;
|
|
properties.Headers[CorrelationIdHeader] = @event.CorrelationId ?? string.Empty;
|
|
properties.Headers[TimestampHeader] = DateTimeOffset.UtcNow.ToString("O");
|
|
properties.Headers[AssemblyQualifiedNameHeader] = eventType.AssemblyQualifiedName ?? eventType.FullName ?? eventType.Name;
|
|
|
|
// Add additional metadata
|
|
if (additionalMetadata != null)
|
|
{
|
|
foreach (var kvp in additionalMetadata)
|
|
{
|
|
properties.Headers[kvp.Key] = kvp.Value;
|
|
}
|
|
}
|
|
|
|
_logger.LogDebug(
|
|
"Serialized event {EventType} (ID: {EventId}) to {Bytes} bytes",
|
|
eventType.Name,
|
|
@event.EventId,
|
|
body.Length);
|
|
|
|
return (body, properties);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to serialize event {EventType}", @event.GetType().Name);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Deserializes a RabbitMQ message to an event.
|
|
/// </summary>
|
|
/// <param name="body">The message body.</param>
|
|
/// <param name="properties">The message properties.</param>
|
|
/// <param name="metadata">Output dictionary containing message metadata.</param>
|
|
/// <returns>The deserialized event, or null if deserialization fails.</returns>
|
|
public ICorrelatedEvent? Deserialize(
|
|
ReadOnlyMemory<byte> body,
|
|
IReadOnlyBasicProperties properties,
|
|
out Dictionary<string, string> metadata)
|
|
{
|
|
metadata = new Dictionary<string, string>();
|
|
|
|
try
|
|
{
|
|
// Extract metadata from headers
|
|
if (properties.Headers != null)
|
|
{
|
|
foreach (var header in properties.Headers)
|
|
{
|
|
var value = header.Value?.ToString() ?? string.Empty;
|
|
metadata[header.Key] = value;
|
|
}
|
|
}
|
|
|
|
// Get event type from headers
|
|
if (!metadata.TryGetValue(AssemblyQualifiedNameHeader, out var assemblyQualifiedName) ||
|
|
string.IsNullOrWhiteSpace(assemblyQualifiedName))
|
|
{
|
|
_logger.LogWarning("Message missing assembly-qualified-name header, cannot deserialize");
|
|
return null;
|
|
}
|
|
|
|
// Resolve type
|
|
var eventType = Type.GetType(assemblyQualifiedName);
|
|
if (eventType == null)
|
|
{
|
|
_logger.LogWarning(
|
|
"Could not resolve event type {TypeName}, event may be from different assembly version",
|
|
assemblyQualifiedName);
|
|
return null;
|
|
}
|
|
|
|
// Deserialize JSON
|
|
var json = Encoding.UTF8.GetString(body.Span);
|
|
var @event = JsonSerializer.Deserialize(json, eventType, _jsonOptions) as ICorrelatedEvent;
|
|
|
|
if (@event == null)
|
|
{
|
|
_logger.LogWarning(
|
|
"Deserialized object is not an ICorrelatedEvent (type: {TypeName})",
|
|
eventType.Name);
|
|
return null;
|
|
}
|
|
|
|
_logger.LogDebug(
|
|
"Deserialized event {EventType} (ID: {EventId})",
|
|
eventType.Name,
|
|
@event.EventId);
|
|
|
|
return @event;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Failed to deserialize message");
|
|
return null;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Extracts metadata from message properties without deserializing the body.
|
|
/// </summary>
|
|
/// <param name="properties">The message properties.</param>
|
|
/// <returns>Dictionary containing message metadata.</returns>
|
|
public Dictionary<string, string> ExtractMetadata(IReadOnlyBasicProperties properties)
|
|
{
|
|
var metadata = new Dictionary<string, string>();
|
|
|
|
if (properties.Headers == null)
|
|
return metadata;
|
|
|
|
foreach (var header in properties.Headers)
|
|
{
|
|
var value = header.Value?.ToString() ?? string.Empty;
|
|
metadata[header.Key] = value;
|
|
}
|
|
|
|
return metadata;
|
|
}
|
|
}
|