Add Svrnty.CQRS.Events.Kafka package

Kafka domain event publisher implementing IDomainEventPublisher,
sibling to Svrnty.CQRS.Events.RabbitMQ. Uses Confluent.Kafka 2.6.1,
targets net10.0 with C# 14.

Features:
- Configurable bootstrap servers, client id, idempotence, acks, retries
- Security protocol + SASL config (Plaintext/SSL/SASL_SSL etc.)
- Topic mapper (default lowercase event-type-name, custom func override)
- IAsyncDisposable producer cleanup
- Two registration overloads via AddKafkaDomainEvents

Project added to solution. Builds with 0 warnings.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mathias Beaulieu-Duncan 2026-04-20 19:20:49 -04:00
parent 55f1324286
commit 7fc680cd93
5 changed files with 385 additions and 0 deletions

View File

@ -0,0 +1,205 @@
using System.Text;
using System.Text.Json;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.Kafka;
/// <summary>
/// Apache Kafka implementation of the domain event publisher.
/// </summary>
public class KafkaDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable
{
private readonly KafkaEventOptions _options;
private readonly ILogger<KafkaDomainEventPublisher> _logger;
private readonly IProducer<string, string> _producer;
private bool _disposed;
/// <summary>
/// Creates a new Kafka domain event publisher.
/// </summary>
public KafkaDomainEventPublisher(
IOptions<KafkaEventOptions> options,
ILogger<KafkaDomainEventPublisher> logger)
{
_options = options.Value;
_logger = logger;
var config = BuildProducerConfig();
_producer = new ProducerBuilder<string, string>(config)
.SetErrorHandler((_, e) => _logger.LogError("Kafka producer error: {Reason}", e.Reason))
.SetLogHandler((_, log) => _logger.LogDebug("Kafka: {Message}", log.Message))
.Build();
}
private ProducerConfig BuildProducerConfig()
{
var config = new ProducerConfig
{
BootstrapServers = _options.BootstrapServers,
ClientId = _options.ClientId,
EnableIdempotence = _options.EnableIdempotence,
MessageTimeoutMs = _options.MessageTimeoutMs,
Acks = (Acks)_options.Acks,
MessageSendMaxRetries = _options.Retries
};
// Configure security if specified
if (!string.IsNullOrEmpty(_options.SecurityProtocol) &&
_options.SecurityProtocol != "Plaintext")
{
config.SecurityProtocol = Enum.Parse<SecurityProtocol>(_options.SecurityProtocol);
}
if (!string.IsNullOrEmpty(_options.SaslMechanism))
{
config.SaslMechanism = Enum.Parse<SaslMechanism>(_options.SaslMechanism);
config.SaslUsername = _options.SaslUsername;
config.SaslPassword = _options.SaslPassword;
}
return config;
}
/// <inheritdoc />
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
where TEvent : IDomainEvent
{
ObjectDisposedException.ThrowIf(_disposed, this);
var eventTypeName = typeof(TEvent).Name;
var topic = GetTopicName(eventTypeName);
var key = GetEventKey(@event);
var value = JsonSerializer.Serialize(@event);
var message = new Message<string, string>
{
Key = key,
Value = value,
Headers = new Headers
{
{ "event-type", Encoding.UTF8.GetBytes(eventTypeName) },
{ "event-id", Encoding.UTF8.GetBytes(@event.EventId.ToString()) },
{ "occurred-at", Encoding.UTF8.GetBytes(@event.OccurredAt.ToString("O")) },
{ "content-type", Encoding.UTF8.GetBytes("application/json") }
}
};
try
{
var result = await _producer.ProduceAsync(topic, message, cancellationToken);
_logger.LogDebug(
"Published event {EventType} with ID {EventId} to topic {Topic}, partition {Partition}, offset {Offset}",
eventTypeName,
@event.EventId,
topic,
result.Partition.Value,
result.Offset.Value);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex,
"Failed to publish event {EventType} with ID {EventId} to topic {Topic}: {Error}",
eventTypeName,
@event.EventId,
topic,
ex.Error.Reason);
throw;
}
}
private string GetTopicName(string eventTypeName)
{
// Use custom mapper if provided
if (_options.TopicMapper != null)
{
return _options.TopicMapper(eventTypeName);
}
// Default mapping: group events by category
// e.g., RequestCreatedEvent -> {prefix}.requests
// e.g., ContainerSpawnedEvent -> {prefix}.containers
var category = GetEventCategory(eventTypeName);
return $"{_options.TopicPrefix}.{category}";
}
private static string GetEventCategory(string eventTypeName)
{
// Remove "Event" suffix
var name = eventTypeName.Replace("Event", "");
// Map to categories based on prefix
return name switch
{
var n when n.StartsWith("Request") => "requests",
var n when n.StartsWith("Container") => "containers",
var n when n.StartsWith("Plan") => "plans",
var n when n.StartsWith("Project") => "projects",
var n when n.StartsWith("Solution") => "solutions",
var n when n.StartsWith("Claude") || n.StartsWith("Session") => "sessions",
var n when n.StartsWith("Worker") => "workers",
var n when n.StartsWith("Deployment") => "deployments",
_ => "events" // Default catch-all topic
};
}
private static string GetEventKey<TEvent>(TEvent @event) where TEvent : IDomainEvent
{
// Use reflection to find the best partition key
// Priority: RequestId > ProjectId > SolutionId > EventId
var type = typeof(TEvent);
// Try RequestId first (most common for request-scoped events)
var requestIdProp = type.GetProperty("RequestId");
if (requestIdProp?.GetValue(@event) is Guid requestId && requestId != Guid.Empty)
{
return requestId.ToString();
}
// Try ContainerId
var containerIdProp = type.GetProperty("ContainerId");
if (containerIdProp?.GetValue(@event) is Guid containerId && containerId != Guid.Empty)
{
return containerId.ToString();
}
// Try ProjectId
var projectIdProp = type.GetProperty("ProjectId");
if (projectIdProp?.GetValue(@event) is Guid projectId && projectId != Guid.Empty)
{
return projectId.ToString();
}
// Try SolutionId
var solutionIdProp = type.GetProperty("SolutionId");
if (solutionIdProp?.GetValue(@event) is Guid solutionId && solutionId != Guid.Empty)
{
return solutionId.ToString();
}
// Fall back to EventId
return @event.EventId.ToString();
}
/// <inheritdoc />
public ValueTask DisposeAsync()
{
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
// Flush pending messages with timeout
_producer.Flush(TimeSpan.FromSeconds(10));
_producer.Dispose();
_logger.LogDebug("Kafka domain event publisher disposed");
return ValueTask.CompletedTask;
}
}

View File

@ -0,0 +1,72 @@
namespace Svrnty.CQRS.Events.Kafka;
/// <summary>
/// Configuration options for Kafka domain event publishing.
/// </summary>
public class KafkaEventOptions
{
/// <summary>
/// Kafka bootstrap servers. Default: localhost:9092
/// </summary>
public string BootstrapServers { get; set; } = "localhost:9092";
/// <summary>
/// Prefix for Kafka topics. Default: domain
/// Events will be published to topics like {TopicPrefix}.{category}
/// </summary>
public string TopicPrefix { get; set; } = "domain";
/// <summary>
/// Client identifier for Kafka producer. Default: cqrs-events
/// </summary>
public string ClientId { get; set; } = "cqrs-events";
/// <summary>
/// Enable idempotent producer for exactly-once semantics. Default: true
/// </summary>
public bool EnableIdempotence { get; set; } = true;
/// <summary>
/// Message timeout in milliseconds. Default: 30000 (30 seconds)
/// </summary>
public int MessageTimeoutMs { get; set; } = 30000;
/// <summary>
/// Security protocol. Default: Plaintext
/// Options: Plaintext, Ssl, SaslPlaintext, SaslSsl
/// </summary>
public string SecurityProtocol { get; set; } = "Plaintext";
/// <summary>
/// SASL mechanism for authentication. Optional.
/// Options: Plain, ScramSha256, ScramSha512
/// </summary>
public string? SaslMechanism { get; set; }
/// <summary>
/// SASL username for authentication. Optional.
/// </summary>
public string? SaslUsername { get; set; }
/// <summary>
/// SASL password for authentication. Optional.
/// </summary>
public string? SaslPassword { get; set; }
/// <summary>
/// Number of acknowledgements required. Default: All (-1)
/// Options: None (0), Leader (1), All (-1)
/// </summary>
public int Acks { get; set; } = -1;
/// <summary>
/// Maximum number of retries. Default: 3
/// </summary>
public int Retries { get; set; } = 3;
/// <summary>
/// Custom topic mapping function. If not set, default mapping is used.
/// Maps event type name to topic name.
/// </summary>
public Func<string, string>? TopicMapper { get; set; }
}

View File

@ -0,0 +1,53 @@
using Microsoft.Extensions.DependencyInjection;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.Kafka;
/// <summary>
/// Extension methods for registering Kafka domain event publishing.
/// </summary>
public static class ServiceCollectionExtensions
{
/// <summary>
/// Adds Kafka domain event publishing to the service collection.
/// </summary>
/// <param name="services">The service collection.</param>
/// <param name="configure">Optional configuration action for Kafka options.</param>
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddKafkaDomainEvents(
this IServiceCollection services,
Action<KafkaEventOptions>? configure = null)
{
if (configure != null)
{
services.Configure(configure);
}
services.AddSingleton<IDomainEventPublisher, KafkaDomainEventPublisher>();
return services;
}
/// <summary>
/// Adds Kafka domain event publishing with custom topic mapping.
/// </summary>
/// <param name="services">The service collection.</param>
/// <param name="topicMapper">Custom function to map event type names to topic names.</param>
/// <param name="configure">Optional configuration action for other Kafka options.</param>
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddKafkaDomainEvents(
this IServiceCollection services,
Func<string, string> topicMapper,
Action<KafkaEventOptions>? configure = null)
{
services.Configure<KafkaEventOptions>(options =>
{
options.TopicMapper = topicMapper;
configure?.Invoke(options);
});
services.AddSingleton<IDomainEventPublisher, KafkaDomainEventPublisher>();
return services;
}
}

View File

@ -0,0 +1,41 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<IsAotCompatible>false</IsAotCompatible>
<LangVersion>14</LangVersion>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<Company>Svrnty</Company>
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
<PackageIcon>icon.png</PackageIcon>
<PackageReadmeFile>README.md</PackageReadmeFile>
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<Description>Apache Kafka domain event publishing for Svrnty.CQRS framework</Description>
<DebugType>portable</DebugType>
<DebugSymbols>true</DebugSymbols>
<IncludeSymbols>true</IncludeSymbols>
<IncludeSource>true</IncludeSource>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>
<ItemGroup>
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Svrnty.CQRS.Events.Abstractions\Svrnty.CQRS.Events.Abstractions.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.6.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
</ItemGroup>
</Project>

View File

@ -43,6 +43,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.Abstract
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.RabbitMQ", "Svrnty.CQRS.Events.RabbitMQ\Svrnty.CQRS.Events.RabbitMQ.csproj", "{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.Kafka", "Svrnty.CQRS.Events.Kafka\Svrnty.CQRS.Events.Kafka.csproj", "{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -257,6 +259,18 @@ Global
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x64.Build.0 = Release|Any CPU
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.ActiveCfg = Release|Any CPU
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.Build.0 = Release|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x64.ActiveCfg = Debug|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x64.Build.0 = Debug|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x86.ActiveCfg = Debug|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Debug|x86.Build.0 = Debug|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|Any CPU.Build.0 = Release|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x64.ActiveCfg = Release|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x64.Build.0 = Release|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x86.ActiveCfg = Release|Any CPU
{D367A93B-2DE7-41AC-9BF1-A49AC4617AFD}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE