using System.Text; using System.Text.Json; using Confluent.Kafka; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.Kafka; /// /// Apache Kafka implementation of the domain event publisher. /// public class KafkaDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable { private readonly KafkaEventOptions _options; private readonly ILogger _logger; private readonly IProducer _producer; private bool _disposed; /// /// Creates a new Kafka domain event publisher. /// public KafkaDomainEventPublisher( IOptions options, ILogger logger) { _options = options.Value; _logger = logger; var config = BuildProducerConfig(); _producer = new ProducerBuilder(config) .SetErrorHandler((_, e) => _logger.LogError("Kafka producer error: {Reason}", e.Reason)) .SetLogHandler((_, log) => _logger.LogDebug("Kafka: {Message}", log.Message)) .Build(); } private ProducerConfig BuildProducerConfig() { var config = new ProducerConfig { BootstrapServers = _options.BootstrapServers, ClientId = _options.ClientId, EnableIdempotence = _options.EnableIdempotence, MessageTimeoutMs = _options.MessageTimeoutMs, Acks = (Acks)_options.Acks, MessageSendMaxRetries = _options.Retries }; // Configure security if specified if (!string.IsNullOrEmpty(_options.SecurityProtocol) && _options.SecurityProtocol != "Plaintext") { config.SecurityProtocol = Enum.Parse(_options.SecurityProtocol); } if (!string.IsNullOrEmpty(_options.SaslMechanism)) { config.SaslMechanism = Enum.Parse(_options.SaslMechanism); config.SaslUsername = _options.SaslUsername; config.SaslPassword = _options.SaslPassword; } return config; } /// public async Task PublishAsync(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IDomainEvent { ObjectDisposedException.ThrowIf(_disposed, this); var eventTypeName = typeof(TEvent).Name; var topic = GetTopicName(eventTypeName); var key = GetEventKey(@event); var value = JsonSerializer.Serialize(@event); var message = new Message { Key = key, Value = value, Headers = new Headers { { "event-type", Encoding.UTF8.GetBytes(eventTypeName) }, { "event-id", Encoding.UTF8.GetBytes(@event.EventId.ToString()) }, { "occurred-at", Encoding.UTF8.GetBytes(@event.OccurredAt.ToString("O")) }, { "content-type", Encoding.UTF8.GetBytes("application/json") } } }; try { var result = await _producer.ProduceAsync(topic, message, cancellationToken); _logger.LogDebug( "Published event {EventType} with ID {EventId} to topic {Topic}, partition {Partition}, offset {Offset}", eventTypeName, @event.EventId, topic, result.Partition.Value, result.Offset.Value); } catch (ProduceException ex) { _logger.LogError(ex, "Failed to publish event {EventType} with ID {EventId} to topic {Topic}: {Error}", eventTypeName, @event.EventId, topic, ex.Error.Reason); throw; } } private string GetTopicName(string eventTypeName) { // Use custom mapper if provided if (_options.TopicMapper != null) { return _options.TopicMapper(eventTypeName); } // Default mapping: group events by category // e.g., RequestCreatedEvent -> {prefix}.requests // e.g., ContainerSpawnedEvent -> {prefix}.containers var category = GetEventCategory(eventTypeName); return $"{_options.TopicPrefix}.{category}"; } private static string GetEventCategory(string eventTypeName) { // Remove "Event" suffix var name = eventTypeName.Replace("Event", ""); // Map to categories based on prefix return name switch { var n when n.StartsWith("Request") => "requests", var n when n.StartsWith("Container") => "containers", var n when n.StartsWith("Plan") => "plans", var n when n.StartsWith("Project") => "projects", var n when n.StartsWith("Solution") => "solutions", var n when n.StartsWith("Claude") || n.StartsWith("Session") => "sessions", var n when n.StartsWith("Worker") => "workers", var n when n.StartsWith("Deployment") => "deployments", _ => "events" // Default catch-all topic }; } private static string GetEventKey(TEvent @event) where TEvent : IDomainEvent { // Use reflection to find the best partition key // Priority: RequestId > ProjectId > SolutionId > EventId var type = typeof(TEvent); // Try RequestId first (most common for request-scoped events) var requestIdProp = type.GetProperty("RequestId"); if (requestIdProp?.GetValue(@event) is Guid requestId && requestId != Guid.Empty) { return requestId.ToString(); } // Try ContainerId var containerIdProp = type.GetProperty("ContainerId"); if (containerIdProp?.GetValue(@event) is Guid containerId && containerId != Guid.Empty) { return containerId.ToString(); } // Try ProjectId var projectIdProp = type.GetProperty("ProjectId"); if (projectIdProp?.GetValue(@event) is Guid projectId && projectId != Guid.Empty) { return projectId.ToString(); } // Try SolutionId var solutionIdProp = type.GetProperty("SolutionId"); if (solutionIdProp?.GetValue(@event) is Guid solutionId && solutionId != Guid.Empty) { return solutionId.ToString(); } // Fall back to EventId return @event.EventId.ToString(); } /// public ValueTask DisposeAsync() { if (_disposed) { return ValueTask.CompletedTask; } _disposed = true; // Flush pending messages with timeout _producer.Flush(TimeSpan.FromSeconds(10)); _producer.Dispose(); _logger.LogDebug("Kafka domain event publisher disposed"); return ValueTask.CompletedTask; } }