236 lines
8.0 KiB
C#
236 lines
8.0 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using System.Collections.Concurrent;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.Extensions.Logging;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.SignalR;
|
|
|
|
/// <summary>
|
|
/// SignalR hub for real-time event streaming to browser clients.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// Clients can subscribe to specific event streams and receive events in real-time.
|
|
/// Supports both typed and untyped subscriptions.
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Client Methods:</strong>
|
|
/// - SubscribeToStream(streamName): Subscribe to a stream
|
|
/// - UnsubscribeFromStream(streamName): Unsubscribe from a stream
|
|
/// - GetSubscriptions(): Get list of active subscriptions
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Server Methods (pushed to clients):</strong>
|
|
/// - EventReceived(streamName, event): New event on subscribed stream
|
|
/// - SubscriptionConfirmed(streamName): Subscription successful
|
|
/// - SubscriptionRemoved(streamName): Unsubscription successful
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class EventStreamHub : Hub
|
|
{
|
|
private readonly IEventStreamStore _eventStreamStore;
|
|
private readonly ILogger<EventStreamHub> _logger;
|
|
|
|
// Track subscriptions per connection
|
|
private static readonly ConcurrentDictionary<string, ConcurrentDictionary<string, CancellationTokenSource>>
|
|
_subscriptions = new();
|
|
|
|
public EventStreamHub(
|
|
IEventStreamStore eventStreamStore,
|
|
ILogger<EventStreamHub> logger)
|
|
{
|
|
_eventStreamStore = eventStreamStore ?? throw new ArgumentNullException(nameof(eventStreamStore));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Subscribe to an event stream.
|
|
/// </summary>
|
|
/// <param name="streamName">Name of the stream to subscribe to.</param>
|
|
/// <param name="startFromOffset">Optional offset to start reading from (default: latest).</param>
|
|
public async Task SubscribeToStream(string streamName, long? startFromOffset = null)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(streamName))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "Stream name cannot be empty");
|
|
return;
|
|
}
|
|
|
|
var connectionId = Context.ConnectionId;
|
|
|
|
_logger.LogInformation(
|
|
"Client {ConnectionId} subscribing to stream {StreamName} from offset {Offset}",
|
|
connectionId, streamName, startFromOffset ?? -1);
|
|
|
|
// Get or create connection subscriptions
|
|
var connectionSubs = _subscriptions.GetOrAdd(connectionId, _ => new ConcurrentDictionary<string, CancellationTokenSource>());
|
|
|
|
// Check if already subscribed
|
|
if (connectionSubs.ContainsKey(streamName))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", $"Already subscribed to stream '{streamName}'");
|
|
return;
|
|
}
|
|
|
|
// Create cancellation token for this subscription
|
|
var cts = new CancellationTokenSource();
|
|
connectionSubs.TryAdd(streamName, cts);
|
|
|
|
// Confirm subscription
|
|
await Clients.Caller.SendAsync("SubscriptionConfirmed", streamName);
|
|
|
|
// Start streaming events to client
|
|
_ = Task.Run(async () => await StreamEventsToClientAsync(connectionId, streamName, startFromOffset ?? -1, cts.Token));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Unsubscribe from an event stream.
|
|
/// </summary>
|
|
/// <param name="streamName">Name of the stream to unsubscribe from.</param>
|
|
public async Task UnsubscribeFromStream(string streamName)
|
|
{
|
|
var connectionId = Context.ConnectionId;
|
|
|
|
_logger.LogInformation(
|
|
"Client {ConnectionId} unsubscribing from stream {StreamName}",
|
|
connectionId, streamName);
|
|
|
|
if (_subscriptions.TryGetValue(connectionId, out var connectionSubs) &&
|
|
connectionSubs.TryRemove(streamName, out var cts))
|
|
{
|
|
cts.Cancel();
|
|
cts.Dispose();
|
|
|
|
await Clients.Caller.SendAsync("SubscriptionRemoved", streamName);
|
|
}
|
|
else
|
|
{
|
|
await Clients.Caller.SendAsync("Error", $"Not subscribed to stream '{streamName}'");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get list of active subscriptions for this connection.
|
|
/// </summary>
|
|
public Task<string[]> GetSubscriptions()
|
|
{
|
|
var connectionId = Context.ConnectionId;
|
|
|
|
if (_subscriptions.TryGetValue(connectionId, out var connectionSubs))
|
|
{
|
|
return Task.FromResult(connectionSubs.Keys.ToArray());
|
|
}
|
|
|
|
return Task.FromResult(Array.Empty<string>());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Called when a client disconnects.
|
|
/// </summary>
|
|
public override async Task OnDisconnectedAsync(Exception? exception)
|
|
{
|
|
var connectionId = Context.ConnectionId;
|
|
|
|
_logger.LogInformation(
|
|
"Client {ConnectionId} disconnected. Cleaning up subscriptions.",
|
|
connectionId);
|
|
|
|
if (_subscriptions.TryRemove(connectionId, out var connectionSubs))
|
|
{
|
|
foreach (var cts in connectionSubs.Values)
|
|
{
|
|
cts.Cancel();
|
|
cts.Dispose();
|
|
}
|
|
}
|
|
|
|
await base.OnDisconnectedAsync(exception);
|
|
}
|
|
|
|
private async Task StreamEventsToClientAsync(
|
|
string connectionId,
|
|
string streamName,
|
|
long startOffset,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
_logger.LogInformation(
|
|
"Starting event stream for client {ConnectionId} on stream {StreamName} from offset {Offset}",
|
|
connectionId, streamName, startOffset);
|
|
|
|
long currentOffset = startOffset;
|
|
|
|
try
|
|
{
|
|
while (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
// Read batch of events
|
|
var events = await _eventStreamStore.ReadStreamAsync(
|
|
streamName,
|
|
currentOffset,
|
|
10, // batchSize
|
|
cancellationToken);
|
|
|
|
if (events.Count > 0)
|
|
{
|
|
// Send events to client
|
|
foreach (var @event in events)
|
|
{
|
|
await Clients.Client(connectionId).SendAsync(
|
|
"EventReceived",
|
|
streamName,
|
|
new
|
|
{
|
|
EventId = @event.EventId,
|
|
CorrelationId = @event.CorrelationId,
|
|
OccurredAt = @event.OccurredAt,
|
|
EventType = @event.GetType().Name,
|
|
Data = @event
|
|
},
|
|
cancellationToken);
|
|
|
|
currentOffset++;
|
|
}
|
|
|
|
_logger.LogDebug(
|
|
"Sent {Count} events to client {ConnectionId} on stream {StreamName}",
|
|
events.Count, connectionId, streamName);
|
|
}
|
|
else
|
|
{
|
|
// No new events, wait before polling again
|
|
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
|
|
}
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
_logger.LogInformation(
|
|
"Event stream cancelled for client {ConnectionId} on stream {StreamName}",
|
|
connectionId, streamName);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Error streaming events to client {ConnectionId} on stream {StreamName}",
|
|
connectionId, streamName);
|
|
|
|
try
|
|
{
|
|
await Clients.Client(connectionId).SendAsync(
|
|
"Error",
|
|
$"Error streaming from '{streamName}': {ex.Message}",
|
|
cancellationToken);
|
|
}
|
|
catch
|
|
{
|
|
// Client may have disconnected
|
|
}
|
|
}
|
|
}
|
|
}
|