using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
namespace Svrnty.CQRS.Events.SignalR;
///
/// SignalR hub for persistent, correlation-based event subscriptions.
/// Supports offline event storage and catch-up on reconnect.
///
///
///
/// Client Methods:
/// - CreateSubscription(request): Create a new persistent subscription
/// - CancelSubscription(subscriptionId): Cancel an existing subscription
/// - CatchUp(subscriptionId): Request missed events
/// - AttachSubscription(subscriptionId): Attach to an existing subscription on reconnect
/// - DetachSubscription(subscriptionId): Temporarily detach from subscription
///
///
/// Server Methods (pushed to clients):
/// - SubscriptionCreated(subscription): Subscription created successfully
/// - EventReceived(subscriptionId, event): New event delivered
/// - SubscriptionCompleted(subscriptionId): Terminal event received
/// - CatchUpComplete(subscriptionId, count): Catch-up finished
/// - Error(message): Error occurred
///
///
public sealed class PersistentSubscriptionHub : Hub
{
private readonly ISubscriptionManager _subscriptionManager;
private readonly IPersistentSubscriptionDeliveryService _deliveryService;
private readonly ILogger _logger;
public PersistentSubscriptionHub(
ISubscriptionManager subscriptionManager,
IPersistentSubscriptionDeliveryService deliveryService,
ILogger logger)
{
_subscriptionManager = subscriptionManager ?? throw new ArgumentNullException(nameof(subscriptionManager));
_deliveryService = deliveryService ?? throw new ArgumentNullException(nameof(deliveryService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
///
/// Create a new persistent subscription.
///
public async Task CreateSubscription(CreateSubscriptionRequest request)
{
if (request == null)
{
await Clients.Caller.SendAsync("Error", "Request cannot be null");
return;
}
if (string.IsNullOrWhiteSpace(request.SubscriberId))
{
await Clients.Caller.SendAsync("Error", "SubscriberId is required");
return;
}
if (string.IsNullOrWhiteSpace(request.CorrelationId))
{
await Clients.Caller.SendAsync("Error", "CorrelationId is required");
return;
}
try
{
_logger.LogInformation(
"Creating subscription for subscriber {SubscriberId} with correlation {CorrelationId}",
request.SubscriberId,
request.CorrelationId);
var subscription = await _subscriptionManager.CreateSubscriptionAsync(
subscriberId: request.SubscriberId,
correlationId: request.CorrelationId,
eventTypes: request.EventTypes?.ToHashSet(),
terminalEventTypes: request.TerminalEventTypes?.ToHashSet(),
deliveryMode: request.DeliveryMode,
expiresAt: request.ExpiresAt,
dataSourceId: request.DataSourceId);
// Attach connection to subscription
await _subscriptionManager.AttachConnectionAsync(subscription.Id, Context.ConnectionId);
// Send confirmation to client
await Clients.Caller.SendAsync("SubscriptionCreated", new
{
subscription.Id,
subscription.SubscriberId,
subscription.CorrelationId,
subscription.EventTypes,
subscription.TerminalEventTypes,
subscription.DeliveryMode,
subscription.CreatedAt,
subscription.ExpiresAt
});
_logger.LogInformation(
"Subscription {SubscriptionId} created and attached to connection {ConnectionId}",
subscription.Id,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating subscription for {SubscriberId}", request.SubscriberId);
await Clients.Caller.SendAsync("Error", $"Failed to create subscription: {ex.Message}");
}
}
///
/// Attach to an existing subscription (e.g., on reconnect).
///
public async Task AttachSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
var subscription = await _subscriptionManager.GetSubscriptionAsync(subscriptionId);
if (subscription == null)
{
await Clients.Caller.SendAsync("Error", $"Subscription {subscriptionId} not found");
return;
}
// Attach connection
await _subscriptionManager.AttachConnectionAsync(subscriptionId, Context.ConnectionId);
await Clients.Caller.SendAsync("SubscriptionAttached", new
{
subscription.Id,
subscription.Status,
subscription.LastDeliveredSequence
});
_logger.LogInformation(
"Subscription {SubscriptionId} attached to connection {ConnectionId}",
subscriptionId,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error attaching subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to attach subscription: {ex.Message}");
}
}
///
/// Detach from a subscription without cancelling it.
///
public async Task DetachSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.DetachConnectionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionDetached", subscriptionId);
_logger.LogInformation(
"Subscription {SubscriptionId} detached from connection {ConnectionId}",
subscriptionId,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error detaching subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to detach subscription: {ex.Message}");
}
}
///
/// Cancel a subscription permanently.
///
public async Task CancelSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.CancelSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionCancelled", subscriptionId);
_logger.LogInformation(
"Subscription {SubscriptionId} cancelled by connection {ConnectionId}",
subscriptionId,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error cancelling subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to cancel subscription: {ex.Message}");
}
}
///
/// Request catch-up for missed events.
///
public async Task CatchUp(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
_logger.LogInformation(
"Starting catch-up for subscription {SubscriptionId}",
subscriptionId);
// Get pending events
var pendingEvents = await _deliveryService.GetPendingEventsAsync(subscriptionId, limit: 100);
if (pendingEvents.Count == 0)
{
await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, 0);
return;
}
// Send events to client
foreach (var eventData in pendingEvents)
{
await Clients.Caller.SendAsync("EventReceived", subscriptionId, new
{
eventData.EventId,
eventData.CorrelationId,
EventType = eventData.GetType().Name,
eventData.OccurredAt,
Data = eventData
});
}
// Perform catch-up (updates LastDeliveredSequence)
var count = await _deliveryService.CatchUpSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, count);
_logger.LogInformation(
"Catch-up complete for subscription {SubscriptionId}: {Count} events",
subscriptionId,
count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during catch-up for subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to catch up: {ex.Message}");
}
}
///
/// Pause a subscription (stop event delivery).
///
public async Task PauseSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.PauseSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionPaused", subscriptionId);
_logger.LogInformation("Subscription {SubscriptionId} paused", subscriptionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error pausing subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to pause subscription: {ex.Message}");
}
}
///
/// Resume a paused subscription.
///
public async Task ResumeSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.ResumeSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionResumed", subscriptionId);
_logger.LogInformation("Subscription {SubscriptionId} resumed", subscriptionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error resuming subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to resume subscription: {ex.Message}");
}
}
///
/// Get all subscriptions for the current user.
///
public async Task