using System; using Svrnty.CQRS.Events.PostgreSQL.Stores; using Svrnty.CQRS.Events.PostgreSQL.Configuration; using Svrnty.CQRS.Events.Abstractions.Delivery; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Collections.Generic; using System.Data; using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; using Svrnty.CQRS.Events.Abstractions; using Svrnty.CQRS.Events.Abstractions.Models; namespace Svrnty.CQRS.Events.PostgreSQL.Stores; /// /// PostgreSQL-based implementation of supporting both /// persistent (event sourcing) and ephemeral (message queue) stream types. /// /// /// /// Persistent Streams: /// Events are stored in an append-only log with sequential offsets. /// Supports event replay, consumer offset tracking, and retention policies. /// /// /// Ephemeral Streams: /// Events are stored temporarily with visibility timeout semantics. /// Events are permanently deleted after acknowledgment. /// /// /// Concurrency: /// Uses optimistic concurrency control for persistent streams. /// Thread-safe for concurrent read and write operations. /// /// public sealed class PostgresEventStreamStore : IEventStreamStore, IDisposable { private readonly PostgresEventStreamStoreOptions _options; private readonly ILogger _logger; private readonly IEnumerable _deliveryProviders; private readonly Timer? _cleanupTimer; private readonly JsonSerializerOptions _jsonOptions; private string SchemaQualifiedTable(string tableName) => $"\"{_options.SchemaName}\".\"{tableName}\""; public PostgresEventStreamStore( IOptions options, IEnumerable deliveryProviders, ILogger logger) { _options = options?.Value ?? throw new ArgumentNullException(nameof(options)); _deliveryProviders = deliveryProviders ?? Enumerable.Empty(); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true, WriteIndented = false }; // Auto-migrate if enabled if (_options.AutoMigrate) { InitializeDatabaseAsync().GetAwaiter().GetResult(); } // Start cleanup timer for expired in-flight events (every 30 seconds) _cleanupTimer = new Timer( async _ => await CleanupExpiredInFlightEventsAsync(), null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30)); } // ======================================================================== // DATABASE INITIALIZATION // ======================================================================== private async Task InitializeDatabaseAsync() { try { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(); // Check if schema exists var schemaExists = await CheckSchemaExistsAsync(connection); if (!schemaExists) { _logger.LogInformation( "Schema {SchemaName} does not exist. Creating database schema...", _options.SchemaName); // Read and execute migration script var migrationScript = await System.IO.File.ReadAllTextAsync( System.IO.Path.Combine( AppContext.BaseDirectory, "Migrations", "001_InitialSchema.sql")); await using var command = new NpgsqlCommand(migrationScript, connection); command.CommandTimeout = 120; // Longer timeout for schema creation await command.ExecuteNonQueryAsync(); _logger.LogInformation("Database schema created successfully"); } else { _logger.LogDebug("Schema {SchemaName} already exists", _options.SchemaName); } } catch (Exception ex) { _logger.LogError(ex, "Failed to initialize database schema"); throw; } } private async Task CheckSchemaExistsAsync(NpgsqlConnection connection) { var sql = "SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = @schemaName)"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("schemaName", _options.SchemaName); var result = await command.ExecuteScalarAsync(); return result is bool exists && exists; } // ======================================================================== // PERSISTENT STREAM OPERATIONS (Event Sourcing) // ======================================================================== /// public async Task AppendAsync( string streamName, ICorrelatedEvent @event, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (@event == null) throw new ArgumentNullException(nameof(@event)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); // Get next offset for this stream var offset = await GetNextOffsetAsync(connection, streamName, cancellationToken); // Serialize event data var eventData = JsonSerializer.Serialize(@event, @event.GetType(), _jsonOptions); // Insert event var sql = $@" INSERT INTO {SchemaQualifiedTable(_options.EventsTableName)} (stream_name, offset, event_id, event_type, correlation_id, event_data, occurred_at, stored_at) VALUES (@streamName, @offset, @eventId, @eventType, @correlationId, @eventData::jsonb, @occurredAt, NOW())"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("offset", offset); command.Parameters.AddWithValue("eventId", @event.EventId); command.Parameters.AddWithValue("eventType", @event.GetType().Name); command.Parameters.AddWithValue("correlationId", @event.CorrelationId ?? string.Empty); command.Parameters.AddWithValue("eventData", eventData); command.Parameters.AddWithValue("occurredAt", DateTimeOffset.UtcNow); try { await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogDebug( "Appended event {EventId} to stream {StreamName} at offset {Offset}", @event.EventId, streamName, offset); // Notify delivery providers await NotifyDeliveryProvidersAsync(streamName, @event, cancellationToken); return offset; } catch (PostgresException ex) when (ex.SqlState == "23505") // Unique violation { _logger.LogWarning( ex, "Duplicate event {EventId} detected for stream {StreamName}", @event.EventId, streamName); throw new InvalidOperationException($"Event with ID {@event.EventId} already exists in stream {streamName}", ex); } } /// public async Task> ReadStreamAsync( string streamName, long fromOffset, int maxCount, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (fromOffset < 0) throw new ArgumentException("Offset cannot be negative.", nameof(fromOffset)); if (maxCount <= 0) throw new ArgumentException("Max count must be positive.", nameof(maxCount)); // Limit max count to configured batch size maxCount = Math.Min(maxCount, _options.ReadBatchSize); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT event_id, event_type, correlation_id, event_data, occurred_at FROM {SchemaQualifiedTable(_options.EventsTableName)} WHERE stream_name = @streamName AND offset >= @fromOffset ORDER BY offset ASC LIMIT @maxCount"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("fromOffset", fromOffset); command.Parameters.AddWithValue("maxCount", maxCount); var events = new List(); await using var reader = await command.ExecuteReaderAsync(cancellationToken); while (await reader.ReadAsync(cancellationToken)) { var eventId = reader.GetString(0); var eventType = reader.GetString(1); var correlationId = reader.GetString(2); var eventDataJson = reader.GetString(3); var occurredAt = reader.GetFieldValue(4); // Deserialize to concrete type using stored event type var type = Type.GetType(eventType); if (type == null) { _logger.LogWarning( "Could not resolve event type {EventType} for event {EventId} in stream {StreamName}", eventType, eventId, streamName); continue; } var eventObject = JsonSerializer.Deserialize(eventDataJson, type, _jsonOptions) as ICorrelatedEvent; if (eventObject != null) { events.Add(eventObject); } } _logger.LogDebug( "Read {Count} events from stream {StreamName} starting at offset {FromOffset}", events.Count, streamName, fromOffset); return events; } /// public async Task GetStreamLengthAsync( string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT COUNT(*) FROM {SchemaQualifiedTable(_options.EventsTableName)} WHERE stream_name = @streamName"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); var result = await command.ExecuteScalarAsync(cancellationToken); return result != null ? Convert.ToInt64(result) : 0L; } /// public async Task GetStreamMetadataAsync( string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT stream_name, length, oldest_event_offset, newest_event_offset, oldest_event_timestamp, newest_event_timestamp FROM {SchemaQualifiedTable("stream_metadata")} WHERE stream_name = @streamName"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); await using var reader = await command.ExecuteReaderAsync(cancellationToken); if (await reader.ReadAsync(cancellationToken)) { var length = reader.GetInt64(1); var oldestOffset = reader.GetInt64(2); var newestOffset = reader.GetInt64(3); var oldestTimestamp = reader.IsDBNull(4) ? (DateTimeOffset?)null : reader.GetFieldValue(4); var newestTimestamp = reader.IsDBNull(5) ? (DateTimeOffset?)null : reader.GetFieldValue(5); return new StreamMetadata { StreamName = streamName, Length = length, OldestEventOffset = oldestOffset, OldestEventTimestamp = oldestTimestamp, NewestEventTimestamp = newestTimestamp, RetentionPolicy = null, DeletedEventCount = 0 }; } // Stream doesn't exist return new StreamMetadata { StreamName = streamName, Length = 0, OldestEventOffset = 0, OldestEventTimestamp = null, NewestEventTimestamp = null, RetentionPolicy = null, DeletedEventCount = 0 }; } // ======================================================================== // EPHEMERAL STREAM OPERATIONS (Message Queue) // ======================================================================== /// public async Task EnqueueAsync( string streamName, ICorrelatedEvent @event, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (@event == null) throw new ArgumentNullException(nameof(@event)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var eventData = JsonSerializer.Serialize(@event, @event.GetType(), _jsonOptions); var sql = $@" INSERT INTO {SchemaQualifiedTable(_options.QueueEventsTableName)} (stream_name, event_id, event_type, correlation_id, event_data, occurred_at, enqueued_at) VALUES (@streamName, @eventId, @eventType, @correlationId, @eventData::jsonb, @occurredAt, NOW())"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("eventId", @event.EventId); command.Parameters.AddWithValue("eventType", @event.GetType().Name); command.Parameters.AddWithValue("correlationId", @event.CorrelationId ?? string.Empty); command.Parameters.AddWithValue("eventData", eventData); command.Parameters.AddWithValue("occurredAt", DateTimeOffset.UtcNow); await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogDebug("Enqueued event {EventId} to stream {StreamName}", @event.EventId, streamName); // Notify delivery providers await NotifyDeliveryProvidersAsync(streamName, @event, cancellationToken); } /// public async Task EnqueueBatchAsync( string streamName, IEnumerable events, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (events == null) throw new ArgumentNullException(nameof(events)); var eventList = events.Where(e => e != null).ToList(); if (eventList.Count == 0) return; await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var transaction = await connection.BeginTransactionAsync(cancellationToken); try { foreach (var @event in eventList) { var eventData = JsonSerializer.Serialize(@event, @event.GetType(), _jsonOptions); var sql = $@" INSERT INTO {SchemaQualifiedTable(_options.QueueEventsTableName)} (stream_name, event_id, event_type, correlation_id, event_data, occurred_at, enqueued_at) VALUES (@streamName, @eventId, @eventType, @correlationId, @eventData::jsonb, @occurredAt, NOW())"; await using var command = new NpgsqlCommand(sql, connection, transaction); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("eventId", @event.EventId); command.Parameters.AddWithValue("eventType", @event.GetType().Name); command.Parameters.AddWithValue("correlationId", @event.CorrelationId ?? string.Empty); command.Parameters.AddWithValue("eventData", eventData); command.Parameters.AddWithValue("occurredAt", DateTimeOffset.UtcNow); await command.ExecuteNonQueryAsync(cancellationToken); } await transaction.CommitAsync(cancellationToken); _logger.LogDebug("Enqueued {Count} events to stream {StreamName}", eventList.Count, streamName); // Notify delivery providers foreach (var @event in eventList) { await NotifyDeliveryProvidersAsync(streamName, @event, cancellationToken); } } catch { await transaction.RollbackAsync(cancellationToken); throw; } } /// public async Task DequeueAsync( string streamName, string consumerId, TimeSpan visibilityTimeout, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (visibilityTimeout <= TimeSpan.Zero) throw new ArgumentException("Visibility timeout must be positive.", nameof(visibilityTimeout)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var transaction = await connection.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken); try { // Find the oldest available event (not in-flight) var selectSql = $@" SELECT q.id, q.event_id, q.event_type, q.correlation_id, q.event_data, q.occurred_at, q.delivery_count FROM {SchemaQualifiedTable(_options.QueueEventsTableName)} q LEFT JOIN {SchemaQualifiedTable("in_flight_events")} inf ON q.event_id = inf.event_id WHERE q.stream_name = @streamName AND inf.event_id IS NULL ORDER BY q.enqueued_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED"; await using var selectCommand = new NpgsqlCommand(selectSql, connection, transaction); selectCommand.Parameters.AddWithValue("streamName", streamName); await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken); if (!await reader.ReadAsync(cancellationToken)) { // No events available await transaction.CommitAsync(cancellationToken); return null; } var queueEventId = reader.GetInt64(0); var eventId = reader.GetString(1); var eventType = reader.GetString(2); var correlationId = reader.GetString(3); var eventDataJson = reader.GetString(4); var occurredAt = reader.GetFieldValue(5); var deliveryCount = reader.GetInt32(6); await reader.CloseAsync(); // Mark as in-flight var visibleAfter = DateTimeOffset.UtcNow.Add(visibilityTimeout); var insertInFlightSql = $@" INSERT INTO {SchemaQualifiedTable("in_flight_events")} (event_id, stream_name, consumer_id, visible_after, delivery_count, queue_event_id) VALUES (@eventId, @streamName, @consumerId, @visibleAfter, @deliveryCount, @queueEventId)"; await using var insertCommand = new NpgsqlCommand(insertInFlightSql, connection, transaction); insertCommand.Parameters.AddWithValue("eventId", eventId); insertCommand.Parameters.AddWithValue("streamName", streamName); insertCommand.Parameters.AddWithValue("consumerId", consumerId); insertCommand.Parameters.AddWithValue("visibleAfter", visibleAfter); insertCommand.Parameters.AddWithValue("deliveryCount", deliveryCount + 1); insertCommand.Parameters.AddWithValue("queueEventId", queueEventId); await insertCommand.ExecuteNonQueryAsync(cancellationToken); await transaction.CommitAsync(cancellationToken); // Deserialize to concrete type using stored event type var type = Type.GetType(eventType); if (type == null) { _logger.LogWarning( "Could not resolve event type {EventType} for event {EventId} in stream {StreamName}", eventType, eventId, streamName); return null; } var eventObject = JsonSerializer.Deserialize(eventDataJson, type, _jsonOptions) as ICorrelatedEvent; if (eventObject != null) { _logger.LogDebug( "Dequeued event {EventId} from stream {StreamName} for consumer {ConsumerId}", eventId, streamName, consumerId); return eventObject; } return null; } catch { await transaction.RollbackAsync(cancellationToken); throw; } } /// public async Task AcknowledgeAsync( string streamName, string eventId, string consumerId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(eventId)) throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var transaction = await connection.BeginTransactionAsync(cancellationToken); try { // Get queue event ID from in-flight var selectSql = $@" SELECT queue_event_id FROM {SchemaQualifiedTable("in_flight_events")} WHERE event_id = @eventId AND consumer_id = @consumerId"; await using var selectCommand = new NpgsqlCommand(selectSql, connection, transaction); selectCommand.Parameters.AddWithValue("eventId", eventId); selectCommand.Parameters.AddWithValue("consumerId", consumerId); var queueEventId = await selectCommand.ExecuteScalarAsync(cancellationToken); if (queueEventId == null) { await transaction.CommitAsync(cancellationToken); return false; // Event not found or wrong consumer } // Delete from in-flight var deleteInFlightSql = $@" DELETE FROM {SchemaQualifiedTable("in_flight_events")} WHERE event_id = @eventId AND consumer_id = @consumerId"; await using var deleteInFlightCommand = new NpgsqlCommand(deleteInFlightSql, connection, transaction); deleteInFlightCommand.Parameters.AddWithValue("eventId", eventId); deleteInFlightCommand.Parameters.AddWithValue("consumerId", consumerId); await deleteInFlightCommand.ExecuteNonQueryAsync(cancellationToken); // Delete from queue (permanent deletion for ephemeral streams) var deleteQueueSql = $@" DELETE FROM {SchemaQualifiedTable(_options.QueueEventsTableName)} WHERE id = @queueEventId"; await using var deleteQueueCommand = new NpgsqlCommand(deleteQueueSql, connection, transaction); deleteQueueCommand.Parameters.AddWithValue("queueEventId", queueEventId); await deleteQueueCommand.ExecuteNonQueryAsync(cancellationToken); await transaction.CommitAsync(cancellationToken); _logger.LogDebug("Acknowledged event {EventId} for consumer {ConsumerId}", eventId, consumerId); return true; } catch { await transaction.RollbackAsync(cancellationToken); throw; } } /// public async Task NackAsync( string streamName, string eventId, string consumerId, bool requeue = true, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(eventId)) throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); await using var transaction = await connection.BeginTransactionAsync(cancellationToken); try { // Get event details from in-flight var selectSql = $@" SELECT queue_event_id, delivery_count FROM {SchemaQualifiedTable("in_flight_events")} WHERE event_id = @eventId AND consumer_id = @consumerId"; await using var selectCommand = new NpgsqlCommand(selectSql, connection, transaction); selectCommand.Parameters.AddWithValue("eventId", eventId); selectCommand.Parameters.AddWithValue("consumerId", consumerId); await using var reader = await selectCommand.ExecuteReaderAsync(cancellationToken); if (!await reader.ReadAsync(cancellationToken)) { await transaction.CommitAsync(cancellationToken); return false; } var queueEventId = reader.GetInt64(0); var deliveryCount = reader.GetInt32(1); await reader.CloseAsync(); // Delete from in-flight var deleteInFlightSql = $@" DELETE FROM {SchemaQualifiedTable("in_flight_events")} WHERE event_id = @eventId AND consumer_id = @consumerId"; await using var deleteInFlightCommand = new NpgsqlCommand(deleteInFlightSql, connection, transaction); deleteInFlightCommand.Parameters.AddWithValue("eventId", eventId); deleteInFlightCommand.Parameters.AddWithValue("consumerId", consumerId); await deleteInFlightCommand.ExecuteNonQueryAsync(cancellationToken); if (!requeue) { // Move to dead letter queue var moveToDlqSql = $@" INSERT INTO {SchemaQualifiedTable("dead_letter_queue")} (stream_name, event_id, event_type, correlation_id, event_data, original_enqueued_at, delivery_attempts, last_consumer_id, occurred_at) SELECT stream_name, event_id, event_type, correlation_id, event_data, enqueued_at, @deliveryAttempts, @consumerId, occurred_at FROM {SchemaQualifiedTable(_options.QueueEventsTableName)} WHERE id = @queueEventId"; await using var moveToDlqCommand = new NpgsqlCommand(moveToDlqSql, connection, transaction); moveToDlqCommand.Parameters.AddWithValue("deliveryAttempts", deliveryCount); moveToDlqCommand.Parameters.AddWithValue("consumerId", consumerId); moveToDlqCommand.Parameters.AddWithValue("queueEventId", queueEventId); await moveToDlqCommand.ExecuteNonQueryAsync(cancellationToken); // Delete from queue var deleteQueueSql = $@" DELETE FROM {SchemaQualifiedTable(_options.QueueEventsTableName)} WHERE id = @queueEventId"; await using var deleteQueueCommand = new NpgsqlCommand(deleteQueueSql, connection, transaction); deleteQueueCommand.Parameters.AddWithValue("queueEventId", queueEventId); await deleteQueueCommand.ExecuteNonQueryAsync(cancellationToken); _logger.LogWarning( "Moved event {EventId} to dead letter queue after {DeliveryAttempts} attempts", eventId, deliveryCount); } // If requeue=true, event is automatically available again (we just removed from in_flight) await transaction.CommitAsync(cancellationToken); _logger.LogDebug( "NACKed event {EventId} for consumer {ConsumerId}, requeue={Requeue}", eventId, consumerId, requeue); return true; } catch { await transaction.RollbackAsync(cancellationToken); throw; } } /// public async Task GetPendingCountAsync( string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT COUNT(*) FROM {SchemaQualifiedTable(_options.QueueEventsTableName)} q LEFT JOIN {SchemaQualifiedTable("in_flight_events")} inf ON q.event_id = inf.event_id WHERE q.stream_name = @streamName AND inf.event_id IS NULL"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); var result = await command.ExecuteScalarAsync(cancellationToken); return result != null ? Convert.ToInt32(result) : 0; } // ======================================================================== // HELPER METHODS // ======================================================================== private async Task GetNextOffsetAsync( NpgsqlConnection connection, string streamName, CancellationToken cancellationToken) { var sql = $"SELECT {SchemaQualifiedTable("get_next_offset")}(@streamName)"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); var result = await command.ExecuteScalarAsync(cancellationToken); return result != null ? Convert.ToInt64(result) : 0L; } private async Task NotifyDeliveryProvidersAsync( string streamName, ICorrelatedEvent @event, CancellationToken cancellationToken) { foreach (var provider in _deliveryProviders) { try { await provider.NotifyEventAvailableAsync(streamName, @event, cancellationToken); } catch (Exception ex) { _logger.LogError( ex, "Delivery provider {ProviderName} failed to process event notification for stream {StreamName}, event {EventId}", provider.ProviderName, streamName, @event.EventId); } } } private async Task CleanupExpiredInFlightEventsAsync() { try { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(); var sql = $"SELECT {SchemaQualifiedTable("cleanup_expired_in_flight")}()"; await using var command = new NpgsqlCommand(sql, connection); var result = await command.ExecuteScalarAsync(); if (result is int requeuedCount && requeuedCount > 0) { _logger.LogInformation("Requeued {Count} expired in-flight events", requeuedCount); } } catch (Exception ex) { _logger.LogError(ex, "Failed to cleanup expired in-flight events"); } } // ======================================================================== // CONSUMER OFFSET TRACKING - Phase 6 (Monitoring & Health Checks) // ======================================================================== public async Task GetConsumerOffsetAsync( string streamName, string consumerId, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT offset FROM {SchemaQualifiedTable("consumer_offsets")} WHERE stream_name = @streamName AND consumer_id = @consumerId"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("consumerId", consumerId); var result = await command.ExecuteScalarAsync(cancellationToken); return result != null ? Convert.ToInt64(result) : 0L; } public async Task GetConsumerLastUpdateTimeAsync( string streamName, string consumerId, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" SELECT last_updated FROM {SchemaQualifiedTable("consumer_offsets")} WHERE stream_name = @streamName AND consumer_id = @consumerId"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("consumerId", consumerId); var result = await command.ExecuteScalarAsync(cancellationToken); return result != null && result != DBNull.Value ? (DateTimeOffset)result : DateTimeOffset.MinValue; } public async Task UpdateConsumerOffsetAsync( string streamName, string consumerId, long newOffset, CancellationToken cancellationToken = default) { await using var connection = new NpgsqlConnection(_options.ConnectionString); await connection.OpenAsync(cancellationToken); var sql = $@" INSERT INTO {SchemaQualifiedTable("consumer_offsets")} (stream_name, consumer_id, offset, last_updated) VALUES (@streamName, @consumerId, @offset, @lastUpdated) ON CONFLICT (stream_name, consumer_id) DO UPDATE SET offset = @offset, last_updated = @lastUpdated"; await using var command = new NpgsqlCommand(sql, connection); command.Parameters.AddWithValue("streamName", streamName); command.Parameters.AddWithValue("consumerId", consumerId); command.Parameters.AddWithValue("offset", newOffset); command.Parameters.AddWithValue("lastUpdated", DateTimeOffset.UtcNow); await command.ExecuteNonQueryAsync(cancellationToken); _logger.LogInformation( "Consumer offset updated: Stream={StreamName}, Consumer={ConsumerId}, NewOffset={NewOffset}", streamName, consumerId, newOffset); } public void Dispose() { _cleanupTimer?.Dispose(); } }