using System; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using Svrnty.CQRS.Events.Abstractions.Storage; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.PostgreSQL.Stores; /// /// PostgreSQL-based implementation of . /// /// /// /// Persistence: /// Stores read receipts in PostgreSQL for durability across restarts. /// /// /// Distributed Support: /// Safe for multiple application instances tracking the same stream. /// /// public sealed class PostgresReadReceiptStore : IReadReceiptStore { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; private string SchemaQualifiedTable(string tableName) => $"\"{_options.SchemaName}\".\"{tableName}\""; public PostgresReadReceiptStore( IOptions options, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public async Task AcknowledgeEventAsync( string consumerId, string streamName, string eventId, long offset, DateTimeOffset acknowledgedAt, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(eventId)) throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); // Use INSERT ... ON CONFLICT to handle both insert and update var sql = $@" INSERT INTO {SchemaQualifiedTable("read_receipts")} (consumer_id, stream_name, last_event_id, last_offset, last_acknowledged_at, first_acknowledged_at, total_acknowledged) VALUES (@ConsumerId, @StreamName, @EventId, @Offset, @AcknowledgedAt, @AcknowledgedAt, 1) ON CONFLICT (consumer_id, stream_name) DO UPDATE SET last_event_id = CASE WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset THEN @EventId ELSE {SchemaQualifiedTable("read_receipts")}.last_event_id END, last_offset = CASE WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset THEN @Offset ELSE {SchemaQualifiedTable("read_receipts")}.last_offset END, last_acknowledged_at = CASE WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset THEN @AcknowledgedAt ELSE {SchemaQualifiedTable("read_receipts")}.last_acknowledged_at END, total_acknowledged = {SchemaQualifiedTable("read_receipts")}.total_acknowledged + 1"; await using var command = new NpgsqlCommand(sql, connection) { CommandTimeout = _options.CommandTimeout }; command.Parameters.AddWithValue("@ConsumerId", consumerId); command.Parameters.AddWithValue("@StreamName", streamName); command.Parameters.AddWithValue("@EventId", eventId); command.Parameters.AddWithValue("@Offset", offset); command.Parameters.AddWithValue("@AcknowledgedAt", acknowledgedAt); await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogDebug( "Acknowledged event {EventId} at offset {Offset} for consumer {ConsumerId} on stream {StreamName}", eventId, offset, consumerId, streamName); } /// public async Task GetLastAcknowledgedOffsetAsync( string consumerId, string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT last_offset FROM {SchemaQualifiedTable("read_receipts")} WHERE consumer_id = @ConsumerId AND stream_name = @StreamName"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@ConsumerId", consumerId); command.Parameters.AddWithValue("@StreamName", streamName); var result = await command.ExecuteScalarAsync(cancellationToken); return result as long?; } /// public async Task GetConsumerProgressAsync( string consumerId, string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT consumer_id, stream_name, last_offset, last_acknowledged_at, total_acknowledged, first_acknowledged_at FROM {SchemaQualifiedTable("read_receipts")} WHERE consumer_id = @ConsumerId AND stream_name = @StreamName"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@ConsumerId", consumerId); command.Parameters.AddWithValue("@StreamName", streamName); await using var reader = await command.ExecuteReaderAsync(cancellationToken); if (await reader.ReadAsync(cancellationToken)) { return new ConsumerProgress { ConsumerId = reader.GetString(0), StreamName = reader.GetString(1), LastOffset = reader.GetInt64(2), LastAcknowledgedAt = reader.GetFieldValue(3), TotalAcknowledged = reader.GetInt64(4), FirstAcknowledgedAt = reader.IsDBNull(5) ? null : reader.GetFieldValue(5) }; } return null; } /// public async Task> GetConsumersForStreamAsync( string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT DISTINCT consumer_id FROM {SchemaQualifiedTable("read_receipts")} WHERE stream_name = @StreamName ORDER BY consumer_id"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@StreamName", streamName); var consumers = new List(); await using var reader = await command.ExecuteReaderAsync(cancellationToken); while (await reader.ReadAsync(cancellationToken)) { consumers.Add(reader.GetString(0)); } return consumers; } /// public async Task CleanupAsync( DateTimeOffset olderThan, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" DELETE FROM {SchemaQualifiedTable("read_receipts")} WHERE last_acknowledged_at < @OlderThan"; await using var command = new NpgsqlCommand(sql, connection) { CommandTimeout = _options.CommandTimeout }; command.Parameters.AddWithValue("@OlderThan", olderThan); var deletedCount = await command.ExecuteNonQueryAsync(cancellationToken); if (deletedCount > 0) { _logger.LogInformation( "Cleaned up {DeletedCount} read receipt records older than {OlderThan}", deletedCount, olderThan); } return deletedCount; } }