using System; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using System.Collections.Generic; using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using Svrnty.CQRS.Events.Abstractions.Subscriptions; namespace Svrnty.CQRS.Events.PostgreSQL.Subscriptions; /// /// PostgreSQL implementation of subscription store. /// public sealed class PostgresSubscriptionStore : IPersistentSubscriptionStore { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; public PostgresSubscriptionStore( IOptions options, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task CreateAsync( PersistentSubscription subscription, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ INSERT INTO persistent_subscriptions ( id, subscriber_id, correlation_id, event_types, terminal_event_types, delivery_mode, created_at, expires_at, last_delivered_sequence, status, connection_id, data_source_id ) VALUES ( @Id, @SubscriberId, @CorrelationId, @EventTypes::jsonb, @TerminalEventTypes::jsonb, @DeliveryMode, @CreatedAt, @ExpiresAt, @LastDeliveredSequence, @Status, @ConnectionId, @DataSourceId ) """; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("Id", subscription.Id); cmd.Parameters.AddWithValue("SubscriberId", subscription.SubscriberId); cmd.Parameters.AddWithValue("CorrelationId", subscription.CorrelationId); cmd.Parameters.AddWithValue("EventTypes", JsonSerializer.Serialize(subscription.EventTypes)); cmd.Parameters.AddWithValue("TerminalEventTypes", JsonSerializer.Serialize(subscription.TerminalEventTypes)); cmd.Parameters.AddWithValue("DeliveryMode", (int)subscription.DeliveryMode); cmd.Parameters.AddWithValue("CreatedAt", subscription.CreatedAt); cmd.Parameters.AddWithValue("ExpiresAt", subscription.ExpiresAt.HasValue ? subscription.ExpiresAt.Value : DBNull.Value); cmd.Parameters.AddWithValue("LastDeliveredSequence", subscription.LastDeliveredSequence); cmd.Parameters.AddWithValue("Status", (int)subscription.Status); cmd.Parameters.AddWithValue("ConnectionId", subscription.ConnectionId ?? (object)DBNull.Value); cmd.Parameters.AddWithValue("DataSourceId", subscription.DataSourceId ?? (object)DBNull.Value); await cmd.ExecuteNonQueryAsync(cancellationToken); return subscription; } public async Task GetByIdAsync( string id, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types, delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence, status, connection_id, data_source_id FROM persistent_subscriptions WHERE id = @Id """; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("Id", id); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); if (!await reader.ReadAsync(cancellationToken)) { return null; } return MapSubscription(reader); } public async Task> GetBySubscriberIdAsync( string subscriberId, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types, delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence, status, connection_id, data_source_id FROM persistent_subscriptions WHERE subscriber_id = @SubscriberId ORDER BY created_at DESC """; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("SubscriberId", subscriberId); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); var subscriptions = new List(); while (await reader.ReadAsync(cancellationToken)) { subscriptions.Add(MapSubscription(reader)); } return subscriptions; } public async Task> GetByCorrelationIdAsync( string correlationId, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types, delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence, status, connection_id, data_source_id FROM persistent_subscriptions WHERE correlation_id = @CorrelationId ORDER BY created_at ASC """; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("CorrelationId", correlationId); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); var subscriptions = new List(); while (await reader.ReadAsync(cancellationToken)) { subscriptions.Add(MapSubscription(reader)); } return subscriptions; } public async Task> GetByStatusAsync( SubscriptionStatus status, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types, delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence, status, connection_id, data_source_id FROM persistent_subscriptions WHERE status = @Status ORDER BY created_at DESC """; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("Status", (int)status); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); var subscriptions = new List(); while (await reader.ReadAsync(cancellationToken)) { subscriptions.Add(MapSubscription(reader)); } return subscriptions; } public async Task> GetByConnectionIdAsync( string connectionId, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types, delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence, status, connection_id, data_source_id FROM persistent_subscriptions WHERE connection_id = @ConnectionId ORDER BY created_at ASC """; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("ConnectionId", connectionId); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); var subscriptions = new List(); while (await reader.ReadAsync(cancellationToken)) { subscriptions.Add(MapSubscription(reader)); } return subscriptions; } public async Task UpdateAsync( PersistentSubscription subscription, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ UPDATE persistent_subscriptions SET subscriber_id = @SubscriberId, correlation_id = @CorrelationId, event_types = @EventTypes::jsonb, terminal_event_types = @TerminalEventTypes::jsonb, delivery_mode = @DeliveryMode, expires_at = @ExpiresAt, completed_at = @CompletedAt, last_delivered_sequence = @LastDeliveredSequence, status = @Status, connection_id = @ConnectionId, data_source_id = @DataSourceId WHERE id = @Id """; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("Id", subscription.Id); cmd.Parameters.AddWithValue("SubscriberId", subscription.SubscriberId); cmd.Parameters.AddWithValue("CorrelationId", subscription.CorrelationId); cmd.Parameters.AddWithValue("EventTypes", JsonSerializer.Serialize(subscription.EventTypes)); cmd.Parameters.AddWithValue("TerminalEventTypes", JsonSerializer.Serialize(subscription.TerminalEventTypes)); cmd.Parameters.AddWithValue("DeliveryMode", (int)subscription.DeliveryMode); cmd.Parameters.AddWithValue("ExpiresAt", subscription.ExpiresAt.HasValue ? subscription.ExpiresAt.Value : DBNull.Value); cmd.Parameters.AddWithValue("CompletedAt", subscription.CompletedAt.HasValue ? subscription.CompletedAt.Value : DBNull.Value); cmd.Parameters.AddWithValue("LastDeliveredSequence", subscription.LastDeliveredSequence); cmd.Parameters.AddWithValue("Status", (int)subscription.Status); cmd.Parameters.AddWithValue("ConnectionId", subscription.ConnectionId ?? (object)DBNull.Value); cmd.Parameters.AddWithValue("DataSourceId", subscription.DataSourceId ?? (object)DBNull.Value); await cmd.ExecuteNonQueryAsync(cancellationToken); } public async Task DeleteAsync( string id, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = "DELETE FROM persistent_subscriptions WHERE id = @Id"; await using var cmd = new NpgsqlCommand(sql, connection); cmd.Parameters.AddWithValue("Id", id); await cmd.ExecuteNonQueryAsync(cancellationToken); } public async Task> GetExpiredSubscriptionsAsync( CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); const string sql = """ SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types, delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence, status, connection_id, data_source_id FROM persistent_subscriptions WHERE expires_at IS NOT NULL AND expires_at < NOW() AND status = 0 ORDER BY expires_at ASC """; await using var cmd = new NpgsqlCommand(sql, connection); await using var reader = await cmd.ExecuteReaderAsync(cancellationToken); var subscriptions = new List(); while (await reader.ReadAsync(cancellationToken)) { subscriptions.Add(MapSubscription(reader)); } return subscriptions; } private static PersistentSubscription MapSubscription(NpgsqlDataReader reader) { var eventTypesJson = reader.GetString(3); var terminalEventTypesJson = reader.GetString(4); var subscription = new PersistentSubscription { Id = reader.GetString(0), SubscriberId = reader.GetString(1), CorrelationId = reader.GetString(2), EventTypes = JsonSerializer.Deserialize>(eventTypesJson) ?? new HashSet(), TerminalEventTypes = JsonSerializer.Deserialize>(terminalEventTypesJson) ?? new HashSet(), DeliveryMode = (DeliveryMode)reader.GetInt32(5), CreatedAt = reader.GetFieldValue(6), ExpiresAt = reader.IsDBNull(7) ? null : reader.GetFieldValue(7), ConnectionId = reader.IsDBNull(11) ? null : reader.GetString(11), DataSourceId = reader.IsDBNull(12) ? null : reader.GetString(12) }; // Set private properties via reflection or use methods var completedAt = reader.IsDBNull(8) ? (DateTimeOffset?)null : reader.GetFieldValue(8); var lastDeliveredSequence = reader.GetInt64(9); var status = (SubscriptionStatus)reader.GetInt32(10); // Use reflection to set private setters typeof(PersistentSubscription) .GetProperty(nameof(PersistentSubscription.CompletedAt))! .SetValue(subscription, completedAt); typeof(PersistentSubscription) .GetProperty(nameof(PersistentSubscription.LastDeliveredSequence))! .SetValue(subscription, lastDeliveredSequence); typeof(PersistentSubscription) .GetProperty(nameof(PersistentSubscription.Status))! .SetValue(subscription, status); return subscription; } }