418 lines
17 KiB
C#
418 lines
17 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Replay;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Migration;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Retention;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Stores;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.Streaming;
|
|
using Svrnty.CQRS.Events.Abstractions.Replay;
|
|
using Svrnty.CQRS.Events.Abstractions.Schema;
|
|
using Svrnty.CQRS.Events.Abstractions.Storage;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using Svrnty.CQRS.Events.Abstractions.Configuration;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
using Svrnty.CQRS.Events.Abstractions.Projections;
|
|
using Svrnty.CQRS.Events.Abstractions.Sagas;
|
|
|
|
namespace Svrnty.CQRS.Events.PostgreSQL;
|
|
|
|
/// <summary>
|
|
/// Extension methods for registering PostgreSQL event streaming services.
|
|
/// </summary>
|
|
public static class ServiceCollectionExtensions
|
|
{
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based event stream storage.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <param name="configure">Configuration action for PostgreSQL options.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddPostgresEventStreaming(options =>
|
|
/// {
|
|
/// options.ConnectionString = "Host=localhost;Database=mydb;Username=user;Password=pass";
|
|
/// options.SchemaName = "event_streaming";
|
|
/// options.AutoMigrate = true;
|
|
/// });
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresEventStreaming(
|
|
this IServiceCollection services,
|
|
Action<PostgresEventStreamStoreOptions> configure)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
if (configure == null)
|
|
throw new ArgumentNullException(nameof(configure));
|
|
|
|
// Configure options
|
|
services.Configure(configure);
|
|
|
|
// Register PostgresEventStreamStore as IEventStreamStore
|
|
services.Replace(ServiceDescriptor.Singleton<IEventStreamStore, PostgresEventStreamStore>());
|
|
|
|
// Phase 3.1: Register idempotency store for exactly-once delivery
|
|
services.Replace(ServiceDescriptor.Singleton<IIdempotencyStore, PostgresIdempotencyStore>());
|
|
|
|
// Phase 3.3: Register read receipt store for consumer progress tracking
|
|
services.Replace(ServiceDescriptor.Singleton<IReadReceiptStore, PostgresReadReceiptStore>());
|
|
|
|
// Register DatabaseMigrator
|
|
services.AddSingleton<DatabaseMigrator>();
|
|
|
|
// Run migrations on startup (if AutoMigrate is enabled)
|
|
services.AddHostedService<MigrationHostedService>();
|
|
|
|
return services;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based event stream storage with connection string.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <param name="connectionString">PostgreSQL connection string.</param>
|
|
/// <param name="configure">Optional additional configuration.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddPostgresEventStreaming(
|
|
/// "Host=localhost;Database=mydb;Username=user;Password=pass");
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresEventStreaming(
|
|
this IServiceCollection services,
|
|
string connectionString,
|
|
Action<PostgresEventStreamStoreOptions>? configure = null)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
if (string.IsNullOrWhiteSpace(connectionString))
|
|
throw new ArgumentException("Connection string cannot be null or whitespace.", nameof(connectionString));
|
|
|
|
return services.AddPostgresEventStreaming(options =>
|
|
{
|
|
options.ConnectionString = connectionString;
|
|
configure?.Invoke(options);
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based event stream storage from configuration.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <param name="configuration">Configuration section containing PostgreSQL options.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <remarks>
|
|
/// Expects a configuration section with the following structure:
|
|
/// <code>
|
|
/// {
|
|
/// "ConnectionString": "Host=localhost;Database=mydb;...",
|
|
/// "SchemaName": "event_streaming",
|
|
/// "AutoMigrate": true
|
|
/// }
|
|
/// </code>
|
|
/// </remarks>
|
|
public static IServiceCollection AddPostgresEventStreaming(
|
|
this IServiceCollection services,
|
|
IConfiguration configuration)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
if (configuration == null)
|
|
throw new ArgumentNullException(nameof(configuration));
|
|
|
|
services.Configure<PostgresEventStreamStoreOptions>(configuration);
|
|
|
|
// Register PostgresEventStreamStore as IEventStreamStore
|
|
services.Replace(ServiceDescriptor.Singleton<IEventStreamStore, PostgresEventStreamStore>());
|
|
|
|
return services;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based retention policy storage and background service.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <param name="configure">Configuration action for retention service options.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddPostgresRetentionPolicies(options =>
|
|
/// {
|
|
/// options.Enabled = true;
|
|
/// options.CleanupInterval = TimeSpan.FromHours(1);
|
|
/// options.CleanupWindowStart = TimeSpan.FromHours(2);
|
|
/// options.CleanupWindowEnd = TimeSpan.FromHours(6);
|
|
/// options.UseCleanupWindow = true;
|
|
/// });
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresRetentionPolicies(
|
|
this IServiceCollection services,
|
|
Action<RetentionServiceOptions>? configure = null)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
|
|
// Configure options if provided
|
|
if (configure != null)
|
|
{
|
|
services.Configure(configure);
|
|
}
|
|
|
|
// Register PostgresRetentionPolicyStore as IRetentionPolicyStore
|
|
services.Replace(ServiceDescriptor.Singleton<IRetentionPolicyStore, PostgresRetentionPolicyStore>());
|
|
|
|
// Register the background service
|
|
services.AddHostedService<RetentionPolicyService>();
|
|
|
|
return services;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based retention policy storage and background service from configuration.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <param name="configuration">Configuration section containing retention service options.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <remarks>
|
|
/// Expects a configuration section with the following structure:
|
|
/// <code>
|
|
/// {
|
|
/// "Enabled": true,
|
|
/// "CleanupInterval": "01:00:00",
|
|
/// "CleanupWindowStart": "02:00:00",
|
|
/// "CleanupWindowEnd": "06:00:00",
|
|
/// "UseCleanupWindow": true
|
|
/// }
|
|
/// </code>
|
|
/// </remarks>
|
|
public static IServiceCollection AddPostgresRetentionPolicies(
|
|
this IServiceCollection services,
|
|
IConfiguration configuration)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
if (configuration == null)
|
|
throw new ArgumentNullException(nameof(configuration));
|
|
|
|
services.Configure<RetentionServiceOptions>(configuration);
|
|
|
|
// Register PostgresRetentionPolicyStore as IRetentionPolicyStore
|
|
services.Replace(ServiceDescriptor.Singleton<IRetentionPolicyStore, PostgresRetentionPolicyStore>());
|
|
|
|
// Register the background service
|
|
services.AddHostedService<RetentionPolicyService>();
|
|
|
|
return services;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based event replay service.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddPostgresEventReplay();
|
|
///
|
|
/// // Then use in your code:
|
|
/// var replayService = serviceProvider.GetRequiredService<IEventReplayService>();
|
|
/// await foreach (var @event in replayService.ReplayFromOffsetAsync("orders", 1000))
|
|
/// {
|
|
/// await ProcessEventAsync(@event);
|
|
/// }
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresEventReplay(this IServiceCollection services)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
|
|
// Register PostgresEventReplayService as IEventReplayService
|
|
services.Replace(ServiceDescriptor.Singleton<IEventReplayService, PostgresEventReplayService>());
|
|
|
|
return services;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based stream configuration store and provider.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddPostgresStreamConfiguration();
|
|
///
|
|
/// // Then use in your code:
|
|
/// var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();
|
|
/// var config = new StreamConfiguration
|
|
/// {
|
|
/// StreamName = "orders",
|
|
/// Retention = new RetentionConfiguration
|
|
/// {
|
|
/// MaxAge = TimeSpan.FromDays(90)
|
|
/// }
|
|
/// };
|
|
/// await configStore.SetConfigurationAsync(config);
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresStreamConfiguration(this IServiceCollection services)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
|
|
// Register PostgresStreamConfigurationStore as IStreamConfigurationStore
|
|
services.Replace(ServiceDescriptor.Singleton<IStreamConfigurationStore, PostgresStreamConfigurationStore>());
|
|
|
|
// Register PostgresStreamConfigurationProvider as IStreamConfigurationProvider
|
|
services.Replace(ServiceDescriptor.Singleton<IStreamConfigurationProvider, PostgresStreamConfigurationProvider>());
|
|
|
|
return services;
|
|
}
|
|
|
|
// ========================================================================
|
|
// Phase 5: Schema Evolution & Versioning
|
|
// ========================================================================
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based schema store for event versioning.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// This replaces the in-memory schema store with a PostgreSQL-backed implementation.
|
|
/// Schemas will be persisted across application restarts.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Prerequisites:</strong>
|
|
/// - Call <c>AddSchemaEvolution()</c> first to register core schema services
|
|
/// - Ensure PostgreSQL connection is configured via <c>AddPostgresEventStreaming()</c>
|
|
/// </para>
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddSchemaEvolution(); // Register core services
|
|
/// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;...");
|
|
/// services.AddPostgresSchemaStore(); // Use PostgreSQL for schema storage
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresSchemaStore(this IServiceCollection services)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
|
|
// Replace in-memory schema store with PostgreSQL implementation
|
|
// Use factory to get connection string and schema name from options
|
|
services.Replace(ServiceDescriptor.Singleton<ISchemaStore>(sp =>
|
|
{
|
|
var options = sp.GetRequiredService<Microsoft.Extensions.Options.IOptions<PostgresEventStreamStoreOptions>>().Value;
|
|
var logger = sp.GetRequiredService<Microsoft.Extensions.Logging.ILogger<PostgresSchemaStore>>();
|
|
return new PostgresSchemaStore(options.ConnectionString, options.SchemaName, logger);
|
|
}));
|
|
|
|
return services;
|
|
}
|
|
|
|
// ========================================================================
|
|
// Phase 7: Advanced Features - Event Sourcing Projections
|
|
// ========================================================================
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based projection checkpoint store.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// This replaces the in-memory projection checkpoint store with a PostgreSQL-backed implementation.
|
|
/// Projection progress will be persisted across application restarts.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Prerequisites:</strong>
|
|
/// - Call <c>AddProjections()</c> first to register core projection services
|
|
/// - Ensure PostgreSQL connection is configured via <c>AddPostgresEventStreaming()</c>
|
|
/// </para>
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddProjections(); // Register core projection services
|
|
/// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;...");
|
|
/// services.AddPostgresProjectionCheckpointStore(); // Use PostgreSQL for checkpoints
|
|
///
|
|
/// // Register a projection
|
|
/// services.AddProjection<UserStatisticsProjection, UserRegisteredEvent>(
|
|
/// projectionName: "user-statistics",
|
|
/// streamName: "user-events",
|
|
/// configure: options =>
|
|
/// {
|
|
/// options.BatchSize = 100;
|
|
/// options.AutoStart = true;
|
|
/// });
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresProjectionCheckpointStore(this IServiceCollection services)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
|
|
// Replace in-memory checkpoint store with PostgreSQL implementation
|
|
services.Replace(ServiceDescriptor.Singleton<IProjectionCheckpointStore, PostgresProjectionCheckpointStore>());
|
|
|
|
return services;
|
|
}
|
|
|
|
// ========================================================================
|
|
// Phase 7: Advanced Features - Saga Orchestration
|
|
// ========================================================================
|
|
|
|
/// <summary>
|
|
/// Registers PostgreSQL-based saga state store.
|
|
/// </summary>
|
|
/// <param name="services">The service collection.</param>
|
|
/// <returns>The service collection for method chaining.</returns>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// This replaces the in-memory saga state store with a PostgreSQL-backed implementation.
|
|
/// Saga state will be persisted across application restarts, enabling long-running workflows.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Prerequisites:</strong>
|
|
/// - Call <c>AddSagaOrchestration()</c> first to register core saga services
|
|
/// - Ensure PostgreSQL connection is configured via <c>AddPostgresEventStreaming()</c>
|
|
/// </para>
|
|
/// </remarks>
|
|
/// <example>
|
|
/// <code>
|
|
/// services.AddSagaOrchestration(useInMemoryStateStore: false); // Register core saga services
|
|
/// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;...");
|
|
/// services.AddPostgresSagaStateStore(); // Use PostgreSQL for saga state
|
|
///
|
|
/// // Register a saga
|
|
/// services.AddSaga<OrderFulfillmentSaga>(
|
|
/// sagaName: "order-fulfillment",
|
|
/// configure: definition =>
|
|
/// {
|
|
/// definition.AddStep("ReserveInventory", ReserveInventoryAsync, CompensateReserveInventoryAsync);
|
|
/// definition.AddStep("ProcessPayment", ProcessPaymentAsync, CompensateProcessPaymentAsync);
|
|
/// definition.AddStep("ShipOrder", ShipOrderAsync, CompensateShipOrderAsync);
|
|
/// });
|
|
/// </code>
|
|
/// </example>
|
|
public static IServiceCollection AddPostgresSagaStateStore(this IServiceCollection services)
|
|
{
|
|
if (services == null)
|
|
throw new ArgumentNullException(nameof(services));
|
|
|
|
// Replace in-memory state store with PostgreSQL implementation
|
|
services.Replace(ServiceDescriptor.Singleton<ISagaStateStore, PostgresSagaStateStore>());
|
|
|
|
return services;
|
|
}
|
|
}
|