dotnet-cqrs/Svrnty.CQRS.Events.SignalR/PersistentSubscriptionHub.cs

403 lines
14 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
namespace Svrnty.CQRS.Events.SignalR;
/// <summary>
/// SignalR hub for persistent, correlation-based event subscriptions.
/// Supports offline event storage and catch-up on reconnect.
/// </summary>
/// <remarks>
/// <para>
/// <strong>Client Methods:</strong>
/// - CreateSubscription(request): Create a new persistent subscription
/// - CancelSubscription(subscriptionId): Cancel an existing subscription
/// - CatchUp(subscriptionId): Request missed events
/// - AttachSubscription(subscriptionId): Attach to an existing subscription on reconnect
/// - DetachSubscription(subscriptionId): Temporarily detach from subscription
/// </para>
/// <para>
/// <strong>Server Methods (pushed to clients):</strong>
/// - SubscriptionCreated(subscription): Subscription created successfully
/// - EventReceived(subscriptionId, event): New event delivered
/// - SubscriptionCompleted(subscriptionId): Terminal event received
/// - CatchUpComplete(subscriptionId, count): Catch-up finished
/// - Error(message): Error occurred
/// </para>
/// </remarks>
public sealed class PersistentSubscriptionHub : Hub
{
private readonly ISubscriptionManager _subscriptionManager;
private readonly IPersistentSubscriptionDeliveryService _deliveryService;
private readonly ILogger<PersistentSubscriptionHub> _logger;
public PersistentSubscriptionHub(
ISubscriptionManager subscriptionManager,
IPersistentSubscriptionDeliveryService deliveryService,
ILogger<PersistentSubscriptionHub> logger)
{
_subscriptionManager = subscriptionManager ?? throw new ArgumentNullException(nameof(subscriptionManager));
_deliveryService = deliveryService ?? throw new ArgumentNullException(nameof(deliveryService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Create a new persistent subscription.
/// </summary>
public async Task CreateSubscription(CreateSubscriptionRequest request)
{
if (request == null)
{
await Clients.Caller.SendAsync("Error", "Request cannot be null");
return;
}
if (string.IsNullOrWhiteSpace(request.SubscriberId))
{
await Clients.Caller.SendAsync("Error", "SubscriberId is required");
return;
}
if (string.IsNullOrWhiteSpace(request.CorrelationId))
{
await Clients.Caller.SendAsync("Error", "CorrelationId is required");
return;
}
try
{
_logger.LogInformation(
"Creating subscription for subscriber {SubscriberId} with correlation {CorrelationId}",
request.SubscriberId,
request.CorrelationId);
var subscription = await _subscriptionManager.CreateSubscriptionAsync(
subscriberId: request.SubscriberId,
correlationId: request.CorrelationId,
eventTypes: request.EventTypes?.ToHashSet(),
terminalEventTypes: request.TerminalEventTypes?.ToHashSet(),
deliveryMode: request.DeliveryMode,
expiresAt: request.ExpiresAt,
dataSourceId: request.DataSourceId);
// Attach connection to subscription
await _subscriptionManager.AttachConnectionAsync(subscription.Id, Context.ConnectionId);
// Send confirmation to client
await Clients.Caller.SendAsync("SubscriptionCreated", new
{
subscription.Id,
subscription.SubscriberId,
subscription.CorrelationId,
subscription.EventTypes,
subscription.TerminalEventTypes,
subscription.DeliveryMode,
subscription.CreatedAt,
subscription.ExpiresAt
});
_logger.LogInformation(
"Subscription {SubscriptionId} created and attached to connection {ConnectionId}",
subscription.Id,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating subscription for {SubscriberId}", request.SubscriberId);
await Clients.Caller.SendAsync("Error", $"Failed to create subscription: {ex.Message}");
}
}
/// <summary>
/// Attach to an existing subscription (e.g., on reconnect).
/// </summary>
public async Task AttachSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
var subscription = await _subscriptionManager.GetSubscriptionAsync(subscriptionId);
if (subscription == null)
{
await Clients.Caller.SendAsync("Error", $"Subscription {subscriptionId} not found");
return;
}
// Attach connection
await _subscriptionManager.AttachConnectionAsync(subscriptionId, Context.ConnectionId);
await Clients.Caller.SendAsync("SubscriptionAttached", new
{
subscription.Id,
subscription.Status,
subscription.LastDeliveredSequence
});
_logger.LogInformation(
"Subscription {SubscriptionId} attached to connection {ConnectionId}",
subscriptionId,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error attaching subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to attach subscription: {ex.Message}");
}
}
/// <summary>
/// Detach from a subscription without cancelling it.
/// </summary>
public async Task DetachSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.DetachConnectionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionDetached", subscriptionId);
_logger.LogInformation(
"Subscription {SubscriptionId} detached from connection {ConnectionId}",
subscriptionId,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error detaching subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to detach subscription: {ex.Message}");
}
}
/// <summary>
/// Cancel a subscription permanently.
/// </summary>
public async Task CancelSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.CancelSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionCancelled", subscriptionId);
_logger.LogInformation(
"Subscription {SubscriptionId} cancelled by connection {ConnectionId}",
subscriptionId,
Context.ConnectionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error cancelling subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to cancel subscription: {ex.Message}");
}
}
/// <summary>
/// Request catch-up for missed events.
/// </summary>
public async Task CatchUp(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
_logger.LogInformation(
"Starting catch-up for subscription {SubscriptionId}",
subscriptionId);
// Get pending events
var pendingEvents = await _deliveryService.GetPendingEventsAsync(subscriptionId, limit: 100);
if (pendingEvents.Count == 0)
{
await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, 0);
return;
}
// Send events to client
foreach (var eventData in pendingEvents)
{
await Clients.Caller.SendAsync("EventReceived", subscriptionId, new
{
eventData.EventId,
eventData.CorrelationId,
EventType = eventData.GetType().Name,
eventData.OccurredAt,
Data = eventData
});
}
// Perform catch-up (updates LastDeliveredSequence)
var count = await _deliveryService.CatchUpSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, count);
_logger.LogInformation(
"Catch-up complete for subscription {SubscriptionId}: {Count} events",
subscriptionId,
count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during catch-up for subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to catch up: {ex.Message}");
}
}
/// <summary>
/// Pause a subscription (stop event delivery).
/// </summary>
public async Task PauseSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.PauseSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionPaused", subscriptionId);
_logger.LogInformation("Subscription {SubscriptionId} paused", subscriptionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error pausing subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to pause subscription: {ex.Message}");
}
}
/// <summary>
/// Resume a paused subscription.
/// </summary>
public async Task ResumeSubscription(string subscriptionId)
{
if (string.IsNullOrWhiteSpace(subscriptionId))
{
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
return;
}
try
{
await _subscriptionManager.ResumeSubscriptionAsync(subscriptionId);
await Clients.Caller.SendAsync("SubscriptionResumed", subscriptionId);
_logger.LogInformation("Subscription {SubscriptionId} resumed", subscriptionId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error resuming subscription {SubscriptionId}", subscriptionId);
await Clients.Caller.SendAsync("Error", $"Failed to resume subscription: {ex.Message}");
}
}
/// <summary>
/// Get all subscriptions for the current user.
/// </summary>
public async Task<object[]> GetMySubscriptions(string subscriberId)
{
if (string.IsNullOrWhiteSpace(subscriberId))
{
return Array.Empty<object>();
}
try
{
var subscriptions = await _subscriptionManager.GetSubscriberSubscriptionsAsync(subscriberId);
return subscriptions.Select(s => new
{
s.Id,
s.CorrelationId,
s.EventTypes,
s.TerminalEventTypes,
s.DeliveryMode,
s.Status,
s.CreatedAt,
s.ExpiresAt,
s.LastDeliveredSequence
}).ToArray<object>();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting subscriptions for {SubscriberId}", subscriberId);
return Array.Empty<object>();
}
}
/// <summary>
/// Called when a client disconnects.
/// </summary>
public override async Task OnDisconnectedAsync(Exception? exception)
{
var connectionId = Context.ConnectionId;
_logger.LogInformation(
"Client {ConnectionId} disconnected. Detaching subscriptions.",
connectionId);
try
{
// Get all subscriptions for this connection
var store = Context.GetHttpContext()?.RequestServices
.GetService(typeof(IPersistentSubscriptionStore)) as IPersistentSubscriptionStore;
if (store != null)
{
var subscriptions = await store.GetByConnectionIdAsync(connectionId);
foreach (var subscription in subscriptions)
{
await _subscriptionManager.DetachConnectionAsync(subscription.Id);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error detaching subscriptions on disconnect for {ConnectionId}", connectionId);
}
await base.OnDisconnectedAsync(exception);
}
}
/// <summary>
/// Request to create a persistent subscription.
/// </summary>
public sealed class CreateSubscriptionRequest
{
public required string SubscriberId { get; init; }
public required string CorrelationId { get; init; }
public List<string>? EventTypes { get; init; }
public List<string>? TerminalEventTypes { get; init; }
public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate;
public DateTimeOffset? ExpiresAt { get; init; }
public string? DataSourceId { get; init; }
}