diff --git a/Svrnty.CQRS.Events.Kafka/KafkaDomainEventPublisher.cs b/Svrnty.CQRS.Events.Kafka/KafkaDomainEventPublisher.cs
new file mode 100644
index 0000000..7d20e42
--- /dev/null
+++ b/Svrnty.CQRS.Events.Kafka/KafkaDomainEventPublisher.cs
@@ -0,0 +1,205 @@
+using System.Text;
+using System.Text.Json;
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Svrnty.CQRS.Events.Abstractions;
+
+namespace Svrnty.CQRS.Events.Kafka;
+
+///
+/// Apache Kafka implementation of the domain event publisher.
+///
+public class KafkaDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable
+{
+ private readonly KafkaEventOptions _options;
+ private readonly ILogger _logger;
+ private readonly IProducer _producer;
+ private bool _disposed;
+
+ ///
+ /// Creates a new Kafka domain event publisher.
+ ///
+ public KafkaDomainEventPublisher(
+ IOptions options,
+ ILogger logger)
+ {
+ _options = options.Value;
+ _logger = logger;
+
+ var config = BuildProducerConfig();
+ _producer = new ProducerBuilder(config)
+ .SetErrorHandler((_, e) => _logger.LogError("Kafka producer error: {Reason}", e.Reason))
+ .SetLogHandler((_, log) => _logger.LogDebug("Kafka: {Message}", log.Message))
+ .Build();
+ }
+
+ private ProducerConfig BuildProducerConfig()
+ {
+ var config = new ProducerConfig
+ {
+ BootstrapServers = _options.BootstrapServers,
+ ClientId = _options.ClientId,
+ EnableIdempotence = _options.EnableIdempotence,
+ MessageTimeoutMs = _options.MessageTimeoutMs,
+ Acks = (Acks)_options.Acks,
+ MessageSendMaxRetries = _options.Retries
+ };
+
+ // Configure security if specified
+ if (!string.IsNullOrEmpty(_options.SecurityProtocol) &&
+ _options.SecurityProtocol != "Plaintext")
+ {
+ config.SecurityProtocol = Enum.Parse(_options.SecurityProtocol);
+ }
+
+ if (!string.IsNullOrEmpty(_options.SaslMechanism))
+ {
+ config.SaslMechanism = Enum.Parse(_options.SaslMechanism);
+ config.SaslUsername = _options.SaslUsername;
+ config.SaslPassword = _options.SaslPassword;
+ }
+
+ return config;
+ }
+
+ ///
+ public async Task PublishAsync(TEvent @event, CancellationToken cancellationToken = default)
+ where TEvent : IDomainEvent
+ {
+ ObjectDisposedException.ThrowIf(_disposed, this);
+
+ var eventTypeName = typeof(TEvent).Name;
+ var topic = GetTopicName(eventTypeName);
+ var key = GetEventKey(@event);
+ var value = JsonSerializer.Serialize(@event);
+
+ var message = new Message
+ {
+ Key = key,
+ Value = value,
+ Headers = new Headers
+ {
+ { "event-type", Encoding.UTF8.GetBytes(eventTypeName) },
+ { "event-id", Encoding.UTF8.GetBytes(@event.EventId.ToString()) },
+ { "occurred-at", Encoding.UTF8.GetBytes(@event.OccurredAt.ToString("O")) },
+ { "content-type", Encoding.UTF8.GetBytes("application/json") }
+ }
+ };
+
+ try
+ {
+ var result = await _producer.ProduceAsync(topic, message, cancellationToken);
+
+ _logger.LogDebug(
+ "Published event {EventType} with ID {EventId} to topic {Topic}, partition {Partition}, offset {Offset}",
+ eventTypeName,
+ @event.EventId,
+ topic,
+ result.Partition.Value,
+ result.Offset.Value);
+ }
+ catch (ProduceException ex)
+ {
+ _logger.LogError(ex,
+ "Failed to publish event {EventType} with ID {EventId} to topic {Topic}: {Error}",
+ eventTypeName,
+ @event.EventId,
+ topic,
+ ex.Error.Reason);
+ throw;
+ }
+ }
+
+ private string GetTopicName(string eventTypeName)
+ {
+ // Use custom mapper if provided
+ if (_options.TopicMapper != null)
+ {
+ return _options.TopicMapper(eventTypeName);
+ }
+
+ // Default mapping: group events by category
+ // e.g., RequestCreatedEvent -> {prefix}.requests
+ // e.g., ContainerSpawnedEvent -> {prefix}.containers
+ var category = GetEventCategory(eventTypeName);
+ return $"{_options.TopicPrefix}.{category}";
+ }
+
+ private static string GetEventCategory(string eventTypeName)
+ {
+ // Remove "Event" suffix
+ var name = eventTypeName.Replace("Event", "");
+
+ // Map to categories based on prefix
+ return name switch
+ {
+ var n when n.StartsWith("Request") => "requests",
+ var n when n.StartsWith("Container") => "containers",
+ var n when n.StartsWith("Plan") => "plans",
+ var n when n.StartsWith("Project") => "projects",
+ var n when n.StartsWith("Solution") => "solutions",
+ var n when n.StartsWith("Claude") || n.StartsWith("Session") => "sessions",
+ var n when n.StartsWith("Worker") => "workers",
+ var n when n.StartsWith("Deployment") => "deployments",
+ _ => "events" // Default catch-all topic
+ };
+ }
+
+ private static string GetEventKey(TEvent @event) where TEvent : IDomainEvent
+ {
+ // Use reflection to find the best partition key
+ // Priority: RequestId > ProjectId > SolutionId > EventId
+ var type = typeof(TEvent);
+
+ // Try RequestId first (most common for request-scoped events)
+ var requestIdProp = type.GetProperty("RequestId");
+ if (requestIdProp?.GetValue(@event) is Guid requestId && requestId != Guid.Empty)
+ {
+ return requestId.ToString();
+ }
+
+ // Try ContainerId
+ var containerIdProp = type.GetProperty("ContainerId");
+ if (containerIdProp?.GetValue(@event) is Guid containerId && containerId != Guid.Empty)
+ {
+ return containerId.ToString();
+ }
+
+ // Try ProjectId
+ var projectIdProp = type.GetProperty("ProjectId");
+ if (projectIdProp?.GetValue(@event) is Guid projectId && projectId != Guid.Empty)
+ {
+ return projectId.ToString();
+ }
+
+ // Try SolutionId
+ var solutionIdProp = type.GetProperty("SolutionId");
+ if (solutionIdProp?.GetValue(@event) is Guid solutionId && solutionId != Guid.Empty)
+ {
+ return solutionId.ToString();
+ }
+
+ // Fall back to EventId
+ return @event.EventId.ToString();
+ }
+
+ ///
+ public ValueTask DisposeAsync()
+ {
+ if (_disposed)
+ {
+ return ValueTask.CompletedTask;
+ }
+
+ _disposed = true;
+
+ // Flush pending messages with timeout
+ _producer.Flush(TimeSpan.FromSeconds(10));
+ _producer.Dispose();
+
+ _logger.LogDebug("Kafka domain event publisher disposed");
+
+ return ValueTask.CompletedTask;
+ }
+}
diff --git a/Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs b/Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs
new file mode 100644
index 0000000..1a8cb90
--- /dev/null
+++ b/Svrnty.CQRS.Events.Kafka/KafkaEventOptions.cs
@@ -0,0 +1,72 @@
+namespace Svrnty.CQRS.Events.Kafka;
+
+///
+/// Configuration options for Kafka domain event publishing.
+///
+public class KafkaEventOptions
+{
+ ///
+ /// Kafka bootstrap servers. Default: localhost:9092
+ ///
+ public string BootstrapServers { get; set; } = "localhost:9092";
+
+ ///
+ /// Prefix for Kafka topics. Default: domain
+ /// Events will be published to topics like {TopicPrefix}.{category}
+ ///
+ public string TopicPrefix { get; set; } = "domain";
+
+ ///
+ /// Client identifier for Kafka producer. Default: cqrs-events
+ ///
+ public string ClientId { get; set; } = "cqrs-events";
+
+ ///
+ /// Enable idempotent producer for exactly-once semantics. Default: true
+ ///
+ public bool EnableIdempotence { get; set; } = true;
+
+ ///
+ /// Message timeout in milliseconds. Default: 30000 (30 seconds)
+ ///
+ public int MessageTimeoutMs { get; set; } = 30000;
+
+ ///
+ /// Security protocol. Default: Plaintext
+ /// Options: Plaintext, Ssl, SaslPlaintext, SaslSsl
+ ///
+ public string SecurityProtocol { get; set; } = "Plaintext";
+
+ ///
+ /// SASL mechanism for authentication. Optional.
+ /// Options: Plain, ScramSha256, ScramSha512
+ ///
+ public string? SaslMechanism { get; set; }
+
+ ///
+ /// SASL username for authentication. Optional.
+ ///
+ public string? SaslUsername { get; set; }
+
+ ///
+ /// SASL password for authentication. Optional.
+ ///
+ public string? SaslPassword { get; set; }
+
+ ///
+ /// Number of acknowledgements required. Default: All (-1)
+ /// Options: None (0), Leader (1), All (-1)
+ ///
+ public int Acks { get; set; } = -1;
+
+ ///
+ /// Maximum number of retries. Default: 3
+ ///
+ public int Retries { get; set; } = 3;
+
+ ///
+ /// Custom topic mapping function. If not set, default mapping is used.
+ /// Maps event type name to topic name.
+ ///
+ public Func? TopicMapper { get; set; }
+}
diff --git a/Svrnty.CQRS.Events.Kafka/ServiceCollectionExtensions.cs b/Svrnty.CQRS.Events.Kafka/ServiceCollectionExtensions.cs
new file mode 100644
index 0000000..54de714
--- /dev/null
+++ b/Svrnty.CQRS.Events.Kafka/ServiceCollectionExtensions.cs
@@ -0,0 +1,53 @@
+using Microsoft.Extensions.DependencyInjection;
+using Svrnty.CQRS.Events.Abstractions;
+
+namespace Svrnty.CQRS.Events.Kafka;
+
+///
+/// Extension methods for registering Kafka domain event publishing.
+///
+public static class ServiceCollectionExtensions
+{
+ ///
+ /// Adds Kafka domain event publishing to the service collection.
+ ///
+ /// The service collection.
+ /// Optional configuration action for Kafka options.
+ /// The service collection for chaining.
+ public static IServiceCollection AddKafkaDomainEvents(
+ this IServiceCollection services,
+ Action? configure = null)
+ {
+ if (configure != null)
+ {
+ services.Configure(configure);
+ }
+
+ services.AddSingleton();
+
+ return services;
+ }
+
+ ///
+ /// Adds Kafka domain event publishing with custom topic mapping.
+ ///
+ /// The service collection.
+ /// Custom function to map event type names to topic names.
+ /// Optional configuration action for other Kafka options.
+ /// The service collection for chaining.
+ public static IServiceCollection AddKafkaDomainEvents(
+ this IServiceCollection services,
+ Func topicMapper,
+ Action? configure = null)
+ {
+ services.Configure(options =>
+ {
+ options.TopicMapper = topicMapper;
+ configure?.Invoke(options);
+ });
+
+ services.AddSingleton();
+
+ return services;
+ }
+}
diff --git a/Svrnty.CQRS.Events.Kafka/Svrnty.CQRS.Events.Kafka.csproj b/Svrnty.CQRS.Events.Kafka/Svrnty.CQRS.Events.Kafka.csproj
new file mode 100644
index 0000000..fe13a97
--- /dev/null
+++ b/Svrnty.CQRS.Events.Kafka/Svrnty.CQRS.Events.Kafka.csproj
@@ -0,0 +1,41 @@
+
+
+ net10.0
+ false
+ 14
+ enable
+ enable
+
+ Svrnty
+ David Lebee, Mathias Beaulieu-Duncan
+ icon.png
+ README.md
+ https://git.openharbor.io/svrnty/dotnet-cqrs
+ git
+ true
+ MIT
+ Apache Kafka domain event publishing for Svrnty.CQRS framework
+
+ portable
+ true
+ true
+ true
+ snupkg
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/Svrnty.CQRS.sln b/Svrnty.CQRS.sln
index 37176b1..a3fdff2 100644
--- a/Svrnty.CQRS.sln
+++ b/Svrnty.CQRS.sln
@@ -43,6 +43,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.Abstract
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.RabbitMQ", "Svrnty.CQRS.Events.RabbitMQ\Svrnty.CQRS.Events.RabbitMQ.csproj", "{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.Kafka", "Svrnty.CQRS.Events.Kafka\Svrnty.CQRS.Events.Kafka.csproj", "{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -257,6 +259,18 @@ Global
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x64.Build.0 = Release|Any CPU
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.ActiveCfg = Release|Any CPU
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.Build.0 = Release|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x64.Build.0 = Debug|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x86.Build.0 = Debug|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x64.ActiveCfg = Release|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x64.Build.0 = Release|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x86.ActiveCfg = Release|Any CPU
+ {D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE