249 lines
8.8 KiB
C#
249 lines
8.8 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.Storage;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Npgsql;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
|
|
|
|
/// <summary>
|
|
/// PostgreSQL-based implementation of <see cref="IIdempotencyStore"/> for exactly-once delivery.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// <strong>Persistence:</strong>
|
|
/// Stores processed events and idempotency locks in PostgreSQL for durability across restarts.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Distributed Locking:</strong>
|
|
/// Supports distributed locking across multiple application instances using database row-level locks.
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class PostgresIdempotencyStore : IIdempotencyStore
|
|
{
|
|
private readonly PostgresEventStreamStoreOptions _options;
|
|
private readonly ILogger<PostgresIdempotencyStore> _logger;
|
|
|
|
private string SchemaQualifiedTable(string tableName) =>
|
|
$"\"{_options.SchemaName}\".\"{tableName}\"";
|
|
|
|
public PostgresIdempotencyStore(
|
|
IOptions<PostgresEventStreamStoreOptions> options,
|
|
ILogger<PostgresIdempotencyStore> logger)
|
|
{
|
|
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<bool> WasProcessedAsync(
|
|
string consumerId,
|
|
string eventId,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(consumerId))
|
|
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
|
|
if (string.IsNullOrWhiteSpace(eventId))
|
|
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM {SchemaQualifiedTable("processed_events")}
|
|
WHERE consumer_id = @ConsumerId
|
|
AND event_id = @EventId
|
|
)";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@ConsumerId", consumerId);
|
|
command.Parameters.AddWithValue("@EventId", eventId);
|
|
|
|
var result = await command.ExecuteScalarAsync(cancellationToken);
|
|
return result is bool exists && exists;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task MarkProcessedAsync(
|
|
string consumerId,
|
|
string eventId,
|
|
DateTimeOffset processedAt,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(consumerId))
|
|
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
|
|
if (string.IsNullOrWhiteSpace(eventId))
|
|
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
INSERT INTO {SchemaQualifiedTable("processed_events")}
|
|
(consumer_id, event_id, processed_at)
|
|
VALUES
|
|
(@ConsumerId, @EventId, @ProcessedAt)
|
|
ON CONFLICT (consumer_id, event_id) DO NOTHING";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection)
|
|
{
|
|
CommandTimeout = _options.CommandTimeout
|
|
};
|
|
|
|
command.Parameters.AddWithValue("@ConsumerId", consumerId);
|
|
command.Parameters.AddWithValue("@EventId", eventId);
|
|
command.Parameters.AddWithValue("@ProcessedAt", processedAt);
|
|
|
|
await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
_logger.LogDebug(
|
|
"Marked event {EventId} as processed by consumer {ConsumerId} at {ProcessedAt}",
|
|
eventId,
|
|
consumerId,
|
|
processedAt);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<bool> TryAcquireIdempotencyLockAsync(
|
|
string idempotencyKey,
|
|
TimeSpan lockDuration,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(idempotencyKey))
|
|
throw new ArgumentException("Idempotency key cannot be null or whitespace.", nameof(idempotencyKey));
|
|
if (lockDuration <= TimeSpan.Zero)
|
|
throw new ArgumentException("Lock duration must be positive.", nameof(lockDuration));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var expiresAt = DateTimeOffset.UtcNow.Add(lockDuration);
|
|
|
|
// Try to insert or update the lock
|
|
var sql = $@"
|
|
INSERT INTO {SchemaQualifiedTable("idempotency_locks")}
|
|
(lock_key, acquired_at, expires_at)
|
|
VALUES
|
|
(@LockKey, NOW(), @ExpiresAt)
|
|
ON CONFLICT (lock_key) DO UPDATE
|
|
SET
|
|
acquired_at = NOW(),
|
|
expires_at = @ExpiresAt
|
|
WHERE {SchemaQualifiedTable("idempotency_locks")}.expires_at <= NOW()
|
|
RETURNING lock_key";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection)
|
|
{
|
|
CommandTimeout = _options.CommandTimeout
|
|
};
|
|
|
|
command.Parameters.AddWithValue("@LockKey", idempotencyKey);
|
|
command.Parameters.AddWithValue("@ExpiresAt", expiresAt);
|
|
|
|
var result = await command.ExecuteScalarAsync(cancellationToken);
|
|
var lockAcquired = result != null;
|
|
|
|
if (lockAcquired)
|
|
{
|
|
_logger.LogDebug(
|
|
"Acquired idempotency lock for key {IdempotencyKey}, expires at {ExpiresAt}",
|
|
idempotencyKey,
|
|
expiresAt);
|
|
}
|
|
else
|
|
{
|
|
_logger.LogDebug(
|
|
"Failed to acquire idempotency lock for key {IdempotencyKey} (lock already held)",
|
|
idempotencyKey);
|
|
}
|
|
|
|
return lockAcquired;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task ReleaseIdempotencyLockAsync(
|
|
string idempotencyKey,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(idempotencyKey))
|
|
throw new ArgumentException("Idempotency key cannot be null or whitespace.", nameof(idempotencyKey));
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
var sql = $@"
|
|
DELETE FROM {SchemaQualifiedTable("idempotency_locks")}
|
|
WHERE lock_key = @LockKey";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection)
|
|
{
|
|
CommandTimeout = _options.CommandTimeout
|
|
};
|
|
|
|
command.Parameters.AddWithValue("@LockKey", idempotencyKey);
|
|
|
|
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
if (rowsAffected > 0)
|
|
{
|
|
_logger.LogDebug("Released idempotency lock for key {IdempotencyKey}", idempotencyKey);
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public async Task<int> CleanupAsync(
|
|
DateTimeOffset olderThan,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
// Clean up old processed events
|
|
var sql = $@"
|
|
DELETE FROM {SchemaQualifiedTable("processed_events")}
|
|
WHERE processed_at < @OlderThan";
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection)
|
|
{
|
|
CommandTimeout = _options.CommandTimeout
|
|
};
|
|
|
|
command.Parameters.AddWithValue("@OlderThan", olderThan);
|
|
|
|
var deletedCount = await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
if (deletedCount > 0)
|
|
{
|
|
_logger.LogInformation(
|
|
"Cleaned up {DeletedCount} processed event records older than {OlderThan}",
|
|
deletedCount,
|
|
olderThan);
|
|
}
|
|
|
|
// Also clean up expired locks
|
|
var expiredLocksSql = $@"
|
|
DELETE FROM {SchemaQualifiedTable("idempotency_locks")}
|
|
WHERE expires_at <= NOW()";
|
|
|
|
await using var expiredLocksCommand = new NpgsqlCommand(expiredLocksSql, connection)
|
|
{
|
|
CommandTimeout = _options.CommandTimeout
|
|
};
|
|
|
|
var expiredLocksDeleted = await expiredLocksCommand.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
if (expiredLocksDeleted > 0)
|
|
{
|
|
_logger.LogDebug("Cleaned up {ExpiredLocksCount} expired idempotency locks", expiredLocksDeleted);
|
|
}
|
|
|
|
return deletedCount;
|
|
}
|
|
}
|