dotnet-cqrs/Svrnty.CQRS.Events/Delivery/EventDeliveryService.cs

57 lines
2.1 KiB
C#

using System;
using Svrnty.CQRS.Events.Abstractions.Delivery;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Svrnty.CQRS.Events.Abstractions;
using Svrnty.CQRS.Events.Abstractions.Models;
namespace Svrnty.CQRS.Events.Delivery;
/// <summary>
/// Default implementation of IEventDeliveryService.
/// Handles event filtering and subscription completion logic.
/// Actual delivery to clients is handled by the transport layer (gRPC/SignalR).
/// </summary>
public sealed class EventDeliveryService : IEventDeliveryService
{
private readonly ISubscriptionStore _subscriptionStore;
public EventDeliveryService(ISubscriptionStore subscriptionStore)
{
_subscriptionStore = subscriptionStore;
}
public async Task DeliverEventAsync(ICorrelatedEvent @event, long sequence, CancellationToken cancellationToken = default)
{
// Find all subscriptions interested in this correlation
var subscriptions = await _subscriptionStore.FindByCorrelationIdAsync(@event.CorrelationId, cancellationToken);
var eventTypeName = @event.GetType().Name;
foreach (var subscription in subscriptions)
{
// Skip if subscription is not active or expired
if (subscription.Status != SubscriptionStatus.Active || subscription.IsExpired)
continue;
// Filter: Only process if subscriber requested this event type
if (!subscription.ShouldReceive(eventTypeName))
continue;
// Check if this is a terminal event
if (subscription.IsTerminalEvent(eventTypeName))
{
subscription.Status = SubscriptionStatus.Completed;
subscription.CompletedAt = DateTimeOffset.UtcNow;
await _subscriptionStore.UpdateAsync(subscription, cancellationToken);
}
// Note: Actual delivery to the client happens in the gRPC stream handler
// The handler will query for events where sequence > LastDeliveredSequence
}
}
}