dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/Stores/PostgresReadReceiptStore.cs

248 lines
9.4 KiB
C#

using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Storage;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
/// <summary>
/// PostgreSQL-based implementation of <see cref="IReadReceiptStore"/>.
/// </summary>
/// <remarks>
/// <para>
/// <strong>Persistence:</strong>
/// Stores read receipts in PostgreSQL for durability across restarts.
/// </para>
/// <para>
/// <strong>Distributed Support:</strong>
/// Safe for multiple application instances tracking the same stream.
/// </para>
/// </remarks>
public sealed class PostgresReadReceiptStore : IReadReceiptStore
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger<PostgresReadReceiptStore> _logger;
private string SchemaQualifiedTable(string tableName) =>
$"\"{_options.SchemaName}\".\"{tableName}\"";
public PostgresReadReceiptStore(
IOptions<PostgresEventStreamStoreOptions> options,
ILogger<PostgresReadReceiptStore> logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task AcknowledgeEventAsync(
string consumerId,
string streamName,
string eventId,
long offset,
DateTimeOffset acknowledgedAt,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (string.IsNullOrWhiteSpace(eventId))
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
// Use INSERT ... ON CONFLICT to handle both insert and update
var sql = $@"
INSERT INTO {SchemaQualifiedTable("read_receipts")}
(consumer_id, stream_name, last_event_id, last_offset, last_acknowledged_at, first_acknowledged_at, total_acknowledged)
VALUES
(@ConsumerId, @StreamName, @EventId, @Offset, @AcknowledgedAt, @AcknowledgedAt, 1)
ON CONFLICT (consumer_id, stream_name) DO UPDATE
SET
last_event_id = CASE
WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset
THEN @EventId
ELSE {SchemaQualifiedTable("read_receipts")}.last_event_id
END,
last_offset = CASE
WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset
THEN @Offset
ELSE {SchemaQualifiedTable("read_receipts")}.last_offset
END,
last_acknowledged_at = CASE
WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset
THEN @AcknowledgedAt
ELSE {SchemaQualifiedTable("read_receipts")}.last_acknowledged_at
END,
total_acknowledged = {SchemaQualifiedTable("read_receipts")}.total_acknowledged + 1";
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = _options.CommandTimeout
};
command.Parameters.AddWithValue("@ConsumerId", consumerId);
command.Parameters.AddWithValue("@StreamName", streamName);
command.Parameters.AddWithValue("@EventId", eventId);
command.Parameters.AddWithValue("@Offset", offset);
command.Parameters.AddWithValue("@AcknowledgedAt", acknowledgedAt);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogDebug(
"Acknowledged event {EventId} at offset {Offset} for consumer {ConsumerId} on stream {StreamName}",
eventId,
offset,
consumerId,
streamName);
}
/// <inheritdoc />
public async Task<long?> GetLastAcknowledgedOffsetAsync(
string consumerId,
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT last_offset
FROM {SchemaQualifiedTable("read_receipts")}
WHERE consumer_id = @ConsumerId
AND stream_name = @StreamName";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@ConsumerId", consumerId);
command.Parameters.AddWithValue("@StreamName", streamName);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result as long?;
}
/// <inheritdoc />
public async Task<ConsumerProgress?> GetConsumerProgressAsync(
string consumerId,
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT
consumer_id,
stream_name,
last_offset,
last_acknowledged_at,
total_acknowledged,
first_acknowledged_at
FROM {SchemaQualifiedTable("read_receipts")}
WHERE consumer_id = @ConsumerId
AND stream_name = @StreamName";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@ConsumerId", consumerId);
command.Parameters.AddWithValue("@StreamName", streamName);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
return new ConsumerProgress
{
ConsumerId = reader.GetString(0),
StreamName = reader.GetString(1),
LastOffset = reader.GetInt64(2),
LastAcknowledgedAt = reader.GetFieldValue<DateTimeOffset>(3),
TotalAcknowledged = reader.GetInt64(4),
FirstAcknowledgedAt = reader.IsDBNull(5) ? null : reader.GetFieldValue<DateTimeOffset>(5)
};
}
return null;
}
/// <inheritdoc />
public async Task<IReadOnlyList<string>> GetConsumersForStreamAsync(
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT DISTINCT consumer_id
FROM {SchemaQualifiedTable("read_receipts")}
WHERE stream_name = @StreamName
ORDER BY consumer_id";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@StreamName", streamName);
var consumers = new List<string>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
consumers.Add(reader.GetString(0));
}
return consumers;
}
/// <inheritdoc />
public async Task<int> CleanupAsync(
DateTimeOffset olderThan,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
DELETE FROM {SchemaQualifiedTable("read_receipts")}
WHERE last_acknowledged_at < @OlderThan";
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = _options.CommandTimeout
};
command.Parameters.AddWithValue("@OlderThan", olderThan);
var deletedCount = await command.ExecuteNonQueryAsync(cancellationToken);
if (deletedCount > 0)
{
_logger.LogInformation(
"Cleaned up {DeletedCount} read receipt records older than {OlderThan}",
deletedCount,
olderThan);
}
return deletedCount;
}
}