using System; using Svrnty.CQRS.Events.PostgreSQL.Replay; using Svrnty.CQRS.Events.PostgreSQL.Migration; using Svrnty.CQRS.Events.PostgreSQL.Retention; using Svrnty.CQRS.Events.PostgreSQL.Stores; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using Svrnty.CQRS.Events.Abstractions.Streaming; using Svrnty.CQRS.Events.Abstractions.Replay; using Svrnty.CQRS.Events.Abstractions.Schema; using Svrnty.CQRS.Events.Abstractions.Storage; using Svrnty.CQRS.Events.Abstractions.EventStore; using Svrnty.CQRS.Events.Abstractions.Configuration; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Svrnty.CQRS.Events.Abstractions; using Svrnty.CQRS.Events.Abstractions.Projections; using Svrnty.CQRS.Events.Abstractions.Sagas; namespace Svrnty.CQRS.Events.PostgreSQL; /// /// Extension methods for registering PostgreSQL event streaming services. /// public static class ServiceCollectionExtensions { /// /// Registers PostgreSQL-based event stream storage. /// /// The service collection. /// Configuration action for PostgreSQL options. /// The service collection for method chaining. /// /// /// services.AddPostgresEventStreaming(options => /// { /// options.ConnectionString = "Host=localhost;Database=mydb;Username=user;Password=pass"; /// options.SchemaName = "event_streaming"; /// options.AutoMigrate = true; /// }); /// /// public static IServiceCollection AddPostgresEventStreaming( this IServiceCollection services, Action configure) { if (services == null) throw new ArgumentNullException(nameof(services)); if (configure == null) throw new ArgumentNullException(nameof(configure)); // Configure options services.Configure(configure); // Register PostgresEventStreamStore as IEventStreamStore services.Replace(ServiceDescriptor.Singleton()); // Phase 3.1: Register idempotency store for exactly-once delivery services.Replace(ServiceDescriptor.Singleton()); // Phase 3.3: Register read receipt store for consumer progress tracking services.Replace(ServiceDescriptor.Singleton()); // Register DatabaseMigrator services.AddSingleton(); // Run migrations on startup (if AutoMigrate is enabled) services.AddHostedService(); return services; } /// /// Registers PostgreSQL-based event stream storage with connection string. /// /// The service collection. /// PostgreSQL connection string. /// Optional additional configuration. /// The service collection for method chaining. /// /// /// services.AddPostgresEventStreaming( /// "Host=localhost;Database=mydb;Username=user;Password=pass"); /// /// public static IServiceCollection AddPostgresEventStreaming( this IServiceCollection services, string connectionString, Action? configure = null) { if (services == null) throw new ArgumentNullException(nameof(services)); if (string.IsNullOrWhiteSpace(connectionString)) throw new ArgumentException("Connection string cannot be null or whitespace.", nameof(connectionString)); return services.AddPostgresEventStreaming(options => { options.ConnectionString = connectionString; configure?.Invoke(options); }); } /// /// Registers PostgreSQL-based event stream storage from configuration. /// /// The service collection. /// Configuration section containing PostgreSQL options. /// The service collection for method chaining. /// /// Expects a configuration section with the following structure: /// /// { /// "ConnectionString": "Host=localhost;Database=mydb;...", /// "SchemaName": "event_streaming", /// "AutoMigrate": true /// } /// /// public static IServiceCollection AddPostgresEventStreaming( this IServiceCollection services, IConfiguration configuration) { if (services == null) throw new ArgumentNullException(nameof(services)); if (configuration == null) throw new ArgumentNullException(nameof(configuration)); services.Configure(configuration); // Register PostgresEventStreamStore as IEventStreamStore services.Replace(ServiceDescriptor.Singleton()); return services; } /// /// Registers PostgreSQL-based retention policy storage and background service. /// /// The service collection. /// Configuration action for retention service options. /// The service collection for method chaining. /// /// /// services.AddPostgresRetentionPolicies(options => /// { /// options.Enabled = true; /// options.CleanupInterval = TimeSpan.FromHours(1); /// options.CleanupWindowStart = TimeSpan.FromHours(2); /// options.CleanupWindowEnd = TimeSpan.FromHours(6); /// options.UseCleanupWindow = true; /// }); /// /// public static IServiceCollection AddPostgresRetentionPolicies( this IServiceCollection services, Action? configure = null) { if (services == null) throw new ArgumentNullException(nameof(services)); // Configure options if provided if (configure != null) { services.Configure(configure); } // Register PostgresRetentionPolicyStore as IRetentionPolicyStore services.Replace(ServiceDescriptor.Singleton()); // Register the background service services.AddHostedService(); return services; } /// /// Registers PostgreSQL-based retention policy storage and background service from configuration. /// /// The service collection. /// Configuration section containing retention service options. /// The service collection for method chaining. /// /// Expects a configuration section with the following structure: /// /// { /// "Enabled": true, /// "CleanupInterval": "01:00:00", /// "CleanupWindowStart": "02:00:00", /// "CleanupWindowEnd": "06:00:00", /// "UseCleanupWindow": true /// } /// /// public static IServiceCollection AddPostgresRetentionPolicies( this IServiceCollection services, IConfiguration configuration) { if (services == null) throw new ArgumentNullException(nameof(services)); if (configuration == null) throw new ArgumentNullException(nameof(configuration)); services.Configure(configuration); // Register PostgresRetentionPolicyStore as IRetentionPolicyStore services.Replace(ServiceDescriptor.Singleton()); // Register the background service services.AddHostedService(); return services; } /// /// Registers PostgreSQL-based event replay service. /// /// The service collection. /// The service collection for method chaining. /// /// /// services.AddPostgresEventReplay(); /// /// // Then use in your code: /// var replayService = serviceProvider.GetRequiredService<IEventReplayService>(); /// await foreach (var @event in replayService.ReplayFromOffsetAsync("orders", 1000)) /// { /// await ProcessEventAsync(@event); /// } /// /// public static IServiceCollection AddPostgresEventReplay(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); // Register PostgresEventReplayService as IEventReplayService services.Replace(ServiceDescriptor.Singleton()); return services; } /// /// Registers PostgreSQL-based stream configuration store and provider. /// /// The service collection. /// The service collection for method chaining. /// /// /// services.AddPostgresStreamConfiguration(); /// /// // Then use in your code: /// var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>(); /// var config = new StreamConfiguration /// { /// StreamName = "orders", /// Retention = new RetentionConfiguration /// { /// MaxAge = TimeSpan.FromDays(90) /// } /// }; /// await configStore.SetConfigurationAsync(config); /// /// public static IServiceCollection AddPostgresStreamConfiguration(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); // Register PostgresStreamConfigurationStore as IStreamConfigurationStore services.Replace(ServiceDescriptor.Singleton()); // Register PostgresStreamConfigurationProvider as IStreamConfigurationProvider services.Replace(ServiceDescriptor.Singleton()); return services; } // ======================================================================== // Phase 5: Schema Evolution & Versioning // ======================================================================== /// /// Registers PostgreSQL-based schema store for event versioning. /// /// The service collection. /// The service collection for method chaining. /// /// /// This replaces the in-memory schema store with a PostgreSQL-backed implementation. /// Schemas will be persisted across application restarts. /// /// /// Prerequisites: /// - Call AddSchemaEvolution() first to register core schema services /// - Ensure PostgreSQL connection is configured via AddPostgresEventStreaming() /// /// /// /// /// services.AddSchemaEvolution(); // Register core services /// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;..."); /// services.AddPostgresSchemaStore(); // Use PostgreSQL for schema storage /// /// public static IServiceCollection AddPostgresSchemaStore(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); // Replace in-memory schema store with PostgreSQL implementation // Use factory to get connection string and schema name from options services.Replace(ServiceDescriptor.Singleton(sp => { var options = sp.GetRequiredService>().Value; var logger = sp.GetRequiredService>(); return new PostgresSchemaStore(options.ConnectionString, options.SchemaName, logger); })); return services; } // ======================================================================== // Phase 7: Advanced Features - Event Sourcing Projections // ======================================================================== /// /// Registers PostgreSQL-based projection checkpoint store. /// /// The service collection. /// The service collection for method chaining. /// /// /// This replaces the in-memory projection checkpoint store with a PostgreSQL-backed implementation. /// Projection progress will be persisted across application restarts. /// /// /// Prerequisites: /// - Call AddProjections() first to register core projection services /// - Ensure PostgreSQL connection is configured via AddPostgresEventStreaming() /// /// /// /// /// services.AddProjections(); // Register core projection services /// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;..."); /// services.AddPostgresProjectionCheckpointStore(); // Use PostgreSQL for checkpoints /// /// // Register a projection /// services.AddProjection<UserStatisticsProjection, UserRegisteredEvent>( /// projectionName: "user-statistics", /// streamName: "user-events", /// configure: options => /// { /// options.BatchSize = 100; /// options.AutoStart = true; /// }); /// /// public static IServiceCollection AddPostgresProjectionCheckpointStore(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); // Replace in-memory checkpoint store with PostgreSQL implementation services.Replace(ServiceDescriptor.Singleton()); return services; } // ======================================================================== // Phase 7: Advanced Features - Saga Orchestration // ======================================================================== /// /// Registers PostgreSQL-based saga state store. /// /// The service collection. /// The service collection for method chaining. /// /// /// This replaces the in-memory saga state store with a PostgreSQL-backed implementation. /// Saga state will be persisted across application restarts, enabling long-running workflows. /// /// /// Prerequisites: /// - Call AddSagaOrchestration() first to register core saga services /// - Ensure PostgreSQL connection is configured via AddPostgresEventStreaming() /// /// /// /// /// services.AddSagaOrchestration(useInMemoryStateStore: false); // Register core saga services /// services.AddPostgresEventStreaming("Host=localhost;Database=mydb;..."); /// services.AddPostgresSagaStateStore(); // Use PostgreSQL for saga state /// /// // Register a saga /// services.AddSaga<OrderFulfillmentSaga>( /// sagaName: "order-fulfillment", /// configure: definition => /// { /// definition.AddStep("ReserveInventory", ReserveInventoryAsync, CompensateReserveInventoryAsync); /// definition.AddStep("ProcessPayment", ProcessPaymentAsync, CompensateProcessPaymentAsync); /// definition.AddStep("ShipOrder", ShipOrderAsync, CompensateShipOrderAsync); /// }); /// /// public static IServiceCollection AddPostgresSagaStateStore(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); // Replace in-memory state store with PostgreSQL implementation services.Replace(ServiceDescriptor.Singleton()); return services; } }