using System; using Svrnty.CQRS.Events.Abstractions.Schema; using Svrnty.CQRS.Events.Abstractions.Storage; using Svrnty.CQRS.Events.Schema; using Svrnty.CQRS.Events.Abstractions.Subscriptions; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.Subscriptions; /// /// Default implementation of . /// Provides event streaming from subscriptions to consumers. /// /// /// /// Phase 1 Implementation: /// Supports Broadcast and Exclusive modes with in-memory storage. /// ConsumerGroup and ReadReceipt modes will be fully implemented in later phases. /// /// /// Phase 5 Implementation: /// Supports automatic event upcasting when schema evolution is enabled. /// /// public class EventSubscriptionClient : IEventSubscriptionClient { private readonly IEventStreamStore _streamStore; private readonly IConsumerRegistry _consumerRegistry; private readonly IReadReceiptStore _readReceiptStore; private readonly ISchemaRegistry? _schemaRegistry; private readonly ILogger? _logger; private readonly Dictionary _subscriptions; // In-memory for Phase 1 /// /// Initializes a new instance of the class. /// public EventSubscriptionClient( IEventStreamStore streamStore, IConsumerRegistry consumerRegistry, IReadReceiptStore readReceiptStore, IEnumerable subscriptions, ISchemaRegistry? schemaRegistry = null, ILogger? logger = null) { _streamStore = streamStore ?? throw new ArgumentNullException(nameof(streamStore)); _consumerRegistry = consumerRegistry ?? throw new ArgumentNullException(nameof(consumerRegistry)); _readReceiptStore = readReceiptStore ?? throw new ArgumentNullException(nameof(readReceiptStore)); _schemaRegistry = schemaRegistry; _logger = logger; _subscriptions = new Dictionary(); // Register all subscriptions provided via DI if (subscriptions != null) { foreach (var subscription in subscriptions) { RegisterSubscription(subscription); } } } /// /// Register a subscription (for Phase 1, stored in-memory). /// public void RegisterSubscription(Subscription subscription) { if (subscription == null) throw new ArgumentNullException(nameof(subscription)); subscription.Validate(); _subscriptions[subscription.SubscriptionId] = subscription; } /// public async IAsyncEnumerable SubscribeAsync( string subscriptionId, string consumerId, [EnumeratorCancellation] CancellationToken cancellationToken = default) { await foreach (var @event in SubscribeAsync(subscriptionId, consumerId, null!, cancellationToken)) { yield return @event; } } /// public async IAsyncEnumerable SubscribeAsync( string subscriptionId, string consumerId, Dictionary metadata, [EnumeratorCancellation] CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); // Get subscription configuration if (!_subscriptions.TryGetValue(subscriptionId, out var subscription)) { throw new InvalidOperationException($"Subscription '{subscriptionId}' not found. Register it first using RegisterSubscription()."); } if (!subscription.IsActive) { throw new InvalidOperationException($"Subscription '{subscriptionId}' is not active."); } // Register consumer await _consumerRegistry.RegisterConsumerAsync(subscriptionId, consumerId, metadata, cancellationToken); try { // Stream events based on subscription mode switch (subscription.Mode) { case SubscriptionMode.Broadcast: await foreach (var @event in StreamBroadcastAsync(subscription, consumerId, cancellationToken)) { yield return @event; } break; case SubscriptionMode.Exclusive: await foreach (var @event in StreamExclusiveAsync(subscription, consumerId, cancellationToken)) { yield return @event; } break; case SubscriptionMode.ConsumerGroup: // Phase 1: Same as Exclusive for now // Phase 3+ will implement proper consumer group partitioning await foreach (var @event in StreamExclusiveAsync(subscription, consumerId, cancellationToken)) { yield return @event; } break; case SubscriptionMode.ReadReceipt: throw new NotImplementedException( "ReadReceipt mode is not implemented in Phase 1. " + "It will be fully implemented in Phase 3 with explicit MarkAsRead support."); default: throw new NotSupportedException($"Subscription mode '{subscription.Mode}' is not supported."); } } finally { // Unregister consumer when enumeration ends await _consumerRegistry.UnregisterConsumerAsync(subscriptionId, consumerId, cancellationToken); } } // ======================================================================== // Phase 5: Schema Evolution & Upcasting // ======================================================================== /// /// Applies automatic upcasting to an event if enabled for the subscription. /// /// The event to potentially upcast. /// The subscription configuration. /// Cancellation token. /// The upcast event, or the original event if upcasting is not needed/enabled. private async Task ApplyUpcastingAsync( ICorrelatedEvent @event, Subscription subscription, CancellationToken cancellationToken) { // Skip if upcasting not enabled if (!subscription.EnableUpcasting) return @event; // Skip if schema registry not available if (_schemaRegistry == null) { _logger?.LogWarning( "Upcasting enabled for subscription {SubscriptionId} but ISchemaRegistry is not registered. " + "Event will be delivered without upcasting.", subscription.SubscriptionId); return @event; } try { // Check if upcasting is needed var needsUpcasting = await _schemaRegistry.NeedsUpcastingAsync( @event, subscription.TargetEventVersion, cancellationToken); if (!needsUpcasting) return @event; // Perform upcasting var upcastEvent = await _schemaRegistry.UpcastAsync( @event, subscription.TargetEventVersion, cancellationToken); _logger?.LogDebug( "Upcast event {EventType} from v{FromVersion} to v{ToVersion} for subscription {SubscriptionId}", @event.GetType().Name, @event.GetType().Name, upcastEvent.GetType().Name, subscription.SubscriptionId); return upcastEvent; } catch (Exception ex) { _logger?.LogError( ex, "Failed to upcast event {EventId} for subscription {SubscriptionId}. Delivering original event.", @event.EventId, subscription.SubscriptionId); // On upcasting failure, deliver original event rather than losing it return @event; } } /// /// Stream events in Broadcast mode (all consumers get all events). /// private async IAsyncEnumerable StreamBroadcastAsync( Subscription subscription, string consumerId, [EnumeratorCancellation] CancellationToken cancellationToken) { // In Broadcast mode, each consumer gets their own copy of events // We use a polling approach with a small delay between polls var pollInterval = TimeSpan.FromMilliseconds(100); while (!cancellationToken.IsCancellationRequested) { // Dequeue event from stream var @event = await _streamStore.DequeueAsync( subscription.StreamName, consumerId, subscription.VisibilityTimeout, cancellationToken); if (@event != null) { // Apply event type filter if configured if (ShouldIncludeEvent(@event, subscription)) { // Phase 5: Apply automatic upcasting if enabled var deliveryEvent = await ApplyUpcastingAsync(@event, subscription, cancellationToken); yield return deliveryEvent; // Auto-acknowledge (event consumed successfully) await _streamStore.AcknowledgeAsync( subscription.StreamName, @event.EventId, consumerId, cancellationToken); } else { // Event filtered out, acknowledge it anyway await _streamStore.AcknowledgeAsync( subscription.StreamName, @event.EventId, consumerId, cancellationToken); } } else { // No events available, wait before polling again await Task.Delay(pollInterval, cancellationToken); } // Send heartbeat await _consumerRegistry.HeartbeatAsync(subscription.SubscriptionId, consumerId, cancellationToken); } } /// /// Stream events in Exclusive mode (only one consumer gets each event). /// private async IAsyncEnumerable StreamExclusiveAsync( Subscription subscription, string consumerId, [EnumeratorCancellation] CancellationToken cancellationToken) { // In Exclusive mode, consumers compete for events // First consumer to dequeue gets the event var pollInterval = TimeSpan.FromMilliseconds(100); while (!cancellationToken.IsCancellationRequested) { // Try to dequeue an event var @event = await _streamStore.DequeueAsync( subscription.StreamName, consumerId, subscription.VisibilityTimeout, cancellationToken); if (@event != null) { // Apply event type filter if configured if (ShouldIncludeEvent(@event, subscription)) { // Phase 5: Apply automatic upcasting if enabled var deliveryEvent = await ApplyUpcastingAsync(@event, subscription, cancellationToken); yield return deliveryEvent; // Auto-acknowledge (event consumed successfully) await _streamStore.AcknowledgeAsync( subscription.StreamName, @event.EventId, consumerId, cancellationToken); } else { // Event filtered out, acknowledge it anyway await _streamStore.AcknowledgeAsync( subscription.StreamName, @event.EventId, consumerId, cancellationToken); } } else { // No events available, wait before polling again await Task.Delay(pollInterval, cancellationToken); } // Send heartbeat await _consumerRegistry.HeartbeatAsync(subscription.SubscriptionId, consumerId, cancellationToken); } } /// /// Check if an event should be included based on subscription filters. /// private static bool ShouldIncludeEvent(ICorrelatedEvent @event, Subscription subscription) { // No filter means include all events if (subscription.EventTypeFilter == null || subscription.EventTypeFilter.Count == 0) return true; // Check if event type is in the filter var eventTypeName = @event.GetType().Name; return subscription.EventTypeFilter.Contains(eventTypeName); } /// public Task AcknowledgeAsync( string subscriptionId, string eventId, string consumerId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId)); if (string.IsNullOrWhiteSpace(eventId)) throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (!_subscriptions.TryGetValue(subscriptionId, out var subscription)) { throw new InvalidOperationException($"Subscription '{subscriptionId}' not found."); } return _streamStore.AcknowledgeAsync(subscription.StreamName, eventId, consumerId, cancellationToken); } /// public Task NackAsync( string subscriptionId, string eventId, string consumerId, bool requeue = true, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId)); if (string.IsNullOrWhiteSpace(eventId)) throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (!_subscriptions.TryGetValue(subscriptionId, out var subscription)) { throw new InvalidOperationException($"Subscription '{subscriptionId}' not found."); } return _streamStore.NackAsync(subscription.StreamName, eventId, consumerId, requeue, cancellationToken); } /// public Task GetSubscriptionAsync( string subscriptionId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId)); _subscriptions.TryGetValue(subscriptionId, out var subscription); return Task.FromResult(subscription); } /// public Task> GetActiveConsumersAsync( string subscriptionId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId)); return _consumerRegistry.GetConsumerInfoAsync(subscriptionId, cancellationToken); } /// public Task UnsubscribeAsync( string subscriptionId, string consumerId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(subscriptionId)) throw new ArgumentException("Subscription ID cannot be null or whitespace.", nameof(subscriptionId)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); return _consumerRegistry.UnregisterConsumerAsync(subscriptionId, consumerId, cancellationToken); } /// /// Get all registered subscriptions (for debugging/monitoring). /// public IReadOnlyList GetAllSubscriptions() { return _subscriptions.Values.ToList(); } // ======================================================================== // Phase 3: Read Receipt API Implementation // ======================================================================== /// public Task RecordReadReceiptAsync( string streamName, string consumerId, string eventId, long offset, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); if (string.IsNullOrWhiteSpace(eventId)) throw new ArgumentException("Event ID cannot be null or whitespace.", nameof(eventId)); return _readReceiptStore.AcknowledgeEventAsync( consumerId, streamName, eventId, offset, DateTimeOffset.UtcNow, cancellationToken); } /// public Task GetLastReadOffsetAsync( string streamName, string consumerId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); return _readReceiptStore.GetLastAcknowledgedOffsetAsync(consumerId, streamName, cancellationToken); } /// public Task GetConsumerProgressAsync( string streamName, string consumerId, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); if (string.IsNullOrWhiteSpace(consumerId)) throw new ArgumentException("Consumer ID cannot be null or whitespace.", nameof(consumerId)); return _readReceiptStore.GetConsumerProgressAsync(consumerId, streamName, cancellationToken); } /// public Task> GetStreamConsumersAsync( string streamName, CancellationToken cancellationToken = default) { if (string.IsNullOrWhiteSpace(streamName)) throw new ArgumentException("Stream name cannot be null or whitespace.", nameof(streamName)); return _readReceiptStore.GetConsumersForStreamAsync(streamName, cancellationToken); } }