From 7fc680cd93587a45120d75aa2c28ebc3ba487f2c Mon Sep 17 00:00:00 2001 From: Mathias Beaulieu-Duncan Date: Mon, 20 Apr 2026 19:20:49 -0400 Subject: [PATCH] Add Svrnty.CQRS.Events.Kafka package Kafka domain event publisher implementing IDomainEventPublisher, sibling to Svrnty.CQRS.Events.RabbitMQ. Uses Confluent.Kafka 2.6.1, targets net10.0 with C# 14. Features: - Configurable bootstrap servers, client id, idempotence, acks, retries - Security protocol + SASL config (Plaintext/SSL/SASL_SSL etc.) - Topic mapper (default lowercase event-type-name, custom func override) - IAsyncDisposable producer cleanup - Two registration overloads via AddKafkaDomainEvents Project added to solution. Builds with 0 warnings. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../KafkaDomainEventPublisher.cs | 205 ++++++++++++++++++ Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs | 72 ++++++ .../ServiceCollectionExtensions.cs | 53 +++++ .../Svrnty.CQRS.Events.Kafka.csproj | 41 ++++ Svrnty.CQRS.sln | 14 ++ 5 files changed, 385 insertions(+) create mode 100644 Svrnty.CQRS.Events.Kafka/KafkaDomainEventPublisher.cs create mode 100644 Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs create mode 100644 Svrnty.CQRS.Events.Kafka/ServiceCollectionExtensions.cs create mode 100644 Svrnty.CQRS.Events.Kafka/Svrnty.CQRS.Events.Kafka.csproj diff --git a/Svrnty.CQRS.Events.Kafka/KafkaDomainEventPublisher.cs b/Svrnty.CQRS.Events.Kafka/KafkaDomainEventPublisher.cs new file mode 100644 index 0000000..7d20e42 --- /dev/null +++ b/Svrnty.CQRS.Events.Kafka/KafkaDomainEventPublisher.cs @@ -0,0 +1,205 @@ +using System.Text; +using System.Text.Json; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Svrnty.CQRS.Events.Abstractions; + +namespace Svrnty.CQRS.Events.Kafka; + +/// +/// Apache Kafka implementation of the domain event publisher. +/// +public class KafkaDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable +{ + private readonly KafkaEventOptions _options; + private readonly ILogger _logger; + private readonly IProducer _producer; + private bool _disposed; + + /// + /// Creates a new Kafka domain event publisher. + /// + public KafkaDomainEventPublisher( + IOptions options, + ILogger logger) + { + _options = options.Value; + _logger = logger; + + var config = BuildProducerConfig(); + _producer = new ProducerBuilder(config) + .SetErrorHandler((_, e) => _logger.LogError("Kafka producer error: {Reason}", e.Reason)) + .SetLogHandler((_, log) => _logger.LogDebug("Kafka: {Message}", log.Message)) + .Build(); + } + + private ProducerConfig BuildProducerConfig() + { + var config = new ProducerConfig + { + BootstrapServers = _options.BootstrapServers, + ClientId = _options.ClientId, + EnableIdempotence = _options.EnableIdempotence, + MessageTimeoutMs = _options.MessageTimeoutMs, + Acks = (Acks)_options.Acks, + MessageSendMaxRetries = _options.Retries + }; + + // Configure security if specified + if (!string.IsNullOrEmpty(_options.SecurityProtocol) && + _options.SecurityProtocol != "Plaintext") + { + config.SecurityProtocol = Enum.Parse(_options.SecurityProtocol); + } + + if (!string.IsNullOrEmpty(_options.SaslMechanism)) + { + config.SaslMechanism = Enum.Parse(_options.SaslMechanism); + config.SaslUsername = _options.SaslUsername; + config.SaslPassword = _options.SaslPassword; + } + + return config; + } + + /// + public async Task PublishAsync(TEvent @event, CancellationToken cancellationToken = default) + where TEvent : IDomainEvent + { + ObjectDisposedException.ThrowIf(_disposed, this); + + var eventTypeName = typeof(TEvent).Name; + var topic = GetTopicName(eventTypeName); + var key = GetEventKey(@event); + var value = JsonSerializer.Serialize(@event); + + var message = new Message + { + Key = key, + Value = value, + Headers = new Headers + { + { "event-type", Encoding.UTF8.GetBytes(eventTypeName) }, + { "event-id", Encoding.UTF8.GetBytes(@event.EventId.ToString()) }, + { "occurred-at", Encoding.UTF8.GetBytes(@event.OccurredAt.ToString("O")) }, + { "content-type", Encoding.UTF8.GetBytes("application/json") } + } + }; + + try + { + var result = await _producer.ProduceAsync(topic, message, cancellationToken); + + _logger.LogDebug( + "Published event {EventType} with ID {EventId} to topic {Topic}, partition {Partition}, offset {Offset}", + eventTypeName, + @event.EventId, + topic, + result.Partition.Value, + result.Offset.Value); + } + catch (ProduceException ex) + { + _logger.LogError(ex, + "Failed to publish event {EventType} with ID {EventId} to topic {Topic}: {Error}", + eventTypeName, + @event.EventId, + topic, + ex.Error.Reason); + throw; + } + } + + private string GetTopicName(string eventTypeName) + { + // Use custom mapper if provided + if (_options.TopicMapper != null) + { + return _options.TopicMapper(eventTypeName); + } + + // Default mapping: group events by category + // e.g., RequestCreatedEvent -> {prefix}.requests + // e.g., ContainerSpawnedEvent -> {prefix}.containers + var category = GetEventCategory(eventTypeName); + return $"{_options.TopicPrefix}.{category}"; + } + + private static string GetEventCategory(string eventTypeName) + { + // Remove "Event" suffix + var name = eventTypeName.Replace("Event", ""); + + // Map to categories based on prefix + return name switch + { + var n when n.StartsWith("Request") => "requests", + var n when n.StartsWith("Container") => "containers", + var n when n.StartsWith("Plan") => "plans", + var n when n.StartsWith("Project") => "projects", + var n when n.StartsWith("Solution") => "solutions", + var n when n.StartsWith("Claude") || n.StartsWith("Session") => "sessions", + var n when n.StartsWith("Worker") => "workers", + var n when n.StartsWith("Deployment") => "deployments", + _ => "events" // Default catch-all topic + }; + } + + private static string GetEventKey(TEvent @event) where TEvent : IDomainEvent + { + // Use reflection to find the best partition key + // Priority: RequestId > ProjectId > SolutionId > EventId + var type = typeof(TEvent); + + // Try RequestId first (most common for request-scoped events) + var requestIdProp = type.GetProperty("RequestId"); + if (requestIdProp?.GetValue(@event) is Guid requestId && requestId != Guid.Empty) + { + return requestId.ToString(); + } + + // Try ContainerId + var containerIdProp = type.GetProperty("ContainerId"); + if (containerIdProp?.GetValue(@event) is Guid containerId && containerId != Guid.Empty) + { + return containerId.ToString(); + } + + // Try ProjectId + var projectIdProp = type.GetProperty("ProjectId"); + if (projectIdProp?.GetValue(@event) is Guid projectId && projectId != Guid.Empty) + { + return projectId.ToString(); + } + + // Try SolutionId + var solutionIdProp = type.GetProperty("SolutionId"); + if (solutionIdProp?.GetValue(@event) is Guid solutionId && solutionId != Guid.Empty) + { + return solutionId.ToString(); + } + + // Fall back to EventId + return @event.EventId.ToString(); + } + + /// + public ValueTask DisposeAsync() + { + if (_disposed) + { + return ValueTask.CompletedTask; + } + + _disposed = true; + + // Flush pending messages with timeout + _producer.Flush(TimeSpan.FromSeconds(10)); + _producer.Dispose(); + + _logger.LogDebug("Kafka domain event publisher disposed"); + + return ValueTask.CompletedTask; + } +} diff --git a/Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs b/Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs new file mode 100644 index 0000000..1a8cb90 --- /dev/null +++ b/Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs @@ -0,0 +1,72 @@ +namespace Svrnty.CQRS.Events.Kafka; + +/// +/// Configuration options for Kafka domain event publishing. +/// +public class KafkaEventOptions +{ + /// + /// Kafka bootstrap servers. Default: localhost:9092 + /// + public string BootstrapServers { get; set; } = "localhost:9092"; + + /// + /// Prefix for Kafka topics. Default: domain + /// Events will be published to topics like {TopicPrefix}.{category} + /// + public string TopicPrefix { get; set; } = "domain"; + + /// + /// Client identifier for Kafka producer. Default: cqrs-events + /// + public string ClientId { get; set; } = "cqrs-events"; + + /// + /// Enable idempotent producer for exactly-once semantics. Default: true + /// + public bool EnableIdempotence { get; set; } = true; + + /// + /// Message timeout in milliseconds. Default: 30000 (30 seconds) + /// + public int MessageTimeoutMs { get; set; } = 30000; + + /// + /// Security protocol. Default: Plaintext + /// Options: Plaintext, Ssl, SaslPlaintext, SaslSsl + /// + public string SecurityProtocol { get; set; } = "Plaintext"; + + /// + /// SASL mechanism for authentication. Optional. + /// Options: Plain, ScramSha256, ScramSha512 + /// + public string? SaslMechanism { get; set; } + + /// + /// SASL username for authentication. Optional. + /// + public string? SaslUsername { get; set; } + + /// + /// SASL password for authentication. Optional. + /// + public string? SaslPassword { get; set; } + + /// + /// Number of acknowledgements required. Default: All (-1) + /// Options: None (0), Leader (1), All (-1) + /// + public int Acks { get; set; } = -1; + + /// + /// Maximum number of retries. Default: 3 + /// + public int Retries { get; set; } = 3; + + /// + /// Custom topic mapping function. If not set, default mapping is used. + /// Maps event type name to topic name. + /// + public Func? TopicMapper { get; set; } +} diff --git a/Svrnty.CQRS.Events.Kafka/ServiceCollectionExtensions.cs b/Svrnty.CQRS.Events.Kafka/ServiceCollectionExtensions.cs new file mode 100644 index 0000000..54de714 --- /dev/null +++ b/Svrnty.CQRS.Events.Kafka/ServiceCollectionExtensions.cs @@ -0,0 +1,53 @@ +using Microsoft.Extensions.DependencyInjection; +using Svrnty.CQRS.Events.Abstractions; + +namespace Svrnty.CQRS.Events.Kafka; + +/// +/// Extension methods for registering Kafka domain event publishing. +/// +public static class ServiceCollectionExtensions +{ + /// + /// Adds Kafka domain event publishing to the service collection. + /// + /// The service collection. + /// Optional configuration action for Kafka options. + /// The service collection for chaining. + public static IServiceCollection AddKafkaDomainEvents( + this IServiceCollection services, + Action? configure = null) + { + if (configure != null) + { + services.Configure(configure); + } + + services.AddSingleton(); + + return services; + } + + /// + /// Adds Kafka domain event publishing with custom topic mapping. + /// + /// The service collection. + /// Custom function to map event type names to topic names. + /// Optional configuration action for other Kafka options. + /// The service collection for chaining. + public static IServiceCollection AddKafkaDomainEvents( + this IServiceCollection services, + Func topicMapper, + Action? configure = null) + { + services.Configure(options => + { + options.TopicMapper = topicMapper; + configure?.Invoke(options); + }); + + services.AddSingleton(); + + return services; + } +} diff --git a/Svrnty.CQRS.Events.Kafka/Svrnty.CQRS.Events.Kafka.csproj b/Svrnty.CQRS.Events.Kafka/Svrnty.CQRS.Events.Kafka.csproj new file mode 100644 index 0000000..fe13a97 --- /dev/null +++ b/Svrnty.CQRS.Events.Kafka/Svrnty.CQRS.Events.Kafka.csproj @@ -0,0 +1,41 @@ + + + net10.0 + false + 14 + enable + enable + + Svrnty + David Lebee, Mathias Beaulieu-Duncan + icon.png + README.md + https://git.openharbor.io/svrnty/dotnet-cqrs + git + true + MIT + Apache Kafka domain event publishing for Svrnty.CQRS framework + + portable + true + true + true + snupkg + + + + + + + + + + + + + + + + + + diff --git a/Svrnty.CQRS.sln b/Svrnty.CQRS.sln index 37176b1..a3fdff2 100644 --- a/Svrnty.CQRS.sln +++ b/Svrnty.CQRS.sln @@ -43,6 +43,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.Abstract EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.RabbitMQ", "Svrnty.CQRS.Events.RabbitMQ\Svrnty.CQRS.Events.RabbitMQ.csproj", "{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.Kafka", "Svrnty.CQRS.Events.Kafka\Svrnty.CQRS.Events.Kafka.csproj", "{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -257,6 +259,18 @@ Global {3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x64.Build.0 = Release|Any CPU {3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.ActiveCfg = Release|Any CPU {3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.Build.0 = Release|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x64.ActiveCfg = Debug|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x64.Build.0 = Debug|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x86.ActiveCfg = Debug|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x86.Build.0 = Debug|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|Any CPU.Build.0 = Release|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x64.ActiveCfg = Release|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x64.Build.0 = Release|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x86.ActiveCfg = Release|Any CPU + {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE