using System.Collections.Concurrent; using System.Reflection; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Notifications.Abstractions; namespace Svrnty.CQRS.Notifications.Grpc; /// /// Publishes notifications to subscribed gRPC clients. /// public class NotificationPublisher : INotificationPublisher { private readonly NotificationSubscriptionManager _manager; private readonly ILogger _logger; // Cache subscription key property info per notification type private static readonly ConcurrentDictionary _keyCache = new(); public NotificationPublisher( NotificationSubscriptionManager manager, ILogger logger) { _manager = manager; _logger = logger; } /// public async Task PublishAsync(TNotification notification, CancellationToken ct = default) where TNotification : class { ArgumentNullException.ThrowIfNull(notification); var keyInfo = GetSubscriptionKeyInfo(typeof(TNotification)); var subscriptionKey = keyInfo.Property.GetValue(notification); if (subscriptionKey == null) { _logger.LogWarning( "Subscription key {PropertyName} is null on {NotificationType}, skipping notification", keyInfo.PropertyName, typeof(TNotification).Name); return; } _logger.LogDebug( "Publishing {NotificationType} with subscription key {PropertyName}={KeyValue}", typeof(TNotification).Name, keyInfo.PropertyName, subscriptionKey); await _manager.NotifyAsync(notification, subscriptionKey, ct); } private static SubscriptionKeyInfo GetSubscriptionKeyInfo(Type type) { return _keyCache.GetOrAdd(type, t => { var attr = t.GetCustomAttribute(); if (attr == null) { throw new InvalidOperationException( $"Type {t.Name} is not marked with [{nameof(StreamingNotificationAttribute)}]. " + $"Add the attribute with a SubscriptionKey to enable streaming notifications."); } var property = t.GetProperty(attr.SubscriptionKey); if (property == null) { throw new InvalidOperationException( $"Property '{attr.SubscriptionKey}' specified in [{nameof(StreamingNotificationAttribute)}] " + $"was not found on type {t.Name}."); } return new SubscriptionKeyInfo(attr.SubscriptionKey, property); }); } private sealed record SubscriptionKeyInfo(string PropertyName, PropertyInfo Property); }