248 lines
9.4 KiB
C#
248 lines
9.4 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.Storage;
|
|
using System.Collections.Generic;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Npgsql;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
|
|
|
|
/// <summary>
|
|
/// PostgreSQL-based implementation of <see cref="IReadReceiptStore"/>.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// <strong>Persistence:</strong>
|
|
/// Stores read receipts in PostgreSQL for durability across restarts.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Distributed Support:</strong>
|
|
/// Safe for multiple application instances tracking the same stream.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class PostgresReadReceiptStore : IReadReceiptStore
|
|
{
|
|
private readonly PostgresEventStreamStoreOptions _options;
|
|
private readonly ILogger<PostgresReadReceiptStore> _logger;
|
|
|
|
private string SchemaQualifiedTable(string tableName) =>
|
|
$"\"{_options.SchemaName}\".\"{tableName}\"";
|
|
|
|
public PostgresReadReceiptStore(
|
|
IOptions<PostgresEventStreamStoreOptions> options,
|
|
ILogger<PostgresReadReceiptStore> logger)
|
|
{
|
|
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task AcknowledgeEventAsync(
|
|
string consumerId,
|
|
string streamName,
|
|
string eventId,
|
|
long offset,
|
|
DateTimeOffset acknowledgedAt,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(consumerId))
|
|
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
|
|
if (string.IsNullOrWhiteSpace(eventId))
|
|
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
// Use INSERT ... ON CONFLICT to handle both insert and update
|
|
var sql = $@"
|
|
INSERT INTO {SchemaQualifiedTable("read_receipts")}
|
|
(consumer_id, stream_name, last_event_id, last_offset, last_acknowledged_at, first_acknowledged_at, total_acknowledged)
|
|
VALUES
|
|
(@ConsumerId, @StreamName, @EventId, @Offset, @AcknowledgedAt, @AcknowledgedAt, 1)
|
|
ON CONFLICT (consumer_id, stream_name) DO UPDATE
|
|
SET
|
|
last_event_id = CASE
|
|
WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset
|
|
THEN @EventId
|
|
ELSE {SchemaQualifiedTable("read_receipts")}.last_event_id
|
|
END,
|
|
last_offset = CASE
|
|
WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset
|
|
THEN @Offset
|
|
ELSE {SchemaQualifiedTable("read_receipts")}.last_offset
|
|
END,
|
|
last_acknowledged_at = CASE
|
|
WHEN @Offset > {SchemaQualifiedTable("read_receipts")}.last_offset
|
|
THEN @AcknowledgedAt
|
|
ELSE {SchemaQualifiedTable("read_receipts")}.last_acknowledged_at
|
|
END,
|
|
total_acknowledged = {SchemaQualifiedTable("read_receipts")}.total_acknowledged + 1";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection)
|
|
{
|
|
CommandTimeout = _options.CommandTimeout
|
|
};
|
|
|
|
command.Parameters.AddWithValue("@ConsumerId", consumerId);
|
|
command.Parameters.AddWithValue("@StreamName", streamName);
|
|
command.Parameters.AddWithValue("@EventId", eventId);
|
|
command.Parameters.AddWithValue("@Offset", offset);
|
|
command.Parameters.AddWithValue("@AcknowledgedAt", acknowledgedAt);
|
|
|
|
await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
_logger.LogDebug(
|
|
"Acknowledged event {EventId} at offset {Offset} for consumer {ConsumerId} on stream {StreamName}",
|
|
eventId,
|
|
offset,
|
|
consumerId,
|
|
streamName);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<long?> GetLastAcknowledgedOffsetAsync(
|
|
string consumerId,
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(consumerId))
|
|
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT last_offset
|
|
FROM {SchemaQualifiedTable("read_receipts")}
|
|
WHERE consumer_id = @ConsumerId
|
|
AND stream_name = @StreamName";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@ConsumerId", consumerId);
|
|
command.Parameters.AddWithValue("@StreamName", streamName);
|
|
|
|
var result = await command.ExecuteScalarAsync(cancellationToken);
|
|
|
|
return result as long?;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<ConsumerProgress?> GetConsumerProgressAsync(
|
|
string consumerId,
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(consumerId))
|
|
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT
|
|
consumer_id,
|
|
stream_name,
|
|
last_offset,
|
|
last_acknowledged_at,
|
|
total_acknowledged,
|
|
first_acknowledged_at
|
|
FROM {SchemaQualifiedTable("read_receipts")}
|
|
WHERE consumer_id = @ConsumerId
|
|
AND stream_name = @StreamName";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@ConsumerId", consumerId);
|
|
command.Parameters.AddWithValue("@StreamName", streamName);
|
|
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
if (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
return new ConsumerProgress
|
|
{
|
|
ConsumerId = reader.GetString(0),
|
|
StreamName = reader.GetString(1),
|
|
LastOffset = reader.GetInt64(2),
|
|
LastAcknowledgedAt = reader.GetFieldValue<DateTimeOffset>(3),
|
|
TotalAcknowledged = reader.GetInt64(4),
|
|
FirstAcknowledgedAt = reader.IsDBNull(5) ? null : reader.GetFieldValue<DateTimeOffset>(5)
|
|
};
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<IReadOnlyList<string>> GetConsumersForStreamAsync(
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT DISTINCT consumer_id
|
|
FROM {SchemaQualifiedTable("read_receipts")}
|
|
WHERE stream_name = @StreamName
|
|
ORDER BY consumer_id";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@StreamName", streamName);
|
|
|
|
var consumers = new List<string>();
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
while (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
consumers.Add(reader.GetString(0));
|
|
}
|
|
|
|
return consumers;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<int> CleanupAsync(
|
|
DateTimeOffset olderThan,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
DELETE FROM {SchemaQualifiedTable("read_receipts")}
|
|
WHERE last_acknowledged_at < @OlderThan";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection)
|
|
{
|
|
CommandTimeout = _options.CommandTimeout
|
|
};
|
|
|
|
command.Parameters.AddWithValue("@OlderThan", olderThan);
|
|
|
|
var deletedCount = await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
if (deletedCount > 0)
|
|
{
|
|
_logger.LogInformation(
|
|
"Cleaned up {DeletedCount} read receipt records older than {OlderThan}",
|
|
deletedCount,
|
|
olderThan);
|
|
}
|
|
|
|
return deletedCount;
|
|
}
|
|
}
|