using System; using DeliveryModeEnum = Svrnty.CQRS.Events.Abstractions.Subscriptions.DeliveryMode; using SubscriptionStatusEnum = Svrnty.CQRS.Events.Abstractions.Subscriptions.SubscriptionStatus; using Svrnty.CQRS.Events.Abstractions.Subscriptions; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Svrnty.CQRS.Events.Abstractions; using Svrnty.CQRS.Events.Abstractions.Models; namespace Svrnty.CQRS.Events.Subscriptions; /// /// Default implementation of IEventSubscriptionService. /// public sealed class EventSubscriptionService : IEventSubscriptionService { private readonly ISubscriptionStore _subscriptionStore; public EventSubscriptionService(ISubscriptionStore subscriptionStore) { _subscriptionStore = subscriptionStore; } public async Task SubscribeAsync(SubscriptionRequest request, CancellationToken cancellationToken = default) { var subscription = new EventSubscription { SubscriptionId = Guid.NewGuid().ToString(), SubscriberId = request.SubscriberId, CorrelationId = request.CorrelationId, EventTypes = request.EventTypes, TerminalEventTypes = request.TerminalEventTypes, DeliveryMode = request.DeliveryMode, CreatedAt = DateTimeOffset.UtcNow, ExpiresAt = request.Timeout.HasValue ? DateTimeOffset.UtcNow + request.Timeout.Value : null, LastDeliveredSequence = 0, Status = SubscriptionStatus.Active }; await _subscriptionStore.CreateAsync(subscription, cancellationToken); return subscription; } public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default) { var subscription = await _subscriptionStore.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) return; subscription.Status = SubscriptionStatus.Cancelled; subscription.CompletedAt = DateTimeOffset.UtcNow; await _subscriptionStore.UpdateAsync(subscription, cancellationToken); } public async Task> GetActiveSubscriptionsAsync(string subscriberId, CancellationToken cancellationToken = default) { var subscriptions = await _subscriptionStore.GetBySubscriberIdAsync(subscriberId, cancellationToken); return subscriptions.Where(s => s.Status == SubscriptionStatus.Active && !s.IsExpired).ToList(); } public async Task CompleteSubscriptionAsync(string subscriptionId, CancellationToken cancellationToken = default) { var subscription = await _subscriptionStore.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) return; subscription.Status = SubscriptionStatus.Completed; subscription.CompletedAt = DateTimeOffset.UtcNow; await _subscriptionStore.UpdateAsync(subscription, cancellationToken); } public async Task UpdateLastDeliveredAsync(string subscriptionId, long sequence, CancellationToken cancellationToken = default) { var subscription = await _subscriptionStore.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) return; if (sequence > subscription.LastDeliveredSequence) { subscription.LastDeliveredSequence = sequence; await _subscriptionStore.UpdateAsync(subscription, cancellationToken); } } }