using System;
using DeliveryModeEnum = Svrnty.CQRS.Events.Abstractions.Subscriptions.DeliveryMode;
using SubscriptionStatusEnum = Svrnty.CQRS.Events.Abstractions.Subscriptions.SubscriptionStatus;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Svrnty.CQRS.Events.Abstractions;
using Svrnty.CQRS.Events.Abstractions.Models;
namespace Svrnty.CQRS.Events.Subscriptions;
///
/// Default implementation of IEventSubscriptionService.
///
public sealed class EventSubscriptionService : IEventSubscriptionService
{
private readonly ISubscriptionStore _subscriptionStore;
public EventSubscriptionService(ISubscriptionStore subscriptionStore)
{
_subscriptionStore = subscriptionStore;
}
public async Task SubscribeAsync(SubscriptionRequest request, CancellationToken cancellationToken = default)
{
var subscription = new EventSubscription
{
SubscriptionId = Guid.NewGuid().ToString(),
SubscriberId = request.SubscriberId,
CorrelationId = request.CorrelationId,
EventTypes = request.EventTypes,
TerminalEventTypes = request.TerminalEventTypes,
DeliveryMode = request.DeliveryMode,
CreatedAt = DateTimeOffset.UtcNow,
ExpiresAt = request.Timeout.HasValue ? DateTimeOffset.UtcNow + request.Timeout.Value : null,
LastDeliveredSequence = 0,
Status = SubscriptionStatus.Active
};
await _subscriptionStore.CreateAsync(subscription, cancellationToken);
return subscription;
}
public async Task UnsubscribeAsync(string subscriptionId, CancellationToken cancellationToken = default)
{
var subscription = await _subscriptionStore.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
return;
subscription.Status = SubscriptionStatus.Cancelled;
subscription.CompletedAt = DateTimeOffset.UtcNow;
await _subscriptionStore.UpdateAsync(subscription, cancellationToken);
}
public async Task> GetActiveSubscriptionsAsync(string subscriberId, CancellationToken cancellationToken = default)
{
var subscriptions = await _subscriptionStore.GetBySubscriberIdAsync(subscriberId, cancellationToken);
return subscriptions.Where(s => s.Status == SubscriptionStatus.Active && !s.IsExpired).ToList();
}
public async Task CompleteSubscriptionAsync(string subscriptionId, CancellationToken cancellationToken = default)
{
var subscription = await _subscriptionStore.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
return;
subscription.Status = SubscriptionStatus.Completed;
subscription.CompletedAt = DateTimeOffset.UtcNow;
await _subscriptionStore.UpdateAsync(subscription, cancellationToken);
}
public async Task UpdateLastDeliveredAsync(string subscriptionId, long sequence, CancellationToken cancellationToken = default)
{
var subscription = await _subscriptionStore.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
return;
if (sequence > subscription.LastDeliveredSequence)
{
subscription.LastDeliveredSequence = sequence;
await _subscriptionStore.UpdateAsync(subscription, cancellationToken);
}
}
}