dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/Stores/PostgresProjectionCheckpointStore.cs

241 lines
10 KiB
C#

using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions.Projections;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
/// <summary>
/// PostgreSQL implementation of projection checkpoint storage.
/// </summary>
/// <remarks>
/// <para>
/// Stores projection checkpoints in the <c>projection_checkpoints</c> table.
/// Thread-safe for concurrent checkpoint updates from multiple projection instances.
/// </para>
/// <para>
/// <strong>Concurrency:</strong> Uses PostgreSQL's UPSERT (INSERT ... ON CONFLICT) for atomic updates.
/// </para>
/// </remarks>
public sealed class PostgresProjectionCheckpointStore : IProjectionCheckpointStore
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger<PostgresProjectionCheckpointStore> _logger;
public PostgresProjectionCheckpointStore(
IOptions<PostgresEventStreamStoreOptions> options,
ILogger<PostgresProjectionCheckpointStore> logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<ProjectionCheckpoint?> GetCheckpointAsync(
string projectionName,
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(projectionName))
throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName));
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or empty", nameof(streamName));
try
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT projection_name, stream_name, last_processed_offset,
last_updated, events_processed, last_error, last_error_at
FROM {SchemaQualified("projection_checkpoints")}
WHERE projection_name = @projection_name AND stream_name = @stream_name;";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@projection_name", projectionName);
command.Parameters.AddWithValue("@stream_name", streamName);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
return new ProjectionCheckpoint
{
ProjectionName = reader.GetString(0),
StreamName = reader.GetString(1),
LastProcessedOffset = reader.GetInt64(2),
LastUpdated = reader.GetFieldValue<DateTimeOffset>(3),
EventsProcessed = reader.GetInt64(4),
LastError = reader.IsDBNull(5) ? null : reader.GetString(5),
LastErrorAt = reader.IsDBNull(6) ? null : reader.GetFieldValue<DateTimeOffset>(6)
};
}
return null;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to get checkpoint for projection {ProjectionName} on stream {StreamName}",
projectionName, streamName);
throw;
}
}
/// <inheritdoc />
public async Task SaveCheckpointAsync(
ProjectionCheckpoint checkpoint,
CancellationToken cancellationToken = default)
{
if (checkpoint == null)
throw new ArgumentNullException(nameof(checkpoint));
try
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
INSERT INTO {SchemaQualified("projection_checkpoints")}
(projection_name, stream_name, last_processed_offset, last_updated,
events_processed, last_error, last_error_at)
VALUES
(@projection_name, @stream_name, @last_processed_offset, @last_updated,
@events_processed, @last_error, @last_error_at)
ON CONFLICT (projection_name, stream_name)
DO UPDATE SET
last_processed_offset = EXCLUDED.last_processed_offset,
last_updated = EXCLUDED.last_updated,
events_processed = EXCLUDED.events_processed,
last_error = EXCLUDED.last_error,
last_error_at = EXCLUDED.last_error_at;";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@projection_name", checkpoint.ProjectionName);
command.Parameters.AddWithValue("@stream_name", checkpoint.StreamName);
command.Parameters.AddWithValue("@last_processed_offset", checkpoint.LastProcessedOffset);
command.Parameters.AddWithValue("@last_updated", DateTimeOffset.UtcNow);
command.Parameters.AddWithValue("@events_processed", checkpoint.EventsProcessed);
command.Parameters.AddWithValue("@last_error", (object?)checkpoint.LastError ?? DBNull.Value);
command.Parameters.AddWithValue("@last_error_at", (object?)checkpoint.LastErrorAt ?? DBNull.Value);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogDebug(
"Saved checkpoint for projection {ProjectionName} on stream {StreamName} at offset {Offset}",
checkpoint.ProjectionName, checkpoint.StreamName, checkpoint.LastProcessedOffset);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to save checkpoint for projection {ProjectionName} on stream {StreamName}",
checkpoint.ProjectionName, checkpoint.StreamName);
throw;
}
}
/// <inheritdoc />
public async Task ResetCheckpointAsync(
string projectionName,
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(projectionName))
throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName));
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or empty", nameof(streamName));
try
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
DELETE FROM {SchemaQualified("projection_checkpoints")}
WHERE projection_name = @projection_name AND stream_name = @stream_name;";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@projection_name", projectionName);
command.Parameters.AddWithValue("@stream_name", streamName);
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogInformation(
"Reset checkpoint for projection {ProjectionName} on stream {StreamName} (rows affected: {RowsAffected})",
projectionName, streamName, rowsAffected);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to reset checkpoint for projection {ProjectionName} on stream {StreamName}",
projectionName, streamName);
throw;
}
}
/// <inheritdoc />
public async Task<ProjectionCheckpoint[]> GetAllCheckpointsAsync(
string projectionName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(projectionName))
throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName));
try
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT projection_name, stream_name, last_processed_offset,
last_updated, events_processed, last_error, last_error_at
FROM {SchemaQualified("projection_checkpoints")}
WHERE projection_name = @projection_name
ORDER BY stream_name;";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@projection_name", projectionName);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
var checkpoints = new System.Collections.Generic.List<ProjectionCheckpoint>();
while (await reader.ReadAsync(cancellationToken))
{
checkpoints.Add(new ProjectionCheckpoint
{
ProjectionName = reader.GetString(0),
StreamName = reader.GetString(1),
LastProcessedOffset = reader.GetInt64(2),
LastUpdated = reader.GetFieldValue<DateTimeOffset>(3),
EventsProcessed = reader.GetInt64(4),
LastError = reader.IsDBNull(5) ? null : reader.GetString(5),
LastErrorAt = reader.IsDBNull(6) ? null : reader.GetFieldValue<DateTimeOffset>(6)
});
}
return checkpoints.ToArray();
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to get all checkpoints for projection {ProjectionName}",
projectionName);
throw;
}
}
private string SchemaQualified(string tableName)
{
return $"\"{_options.SchemaName}\".\"{tableName}\"";
}
}