using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions.Subscriptions; namespace Svrnty.CQRS.Events.Subscriptions; /// /// Default implementation of subscription lifecycle management. /// public sealed class SubscriptionManager : ISubscriptionManager { private readonly IPersistentSubscriptionStore _store; private readonly ILogger _logger; public SubscriptionManager( IPersistentSubscriptionStore store, ILogger logger) { _store = store ?? throw new ArgumentNullException(nameof(store)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public async Task CreateSubscriptionAsync( string subscriberId, string correlationId, HashSet? eventTypes = null, HashSet? terminalEventTypes = null, DeliveryMode deliveryMode = DeliveryMode.Immediate, DateTimeOffset? expiresAt = null, string? dataSourceId = null, CancellationToken cancellationToken = default) { _logger.LogInformation( "Creating subscription for subscriber {SubscriberId} with correlation {CorrelationId}", subscriberId, correlationId); var subscription = new PersistentSubscription { Id = Guid.NewGuid().ToString(), SubscriberId = subscriberId, CorrelationId = correlationId, EventTypes = eventTypes ?? new HashSet(), TerminalEventTypes = terminalEventTypes ?? new HashSet(), DeliveryMode = deliveryMode, CreatedAt = DateTimeOffset.UtcNow, ExpiresAt = expiresAt, DataSourceId = dataSourceId }; await _store.CreateAsync(subscription, cancellationToken); _logger.LogInformation( "Subscription {SubscriptionId} created successfully", subscription.Id); return subscription; } public async Task GetSubscriptionAsync( string subscriptionId, CancellationToken cancellationToken = default) { return await _store.GetByIdAsync(subscriptionId, cancellationToken); } public async Task> GetSubscriberSubscriptionsAsync( string subscriberId, CancellationToken cancellationToken = default) { return await _store.GetBySubscriberIdAsync(subscriberId, cancellationToken); } public async Task> GetActiveSubscriptionsByCorrelationAsync( string correlationId, CancellationToken cancellationToken = default) { var subscriptions = await _store.GetByCorrelationIdAsync(correlationId, cancellationToken); return subscriptions.Where(s => s.CanReceiveEvents).ToList(); } public async Task MarkEventDeliveredAsync( string subscriptionId, long sequence, CancellationToken cancellationToken = default) { var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) { _logger.LogWarning( "Cannot mark event delivered: subscription {SubscriptionId} not found", subscriptionId); return; } subscription.MarkDelivered(sequence); await _store.UpdateAsync(subscription, cancellationToken); _logger.LogDebug( "Subscription {SubscriptionId} marked as delivered up to sequence {Sequence}", subscriptionId, sequence); } public async Task CompleteSubscriptionAsync( string subscriptionId, CancellationToken cancellationToken = default) { var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) { _logger.LogWarning( "Cannot complete subscription: {SubscriptionId} not found", subscriptionId); return; } subscription.Complete(); await _store.UpdateAsync(subscription, cancellationToken); _logger.LogInformation( "Subscription {SubscriptionId} completed", subscriptionId); } public async Task CancelSubscriptionAsync( string subscriptionId, CancellationToken cancellationToken = default) { var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) { _logger.LogWarning( "Cannot cancel subscription: {SubscriptionId} not found", subscriptionId); return; } subscription.Cancel(); await _store.UpdateAsync(subscription, cancellationToken); _logger.LogInformation( "Subscription {SubscriptionId} cancelled", subscriptionId); } public async Task PauseSubscriptionAsync( string subscriptionId, CancellationToken cancellationToken = default) { var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) { _logger.LogWarning( "Cannot pause subscription: {SubscriptionId} not found", subscriptionId); return; } subscription.Pause(); await _store.UpdateAsync(subscription, cancellationToken); _logger.LogInformation( "Subscription {SubscriptionId} paused", subscriptionId); } public async Task ResumeSubscriptionAsync( string subscriptionId, CancellationToken cancellationToken = default) { var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) { _logger.LogWarning( "Cannot resume subscription: {SubscriptionId} not found", subscriptionId); return; } subscription.Resume(); await _store.UpdateAsync(subscription, cancellationToken); _logger.LogInformation( "Subscription {SubscriptionId} resumed", subscriptionId); } public async Task AttachConnectionAsync( string subscriptionId, string connectionId, CancellationToken cancellationToken = default) { var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) { _logger.LogWarning( "Cannot attach connection: subscription {SubscriptionId} not found", subscriptionId); return; } subscription.ConnectionId = connectionId; await _store.UpdateAsync(subscription, cancellationToken); _logger.LogDebug( "Connection {ConnectionId} attached to subscription {SubscriptionId}", connectionId, subscriptionId); } public async Task DetachConnectionAsync( string subscriptionId, CancellationToken cancellationToken = default) { var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken); if (subscription == null) { _logger.LogWarning( "Cannot detach connection: subscription {SubscriptionId} not found", subscriptionId); return; } var oldConnectionId = subscription.ConnectionId; subscription.ConnectionId = null; await _store.UpdateAsync(subscription, cancellationToken); _logger.LogDebug( "Connection {ConnectionId} detached from subscription {SubscriptionId}", oldConnectionId, subscriptionId); } public async Task CleanupExpiredSubscriptionsAsync( CancellationToken cancellationToken = default) { var expiredSubscriptions = await _store.GetExpiredSubscriptionsAsync(cancellationToken); _logger.LogInformation( "Found {Count} expired subscriptions to clean up", expiredSubscriptions.Count); foreach (var subscription in expiredSubscriptions) { subscription.Expire(); await _store.UpdateAsync(subscription, cancellationToken); _logger.LogDebug( "Subscription {SubscriptionId} marked as expired", subscription.Id); } } }