using System;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.SignalR;
///
/// SignalR hub for real-time event streaming to browser clients.
///
///
///
/// Clients can subscribe to specific event streams and receive events in real-time.
/// Supports both typed and untyped subscriptions.
///
///
/// Client Methods:
/// - SubscribeToStream(streamName): Subscribe to a stream
/// - UnsubscribeFromStream(streamName): Unsubscribe from a stream
/// - GetSubscriptions(): Get list of active subscriptions
///
///
/// Server Methods (pushed to clients):
/// - EventReceived(streamName, event): New event on subscribed stream
/// - SubscriptionConfirmed(streamName): Subscription successful
/// - SubscriptionRemoved(streamName): Unsubscription successful
///
///
public sealed class EventStreamHub : Hub
{
private readonly IEventStreamStore _eventStreamStore;
private readonly ILogger _logger;
// Track subscriptions per connection
private static readonly ConcurrentDictionary>
_subscriptions = new();
public EventStreamHub(
IEventStreamStore eventStreamStore,
ILogger logger)
{
_eventStreamStore = eventStreamStore ?? throw new ArgumentNullException(nameof(eventStreamStore));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
/// Subscribe to an event stream.
///
/// Name of the stream to subscribe to.
/// Optional offset to start reading from (default: latest).
public async Task SubscribeToStream(string streamName, long? startFromOffset = null)
{
if (string.IsNullOrWhiteSpace(streamName))
{
await Clients.Caller.SendAsync("Error", "Stream name cannot be empty");
return;
}
var connectionId = Context.ConnectionId;
_logger.LogInformation(
"Client {ConnectionId} subscribing to stream {StreamName} from offset {Offset}",
connectionId, streamName, startFromOffset ?? -1);
// Get or create connection subscriptions
var connectionSubs = _subscriptions.GetOrAdd(connectionId, _ => new ConcurrentDictionary());
// Check if already subscribed
if (connectionSubs.ContainsKey(streamName))
{
await Clients.Caller.SendAsync("Error", $"Already subscribed to stream '{streamName}'");
return;
}
// Create cancellation token for this subscription
var cts = new CancellationTokenSource();
connectionSubs.TryAdd(streamName, cts);
// Confirm subscription
await Clients.Caller.SendAsync("SubscriptionConfirmed", streamName);
// Start streaming events to client
_ = Task.Run(async () => await StreamEventsToClientAsync(connectionId, streamName, startFromOffset ?? -1, cts.Token));
}
///
/// Unsubscribe from an event stream.
///
/// Name of the stream to unsubscribe from.
public async Task UnsubscribeFromStream(string streamName)
{
var connectionId = Context.ConnectionId;
_logger.LogInformation(
"Client {ConnectionId} unsubscribing from stream {StreamName}",
connectionId, streamName);
if (_subscriptions.TryGetValue(connectionId, out var connectionSubs) &&
connectionSubs.TryRemove(streamName, out var cts))
{
cts.Cancel();
cts.Dispose();
await Clients.Caller.SendAsync("SubscriptionRemoved", streamName);
}
else
{
await Clients.Caller.SendAsync("Error", $"Not subscribed to stream '{streamName}'");
}
}
///
/// Get list of active subscriptions for this connection.
///
public Task GetSubscriptions()
{
var connectionId = Context.ConnectionId;
if (_subscriptions.TryGetValue(connectionId, out var connectionSubs))
{
return Task.FromResult(connectionSubs.Keys.ToArray());
}
return Task.FromResult(Array.Empty());
}
///
/// Called when a client disconnects.
///
public override async Task OnDisconnectedAsync(Exception? exception)
{
var connectionId = Context.ConnectionId;
_logger.LogInformation(
"Client {ConnectionId} disconnected. Cleaning up subscriptions.",
connectionId);
if (_subscriptions.TryRemove(connectionId, out var connectionSubs))
{
foreach (var cts in connectionSubs.Values)
{
cts.Cancel();
cts.Dispose();
}
}
await base.OnDisconnectedAsync(exception);
}
private async Task StreamEventsToClientAsync(
string connectionId,
string streamName,
long startOffset,
CancellationToken cancellationToken)
{
_logger.LogInformation(
"Starting event stream for client {ConnectionId} on stream {StreamName} from offset {Offset}",
connectionId, streamName, startOffset);
long currentOffset = startOffset;
try
{
while (!cancellationToken.IsCancellationRequested)
{
// Read batch of events
var events = await _eventStreamStore.ReadStreamAsync(
streamName,
currentOffset,
10, // batchSize
cancellationToken);
if (events.Count > 0)
{
// Send events to client
foreach (var @event in events)
{
await Clients.Client(connectionId).SendAsync(
"EventReceived",
streamName,
new
{
EventId = @event.EventId,
CorrelationId = @event.CorrelationId,
OccurredAt = @event.OccurredAt,
EventType = @event.GetType().Name,
Data = @event
},
cancellationToken);
currentOffset++;
}
_logger.LogDebug(
"Sent {Count} events to client {ConnectionId} on stream {StreamName}",
events.Count, connectionId, streamName);
}
else
{
// No new events, wait before polling again
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation(
"Event stream cancelled for client {ConnectionId} on stream {StreamName}",
connectionId, streamName);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error streaming events to client {ConnectionId} on stream {StreamName}",
connectionId, streamName);
try
{
await Clients.Client(connectionId).SendAsync(
"Error",
$"Error streaming from '{streamName}': {ex.Message}",
cancellationToken);
}
catch
{
// Client may have disconnected
}
}
}
}