using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions.Subscriptions; namespace Svrnty.CQRS.Events.SignalR; /// /// SignalR hub for persistent, correlation-based event subscriptions. /// Supports offline event storage and catch-up on reconnect. /// /// /// /// Client Methods: /// - CreateSubscription(request): Create a new persistent subscription /// - CancelSubscription(subscriptionId): Cancel an existing subscription /// - CatchUp(subscriptionId): Request missed events /// - AttachSubscription(subscriptionId): Attach to an existing subscription on reconnect /// - DetachSubscription(subscriptionId): Temporarily detach from subscription /// /// /// Server Methods (pushed to clients): /// - SubscriptionCreated(subscription): Subscription created successfully /// - EventReceived(subscriptionId, event): New event delivered /// - SubscriptionCompleted(subscriptionId): Terminal event received /// - CatchUpComplete(subscriptionId, count): Catch-up finished /// - Error(message): Error occurred /// /// public sealed class PersistentSubscriptionHub : Hub { private readonly ISubscriptionManager _subscriptionManager; private readonly IPersistentSubscriptionDeliveryService _deliveryService; private readonly ILogger _logger; public PersistentSubscriptionHub( ISubscriptionManager subscriptionManager, IPersistentSubscriptionDeliveryService deliveryService, ILogger logger) { _subscriptionManager = subscriptionManager ?? throw new ArgumentNullException(nameof(subscriptionManager)); _deliveryService = deliveryService ?? throw new ArgumentNullException(nameof(deliveryService)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// /// Create a new persistent subscription. /// public async Task CreateSubscription(CreateSubscriptionRequest request) { if (request == null) { await Clients.Caller.SendAsync("Error", "Request cannot be null"); return; } if (string.IsNullOrWhiteSpace(request.SubscriberId)) { await Clients.Caller.SendAsync("Error", "SubscriberId is required"); return; } if (string.IsNullOrWhiteSpace(request.CorrelationId)) { await Clients.Caller.SendAsync("Error", "CorrelationId is required"); return; } try { _logger.LogInformation( "Creating subscription for subscriber {SubscriberId} with correlation {CorrelationId}", request.SubscriberId, request.CorrelationId); var subscription = await _subscriptionManager.CreateSubscriptionAsync( subscriberId: request.SubscriberId, correlationId: request.CorrelationId, eventTypes: request.EventTypes?.ToHashSet(), terminalEventTypes: request.TerminalEventTypes?.ToHashSet(), deliveryMode: request.DeliveryMode, expiresAt: request.ExpiresAt, dataSourceId: request.DataSourceId); // Attach connection to subscription await _subscriptionManager.AttachConnectionAsync(subscription.Id, Context.ConnectionId); // Send confirmation to client await Clients.Caller.SendAsync("SubscriptionCreated", new { subscription.Id, subscription.SubscriberId, subscription.CorrelationId, subscription.EventTypes, subscription.TerminalEventTypes, subscription.DeliveryMode, subscription.CreatedAt, subscription.ExpiresAt }); _logger.LogInformation( "Subscription {SubscriptionId} created and attached to connection {ConnectionId}", subscription.Id, Context.ConnectionId); } catch (Exception ex) { _logger.LogError(ex, "Error creating subscription for {SubscriberId}", request.SubscriberId); await Clients.Caller.SendAsync("Error", $"Failed to create subscription: {ex.Message}"); } } /// /// Attach to an existing subscription (e.g., on reconnect). /// public async Task AttachSubscription(string subscriptionId) { if (string.IsNullOrWhiteSpace(subscriptionId)) { await Clients.Caller.SendAsync("Error", "SubscriptionId is required"); return; } try { var subscription = await _subscriptionManager.GetSubscriptionAsync(subscriptionId); if (subscription == null) { await Clients.Caller.SendAsync("Error", $"Subscription {subscriptionId} not found"); return; } // Attach connection await _subscriptionManager.AttachConnectionAsync(subscriptionId, Context.ConnectionId); await Clients.Caller.SendAsync("SubscriptionAttached", new { subscription.Id, subscription.Status, subscription.LastDeliveredSequence }); _logger.LogInformation( "Subscription {SubscriptionId} attached to connection {ConnectionId}", subscriptionId, Context.ConnectionId); } catch (Exception ex) { _logger.LogError(ex, "Error attaching subscription {SubscriptionId}", subscriptionId); await Clients.Caller.SendAsync("Error", $"Failed to attach subscription: {ex.Message}"); } } /// /// Detach from a subscription without cancelling it. /// public async Task DetachSubscription(string subscriptionId) { if (string.IsNullOrWhiteSpace(subscriptionId)) { await Clients.Caller.SendAsync("Error", "SubscriptionId is required"); return; } try { await _subscriptionManager.DetachConnectionAsync(subscriptionId); await Clients.Caller.SendAsync("SubscriptionDetached", subscriptionId); _logger.LogInformation( "Subscription {SubscriptionId} detached from connection {ConnectionId}", subscriptionId, Context.ConnectionId); } catch (Exception ex) { _logger.LogError(ex, "Error detaching subscription {SubscriptionId}", subscriptionId); await Clients.Caller.SendAsync("Error", $"Failed to detach subscription: {ex.Message}"); } } /// /// Cancel a subscription permanently. /// public async Task CancelSubscription(string subscriptionId) { if (string.IsNullOrWhiteSpace(subscriptionId)) { await Clients.Caller.SendAsync("Error", "SubscriptionId is required"); return; } try { await _subscriptionManager.CancelSubscriptionAsync(subscriptionId); await Clients.Caller.SendAsync("SubscriptionCancelled", subscriptionId); _logger.LogInformation( "Subscription {SubscriptionId} cancelled by connection {ConnectionId}", subscriptionId, Context.ConnectionId); } catch (Exception ex) { _logger.LogError(ex, "Error cancelling subscription {SubscriptionId}", subscriptionId); await Clients.Caller.SendAsync("Error", $"Failed to cancel subscription: {ex.Message}"); } } /// /// Request catch-up for missed events. /// public async Task CatchUp(string subscriptionId) { if (string.IsNullOrWhiteSpace(subscriptionId)) { await Clients.Caller.SendAsync("Error", "SubscriptionId is required"); return; } try { _logger.LogInformation( "Starting catch-up for subscription {SubscriptionId}", subscriptionId); // Get pending events var pendingEvents = await _deliveryService.GetPendingEventsAsync(subscriptionId, limit: 100); if (pendingEvents.Count == 0) { await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, 0); return; } // Send events to client foreach (var eventData in pendingEvents) { await Clients.Caller.SendAsync("EventReceived", subscriptionId, new { eventData.EventId, eventData.CorrelationId, EventType = eventData.GetType().Name, eventData.OccurredAt, Data = eventData }); } // Perform catch-up (updates LastDeliveredSequence) var count = await _deliveryService.CatchUpSubscriptionAsync(subscriptionId); await Clients.Caller.SendAsync("CatchUpComplete", subscriptionId, count); _logger.LogInformation( "Catch-up complete for subscription {SubscriptionId}: {Count} events", subscriptionId, count); } catch (Exception ex) { _logger.LogError(ex, "Error during catch-up for subscription {SubscriptionId}", subscriptionId); await Clients.Caller.SendAsync("Error", $"Failed to catch up: {ex.Message}"); } } /// /// Pause a subscription (stop event delivery). /// public async Task PauseSubscription(string subscriptionId) { if (string.IsNullOrWhiteSpace(subscriptionId)) { await Clients.Caller.SendAsync("Error", "SubscriptionId is required"); return; } try { await _subscriptionManager.PauseSubscriptionAsync(subscriptionId); await Clients.Caller.SendAsync("SubscriptionPaused", subscriptionId); _logger.LogInformation("Subscription {SubscriptionId} paused", subscriptionId); } catch (Exception ex) { _logger.LogError(ex, "Error pausing subscription {SubscriptionId}", subscriptionId); await Clients.Caller.SendAsync("Error", $"Failed to pause subscription: {ex.Message}"); } } /// /// Resume a paused subscription. /// public async Task ResumeSubscription(string subscriptionId) { if (string.IsNullOrWhiteSpace(subscriptionId)) { await Clients.Caller.SendAsync("Error", "SubscriptionId is required"); return; } try { await _subscriptionManager.ResumeSubscriptionAsync(subscriptionId); await Clients.Caller.SendAsync("SubscriptionResumed", subscriptionId); _logger.LogInformation("Subscription {SubscriptionId} resumed", subscriptionId); } catch (Exception ex) { _logger.LogError(ex, "Error resuming subscription {SubscriptionId}", subscriptionId); await Clients.Caller.SendAsync("Error", $"Failed to resume subscription: {ex.Message}"); } } /// /// Get all subscriptions for the current user. /// public async Task GetMySubscriptions(string subscriberId) { if (string.IsNullOrWhiteSpace(subscriberId)) { return Array.Empty(); } try { var subscriptions = await _subscriptionManager.GetSubscriberSubscriptionsAsync(subscriberId); return subscriptions.Select(s => new { s.Id, s.CorrelationId, s.EventTypes, s.TerminalEventTypes, s.DeliveryMode, s.Status, s.CreatedAt, s.ExpiresAt, s.LastDeliveredSequence }).ToArray(); } catch (Exception ex) { _logger.LogError(ex, "Error getting subscriptions for {SubscriberId}", subscriberId); return Array.Empty(); } } /// /// Called when a client disconnects. /// public override async Task OnDisconnectedAsync(Exception? exception) { var connectionId = Context.ConnectionId; _logger.LogInformation( "Client {ConnectionId} disconnected. Detaching subscriptions.", connectionId); try { // Get all subscriptions for this connection var store = Context.GetHttpContext()?.RequestServices .GetService(typeof(IPersistentSubscriptionStore)) as IPersistentSubscriptionStore; if (store != null) { var subscriptions = await store.GetByConnectionIdAsync(connectionId); foreach (var subscription in subscriptions) { await _subscriptionManager.DetachConnectionAsync(subscription.Id); } } } catch (Exception ex) { _logger.LogError(ex, "Error detaching subscriptions on disconnect for {ConnectionId}", connectionId); } await base.OnDisconnectedAsync(exception); } } /// /// Request to create a persistent subscription. /// public sealed class CreateSubscriptionRequest { public required string SubscriberId { get; init; } public required string CorrelationId { get; init; } public List? EventTypes { get; init; } public List? TerminalEventTypes { get; init; } public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate; public DateTimeOffset? ExpiresAt { get; init; } public string? DataSourceId { get; init; } }