using System; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.SignalR; /// /// SignalR hub for real-time event streaming to browser clients. /// /// /// /// Clients can subscribe to specific event streams and receive events in real-time. /// Supports both typed and untyped subscriptions. /// /// /// Client Methods: /// - SubscribeToStream(streamName): Subscribe to a stream /// - UnsubscribeFromStream(streamName): Unsubscribe from a stream /// - GetSubscriptions(): Get list of active subscriptions /// /// /// Server Methods (pushed to clients): /// - EventReceived(streamName, event): New event on subscribed stream /// - SubscriptionConfirmed(streamName): Subscription successful /// - SubscriptionRemoved(streamName): Unsubscription successful /// /// public sealed class EventStreamHub : Hub { private readonly IEventStreamStore _eventStreamStore; private readonly ILogger _logger; // Track subscriptions per connection private static readonly ConcurrentDictionary> _subscriptions = new(); public EventStreamHub( IEventStreamStore eventStreamStore, ILogger logger) { _eventStreamStore = eventStreamStore ?? throw new ArgumentNullException(nameof(eventStreamStore)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// /// Subscribe to an event stream. /// /// Name of the stream to subscribe to. /// Optional offset to start reading from (default: latest). public async Task SubscribeToStream(string streamName, long? startFromOffset = null) { if (string.IsNullOrWhiteSpace(streamName)) { await Clients.Caller.SendAsync("Error", "Stream name cannot be empty"); return; } var connectionId = Context.ConnectionId; _logger.LogInformation( "Client {ConnectionId} subscribing to stream {StreamName} from offset {Offset}", connectionId, streamName, startFromOffset ?? -1); // Get or create connection subscriptions var connectionSubs = _subscriptions.GetOrAdd(connectionId, _ => new ConcurrentDictionary()); // Check if already subscribed if (connectionSubs.ContainsKey(streamName)) { await Clients.Caller.SendAsync("Error", $"Already subscribed to stream '{streamName}'"); return; } // Create cancellation token for this subscription var cts = new CancellationTokenSource(); connectionSubs.TryAdd(streamName, cts); // Confirm subscription await Clients.Caller.SendAsync("SubscriptionConfirmed", streamName); // Start streaming events to client _ = Task.Run(async () => await StreamEventsToClientAsync(connectionId, streamName, startFromOffset ?? -1, cts.Token)); } /// /// Unsubscribe from an event stream. /// /// Name of the stream to unsubscribe from. public async Task UnsubscribeFromStream(string streamName) { var connectionId = Context.ConnectionId; _logger.LogInformation( "Client {ConnectionId} unsubscribing from stream {StreamName}", connectionId, streamName); if (_subscriptions.TryGetValue(connectionId, out var connectionSubs) && connectionSubs.TryRemove(streamName, out var cts)) { cts.Cancel(); cts.Dispose(); await Clients.Caller.SendAsync("SubscriptionRemoved", streamName); } else { await Clients.Caller.SendAsync("Error", $"Not subscribed to stream '{streamName}'"); } } /// /// Get list of active subscriptions for this connection. /// public Task GetSubscriptions() { var connectionId = Context.ConnectionId; if (_subscriptions.TryGetValue(connectionId, out var connectionSubs)) { return Task.FromResult(connectionSubs.Keys.ToArray()); } return Task.FromResult(Array.Empty()); } /// /// Called when a client disconnects. /// public override async Task OnDisconnectedAsync(Exception? exception) { var connectionId = Context.ConnectionId; _logger.LogInformation( "Client {ConnectionId} disconnected. Cleaning up subscriptions.", connectionId); if (_subscriptions.TryRemove(connectionId, out var connectionSubs)) { foreach (var cts in connectionSubs.Values) { cts.Cancel(); cts.Dispose(); } } await base.OnDisconnectedAsync(exception); } private async Task StreamEventsToClientAsync( string connectionId, string streamName, long startOffset, CancellationToken cancellationToken) { _logger.LogInformation( "Starting event stream for client {ConnectionId} on stream {StreamName} from offset {Offset}", connectionId, streamName, startOffset); long currentOffset = startOffset; try { while (!cancellationToken.IsCancellationRequested) { // Read batch of events var events = await _eventStreamStore.ReadStreamAsync( streamName, currentOffset, 10, // batchSize cancellationToken); if (events.Count > 0) { // Send events to client foreach (var @event in events) { await Clients.Client(connectionId).SendAsync( "EventReceived", streamName, new { EventId = @event.EventId, CorrelationId = @event.CorrelationId, OccurredAt = @event.OccurredAt, EventType = @event.GetType().Name, Data = @event }, cancellationToken); currentOffset++; } _logger.LogDebug( "Sent {Count} events to client {ConnectionId} on stream {StreamName}", events.Count, connectionId, streamName); } else { // No new events, wait before polling again await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); } } } catch (OperationCanceledException) { _logger.LogInformation( "Event stream cancelled for client {ConnectionId} on stream {StreamName}", connectionId, streamName); } catch (Exception ex) { _logger.LogError(ex, "Error streaming events to client {ConnectionId} on stream {StreamName}", connectionId, streamName); try { await Clients.Client(connectionId).SendAsync( "Error", $"Error streaming from '{streamName}': {ex.Message}", cancellationToken); } catch { // Client may have disconnected } } } }