using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
namespace Svrnty.CQRS.Events.Subscriptions;
///
/// In-memory implementation of subscription store for development and testing.
///
public sealed class InMemorySubscriptionStore : IPersistentSubscriptionStore
{
private readonly ConcurrentDictionary _subscriptions = new();
public Task CreateAsync(
PersistentSubscription subscription,
CancellationToken cancellationToken = default)
{
if (!_subscriptions.TryAdd(subscription.Id, subscription))
{
throw new InvalidOperationException($"Subscription with ID {subscription.Id} already exists");
}
return Task.FromResult(subscription);
}
public Task GetByIdAsync(
string id,
CancellationToken cancellationToken = default)
{
_subscriptions.TryGetValue(id, out var subscription);
return Task.FromResult(subscription);
}
public Task> GetBySubscriberIdAsync(
string subscriberId,
CancellationToken cancellationToken = default)
{
var subscriptions = _subscriptions.Values
.Where(s => s.SubscriberId == subscriberId)
.ToList();
return Task.FromResult>(subscriptions);
}
public Task> GetByCorrelationIdAsync(
string correlationId,
CancellationToken cancellationToken = default)
{
var subscriptions = _subscriptions.Values
.Where(s => s.CorrelationId == correlationId)
.ToList();
return Task.FromResult>(subscriptions);
}
public Task> GetByStatusAsync(
SubscriptionStatus status,
CancellationToken cancellationToken = default)
{
var subscriptions = _subscriptions.Values
.Where(s => s.Status == status)
.ToList();
return Task.FromResult>(subscriptions);
}
public Task> GetByConnectionIdAsync(
string connectionId,
CancellationToken cancellationToken = default)
{
var subscriptions = _subscriptions.Values
.Where(s => s.ConnectionId == connectionId)
.ToList();
return Task.FromResult>(subscriptions);
}
public Task UpdateAsync(
PersistentSubscription subscription,
CancellationToken cancellationToken = default)
{
_subscriptions[subscription.Id] = subscription;
return Task.CompletedTask;
}
public Task DeleteAsync(
string id,
CancellationToken cancellationToken = default)
{
_subscriptions.TryRemove(id, out _);
return Task.CompletedTask;
}
public Task> GetExpiredSubscriptionsAsync(
CancellationToken cancellationToken = default)
{
var now = DateTimeOffset.UtcNow;
var expired = _subscriptions.Values
.Where(s => s.ExpiresAt.HasValue && s.ExpiresAt.Value < now && s.Status == SubscriptionStatus.Active)
.ToList();
return Task.FromResult>(expired);
}
}