266 lines
9.5 KiB
C#
266 lines
9.5 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.RabbitMQ.Configuration;
|
|
using System.Collections.Generic;
|
|
|
|
namespace Svrnty.CQRS.Events.RabbitMQ.Configuration;
|
|
|
|
/// <summary>
|
|
/// Configuration for RabbitMQ event delivery provider.
|
|
/// </summary>
|
|
public sealed class RabbitMQConfiguration
|
|
{
|
|
/// <summary>
|
|
/// Gets or sets the RabbitMQ connection string.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Format: amqp://username:password@hostname:port/virtualhost
|
|
/// Example: amqp://guest:guest@localhost:5672/
|
|
/// </remarks>
|
|
public string ConnectionString { get; set; } = "amqp://guest:guest@localhost:5672/";
|
|
|
|
/// <summary>
|
|
/// Gets or sets the exchange name prefix.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// The actual exchange name will be: {ExchangePrefix}.{stream-name}
|
|
/// Example: "myapp" results in "myapp.user-events"
|
|
/// Default: Empty string (no prefix)
|
|
/// </remarks>
|
|
public string ExchangePrefix { get; set; } = string.Empty;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the default exchange type.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Supported values: "topic", "fanout", "direct", "headers"
|
|
/// Default: "topic" (recommended for event streaming)
|
|
/// </remarks>
|
|
public string DefaultExchangeType { get; set; } = "topic";
|
|
|
|
/// <summary>
|
|
/// Gets or sets the default routing key strategy.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <list type="bullet">
|
|
/// <item><term>EventType</term><description>Route by event type name (e.g., "UserCreatedEvent")</description></item>
|
|
/// <item><term>StreamName</term><description>Route by stream name (e.g., "user-events")</description></item>
|
|
/// <item><term>Wildcard</term><description>Route to all consumers (use "#" routing key for topic exchange)</description></item>
|
|
/// </list>
|
|
/// Default: "EventType"
|
|
/// </remarks>
|
|
public string DefaultRoutingKeyStrategy { get; set; } = "EventType";
|
|
|
|
/// <summary>
|
|
/// Gets or sets whether to automatically declare exchanges and queues.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: true (recommended for development)
|
|
/// Set to false in production if topology is managed externally.
|
|
/// </remarks>
|
|
public bool AutoDeclareTopology { get; set; } = true;
|
|
|
|
/// <summary>
|
|
/// Gets or sets whether exchanges should be durable (survive broker restart).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: true (recommended for production)
|
|
/// </remarks>
|
|
public bool DurableExchanges { get; set; } = true;
|
|
|
|
/// <summary>
|
|
/// Gets or sets whether queues should be durable.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: true (recommended for production)
|
|
/// </remarks>
|
|
public bool DurableQueues { get; set; } = true;
|
|
|
|
/// <summary>
|
|
/// Gets or sets whether messages should be persistent.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: true (messages survive broker restart)
|
|
/// Set to false for fire-and-forget scenarios.
|
|
/// </remarks>
|
|
public bool PersistentMessages { get; set; } = true;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the prefetch count for consumers.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Number of unacknowledged messages each consumer can receive.
|
|
/// Higher values = better throughput, more memory usage.
|
|
/// Default: 10
|
|
/// </remarks>
|
|
public ushort PrefetchCount { get; set; } = 10;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the maximum number of connection retry attempts.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: 5
|
|
/// Set to 0 to disable retries.
|
|
/// </remarks>
|
|
public int MaxConnectionRetries { get; set; } = 5;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the delay between connection retry attempts.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: 5 seconds
|
|
/// Exponential backoff is applied.
|
|
/// </remarks>
|
|
public TimeSpan ConnectionRetryDelay { get; set; } = TimeSpan.FromSeconds(5);
|
|
|
|
/// <summary>
|
|
/// Gets or sets the maximum number of publish retry attempts.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: 3
|
|
/// Set to 0 to disable retries.
|
|
/// </remarks>
|
|
public int MaxPublishRetries { get; set; } = 3;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the delay between publish retry attempts.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: 1 second
|
|
/// </remarks>
|
|
public TimeSpan PublishRetryDelay { get; set; } = TimeSpan.FromSeconds(1);
|
|
|
|
/// <summary>
|
|
/// Gets or sets whether to enable publisher confirms.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// When enabled, RabbitMQ confirms message receipt.
|
|
/// Slower but more reliable.
|
|
/// Default: true (recommended for production)
|
|
/// </remarks>
|
|
public bool EnablePublisherConfirms { get; set; } = true;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the timeout for publisher confirms.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: 5 seconds
|
|
/// </remarks>
|
|
public TimeSpan PublisherConfirmTimeout { get; set; } = TimeSpan.FromSeconds(5);
|
|
|
|
/// <summary>
|
|
/// Gets or sets the heartbeat interval for connection health checks.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: 60 seconds
|
|
/// RabbitMQ will close connections that don't send data within 2x heartbeat.
|
|
/// </remarks>
|
|
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(60);
|
|
|
|
/// <summary>
|
|
/// Gets or sets whether to automatically recover from connection failures.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: true (recommended)
|
|
/// </remarks>
|
|
public bool AutoRecovery { get; set; } = true;
|
|
|
|
/// <summary>
|
|
/// Gets or sets the interval between recovery attempts.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Default: 10 seconds
|
|
/// </remarks>
|
|
public TimeSpan RecoveryInterval { get; set; } = TimeSpan.FromSeconds(10);
|
|
|
|
/// <summary>
|
|
/// Gets or sets additional connection properties.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// These are sent to RabbitMQ during connection handshake.
|
|
/// Useful for debugging and monitoring.
|
|
/// </remarks>
|
|
public Dictionary<string, string> ConnectionProperties { get; set; } = new()
|
|
{
|
|
{ "connection_name", "Svrnty.CQRS.Events" },
|
|
{ "product", "Svrnty.CQRS" }
|
|
};
|
|
|
|
/// <summary>
|
|
/// Gets or sets the dead letter exchange name for failed messages.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// If null, no dead letter exchange is configured.
|
|
/// Example: "dlx.events"
|
|
/// </remarks>
|
|
public string? DeadLetterExchange { get; set; }
|
|
|
|
/// <summary>
|
|
/// Gets or sets the time-to-live for messages in queues.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// If null, messages don't expire.
|
|
/// Example: TimeSpan.FromDays(7)
|
|
/// </remarks>
|
|
public TimeSpan? MessageTTL { get; set; }
|
|
|
|
/// <summary>
|
|
/// Gets or sets the maximum queue length.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// If null, no limit.
|
|
/// Oldest messages are dropped when limit is reached.
|
|
/// </remarks>
|
|
public int? MaxQueueLength { get; set; }
|
|
|
|
/// <summary>
|
|
/// Validates the configuration.
|
|
/// </summary>
|
|
/// <exception cref="InvalidOperationException">Thrown if configuration is invalid.</exception>
|
|
public void Validate()
|
|
{
|
|
if (string.IsNullOrWhiteSpace(ConnectionString))
|
|
throw new InvalidOperationException("ConnectionString cannot be null or whitespace.");
|
|
|
|
if (!Uri.TryCreate(ConnectionString, UriKind.Absolute, out var uri) || uri.Scheme != "amqp" && uri.Scheme != "amqps")
|
|
throw new InvalidOperationException("ConnectionString must be a valid AMQP URI (amqp:// or amqps://).");
|
|
|
|
var validExchangeTypes = new[] { "topic", "fanout", "direct", "headers" };
|
|
if (!validExchangeTypes.Contains(DefaultExchangeType.ToLowerInvariant()))
|
|
throw new InvalidOperationException($"DefaultExchangeType must be one of: {string.Join(", ", validExchangeTypes)}");
|
|
|
|
var validRoutingStrategies = new[] { "EventType", "StreamName", "Wildcard" };
|
|
if (!validRoutingStrategies.Contains(DefaultRoutingKeyStrategy))
|
|
throw new InvalidOperationException($"DefaultRoutingKeyStrategy must be one of: {string.Join(", ", validRoutingStrategies)}");
|
|
|
|
if (PrefetchCount == 0)
|
|
throw new InvalidOperationException("PrefetchCount must be greater than 0.");
|
|
|
|
if (MaxConnectionRetries < 0)
|
|
throw new InvalidOperationException("MaxConnectionRetries cannot be negative.");
|
|
|
|
if (MaxPublishRetries < 0)
|
|
throw new InvalidOperationException("MaxPublishRetries cannot be negative.");
|
|
|
|
if (ConnectionRetryDelay <= TimeSpan.Zero)
|
|
throw new InvalidOperationException("ConnectionRetryDelay must be positive.");
|
|
|
|
if (PublishRetryDelay <= TimeSpan.Zero)
|
|
throw new InvalidOperationException("PublishRetryDelay must be positive.");
|
|
|
|
if (PublisherConfirmTimeout <= TimeSpan.Zero)
|
|
throw new InvalidOperationException("PublisherConfirmTimeout must be positive.");
|
|
|
|
if (HeartbeatInterval <= TimeSpan.Zero)
|
|
throw new InvalidOperationException("HeartbeatInterval must be positive.");
|
|
|
|
if (RecoveryInterval <= TimeSpan.Zero)
|
|
throw new InvalidOperationException("RecoveryInterval must be positive.");
|
|
|
|
if (MessageTTL.HasValue && MessageTTL.Value <= TimeSpan.Zero)
|
|
throw new InvalidOperationException("MessageTTL must be positive if specified.");
|
|
|
|
if (MaxQueueLength.HasValue && MaxQueueLength.Value <= 0)
|
|
throw new InvalidOperationException("MaxQueueLength must be positive if specified.");
|
|
}
|
|
}
|