57 lines
2.1 KiB
C#
57 lines
2.1 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.Abstractions.Delivery;
|
|
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
using Svrnty.CQRS.Events.Abstractions.Models;
|
|
|
|
namespace Svrnty.CQRS.Events.Delivery;
|
|
|
|
/// <summary>
|
|
/// Default implementation of IEventDeliveryService.
|
|
/// Handles event filtering and subscription completion logic.
|
|
/// Actual delivery to clients is handled by the transport layer (gRPC/SignalR).
|
|
/// </summary>
|
|
public sealed class EventDeliveryService : IEventDeliveryService
|
|
{
|
|
private readonly ISubscriptionStore _subscriptionStore;
|
|
|
|
public EventDeliveryService(ISubscriptionStore subscriptionStore)
|
|
{
|
|
_subscriptionStore = subscriptionStore;
|
|
}
|
|
|
|
public async Task DeliverEventAsync(ICorrelatedEvent @event, long sequence, CancellationToken cancellationToken = default)
|
|
{
|
|
// Find all subscriptions interested in this correlation
|
|
var subscriptions = await _subscriptionStore.FindByCorrelationIdAsync(@event.CorrelationId, cancellationToken);
|
|
|
|
var eventTypeName = @event.GetType().Name;
|
|
|
|
foreach (var subscription in subscriptions)
|
|
{
|
|
// Skip if subscription is not active or expired
|
|
if (subscription.Status != SubscriptionStatus.Active || subscription.IsExpired)
|
|
continue;
|
|
|
|
// Filter: Only process if subscriber requested this event type
|
|
if (!subscription.ShouldReceive(eventTypeName))
|
|
continue;
|
|
|
|
// Check if this is a terminal event
|
|
if (subscription.IsTerminalEvent(eventTypeName))
|
|
{
|
|
subscription.Status = SubscriptionStatus.Completed;
|
|
subscription.CompletedAt = DateTimeOffset.UtcNow;
|
|
await _subscriptionStore.UpdateAsync(subscription, cancellationToken);
|
|
}
|
|
|
|
// Note: Actual delivery to the client happens in the gRPC stream handler
|
|
// The handler will query for events where sequence > LastDeliveredSequence
|
|
}
|
|
}
|
|
}
|