using System;
using Svrnty.CQRS.Events.PostgreSQL.Replay;
using Svrnty.CQRS.Events.PostgreSQL.Migration;
using Svrnty.CQRS.Events.PostgreSQL.Retention;
using Svrnty.CQRS.Events.PostgreSQL.Stores;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Streaming;
using Svrnty.CQRS.Events.Abstractions.Replay;
using Svrnty.CQRS.Events.Abstractions.Schema;
using Svrnty.CQRS.Events.Abstractions.Storage;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using Svrnty.CQRS.Events.Abstractions.Configuration;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Svrnty.CQRS.Events.Abstractions;
using Svrnty.CQRS.Events.Abstractions.Projections;
using Svrnty.CQRS.Events.Abstractions.Sagas;
namespace Svrnty.CQRS.Events.PostgreSQL;
///
/// Extension methods for registering PostgreSQL event streaming services.
///
public static class ServiceCollectionExtensions
{
///
/// Registers PostgreSQL-based event stream storage.
///
/// The service collection.
/// Configuration action for PostgreSQL options.
/// The service collection for method chaining.
///
///
/// services.AddPostgresEventStreaming(options =>
/// {
/// options.ConnectionString = "Host=localhost;Database=mydb;Username=user;Password=pass";
/// options.SchemaName = "event_streaming";
/// options.AutoMigrate = true;
/// });
///
///
public static IServiceCollection AddPostgresEventStreaming(
this IServiceCollection services,
Action configure)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
if (configure == null)
throw new ArgumentNullException(nameof(configure));
// Configure options
services.Configure(configure);
// Register PostgresEventStreamStore as IEventStreamStore
services.Replace(ServiceDescriptor.Singleton());
// Phase 3.1: Register idempotency store for exactly-once delivery
services.Replace(ServiceDescriptor.Singleton());
// Phase 3.3: Register read receipt store for consumer progress tracking
services.Replace(ServiceDescriptor.Singleton());
// Register DatabaseMigrator
services.AddSingleton();
// Run migrations on startup (if AutoMigrate is enabled)
services.AddHostedService();
return services;
}
///
/// Registers PostgreSQL-based event stream storage with connection string.
///
/// The service collection.
/// PostgreSQL connection string.
/// Optional additional configuration.
/// The service collection for method chaining.
///
///
/// services.AddPostgresEventStreaming(
/// "Host=localhost;Database=mydb;Username=user;Password=pass");
///
///
public static IServiceCollection AddPostgresEventStreaming(
this IServiceCollection services,
string connectionString,
Action? configure = null)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
if (string.IsNullOrWhiteSpace(connectionString))
throw new ArgumentException("Connection string cannot be null or whitespace.", nameof(connectionString));
return services.AddPostgresEventStreaming(options =>
{
options.ConnectionString = connectionString;
configure?.Invoke(options);
});
}
///
/// Registers PostgreSQL-based event stream storage from configuration.
///
/// The service collection.
/// Configuration section containing PostgreSQL options.
/// The service collection for method chaining.
///
/// Expects a configuration section with the following structure:
///
/// {
/// "ConnectionString": "Host=localhost;Database=mydb;...",
/// "SchemaName": "event_streaming",
/// "AutoMigrate": true
/// }
///
///
public static IServiceCollection AddPostgresEventStreaming(
this IServiceCollection services,
IConfiguration configuration)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
if (configuration == null)
throw new ArgumentNullException(nameof(configuration));
services.Configure(configuration);
// Register PostgresEventStreamStore as IEventStreamStore
services.Replace(ServiceDescriptor.Singleton());
return services;
}
///
/// Registers PostgreSQL-based retention policy storage and background service.
///
/// The service collection.
/// Configuration action for retention service options.
/// The service collection for method chaining.
///
///
/// services.AddPostgresRetentionPolicies(options =>
/// {
/// options.Enabled = true;
/// options.CleanupInterval = TimeSpan.FromHours(1);
/// options.CleanupWindowStart = TimeSpan.FromHours(2);
/// options.CleanupWindowEnd = TimeSpan.FromHours(6);
/// options.UseCleanupWindow = true;
/// });
///
///
public static IServiceCollection AddPostgresRetentionPolicies(
this IServiceCollection services,
Action? configure = null)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Configure options if provided
if (configure != null)
{
services.Configure(configure);
}
// Register PostgresRetentionPolicyStore as IRetentionPolicyStore
services.Replace(ServiceDescriptor.Singleton());
// Register the background service
services.AddHostedService();
return services;
}
///
/// Registers PostgreSQL-based retention policy storage and background service from configuration.
///
/// The service collection.
/// Configuration section containing retention service options.
/// The service collection for method chaining.
///
/// Expects a configuration section with the following structure:
///
/// {
/// "Enabled": true,
/// "CleanupInterval": "01:00:00",
/// "CleanupWindowStart": "02:00:00",
/// "CleanupWindowEnd": "06:00:00",
/// "UseCleanupWindow": true
/// }
///
///
public static IServiceCollection AddPostgresRetentionPolicies(
this IServiceCollection services,
IConfiguration configuration)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
if (configuration == null)
throw new ArgumentNullException(nameof(configuration));
services.Configure(configuration);
// Register PostgresRetentionPolicyStore as IRetentionPolicyStore
services.Replace(ServiceDescriptor.Singleton());
// Register the background service
services.AddHostedService();
return services;
}
///
/// Registers PostgreSQL-based event replay service.
///
/// The service collection.
/// The service collection for method chaining.
///
///
/// services.AddPostgresEventReplay();
///
/// // Then use in your code:
/// var replayService = serviceProvider.GetRequiredService<IEventReplayService>();
/// await foreach (var @event in replayService.ReplayFromOffsetAsync("orders", 1000))
/// {
/// await ProcessEventAsync(@event);
/// }
///
///
public static IServiceCollection AddPostgresEventReplay(this IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Register PostgresEventReplayService as IEventReplayService
services.Replace(ServiceDescriptor.Singleton());
return services;
}
///
/// Registers PostgreSQL-based stream configuration store and provider.
///
/// The service collection.
/// The service collection for method chaining.
///
///
/// services.AddPostgresStreamConfiguration();
///
/// // Then use in your code:
/// var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();
/// var config = new StreamConfiguration
/// {
/// StreamName = "orders",
/// Retention = new RetentionConfiguration
/// {
/// MaxAge = TimeSpan.FromDays(90)
/// }
/// };
/// await configStore.SetConfigurationAsync(config);
///
///
public static IServiceCollection AddPostgresStreamConfiguration(this IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Register PostgresStreamConfigurationStore as IStreamConfigurationStore
services.Replace(ServiceDescriptor.Singleton());
// Register PostgresStreamConfigurationProvider as IStreamConfigurationProvider
services.Replace(ServiceDescriptor.Singleton());
return services;
}
// ========================================================================
// Phase 5: Schema Evolution & Versioning
// ========================================================================
///
/// Registers PostgreSQL-based schema store for event versioning.
///
/// The service collection.
/// The service collection for method chaining.
///
///
/// This replaces the in-memory schema store with a PostgreSQL-backed implementation.
/// Schemas will be persisted across application restarts.
///
///
/// Prerequisites:
/// - Call AddSchemaEvolution() first to register core schema services
/// - Ensure PostgreSQL connection is configured via AddPostgresEventStreaming()
///
///
///
///
/// services.AddSchemaEvolution(); // Register core services
/// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;...");
/// services.AddPostgresSchemaStore(); // Use PostgreSQL for schema storage
///
///
public static IServiceCollection AddPostgresSchemaStore(this IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Replace in-memory schema store with PostgreSQL implementation
// Use factory to get connection string and schema name from options
services.Replace(ServiceDescriptor.Singleton(sp =>
{
var options = sp.GetRequiredService>().Value;
var logger = sp.GetRequiredService>();
return new PostgresSchemaStore(options.ConnectionString, options.SchemaName, logger);
}));
return services;
}
// ========================================================================
// Phase 7: Advanced Features - Event Sourcing Projections
// ========================================================================
///
/// Registers PostgreSQL-based projection checkpoint store.
///
/// The service collection.
/// The service collection for method chaining.
///
///
/// This replaces the in-memory projection checkpoint store with a PostgreSQL-backed implementation.
/// Projection progress will be persisted across application restarts.
///
///
/// Prerequisites:
/// - Call AddProjections() first to register core projection services
/// - Ensure PostgreSQL connection is configured via AddPostgresEventStreaming()
///
///
///
///
/// services.AddProjections(); // Register core projection services
/// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;...");
/// services.AddPostgresProjectionCheckpointStore(); // Use PostgreSQL for checkpoints
///
/// // Register a projection
/// services.AddProjection<UserStatisticsProjection, UserRegisteredEvent>(
/// projectionName: "user-statistics",
/// streamName: "user-events",
/// configure: options =>
/// {
/// options.BatchSize = 100;
/// options.AutoStart = true;
/// });
///
///
public static IServiceCollection AddPostgresProjectionCheckpointStore(this IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Replace in-memory checkpoint store with PostgreSQL implementation
services.Replace(ServiceDescriptor.Singleton());
return services;
}
// ========================================================================
// Phase 7: Advanced Features - Saga Orchestration
// ========================================================================
///
/// Registers PostgreSQL-based saga state store.
///
/// The service collection.
/// The service collection for method chaining.
///
///
/// This replaces the in-memory saga state store with a PostgreSQL-backed implementation.
/// Saga state will be persisted across application restarts, enabling long-running workflows.
///
///
/// Prerequisites:
/// - Call AddSagaOrchestration() first to register core saga services
/// - Ensure PostgreSQL connection is configured via AddPostgresEventStreaming()
///
///
///
///
/// services.AddSagaOrchestration(useInMemoryStateStore: false); // Register core saga services
/// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;...");
/// services.AddPostgresSagaStateStore(); // Use PostgreSQL for saga state
///
/// // Register a saga
/// services.AddSaga<OrderFulfillmentSaga>(
/// sagaName: "order-fulfillment",
/// configure: definition =>
/// {
/// definition.AddStep("ReserveInventory", ReserveInventoryAsync, CompensateReserveInventoryAsync);
/// definition.AddStep("ProcessPayment", ProcessPaymentAsync, CompensateProcessPaymentAsync);
/// definition.AddStep("ShipOrder", ShipOrderAsync, CompensateShipOrderAsync);
/// });
///
///
public static IServiceCollection AddPostgresSagaStateStore(this IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Replace in-memory state store with PostgreSQL implementation
services.Replace(ServiceDescriptor.Singleton());
return services;
}
}