using System; using Svrnty.CQRS.Events.Abstractions.Delivery; using Svrnty.CQRS.Events.Abstractions.Subscriptions; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Linq; using System.Threading; using System.Threading.Tasks; using Svrnty.CQRS.Events.Abstractions; using Svrnty.CQRS.Events.Abstractions.Models; namespace Svrnty.CQRS.Events.Delivery; /// /// Default implementation of IEventDeliveryService. /// Handles event filtering and subscription completion logic. /// Actual delivery to clients is handled by the transport layer (gRPC/SignalR). /// public sealed class EventDeliveryService : IEventDeliveryService { private readonly ISubscriptionStore _subscriptionStore; public EventDeliveryService(ISubscriptionStore subscriptionStore) { _subscriptionStore = subscriptionStore; } public async Task DeliverEventAsync(ICorrelatedEvent @event, long sequence, CancellationToken cancellationToken = default) { // Find all subscriptions interested in this correlation var subscriptions = await _subscriptionStore.FindByCorrelationIdAsync(@event.CorrelationId, cancellationToken); var eventTypeName = @event.GetType().Name; foreach (var subscription in subscriptions) { // Skip if subscription is not active or expired if (subscription.Status != SubscriptionStatus.Active || subscription.IsExpired) continue; // Filter: Only process if subscriber requested this event type if (!subscription.ShouldReceive(eventTypeName)) continue; // Check if this is a terminal event if (subscription.IsTerminalEvent(eventTypeName)) { subscription.Status = SubscriptionStatus.Completed; subscription.CompletedAt = DateTimeOffset.UtcNow; await _subscriptionStore.UpdateAsync(subscription, cancellationToken); } // Note: Actual delivery to the client happens in the gRPC stream handler // The handler will query for events where sequence > LastDeliveredSequence } } }