using System;
using Svrnty.CQRS.Events.PostgreSQL.Stores;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using Svrnty.CQRS.Events.Abstractions.Delivery;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions;
using Svrnty.CQRS.Events.Abstractions.Models;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
///
/// PostgreSQL-based implementation of supporting both
/// persistent (event sourcing) and ephemeral (message queue) stream types.
///
///
///
/// Persistent Streams:
/// Events are stored in an append-only log with sequential offsets.
/// Supports event replay, consumer offset tracking, and retention policies.
///
///
/// Ephemeral Streams:
/// Events are stored temporarily with visibility timeout semantics.
/// Events are permanently deleted after acknowledgment.
///
///
/// Concurrency:
/// Uses optimistic concurrency control for persistent streams.
/// Thread-safe for concurrent read and write operations.
///
///
public sealed class PostgresEventStreamStore : IEventStreamStore, IDisposable
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger _logger;
private readonly IEnumerable _deliveryProviders;
private readonly Timer? _cleanupTimer;
private readonly JsonSerializerOptions _jsonOptions;
private string SchemaQualifiedTable(string tableName) =>
$"\"{_options.SchemaName}\".\"{tableName}\"";
public PostgresEventStreamStore(
IOptions options,
IEnumerable deliveryProviders,
ILogger logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_deliveryProviders = deliveryProviders ?? Enumerable.Empty();
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_jsonOptions = new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
WriteIndented = false
};
// Auto-migrate if enabled
if (_options.AutoMigrate)
{
InitializeDatabaseAsync().GetAwaiter().GetResult();
}
// Start cleanup timer for expired in-flight events (every 30 seconds)
_cleanupTimer = new Timer(
async _ => await CleanupExpiredInFlightEventsAsync(),
null,
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(30));
}
// ========================================================================
// DATABASE INITIALIZATION
// ========================================================================
private async Task InitializeDatabaseAsync()
{
try
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync();
// Check if schema exists
var schemaExists = await CheckSchemaExistsAsync(connection);
if (!schemaExists)
{
_logger.LogInformation(
"Schema {SchemaName} does not exist. Creating database schema...",
_options.SchemaName);
// Read and execute migration script
var migrationScript = await System.IO.File.ReadAllTextAsync(
System.IO.Path.Combine(
AppContext.BaseDirectory,
"Migrations",
"001_InitialSchema.sql"));
await using var command = new NpgsqlCommand(migrationScript, connection);
command.CommandTimeout = 120; // Longer timeout for schema creation
await command.ExecuteNonQueryAsync();
_logger.LogInformation("Database schema created successfully");
}
else
{
_logger.LogDebug("Schema {SchemaName} already exists", _options.SchemaName);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to initialize database schema");
throw;
}
}
private async Task CheckSchemaExistsAsync(NpgsqlConnection connection)
{
var sql = "SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = @schemaName)";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("schemaName", _options.SchemaName);
var result = await command.ExecuteScalarAsync();
return result is bool exists && exists;
}
// ========================================================================
// PERSISTENT STREAM OPERATIONS (Event Sourcing)
// ========================================================================
///
public async Task AppendAsync(
string streamName,
ICorrelatedEvent @event,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (@event == null)
throw new ArgumentNullException(nameof(@event));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
// Get next offset for this stream
var offset = await GetNextOffsetAsync(connection, streamName, cancellationToken);
// Serialize event data
var eventData = JsonSerializer.Serialize(@event, @event.GetType(), _jsonOptions);
// Insert event
var sql = $@"
INSERT INTO {SchemaQualifiedTable(_options.EventsTableName)}
(stream_name, offset, event_id, event_type, correlation_id, event_data, occurred_at, stored_at)
VALUES (@streamName, @offset, @eventId, @eventType, @correlationId, @eventData::jsonb, @occurredAt, NOW())";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("offset", offset);
command.Parameters.AddWithValue("eventId", @event.EventId);
command.Parameters.AddWithValue("eventType", @event.GetType().Name);
command.Parameters.AddWithValue("correlationId", @event.CorrelationId ?? string.Empty);
command.Parameters.AddWithValue("eventData", eventData);
command.Parameters.AddWithValue("occurredAt", DateTimeOffset.UtcNow);
try
{
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogDebug(
"Appended event {EventId} to stream {StreamName} at offset {Offset}",
@event.EventId,
streamName,
offset);
// Notify delivery providers
await NotifyDeliveryProvidersAsync(streamName, @event, cancellationToken);
return offset;
}
catch (PostgresException ex) when (ex.SqlState == "23505") // Unique violation
{
_logger.LogWarning(
ex,
"Duplicate event {EventId} detected for stream {StreamName}",
@event.EventId,
streamName);
throw new InvalidOperationException($"Event with ID {@event.EventId} already exists in stream {streamName}", ex);
}
}
///
public async Task> ReadStreamAsync(
string streamName,
long fromOffset,
int maxCount,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (fromOffset < 0)
throw new ArgumentException("Offset cannot be negative.", nameof(fromOffset));
if (maxCount <= 0)
throw new ArgumentException("Max count must be positive.", nameof(maxCount));
// Limit max count to configured batch size
maxCount = Math.Min(maxCount, _options.ReadBatchSize);
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT event_id, event_type, correlation_id, event_data, occurred_at
FROM {SchemaQualifiedTable(_options.EventsTableName)}
WHERE stream_name = @streamName
AND offset >= @fromOffset
ORDER BY offset ASC
LIMIT @maxCount";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("fromOffset", fromOffset);
command.Parameters.AddWithValue("maxCount", maxCount);
var events = new List();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
var eventId = reader.GetString(0);
var eventType = reader.GetString(1);
var correlationId = reader.GetString(2);
var eventDataJson = reader.GetString(3);
var occurredAt = reader.GetFieldValue(4);
// Deserialize to concrete type using stored event type
var type = Type.GetType(eventType);
if (type == null)
{
_logger.LogWarning(
"Could not resolve event type {EventType} for event {EventId} in stream {StreamName}",
eventType,
eventId,
streamName);
continue;
}
var eventObject = JsonSerializer.Deserialize(eventDataJson, type, _jsonOptions) as ICorrelatedEvent;
if (eventObject != null)
{
events.Add(eventObject);
}
}
_logger.LogDebug(
"Read {Count} events from stream {StreamName} starting at offset {FromOffset}",
events.Count,
streamName,
fromOffset);
return events;
}
///
public async Task GetStreamLengthAsync(
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT COUNT(*)
FROM {SchemaQualifiedTable(_options.EventsTableName)}
WHERE stream_name = @streamName";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null ? Convert.ToInt64(result) : 0L;
}
///
public async Task GetStreamMetadataAsync(
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT
stream_name,
length,
oldest_event_offset,
newest_event_offset,
oldest_event_timestamp,
newest_event_timestamp
FROM {SchemaQualifiedTable("stream_metadata")}
WHERE stream_name = @streamName";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (await reader.ReadAsync(cancellationToken))
{
var length = reader.GetInt64(1);
var oldestOffset = reader.GetInt64(2);
var newestOffset = reader.GetInt64(3);
var oldestTimestamp = reader.IsDBNull(4) ? (DateTimeOffset?)null : reader.GetFieldValue(4);
var newestTimestamp = reader.IsDBNull(5) ? (DateTimeOffset?)null : reader.GetFieldValue(5);
return new StreamMetadata
{
StreamName = streamName,
Length = length,
OldestEventOffset = oldestOffset,
OldestEventTimestamp = oldestTimestamp,
NewestEventTimestamp = newestTimestamp,
RetentionPolicy = null,
DeletedEventCount = 0
};
}
// Stream doesn't exist
return new StreamMetadata
{
StreamName = streamName,
Length = 0,
OldestEventOffset = 0,
OldestEventTimestamp = null,
NewestEventTimestamp = null,
RetentionPolicy = null,
DeletedEventCount = 0
};
}
// ========================================================================
// EPHEMERAL STREAM OPERATIONS (Message Queue)
// ========================================================================
///
public async Task EnqueueAsync(
string streamName,
ICorrelatedEvent @event,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (@event == null)
throw new ArgumentNullException(nameof(@event));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var eventData = JsonSerializer.Serialize(@event, @event.GetType(), _jsonOptions);
var sql = $@"
INSERT INTO {SchemaQualifiedTable(_options.QueueEventsTableName)}
(stream_name, event_id, event_type, correlation_id, event_data, occurred_at, enqueued_at)
VALUES (@streamName, @eventId, @eventType, @correlationId, @eventData::jsonb, @occurredAt, NOW())";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("eventId", @event.EventId);
command.Parameters.AddWithValue("eventType", @event.GetType().Name);
command.Parameters.AddWithValue("correlationId", @event.CorrelationId ?? string.Empty);
command.Parameters.AddWithValue("eventData", eventData);
command.Parameters.AddWithValue("occurredAt", DateTimeOffset.UtcNow);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogDebug("Enqueued event {EventId} to stream {StreamName}", @event.EventId, streamName);
// Notify delivery providers
await NotifyDeliveryProvidersAsync(streamName, @event, cancellationToken);
}
///
public async Task EnqueueBatchAsync(
string streamName,
IEnumerable events,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (events == null)
throw new ArgumentNullException(nameof(events));
var eventList = events.Where(e => e != null).ToList();
if (eventList.Count == 0)
return;
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
try
{
foreach (var @event in eventList)
{
var eventData = JsonSerializer.Serialize(@event, @event.GetType(), _jsonOptions);
var sql = $@"
INSERT INTO {SchemaQualifiedTable(_options.QueueEventsTableName)}
(stream_name, event_id, event_type, correlation_id, event_data, occurred_at, enqueued_at)
VALUES (@streamName, @eventId, @eventType, @correlationId, @eventData::jsonb, @occurredAt, NOW())";
await using var command = new NpgsqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("eventId", @event.EventId);
command.Parameters.AddWithValue("eventType", @event.GetType().Name);
command.Parameters.AddWithValue("correlationId", @event.CorrelationId ?? string.Empty);
command.Parameters.AddWithValue("eventData", eventData);
command.Parameters.AddWithValue("occurredAt", DateTimeOffset.UtcNow);
await command.ExecuteNonQueryAsync(cancellationToken);
}
await transaction.CommitAsync(cancellationToken);
_logger.LogDebug("Enqueued {Count} events to stream {StreamName}", eventList.Count, streamName);
// Notify delivery providers
foreach (var @event in eventList)
{
await NotifyDeliveryProvidersAsync(streamName, @event, cancellationToken);
}
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
///
public async Task DequeueAsync(
string streamName,
string consumerId,
TimeSpan visibilityTimeout,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
if (visibilityTimeout <= TimeSpan.Zero)
throw new ArgumentException("Visibility timeout must be positive.", nameof(visibilityTimeout));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try
{
// Find the oldest available event (not in-flight)
var selectSql = $@"
SELECT q.id, q.event_id, q.event_type, q.correlation_id, q.event_data, q.occurred_at, q.delivery_count
FROM {SchemaQualifiedTable(_options.QueueEventsTableName)} q
LEFT JOIN {SchemaQualifiedTable("in_flight_events")} inf ON q.event_id = inf.event_id
WHERE q.stream_name = @streamName
AND inf.event_id IS NULL
ORDER BY q.enqueued_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED";
await using var selectCommand = new NpgsqlCommand(selectSql, connection, transaction);
selectCommand.Parameters.AddWithValue("streamName", streamName);
await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
{
// No events available
await transaction.CommitAsync(cancellationToken);
return null;
}
var queueEventId = reader.GetInt64(0);
var eventId = reader.GetString(1);
var eventType = reader.GetString(2);
var correlationId = reader.GetString(3);
var eventDataJson = reader.GetString(4);
var occurredAt = reader.GetFieldValue(5);
var deliveryCount = reader.GetInt32(6);
await reader.CloseAsync();
// Mark as in-flight
var visibleAfter = DateTimeOffset.UtcNow.Add(visibilityTimeout);
var insertInFlightSql = $@"
INSERT INTO {SchemaQualifiedTable("in_flight_events")}
(event_id, stream_name, consumer_id, visible_after, delivery_count, queue_event_id)
VALUES (@eventId, @streamName, @consumerId, @visibleAfter, @deliveryCount, @queueEventId)";
await using var insertCommand = new NpgsqlCommand(insertInFlightSql, connection, transaction);
insertCommand.Parameters.AddWithValue("eventId", eventId);
insertCommand.Parameters.AddWithValue("streamName", streamName);
insertCommand.Parameters.AddWithValue("consumerId", consumerId);
insertCommand.Parameters.AddWithValue("visibleAfter", visibleAfter);
insertCommand.Parameters.AddWithValue("deliveryCount", deliveryCount + 1);
insertCommand.Parameters.AddWithValue("queueEventId", queueEventId);
await insertCommand.ExecuteNonQueryAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
// Deserialize to concrete type using stored event type
var type = Type.GetType(eventType);
if (type == null)
{
_logger.LogWarning(
"Could not resolve event type {EventType} for event {EventId} in stream {StreamName}",
eventType,
eventId,
streamName);
return null;
}
var eventObject = JsonSerializer.Deserialize(eventDataJson, type, _jsonOptions) as ICorrelatedEvent;
if (eventObject != null)
{
_logger.LogDebug(
"Dequeued event {EventId} from stream {StreamName} for consumer {ConsumerId}",
eventId,
streamName,
consumerId);
return eventObject;
}
return null;
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
///
public async Task AcknowledgeAsync(
string streamName,
string eventId,
string consumerId,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (string.IsNullOrWhiteSpace(eventId))
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
try
{
// Get queue event ID from in-flight
var selectSql = $@"
SELECT queue_event_id
FROM {SchemaQualifiedTable("in_flight_events")}
WHERE event_id = @eventId AND consumer_id = @consumerId";
await using var selectCommand = new NpgsqlCommand(selectSql, connection, transaction);
selectCommand.Parameters.AddWithValue("eventId", eventId);
selectCommand.Parameters.AddWithValue("consumerId", consumerId);
var queueEventId = await selectCommand.ExecuteScalarAsync(cancellationToken);
if (queueEventId == null)
{
await transaction.CommitAsync(cancellationToken);
return false; // Event not found or wrong consumer
}
// Delete from in-flight
var deleteInFlightSql = $@"
DELETE FROM {SchemaQualifiedTable("in_flight_events")}
WHERE event_id = @eventId AND consumer_id = @consumerId";
await using var deleteInFlightCommand = new NpgsqlCommand(deleteInFlightSql, connection, transaction);
deleteInFlightCommand.Parameters.AddWithValue("eventId", eventId);
deleteInFlightCommand.Parameters.AddWithValue("consumerId", consumerId);
await deleteInFlightCommand.ExecuteNonQueryAsync(cancellationToken);
// Delete from queue (permanent deletion for ephemeral streams)
var deleteQueueSql = $@"
DELETE FROM {SchemaQualifiedTable(_options.QueueEventsTableName)}
WHERE id = @queueEventId";
await using var deleteQueueCommand = new NpgsqlCommand(deleteQueueSql, connection, transaction);
deleteQueueCommand.Parameters.AddWithValue("queueEventId", queueEventId);
await deleteQueueCommand.ExecuteNonQueryAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
_logger.LogDebug("Acknowledged event {EventId} for consumer {ConsumerId}", eventId, consumerId);
return true;
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
///
public async Task NackAsync(
string streamName,
string eventId,
string consumerId,
bool requeue = true,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
if (string.IsNullOrWhiteSpace(eventId))
throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId));
if (string.IsNullOrWhiteSpace(consumerId))
throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var transaction = await connection.BeginTransactionAsync(cancellationToken);
try
{
// Get event details from in-flight
var selectSql = $@"
SELECT queue_event_id, delivery_count
FROM {SchemaQualifiedTable("in_flight_events")}
WHERE event_id = @eventId AND consumer_id = @consumerId";
await using var selectCommand = new NpgsqlCommand(selectSql, connection, transaction);
selectCommand.Parameters.AddWithValue("eventId", eventId);
selectCommand.Parameters.AddWithValue("consumerId", consumerId);
await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
{
await transaction.CommitAsync(cancellationToken);
return false;
}
var queueEventId = reader.GetInt64(0);
var deliveryCount = reader.GetInt32(1);
await reader.CloseAsync();
// Delete from in-flight
var deleteInFlightSql = $@"
DELETE FROM {SchemaQualifiedTable("in_flight_events")}
WHERE event_id = @eventId AND consumer_id = @consumerId";
await using var deleteInFlightCommand = new NpgsqlCommand(deleteInFlightSql, connection, transaction);
deleteInFlightCommand.Parameters.AddWithValue("eventId", eventId);
deleteInFlightCommand.Parameters.AddWithValue("consumerId", consumerId);
await deleteInFlightCommand.ExecuteNonQueryAsync(cancellationToken);
if (!requeue)
{
// Move to dead letter queue
var moveToDlqSql = $@"
INSERT INTO {SchemaQualifiedTable("dead_letter_queue")}
(stream_name, event_id, event_type, correlation_id, event_data,
original_enqueued_at, delivery_attempts, last_consumer_id, occurred_at)
SELECT stream_name, event_id, event_type, correlation_id, event_data,
enqueued_at, @deliveryAttempts, @consumerId, occurred_at
FROM {SchemaQualifiedTable(_options.QueueEventsTableName)}
WHERE id = @queueEventId";
await using var moveToDlqCommand = new NpgsqlCommand(moveToDlqSql, connection, transaction);
moveToDlqCommand.Parameters.AddWithValue("deliveryAttempts", deliveryCount);
moveToDlqCommand.Parameters.AddWithValue("consumerId", consumerId);
moveToDlqCommand.Parameters.AddWithValue("queueEventId", queueEventId);
await moveToDlqCommand.ExecuteNonQueryAsync(cancellationToken);
// Delete from queue
var deleteQueueSql = $@"
DELETE FROM {SchemaQualifiedTable(_options.QueueEventsTableName)}
WHERE id = @queueEventId";
await using var deleteQueueCommand = new NpgsqlCommand(deleteQueueSql, connection, transaction);
deleteQueueCommand.Parameters.AddWithValue("queueEventId", queueEventId);
await deleteQueueCommand.ExecuteNonQueryAsync(cancellationToken);
_logger.LogWarning(
"Moved event {EventId} to dead letter queue after {DeliveryAttempts} attempts",
eventId,
deliveryCount);
}
// If requeue=true, event is automatically available again (we just removed from in_flight)
await transaction.CommitAsync(cancellationToken);
_logger.LogDebug(
"NACKed event {EventId} for consumer {ConsumerId}, requeue={Requeue}",
eventId,
consumerId,
requeue);
return true;
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
///
public async Task GetPendingCountAsync(
string streamName,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(streamName))
throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName));
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT COUNT(*)
FROM {SchemaQualifiedTable(_options.QueueEventsTableName)} q
LEFT JOIN {SchemaQualifiedTable("in_flight_events")} inf ON q.event_id = inf.event_id
WHERE q.stream_name = @streamName
AND inf.event_id IS NULL";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null ? Convert.ToInt32(result) : 0;
}
// ========================================================================
// HELPER METHODS
// ========================================================================
private async Task GetNextOffsetAsync(
NpgsqlConnection connection,
string streamName,
CancellationToken cancellationToken)
{
var sql = $"SELECT {SchemaQualifiedTable("get_next_offset")}(@streamName)";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null ? Convert.ToInt64(result) : 0L;
}
private async Task NotifyDeliveryProvidersAsync(
string streamName,
ICorrelatedEvent @event,
CancellationToken cancellationToken)
{
foreach (var provider in _deliveryProviders)
{
try
{
await provider.NotifyEventAvailableAsync(streamName, @event, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Delivery provider {ProviderName} failed to process event notification for stream {StreamName}, event {EventId}",
provider.ProviderName,
streamName,
@event.EventId);
}
}
}
private async Task CleanupExpiredInFlightEventsAsync()
{
try
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var sql = $"SELECT {SchemaQualifiedTable("cleanup_expired_in_flight")}()";
await using var command = new NpgsqlCommand(sql, connection);
var result = await command.ExecuteScalarAsync();
if (result is int requeuedCount && requeuedCount > 0)
{
_logger.LogInformation("Requeued {Count} expired in-flight events", requeuedCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to cleanup expired in-flight events");
}
}
// ========================================================================
// CONSUMER OFFSET TRACKING - Phase 6 (Monitoring & Health Checks)
// ========================================================================
public async Task GetConsumerOffsetAsync(
string streamName,
string consumerId,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT offset
FROM {SchemaQualifiedTable("consumer_offsets")}
WHERE stream_name = @streamName AND consumer_id = @consumerId";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("consumerId", consumerId);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null ? Convert.ToInt64(result) : 0L;
}
public async Task GetConsumerLastUpdateTimeAsync(
string streamName,
string consumerId,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
SELECT last_updated
FROM {SchemaQualifiedTable("consumer_offsets")}
WHERE stream_name = @streamName AND consumer_id = @consumerId";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("consumerId", consumerId);
var result = await command.ExecuteScalarAsync(cancellationToken);
return result != null && result != DBNull.Value
? (DateTimeOffset)result
: DateTimeOffset.MinValue;
}
public async Task UpdateConsumerOffsetAsync(
string streamName,
string consumerId,
long newOffset,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
var sql = $@"
INSERT INTO {SchemaQualifiedTable("consumer_offsets")}
(stream_name, consumer_id, offset, last_updated)
VALUES (@streamName, @consumerId, @offset, @lastUpdated)
ON CONFLICT (stream_name, consumer_id)
DO UPDATE SET
offset = @offset,
last_updated = @lastUpdated";
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("streamName", streamName);
command.Parameters.AddWithValue("consumerId", consumerId);
command.Parameters.AddWithValue("offset", newOffset);
command.Parameters.AddWithValue("lastUpdated", DateTimeOffset.UtcNow);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogInformation(
"Consumer offset updated: Stream={StreamName}, Consumer={ConsumerId}, NewOffset={NewOffset}",
streamName, consumerId, newOffset);
}
public void Dispose()
{
_cleanupTimer?.Dispose();
}
}