dotnet-cqrs/Svrnty.CQRS.Notifications.Grpc/NotificationSubscriptionManager.cs

165 lines
5.6 KiB
C#

using System.Collections.Concurrent;
using Grpc.Core;
using Microsoft.Extensions.Logging;
namespace Svrnty.CQRS.Notifications.Grpc;
/// <summary>
/// Manages gRPC stream subscriptions for notifications.
/// Thread-safe singleton that tracks subscriptions and routes notifications to subscribers.
/// </summary>
public class NotificationSubscriptionManager
{
private readonly ConcurrentDictionary<(string TypeName, string Key), ConcurrentBag<object>> _subscriptions = new();
private readonly ILogger<NotificationSubscriptionManager> _logger;
public NotificationSubscriptionManager(ILogger<NotificationSubscriptionManager> logger)
{
_logger = logger;
}
/// <summary>
/// Subscribe to notifications of a specific domain type with a mapper to convert to proto format.
/// </summary>
/// <typeparam name="TDomain">The domain notification type.</typeparam>
/// <typeparam name="TProto">The proto message type.</typeparam>
/// <param name="subscriptionKey">The subscription key value (e.g., inventory ID).</param>
/// <param name="stream">The gRPC server stream writer.</param>
/// <param name="mapper">Function to map domain notification to proto message.</param>
/// <returns>A disposable that removes the subscription when disposed.</returns>
public IDisposable Subscribe<TDomain, TProto>(
object subscriptionKey,
IServerStreamWriter<TProto> stream,
Func<TDomain, TProto> mapper) where TDomain : class
{
var key = (typeof(TDomain).FullName!, subscriptionKey.ToString()!);
var subscriber = new Subscriber<TDomain, TProto>(stream, mapper);
var bag = _subscriptions.GetOrAdd(key, _ => new ConcurrentBag<object>());
bag.Add(subscriber);
_logger.LogInformation(
"Client subscribed to {NotificationType} with key {SubscriptionKey}. Total subscribers: {Count}",
typeof(TDomain).Name, subscriptionKey, bag.Count);
return new SubscriptionHandle(() => Remove(key, subscriber));
}
/// <summary>
/// Notify all subscribers of a specific notification type and subscription key.
/// </summary>
internal async Task NotifyAsync<TDomain>(TDomain notification, object subscriptionKey, CancellationToken ct) where TDomain : class
{
var key = (typeof(TDomain).FullName!, subscriptionKey.ToString()!);
if (!_subscriptions.TryGetValue(key, out var subscribers))
{
_logger.LogDebug(
"No subscribers for {NotificationType} with key {SubscriptionKey}",
typeof(TDomain).Name, subscriptionKey);
return;
}
var deadSubscribers = new List<object>();
foreach (var sub in subscribers)
{
if (sub is INotifiable<TDomain> notifiable)
{
try
{
await notifiable.NotifyAsync(notification, ct);
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to notify subscriber for {NotificationType}, marking for removal",
typeof(TDomain).Name);
deadSubscribers.Add(sub);
}
}
}
// Clean up dead subscribers
foreach (var dead in deadSubscribers)
{
Remove(key, dead);
}
_logger.LogDebug(
"Notified {Count} subscribers for {NotificationType} with key {SubscriptionKey}",
subscribers.Count - deadSubscribers.Count, typeof(TDomain).Name, subscriptionKey);
}
private void Remove((string TypeName, string Key) key, object subscriber)
{
if (_subscriptions.TryGetValue(key, out var bag))
{
// ConcurrentBag doesn't support removal, so we rebuild
var remaining = bag.Where(s => !ReferenceEquals(s, subscriber)).ToList();
if (remaining.Count == 0)
{
_subscriptions.TryRemove(key, out _);
}
else
{
var newBag = new ConcurrentBag<object>(remaining);
_subscriptions.TryUpdate(key, newBag, bag);
}
_logger.LogInformation(
"Client unsubscribed from {NotificationType} with key {SubscriptionKey}",
key.TypeName.Split('.').Last(), key.Key);
}
}
}
/// <summary>
/// Internal interface for type-erased notification delivery.
/// </summary>
internal interface INotifiable<in TDomain>
{
Task NotifyAsync(TDomain notification, CancellationToken ct);
}
/// <summary>
/// Wraps a gRPC stream writer with a domain→proto mapper.
/// </summary>
internal sealed class Subscriber<TDomain, TProto> : INotifiable<TDomain>
{
private readonly IServerStreamWriter<TProto> _stream;
private readonly Func<TDomain, TProto> _mapper;
public Subscriber(IServerStreamWriter<TProto> stream, Func<TDomain, TProto> mapper)
{
_stream = stream;
_mapper = mapper;
}
public async Task NotifyAsync(TDomain notification, CancellationToken ct)
{
var proto = _mapper(notification);
await _stream.WriteAsync(proto, ct);
}
}
/// <summary>
/// Handle that removes a subscription when disposed.
/// </summary>
internal sealed class SubscriptionHandle : IDisposable
{
private readonly Action _onDispose;
private bool _disposed;
public SubscriptionHandle(Action onDispose)
{
_onDispose = onDispose;
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_onDispose();
}
}