using System; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions; using Svrnty.CQRS.Events.ConsumerGroups.Abstractions; namespace Svrnty.CQRS.Events.ConsumerGroups.PostgreSQL; /// /// PostgreSQL-based implementation of IConsumerGroupReader. /// Provides high-level consumer group functionality with automatic offset management, /// heartbeating, and error handling. /// public class PostgresConsumerGroupReader : IConsumerGroupReader { private readonly IEventStreamStore _streamStore; private readonly IConsumerOffsetStore _offsetStore; private readonly ILogger _logger; public PostgresConsumerGroupReader( IEventStreamStore streamStore, IConsumerOffsetStore offsetStore, ILogger logger) { _streamStore = streamStore ?? throw new ArgumentNullException(nameof(streamStore)); _offsetStore = offsetStore ?? throw new ArgumentNullException(nameof(offsetStore)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public async IAsyncEnumerable ConsumeAsync( string streamName, string groupId, string consumerId, ConsumerGroupOptions options, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); if (string.IsNullOrWhiteSpace(groupId)) throw new ArgumentException("Group ID cannot be null or whitespace", nameof(groupId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace", nameof(consumerId)); if (options == null) throw new ArgumentNullException(nameof(options)); options.Validate(); // Register consumer await _offsetStore.RegisterConsumerAsync(groupId, consumerId, cancellationToken); _logger.LogInformation( "Consumer {ConsumerId} registered in group {GroupId} for stream {StreamName}", consumerId, groupId, streamName); // Set up heartbeat timer using var heartbeatTimer = new PeriodicTimer(options.HeartbeatInterval); var heartbeatTask = HeartbeatLoopAsync(groupId, consumerId, heartbeatTimer, cancellationToken); // Set up periodic commit timer (if using periodic strategy) PeriodicTimer? periodicCommitTimer = null; Task? periodicCommitTask = null; long lastPeriodicCommitOffset = -1; if (options.CommitStrategy == OffsetCommitStrategy.Periodic) { periodicCommitTimer = new PeriodicTimer(options.PeriodicCommitInterval); periodicCommitTask = PeriodicCommitLoopAsync( streamName, groupId, consumerId, periodicCommitTimer, () => lastPeriodicCommitOffset, cancellationToken); } try { // Determine starting offset var startOffset = await GetStartingOffsetAsync( streamName, groupId, options.StartFromBeginning, cancellationToken); _logger.LogDebug( "Consumer {ConsumerId} starting from offset {Offset} on stream {StreamName}", consumerId, startOffset, streamName); long currentOffset = startOffset; var batchEventCount = 0; var isFirstBatch = true; while (!cancellationToken.IsCancellationRequested) { // Read batch of events var events = await _streamStore.ReadStreamAsync( streamName, currentOffset, options.BatchSize, cancellationToken); if (events.Count == 0) { // No more events, wait before polling again if (!isFirstBatch) { _logger.LogDebug( "Consumer {ConsumerId} caught up on stream {StreamName}, polling in {Interval}", consumerId, streamName, options.PollingInterval); } await Task.Delay(options.PollingInterval, cancellationToken); continue; } isFirstBatch = false; // Process events in batch // Each event corresponds to sequential offsets starting from currentOffset for (int i = 0; i < events.Count; i++) { var evt = events[i]; var eventOffset = currentOffset + i; batchEventCount++; yield return evt; // Commit after each event if strategy is AfterEach if (options.CommitStrategy == OffsetCommitStrategy.AfterEach) { await CommitOffsetAsync(streamName, groupId, consumerId, eventOffset, cancellationToken); _logger.LogTrace( "Committed offset {Offset} for consumer {ConsumerId} (AfterEach)", eventOffset, consumerId); } else if (options.CommitStrategy == OffsetCommitStrategy.Periodic) { // Update the last offset for periodic commits lastPeriodicCommitOffset = eventOffset; } } // Commit after batch if strategy is AfterBatch if (options.CommitStrategy == OffsetCommitStrategy.AfterBatch && events.Count > 0) { var lastOffsetInBatch = currentOffset + events.Count - 1; await CommitOffsetAsync(streamName, groupId, consumerId, lastOffsetInBatch, cancellationToken); _logger.LogDebug( "Committed offset {Offset} for consumer {ConsumerId} after batch of {Count} events", lastOffsetInBatch, consumerId, batchEventCount); } // Advance current offset past the events we just read currentOffset += events.Count; batchEventCount = 0; } } finally { // Clean up timers periodicCommitTimer?.Dispose(); // Wait for background tasks to complete if (periodicCommitTask != null) { try { await periodicCommitTask; } catch (OperationCanceledException) { // Expected on cancellation } } try { await heartbeatTask; } catch (OperationCanceledException) { // Expected on cancellation } // Unregister consumer try { await _offsetStore.UnregisterConsumerAsync(groupId, consumerId, CancellationToken.None); _logger.LogInformation( "Consumer {ConsumerId} unregistered from group {GroupId}", consumerId, groupId); } catch (Exception ex) { _logger.LogError(ex, "Failed to unregister consumer {ConsumerId} from group {GroupId}", consumerId, groupId); } } } /// public async Task CommitOffsetAsync( string streamName, string groupId, string consumerId, long offset, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); if (string.IsNullOrWhiteSpace(groupId)) throw new ArgumentException("Group ID cannot be null or whitespace", nameof(groupId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace", nameof(consumerId)); await _offsetStore.CommitOffsetAsync(groupId, consumerId, streamName, offset, cancellationToken); } /// public async Task GetLastCommittedOffsetAsync( string streamName, string groupId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace", nameof(streamName)); if (string.IsNullOrWhiteSpace(groupId)) throw new ArgumentException("Group ID cannot be null or whitespace", nameof(groupId)); return await _offsetStore.GetCommittedOffsetAsync(groupId, streamName, cancellationToken); } private async Task GetStartingOffsetAsync( string streamName, string groupId, bool startFromBeginning, CancellationToken cancellationToken) { var committedOffset = await _offsetStore.GetCommittedOffsetAsync(groupId, streamName, cancellationToken); if (committedOffset.HasValue) { // Continue from last committed offset + 1 return committedOffset.Value + 1; } // No committed offset, start from beginning or end return startFromBeginning ? 0 : long.MaxValue; } private async Task HeartbeatLoopAsync( string groupId, string consumerId, PeriodicTimer timer, CancellationToken cancellationToken) { try { while (await timer.WaitForNextTickAsync(cancellationToken)) { try { await _offsetStore.RegisterConsumerAsync(groupId, consumerId, cancellationToken); _logger.LogTrace( "Sent heartbeat for consumer {ConsumerId} in group {GroupId}", consumerId, groupId); } catch (Exception ex) { _logger.LogWarning(ex, "Failed to send heartbeat for consumer {ConsumerId} in group {GroupId}", consumerId, groupId); } } } catch (OperationCanceledException) { // Expected on cancellation _logger.LogDebug( "Heartbeat loop cancelled for consumer {ConsumerId} in group {GroupId}", consumerId, groupId); } } private async Task PeriodicCommitLoopAsync( string streamName, string groupId, string consumerId, PeriodicTimer timer, Func getLastOffset, CancellationToken cancellationToken) { long lastCommittedOffset = -1; try { while (await timer.WaitForNextTickAsync(cancellationToken)) { try { var currentOffset = getLastOffset(); // Only commit if offset has advanced if (currentOffset >= 0 && currentOffset != lastCommittedOffset) { await _offsetStore.CommitOffsetAsync( groupId, consumerId, streamName, currentOffset, cancellationToken); lastCommittedOffset = currentOffset; _logger.LogDebug( "Periodic commit: offset {Offset} for consumer {ConsumerId}", currentOffset, consumerId); } } catch (Exception ex) { _logger.LogWarning(ex, "Failed to commit offset periodically for consumer {ConsumerId} in group {GroupId}", consumerId, groupId); } } } catch (OperationCanceledException) { // Expected on cancellation _logger.LogDebug( "Periodic commit loop cancelled for consumer {ConsumerId} in group {GroupId}", consumerId, groupId); } } }