241 lines
10 KiB
C#
241 lines
10 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Npgsql;
|
|
using Svrnty.CQRS.Events.Abstractions.Projections;
|
|
|
|
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
|
|
|
|
/// <summary>
|
|
/// PostgreSQL implementation of projection checkpoint storage.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Stores projection checkpoints in the <c>projection_checkpoints</c> table.
|
|
/// Thread-safe for concurrent checkpoint updates from multiple projection instances.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Concurrency:</strong> Uses PostgreSQL's UPSERT (INSERT ... ON CONFLICT) for atomic updates.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class PostgresProjectionCheckpointStore : IProjectionCheckpointStore
|
|
{
|
|
private readonly PostgresEventStreamStoreOptions _options;
|
|
private readonly ILogger<PostgresProjectionCheckpointStore> _logger;
|
|
|
|
public PostgresProjectionCheckpointStore(
|
|
IOptions<PostgresEventStreamStoreOptions> options,
|
|
ILogger<PostgresProjectionCheckpointStore> logger)
|
|
{
|
|
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<ProjectionCheckpoint?> GetCheckpointAsync(
|
|
string projectionName,
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(projectionName))
|
|
throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName));
|
|
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or empty", nameof(streamName));
|
|
|
|
try
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT projection_name, stream_name, last_processed_offset,
|
|
last_updated, events_processed, last_error, last_error_at
|
|
FROM {SchemaQualified("projection_checkpoints")}
|
|
WHERE projection_name = @projection_name AND stream_name = @stream_name;";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@projection_name", projectionName);
|
|
command.Parameters.AddWithValue("@stream_name", streamName);
|
|
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
if (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
return new ProjectionCheckpoint
|
|
{
|
|
ProjectionName = reader.GetString(0),
|
|
StreamName = reader.GetString(1),
|
|
LastProcessedOffset = reader.GetInt64(2),
|
|
LastUpdated = reader.GetFieldValue<DateTimeOffset>(3),
|
|
EventsProcessed = reader.GetInt64(4),
|
|
LastError = reader.IsDBNull(5) ? null : reader.GetString(5),
|
|
LastErrorAt = reader.IsDBNull(6) ? null : reader.GetFieldValue<DateTimeOffset>(6)
|
|
};
|
|
}
|
|
|
|
return null;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Failed to get checkpoint for projection {ProjectionName} on stream {StreamName}",
|
|
projectionName, streamName);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task SaveCheckpointAsync(
|
|
ProjectionCheckpoint checkpoint,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (checkpoint == null)
|
|
throw new ArgumentNullException(nameof(checkpoint));
|
|
|
|
try
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
INSERT INTO {SchemaQualified("projection_checkpoints")}
|
|
(projection_name, stream_name, last_processed_offset, last_updated,
|
|
events_processed, last_error, last_error_at)
|
|
VALUES
|
|
(@projection_name, @stream_name, @last_processed_offset, @last_updated,
|
|
@events_processed, @last_error, @last_error_at)
|
|
ON CONFLICT (projection_name, stream_name)
|
|
DO UPDATE SET
|
|
last_processed_offset = EXCLUDED.last_processed_offset,
|
|
last_updated = EXCLUDED.last_updated,
|
|
events_processed = EXCLUDED.events_processed,
|
|
last_error = EXCLUDED.last_error,
|
|
last_error_at = EXCLUDED.last_error_at;";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@projection_name", checkpoint.ProjectionName);
|
|
command.Parameters.AddWithValue("@stream_name", checkpoint.StreamName);
|
|
command.Parameters.AddWithValue("@last_processed_offset", checkpoint.LastProcessedOffset);
|
|
command.Parameters.AddWithValue("@last_updated", DateTimeOffset.UtcNow);
|
|
command.Parameters.AddWithValue("@events_processed", checkpoint.EventsProcessed);
|
|
command.Parameters.AddWithValue("@last_error", (object?)checkpoint.LastError ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("@last_error_at", (object?)checkpoint.LastErrorAt ?? DBNull.Value);
|
|
|
|
await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
_logger.LogDebug(
|
|
"Saved checkpoint for projection {ProjectionName} on stream {StreamName} at offset {Offset}",
|
|
checkpoint.ProjectionName, checkpoint.StreamName, checkpoint.LastProcessedOffset);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Failed to save checkpoint for projection {ProjectionName} on stream {StreamName}",
|
|
checkpoint.ProjectionName, checkpoint.StreamName);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task ResetCheckpointAsync(
|
|
string projectionName,
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(projectionName))
|
|
throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName));
|
|
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or empty", nameof(streamName));
|
|
|
|
try
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
DELETE FROM {SchemaQualified("projection_checkpoints")}
|
|
WHERE projection_name = @projection_name AND stream_name = @stream_name;";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@projection_name", projectionName);
|
|
command.Parameters.AddWithValue("@stream_name", streamName);
|
|
|
|
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
_logger.LogInformation(
|
|
"Reset checkpoint for projection {ProjectionName} on stream {StreamName} (rows affected: {RowsAffected})",
|
|
projectionName, streamName, rowsAffected);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Failed to reset checkpoint for projection {ProjectionName} on stream {StreamName}",
|
|
projectionName, streamName);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<ProjectionCheckpoint[]> GetAllCheckpointsAsync(
|
|
string projectionName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(projectionName))
|
|
throw new ArgumentException("Projection name cannot be null or empty", nameof(projectionName));
|
|
|
|
try
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT projection_name, stream_name, last_processed_offset,
|
|
last_updated, events_processed, last_error, last_error_at
|
|
FROM {SchemaQualified("projection_checkpoints")}
|
|
WHERE projection_name = @projection_name
|
|
ORDER BY stream_name;";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@projection_name", projectionName);
|
|
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
var checkpoints = new System.Collections.Generic.List<ProjectionCheckpoint>();
|
|
|
|
while (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
checkpoints.Add(new ProjectionCheckpoint
|
|
{
|
|
ProjectionName = reader.GetString(0),
|
|
StreamName = reader.GetString(1),
|
|
LastProcessedOffset = reader.GetInt64(2),
|
|
LastUpdated = reader.GetFieldValue<DateTimeOffset>(3),
|
|
EventsProcessed = reader.GetInt64(4),
|
|
LastError = reader.IsDBNull(5) ? null : reader.GetString(5),
|
|
LastErrorAt = reader.IsDBNull(6) ? null : reader.GetFieldValue<DateTimeOffset>(6)
|
|
});
|
|
}
|
|
|
|
return checkpoints.ToArray();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Failed to get all checkpoints for projection {ProjectionName}",
|
|
projectionName);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
private string SchemaQualified(string tableName)
|
|
{
|
|
return $"\"{_options.SchemaName}\".\"{tableName}\"";
|
|
}
|
|
}
|