using System; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using Svrnty.CQRS.Events.Abstractions.Projections; namespace Svrnty.CQRS.Events.PostgreSQL.Stores; /// /// PostgreSQL implementation of projection checkpoint storage. /// /// /// /// Stores projection checkpoints in the projection_checkpoints table. /// Thread-safe for concurrent checkpoint updates from multiple projection instances. /// /// /// Concurrency: Uses PostgreSQL's UPSERT (INSERT ... ON CONFLICT) for atomic updates. /// /// public sealed class PostgresProjectionCheckpointStore : IProjectionCheckpointStore { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; public PostgresProjectionCheckpointStore( IOptions options, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public async Task GetCheckpointAsync( string projectionName, string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(projectionName)) throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName)); if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or empty", nameof(streamName)); try { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT projection_name, stream_name, last_processed_offset, last_updated, events_processed, last_error, last_error_at FROM {SchemaQualified("projection_checkpoints")} WHERE projection_name = @projection_name AND stream_name = @stream_name;"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@projection_name", projectionName); command.Parameters.AddWithValue("@stream_name", streamName); await using var reader = await command.ExecuteReaderAsync(cancellationToken); if (await reader.ReadAsync(cancellationToken)) { return new ProjectionCheckpoint { ProjectionName = reader.GetString(0), StreamName = reader.GetString(1), LastProcessedOffset = reader.GetInt64(2), LastUpdated = reader.GetFieldValue(3), EventsProcessed = reader.GetInt64(4), LastError = reader.IsDBNull(5) ? null : reader.GetString(5), LastErrorAt = reader.IsDBNull(6) ? null : reader.GetFieldValue(6) }; } return null; } catch (Exception ex) { _logger.LogError(ex, "Failed to get checkpoint for projection {ProjectionName} on stream {StreamName}", projectionName, streamName); throw; } } /// public async Task SaveCheckpointAsync( ProjectionCheckpoint checkpoint, CancellationToken cancellationToken = default) { if (checkpoint == null) throw new ArgumentNullException(nameof(checkpoint)); try { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" INSERT INTO {SchemaQualified("projection_checkpoints")} (projection_name, stream_name, last_processed_offset, last_updated, events_processed, last_error, last_error_at) VALUES (@projection_name, @stream_name, @last_processed_offset, @last_updated, @events_processed, @last_error, @last_error_at) ON CONFLICT (projection_name, stream_name) DO UPDATE SET last_processed_offset = EXCLUDED.last_processed_offset, last_updated = EXCLUDED.last_updated, events_processed = EXCLUDED.events_processed, last_error = EXCLUDED.last_error, last_error_at = EXCLUDED.last_error_at;"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@projection_name", checkpoint.ProjectionName); command.Parameters.AddWithValue("@stream_name", checkpoint.StreamName); command.Parameters.AddWithValue("@last_processed_offset", checkpoint.LastProcessedOffset); command.Parameters.AddWithValue("@last_updated", DateTimeOffset.UtcNow); command.Parameters.AddWithValue("@events_processed", checkpoint.EventsProcessed); command.Parameters.AddWithValue("@last_error", (object?)checkpoint.LastError ?? DBNull.Value); command.Parameters.AddWithValue("@last_error_at", (object?)checkpoint.LastErrorAt ?? DBNull.Value); await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogDebug( "Saved checkpoint for projection {ProjectionName} on stream {StreamName} at offset {Offset}", checkpoint.ProjectionName, checkpoint.StreamName, checkpoint.LastProcessedOffset); } catch (Exception ex) { _logger.LogError(ex, "Failed to save checkpoint for projection {ProjectionName} on stream {StreamName}", checkpoint.ProjectionName, checkpoint.StreamName); throw; } } /// public async Task ResetCheckpointAsync( string projectionName, string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(projectionName)) throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName)); if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or empty", nameof(streamName)); try { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" DELETE FROM {SchemaQualified("projection_checkpoints")} WHERE projection_name = @projection_name AND stream_name = @stream_name;"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@projection_name", projectionName); command.Parameters.AddWithValue("@stream_name", streamName); var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogInformation( "Reset checkpoint for projection {ProjectionName} on stream {StreamName} (rows affected: {RowsAffected})", projectionName, streamName, rowsAffected); } catch (Exception ex) { _logger.LogError(ex, "Failed to reset checkpoint for projection {ProjectionName} on stream {StreamName}", projectionName, streamName); throw; } } /// public async Task GetAllCheckpointsAsync( string projectionName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(projectionName)) throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName)); try { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT projection_name, stream_name, last_processed_offset, last_updated, events_processed, last_error, last_error_at FROM {SchemaQualified("projection_checkpoints")} WHERE projection_name = @projection_name ORDER BY stream_name;"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("@projection_name", projectionName); await using var reader = await command.ExecuteReaderAsync(cancellationToken); var checkpoints = new System.Collections.Generic.List(); while (await reader.ReadAsync(cancellationToken)) { checkpoints.Add(new ProjectionCheckpoint { ProjectionName = reader.GetString(0), StreamName = reader.GetString(1), LastProcessedOffset = reader.GetInt64(2), LastUpdated = reader.GetFieldValue(3), EventsProcessed = reader.GetInt64(4), LastError = reader.IsDBNull(5) ? null : reader.GetString(5), LastErrorAt = reader.IsDBNull(6) ? null : reader.GetFieldValue(6) }); } return checkpoints.ToArray(); } catch (Exception ex) { _logger.LogError(ex, "Failed to get all checkpoints for projection {ProjectionName}", projectionName); throw; } } private string SchemaQualified(string tableName) { return $"\"{_options.SchemaName}\".\"{tableName}\""; } }