403 lines
14 KiB
C#
403 lines
14 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.AspNetCore.SignalR;
|
|
using Microsoft.Extensions.Logging;
|
|
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
|
|
|
|
namespace Svrnty.CQRS.Events.SignalR;
|
|
|
|
/// <summary>
|
|
/// SignalR hub for persistent, correlation-based event subscriptions.
|
|
/// Supports offline event storage and catch-up on reconnect.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>
|
|
/// <strong>Client Methods:</strong>
|
|
/// - CreateSubscription(request): Create a new persistent subscription
|
|
/// - CancelSubscription(subscriptionId): Cancel an existing subscription
|
|
/// - CatchUp(subscriptionId): Request missed events
|
|
/// - AttachSubscription(subscriptionId): Attach to an existing subscription on reconnect
|
|
/// - DetachSubscription(subscriptionId): Temporarily detach from subscription
|
|
/// </para>
|
|
/// <para>
|
|
/// <strong>Server Methods (pushed to clients):</strong>
|
|
/// - SubscriptionCreated(subscription): Subscription created successfully
|
|
/// - EventReceived(subscriptionId, event): New event delivered
|
|
/// - SubscriptionCompleted(subscriptionId): Terminal event received
|
|
/// - CatchUpComplete(subscriptionId, count): Catch-up finished
|
|
/// - Error(message): Error occurred
|
|
/// </para>
|
|
/// </remarks>
|
|
public sealed class PersistentSubscriptionHub : Hub
|
|
{
|
|
private readonly ISubscriptionManager _subscriptionManager;
|
|
private readonly IPersistentSubscriptionDeliveryService _deliveryService;
|
|
private readonly ILogger<PersistentSubscriptionHub> _logger;
|
|
|
|
public PersistentSubscriptionHub(
|
|
ISubscriptionManager subscriptionManager,
|
|
IPersistentSubscriptionDeliveryService deliveryService,
|
|
ILogger<PersistentSubscriptionHub> logger)
|
|
{
|
|
_subscriptionManager = subscriptionManager ?? throw new ArgumentNullException(nameof(subscriptionManager));
|
|
_deliveryService = deliveryService ?? throw new ArgumentNullException(nameof(deliveryService));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Create a new persistent subscription.
|
|
/// </summary>
|
|
public async Task CreateSubscription(CreateSubscriptionRequest request)
|
|
{
|
|
if (request == null)
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "Request cannot be null");
|
|
return;
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(request.SubscriberId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "SubscriberId is required");
|
|
return;
|
|
}
|
|
|
|
if (string.IsNullOrWhiteSpace(request.CorrelationId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "CorrelationId is required");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
_logger.LogInformation(
|
|
"Creating subscription for subscriber {SubscriberId} with correlation {CorrelationId}",
|
|
request.SubscriberId,
|
|
request.CorrelationId);
|
|
|
|
var subscription = await _subscriptionManager.CreateSubscriptionAsync(
|
|
subscriberId: request.SubscriberId,
|
|
correlationId: request.CorrelationId,
|
|
eventTypes: request.EventTypes?.ToHashSet(),
|
|
terminalEventTypes: request.TerminalEventTypes?.ToHashSet(),
|
|
deliveryMode: request.DeliveryMode,
|
|
expiresAt: request.ExpiresAt,
|
|
dataSourceId: request.DataSourceId);
|
|
|
|
// Attach connection to subscription
|
|
await _subscriptionManager.AttachConnectionAsync(subscription.Id, Context.ConnectionId);
|
|
|
|
// Send confirmation to client
|
|
await Clients.Caller.SendAsync("SubscriptionCreated", new
|
|
{
|
|
subscription.Id,
|
|
subscription.SubscriberId,
|
|
subscription.CorrelationId,
|
|
subscription.EventTypes,
|
|
subscription.TerminalEventTypes,
|
|
subscription.DeliveryMode,
|
|
subscription.CreatedAt,
|
|
subscription.ExpiresAt
|
|
});
|
|
|
|
_logger.LogInformation(
|
|
"Subscription {SubscriptionId} created and attached to connection {ConnectionId}",
|
|
subscription.Id,
|
|
Context.ConnectionId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error creating subscription for {SubscriberId}", request.SubscriberId);
|
|
await Clients.Caller.SendAsync("Error", $"Failed to create subscription: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Attach to an existing subscription (e.g., on reconnect).
|
|
/// </summary>
|
|
public async Task AttachSubscription(string subscriptionId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(subscriptionId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
var subscription = await _subscriptionManager.GetSubscriptionAsync(subscriptionId);
|
|
if (subscription == null)
|
|
{
|
|
await Clients.Caller.SendAsync("Error", $"Subscription {subscriptionId} not found");
|
|
return;
|
|
}
|
|
|
|
// Attach connection
|
|
await _subscriptionManager.AttachConnectionAsync(subscriptionId, Context.ConnectionId);
|
|
|
|
await Clients.Caller.SendAsync("SubscriptionAttached", new
|
|
{
|
|
subscription.Id,
|
|
subscription.Status,
|
|
subscription.LastDeliveredSequence
|
|
});
|
|
|
|
_logger.LogInformation(
|
|
"Subscription {SubscriptionId} attached to connection {ConnectionId}",
|
|
subscriptionId,
|
|
Context.ConnectionId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error attaching subscription {SubscriptionId}", subscriptionId);
|
|
await Clients.Caller.SendAsync("Error", $"Failed to attach subscription: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Detach from a subscription without cancelling it.
|
|
/// </summary>
|
|
public async Task DetachSubscription(string subscriptionId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(subscriptionId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _subscriptionManager.DetachConnectionAsync(subscriptionId);
|
|
|
|
await Clients.Caller.SendAsync("SubscriptionDetached", subscriptionId);
|
|
|
|
_logger.LogInformation(
|
|
"Subscription {SubscriptionId} detached from connection {ConnectionId}",
|
|
subscriptionId,
|
|
Context.ConnectionId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error detaching subscription {SubscriptionId}", subscriptionId);
|
|
await Clients.Caller.SendAsync("Error", $"Failed to detach subscription: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Cancel a subscription permanently.
|
|
/// </summary>
|
|
public async Task CancelSubscription(string subscriptionId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(subscriptionId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _subscriptionManager.CancelSubscriptionAsync(subscriptionId);
|
|
|
|
await Clients.Caller.SendAsync("SubscriptionCancelled", subscriptionId);
|
|
|
|
_logger.LogInformation(
|
|
"Subscription {SubscriptionId} cancelled by connection {ConnectionId}",
|
|
subscriptionId,
|
|
Context.ConnectionId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error cancelling subscription {SubscriptionId}", subscriptionId);
|
|
await Clients.Caller.SendAsync("Error", $"Failed to cancel subscription: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Request catch-up for missed events.
|
|
/// </summary>
|
|
public async Task CatchUp(string subscriptionId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(subscriptionId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
_logger.LogInformation(
|
|
"Starting catch-up for subscription {SubscriptionId}",
|
|
subscriptionId);
|
|
|
|
// Get pending events
|
|
var pendingEvents = await _deliveryService.GetPendingEventsAsync(subscriptionId, limit: 100);
|
|
|
|
if (pendingEvents.Count == 0)
|
|
{
|
|
await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, 0);
|
|
return;
|
|
}
|
|
|
|
// Send events to client
|
|
foreach (var eventData in pendingEvents)
|
|
{
|
|
await Clients.Caller.SendAsync("EventReceived", subscriptionId, new
|
|
{
|
|
eventData.EventId,
|
|
eventData.CorrelationId,
|
|
EventType = eventData.GetType().Name,
|
|
eventData.OccurredAt,
|
|
Data = eventData
|
|
});
|
|
}
|
|
|
|
// Perform catch-up (updates LastDeliveredSequence)
|
|
var count = await _deliveryService.CatchUpSubscriptionAsync(subscriptionId);
|
|
|
|
await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, count);
|
|
|
|
_logger.LogInformation(
|
|
"Catch-up complete for subscription {SubscriptionId}: {Count} events",
|
|
subscriptionId,
|
|
count);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error during catch-up for subscription {SubscriptionId}", subscriptionId);
|
|
await Clients.Caller.SendAsync("Error", $"Failed to catch up: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Pause a subscription (stop event delivery).
|
|
/// </summary>
|
|
public async Task PauseSubscription(string subscriptionId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(subscriptionId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _subscriptionManager.PauseSubscriptionAsync(subscriptionId);
|
|
await Clients.Caller.SendAsync("SubscriptionPaused", subscriptionId);
|
|
|
|
_logger.LogInformation("Subscription {SubscriptionId} paused", subscriptionId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error pausing subscription {SubscriptionId}", subscriptionId);
|
|
await Clients.Caller.SendAsync("Error", $"Failed to pause subscription: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Resume a paused subscription.
|
|
/// </summary>
|
|
public async Task ResumeSubscription(string subscriptionId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(subscriptionId))
|
|
{
|
|
await Clients.Caller.SendAsync("Error", "SubscriptionId is required");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _subscriptionManager.ResumeSubscriptionAsync(subscriptionId);
|
|
await Clients.Caller.SendAsync("SubscriptionResumed", subscriptionId);
|
|
|
|
_logger.LogInformation("Subscription {SubscriptionId} resumed", subscriptionId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error resuming subscription {SubscriptionId}", subscriptionId);
|
|
await Clients.Caller.SendAsync("Error", $"Failed to resume subscription: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Get all subscriptions for the current user.
|
|
/// </summary>
|
|
public async Task<object[]> GetMySubscriptions(string subscriberId)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(subscriberId))
|
|
{
|
|
return Array.Empty<object>();
|
|
}
|
|
|
|
try
|
|
{
|
|
var subscriptions = await _subscriptionManager.GetSubscriberSubscriptionsAsync(subscriberId);
|
|
return subscriptions.Select(s => new
|
|
{
|
|
s.Id,
|
|
s.CorrelationId,
|
|
s.EventTypes,
|
|
s.TerminalEventTypes,
|
|
s.DeliveryMode,
|
|
s.Status,
|
|
s.CreatedAt,
|
|
s.ExpiresAt,
|
|
s.LastDeliveredSequence
|
|
}).ToArray<object>();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error getting subscriptions for {SubscriberId}", subscriberId);
|
|
return Array.Empty<object>();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Called when a client disconnects.
|
|
/// </summary>
|
|
public override async Task OnDisconnectedAsync(Exception? exception)
|
|
{
|
|
var connectionId = Context.ConnectionId;
|
|
|
|
_logger.LogInformation(
|
|
"Client {ConnectionId} disconnected. Detaching subscriptions.",
|
|
connectionId);
|
|
|
|
try
|
|
{
|
|
// Get all subscriptions for this connection
|
|
var store = Context.GetHttpContext()?.RequestServices
|
|
.GetService(typeof(IPersistentSubscriptionStore)) as IPersistentSubscriptionStore;
|
|
|
|
if (store != null)
|
|
{
|
|
var subscriptions = await store.GetByConnectionIdAsync(connectionId);
|
|
foreach (var subscription in subscriptions)
|
|
{
|
|
await _subscriptionManager.DetachConnectionAsync(subscription.Id);
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error detaching subscriptions on disconnect for {ConnectionId}", connectionId);
|
|
}
|
|
|
|
await base.OnDisconnectedAsync(exception);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Request to create a persistent subscription.
|
|
/// </summary>
|
|
public sealed class CreateSubscriptionRequest
|
|
{
|
|
public required string SubscriberId { get; init; }
|
|
public required string CorrelationId { get; init; }
|
|
public List<string>? EventTypes { get; init; }
|
|
public List<string>? TerminalEventTypes { get; init; }
|
|
public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate;
|
|
public DateTimeOffset? ExpiresAt { get; init; }
|
|
public string? DataSourceId { get; init; }
|
|
}
|