using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
namespace Svrnty.CQRS.Events.Subscriptions;
///
/// Default implementation of subscription lifecycle management.
///
public sealed class SubscriptionManager : ISubscriptionManager
{
private readonly IPersistentSubscriptionStore _store;
private readonly ILogger _logger;
public SubscriptionManager(
IPersistentSubscriptionStore store,
ILogger logger)
{
_store = store ?? throw new ArgumentNullException(nameof(store));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task CreateSubscriptionAsync(
string subscriberId,
string correlationId,
HashSet? eventTypes = null,
HashSet? terminalEventTypes = null,
DeliveryMode deliveryMode = DeliveryMode.Immediate,
DateTimeOffset? expiresAt = null,
string? dataSourceId = null,
CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Creating subscription for subscriber {SubscriberId} with correlation {CorrelationId}",
subscriberId,
correlationId);
var subscription = new PersistentSubscription
{
Id = Guid.NewGuid().ToString(),
SubscriberId = subscriberId,
CorrelationId = correlationId,
EventTypes = eventTypes ?? new HashSet(),
TerminalEventTypes = terminalEventTypes ?? new HashSet(),
DeliveryMode = deliveryMode,
CreatedAt = DateTimeOffset.UtcNow,
ExpiresAt = expiresAt,
DataSourceId = dataSourceId
};
await _store.CreateAsync(subscription, cancellationToken);
_logger.LogInformation(
"Subscription {SubscriptionId} created successfully",
subscription.Id);
return subscription;
}
public async Task GetSubscriptionAsync(
string subscriptionId,
CancellationToken cancellationToken = default)
{
return await _store.GetByIdAsync(subscriptionId, cancellationToken);
}
public async Task> GetSubscriberSubscriptionsAsync(
string subscriberId,
CancellationToken cancellationToken = default)
{
return await _store.GetBySubscriberIdAsync(subscriberId, cancellationToken);
}
public async Task> GetActiveSubscriptionsByCorrelationAsync(
string correlationId,
CancellationToken cancellationToken = default)
{
var subscriptions = await _store.GetByCorrelationIdAsync(correlationId, cancellationToken);
return subscriptions.Where(s => s.CanReceiveEvents).ToList();
}
public async Task MarkEventDeliveredAsync(
string subscriptionId,
long sequence,
CancellationToken cancellationToken = default)
{
var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
{
_logger.LogWarning(
"Cannot mark event delivered: subscription {SubscriptionId} not found",
subscriptionId);
return;
}
subscription.MarkDelivered(sequence);
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogDebug(
"Subscription {SubscriptionId} marked as delivered up to sequence {Sequence}",
subscriptionId,
sequence);
}
public async Task CompleteSubscriptionAsync(
string subscriptionId,
CancellationToken cancellationToken = default)
{
var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
{
_logger.LogWarning(
"Cannot complete subscription: {SubscriptionId} not found",
subscriptionId);
return;
}
subscription.Complete();
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogInformation(
"Subscription {SubscriptionId} completed",
subscriptionId);
}
public async Task CancelSubscriptionAsync(
string subscriptionId,
CancellationToken cancellationToken = default)
{
var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
{
_logger.LogWarning(
"Cannot cancel subscription: {SubscriptionId} not found",
subscriptionId);
return;
}
subscription.Cancel();
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogInformation(
"Subscription {SubscriptionId} cancelled",
subscriptionId);
}
public async Task PauseSubscriptionAsync(
string subscriptionId,
CancellationToken cancellationToken = default)
{
var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
{
_logger.LogWarning(
"Cannot pause subscription: {SubscriptionId} not found",
subscriptionId);
return;
}
subscription.Pause();
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogInformation(
"Subscription {SubscriptionId} paused",
subscriptionId);
}
public async Task ResumeSubscriptionAsync(
string subscriptionId,
CancellationToken cancellationToken = default)
{
var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
{
_logger.LogWarning(
"Cannot resume subscription: {SubscriptionId} not found",
subscriptionId);
return;
}
subscription.Resume();
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogInformation(
"Subscription {SubscriptionId} resumed",
subscriptionId);
}
public async Task AttachConnectionAsync(
string subscriptionId,
string connectionId,
CancellationToken cancellationToken = default)
{
var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
{
_logger.LogWarning(
"Cannot attach connection: subscription {SubscriptionId} not found",
subscriptionId);
return;
}
subscription.ConnectionId = connectionId;
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogDebug(
"Connection {ConnectionId} attached to subscription {SubscriptionId}",
connectionId,
subscriptionId);
}
public async Task DetachConnectionAsync(
string subscriptionId,
CancellationToken cancellationToken = default)
{
var subscription = await _store.GetByIdAsync(subscriptionId, cancellationToken);
if (subscription == null)
{
_logger.LogWarning(
"Cannot detach connection: subscription {SubscriptionId} not found",
subscriptionId);
return;
}
var oldConnectionId = subscription.ConnectionId;
subscription.ConnectionId = null;
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogDebug(
"Connection {ConnectionId} detached from subscription {SubscriptionId}",
oldConnectionId,
subscriptionId);
}
public async Task CleanupExpiredSubscriptionsAsync(
CancellationToken cancellationToken = default)
{
var expiredSubscriptions = await _store.GetExpiredSubscriptionsAsync(cancellationToken);
_logger.LogInformation(
"Found {Count} expired subscriptions to clean up",
expiredSubscriptions.Count);
foreach (var subscription in expiredSubscriptions)
{
subscription.Expire();
await _store.UpdateAsync(subscription, cancellationToken);
_logger.LogDebug(
"Subscription {SubscriptionId} marked as expired",
subscription.Id);
}
}
}