Kafka domain event publisher implementing IDomainEventPublisher, sibling to Svrnty.CQRS.Events.RabbitMQ. Uses Confluent.Kafka 2.6.1, targets net10.0 with C# 14. Features: - Configurable bootstrap servers, client id, idempotence, acks, retries - Security protocol + SASL config (Plaintext/SSL/SASL_SSL etc.) - Topic mapper (default lowercase event-type-name, custom func override) - IAsyncDisposable producer cleanup - Two registration overloads via AddKafkaDomainEvents Project added to solution. Builds with 0 warnings. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
206 lines
6.9 KiB
C#
206 lines
6.9 KiB
C#
using System.Text;
|
|
using System.Text.Json;
|
|
using Confluent.Kafka;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.Kafka;
|
|
|
|
/// <summary>
|
|
/// Apache Kafka implementation of the domain event publisher.
|
|
/// </summary>
|
|
public class KafkaDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable
|
|
{
|
|
private readonly KafkaEventOptions _options;
|
|
private readonly ILogger<KafkaDomainEventPublisher> _logger;
|
|
private readonly IProducer<string, string> _producer;
|
|
private bool _disposed;
|
|
|
|
/// <summary>
|
|
/// Creates a new Kafka domain event publisher.
|
|
/// </summary>
|
|
public KafkaDomainEventPublisher(
|
|
IOptions<KafkaEventOptions> options,
|
|
ILogger<KafkaDomainEventPublisher> logger)
|
|
{
|
|
_options = options.Value;
|
|
_logger = logger;
|
|
|
|
var config = BuildProducerConfig();
|
|
_producer = new ProducerBuilder<string, string>(config)
|
|
.SetErrorHandler((_, e) => _logger.LogError("Kafka producer error: {Reason}", e.Reason))
|
|
.SetLogHandler((_, log) => _logger.LogDebug("Kafka: {Message}", log.Message))
|
|
.Build();
|
|
}
|
|
|
|
private ProducerConfig BuildProducerConfig()
|
|
{
|
|
var config = new ProducerConfig
|
|
{
|
|
BootstrapServers = _options.BootstrapServers,
|
|
ClientId = _options.ClientId,
|
|
EnableIdempotence = _options.EnableIdempotence,
|
|
MessageTimeoutMs = _options.MessageTimeoutMs,
|
|
Acks = (Acks)_options.Acks,
|
|
MessageSendMaxRetries = _options.Retries
|
|
};
|
|
|
|
// Configure security if specified
|
|
if (!string.IsNullOrEmpty(_options.SecurityProtocol) &&
|
|
_options.SecurityProtocol != "Plaintext")
|
|
{
|
|
config.SecurityProtocol = Enum.Parse<SecurityProtocol>(_options.SecurityProtocol);
|
|
}
|
|
|
|
if (!string.IsNullOrEmpty(_options.SaslMechanism))
|
|
{
|
|
config.SaslMechanism = Enum.Parse<SaslMechanism>(_options.SaslMechanism);
|
|
config.SaslUsername = _options.SaslUsername;
|
|
config.SaslPassword = _options.SaslPassword;
|
|
}
|
|
|
|
return config;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
|
|
where TEvent : IDomainEvent
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
|
|
var eventTypeName = typeof(TEvent).Name;
|
|
var topic = GetTopicName(eventTypeName);
|
|
var key = GetEventKey(@event);
|
|
var value = JsonSerializer.Serialize(@event);
|
|
|
|
var message = new Message<string, string>
|
|
{
|
|
Key = key,
|
|
Value = value,
|
|
Headers = new Headers
|
|
{
|
|
{ "event-type", Encoding.UTF8.GetBytes(eventTypeName) },
|
|
{ "event-id", Encoding.UTF8.GetBytes(@event.EventId.ToString()) },
|
|
{ "occurred-at", Encoding.UTF8.GetBytes(@event.OccurredAt.ToString("O")) },
|
|
{ "content-type", Encoding.UTF8.GetBytes("application/json") }
|
|
}
|
|
};
|
|
|
|
try
|
|
{
|
|
var result = await _producer.ProduceAsync(topic, message, cancellationToken);
|
|
|
|
_logger.LogDebug(
|
|
"Published event {EventType} with ID {EventId} to topic {Topic}, partition {Partition}, offset {Offset}",
|
|
eventTypeName,
|
|
@event.EventId,
|
|
topic,
|
|
result.Partition.Value,
|
|
result.Offset.Value);
|
|
}
|
|
catch (ProduceException<string, string> ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Failed to publish event {EventType} with ID {EventId} to topic {Topic}: {Error}",
|
|
eventTypeName,
|
|
@event.EventId,
|
|
topic,
|
|
ex.Error.Reason);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
private string GetTopicName(string eventTypeName)
|
|
{
|
|
// Use custom mapper if provided
|
|
if (_options.TopicMapper != null)
|
|
{
|
|
return _options.TopicMapper(eventTypeName);
|
|
}
|
|
|
|
// Default mapping: group events by category
|
|
// e.g., RequestCreatedEvent -> {prefix}.requests
|
|
// e.g., ContainerSpawnedEvent -> {prefix}.containers
|
|
var category = GetEventCategory(eventTypeName);
|
|
return $"{_options.TopicPrefix}.{category}";
|
|
}
|
|
|
|
private static string GetEventCategory(string eventTypeName)
|
|
{
|
|
// Remove "Event" suffix
|
|
var name = eventTypeName.Replace("Event", "");
|
|
|
|
// Map to categories based on prefix
|
|
return name switch
|
|
{
|
|
var n when n.StartsWith("Request") => "requests",
|
|
var n when n.StartsWith("Container") => "containers",
|
|
var n when n.StartsWith("Plan") => "plans",
|
|
var n when n.StartsWith("Project") => "projects",
|
|
var n when n.StartsWith("Solution") => "solutions",
|
|
var n when n.StartsWith("Claude") || n.StartsWith("Session") => "sessions",
|
|
var n when n.StartsWith("Worker") => "workers",
|
|
var n when n.StartsWith("Deployment") => "deployments",
|
|
_ => "events" // Default catch-all topic
|
|
};
|
|
}
|
|
|
|
private static string GetEventKey<TEvent>(TEvent @event) where TEvent : IDomainEvent
|
|
{
|
|
// Use reflection to find the best partition key
|
|
// Priority: RequestId > ProjectId > SolutionId > EventId
|
|
var type = typeof(TEvent);
|
|
|
|
// Try RequestId first (most common for request-scoped events)
|
|
var requestIdProp = type.GetProperty("RequestId");
|
|
if (requestIdProp?.GetValue(@event) is Guid requestId && requestId != Guid.Empty)
|
|
{
|
|
return requestId.ToString();
|
|
}
|
|
|
|
// Try ContainerId
|
|
var containerIdProp = type.GetProperty("ContainerId");
|
|
if (containerIdProp?.GetValue(@event) is Guid containerId && containerId != Guid.Empty)
|
|
{
|
|
return containerId.ToString();
|
|
}
|
|
|
|
// Try ProjectId
|
|
var projectIdProp = type.GetProperty("ProjectId");
|
|
if (projectIdProp?.GetValue(@event) is Guid projectId && projectId != Guid.Empty)
|
|
{
|
|
return projectId.ToString();
|
|
}
|
|
|
|
// Try SolutionId
|
|
var solutionIdProp = type.GetProperty("SolutionId");
|
|
if (solutionIdProp?.GetValue(@event) is Guid solutionId && solutionId != Guid.Empty)
|
|
{
|
|
return solutionId.ToString();
|
|
}
|
|
|
|
// Fall back to EventId
|
|
return @event.EventId.ToString();
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed)
|
|
{
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
|
|
_disposed = true;
|
|
|
|
// Flush pending messages with timeout
|
|
_producer.Flush(TimeSpan.FromSeconds(10));
|
|
_producer.Dispose();
|
|
|
|
_logger.LogDebug("Kafka domain event publisher disposed");
|
|
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
}
|