using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Storage;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
///
/// PostgreSQL-based implementation of for exactly-once delivery.
///
///
///
/// Persistence:
/// Stores processed events and idempotency locks in PostgreSQL for durability across restarts.
///
///
/// Distributed Locking:
/// Supports distributed locking across multiple application instances using database row-level locks.
///
///
public sealed class PostgresIdempotencyStore : IIdempotencyStore
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger _logger;
private string SchemaQualifiedTable(string tableName) =>
$"\"{_options.SchemaName}\".\"{tableName}\"";
public PostgresIdempotencyStore(
IOptions options,
ILogger logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
public async Task WasProcessedAsync(
string consumerId,
string eventId,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
if (string.IsNullOrWhiteSpace(eventId))
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT EXISTS (
SELECT 1
FROM {SchemaQualifiedTable("processed_events")}
WHERE consumer_id = @ConsumerId
AND event_id = @EventId
)";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@ConsumerId", consumerId);
command.Parameters.AddWithValue("@EventId", eventId);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result is bool exists && exists;
}
///
public async Task MarkProcessedAsync(
string consumerId,
string eventId,
DateTimeOffset processedAt,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
if (string.IsNullOrWhiteSpace(eventId))
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
INSERT INTO {SchemaQualifiedTable("processed_events")}
(consumer_id, event_id, processed_at)
VALUES
(@ConsumerId, @EventId, @ProcessedAt)
ON CONFLICT (consumer_id, event_id) DO NOTHING";
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = _options.CommandTimeout
};
command.Parameters.AddWithValue("@ConsumerId", consumerId);
command.Parameters.AddWithValue("@EventId", eventId);
command.Parameters.AddWithValue("@ProcessedAt", processedAt);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogDebug(
"Marked event {EventId} as processed by consumer {ConsumerId} at {ProcessedAt}",
eventId,
consumerId,
processedAt);
}
///
public async Task TryAcquireIdempotencyLockAsync(
string idempotencyKey,
TimeSpan lockDuration,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(idempotencyKey))
throw new ArgumentException("Idempotency key cannot be null or whitespace.", nameof(idempotencyKey));
if (lockDuration <= TimeSpan.Zero)
throw new ArgumentException("Lock duration must be positive.", nameof(lockDuration));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var expiresAt = DateTimeOffset.UtcNow.Add(lockDuration);
// Try to insert or update the lock
var sql = $@"
INSERT INTO {SchemaQualifiedTable("idempotency_locks")}
(lock_key, acquired_at, expires_at)
VALUES
(@LockKey, NOW(), @ExpiresAt)
ON CONFLICT (lock_key) DO UPDATE
SET
acquired_at = NOW(),
expires_at = @ExpiresAt
WHERE {SchemaQualifiedTable("idempotency_locks")}.expires_at <= NOW()
RETURNING lock_key";
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = _options.CommandTimeout
};
command.Parameters.AddWithValue("@LockKey", idempotencyKey);
command.Parameters.AddWithValue("@ExpiresAt", expiresAt);
var result = await command.ExecuteScalarAsync(cancellationToken);
var lockAcquired = result != null;
if (lockAcquired)
{
_logger.LogDebug(
"Acquired idempotency lock for key {IdempotencyKey}, expires at {ExpiresAt}",
idempotencyKey,
expiresAt);
}
else
{
_logger.LogDebug(
"Failed to acquire idempotency lock for key {IdempotencyKey} (lock already held)",
idempotencyKey);
}
return lockAcquired;
}
///
public async Task ReleaseIdempotencyLockAsync(
string idempotencyKey,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(idempotencyKey))
throw new ArgumentException("Idempotency key cannot be null or whitespace.", nameof(idempotencyKey));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
DELETE FROM {SchemaQualifiedTable("idempotency_locks")}
WHERE lock_key = @LockKey";
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = _options.CommandTimeout
};
command.Parameters.AddWithValue("@LockKey", idempotencyKey);
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
if (rowsAffected > 0)
{
_logger.LogDebug("Released idempotency lock for key {IdempotencyKey}", idempotencyKey);
}
}
///
public async Task CleanupAsync(
DateTimeOffset olderThan,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
// Clean up old processed events
var sql = $@"
DELETE FROM {SchemaQualifiedTable("processed_events")}
WHERE processed_at < @OlderThan";
await using var command = new NpgsqlCommand(sql, connection)
{
CommandTimeout = _options.CommandTimeout
};
command.Parameters.AddWithValue("@OlderThan", olderThan);
var deletedCount = await command.ExecuteNonQueryAsync(cancellationToken);
if (deletedCount > 0)
{
_logger.LogInformation(
"Cleaned up {DeletedCount} processed event records older than {OlderThan}",
deletedCount,
olderThan);
}
// Also clean up expired locks
var expiredLocksSql = $@"
DELETE FROM {SchemaQualifiedTable("idempotency_locks")}
WHERE expires_at <= NOW()";
await using var expiredLocksCommand = new NpgsqlCommand(expiredLocksSql, connection)
{
CommandTimeout = _options.CommandTimeout
};
var expiredLocksDeleted = await expiredLocksCommand.ExecuteNonQueryAsync(cancellationToken);
if (expiredLocksDeleted > 0)
{
_logger.LogDebug("Cleaned up {ExpiredLocksCount} expired idempotency locks", expiredLocksDeleted);
}
return deletedCount;
}
}