# Bidirectional Communication Design for CQRS Framework This document outlines the design for adding persistent, selective event subscriptions to a CQRS framework with a Flutter frontend and .NET backend. ## Table of Contents - [Overview](#overview) - [Core Concepts](#core-concepts) - [Frontend Architecture (Flutter)](#frontend-architecture-flutter) - [DataSource](#datasource) - [Event Subscription Config](#event-subscription-config) - [Event Connection](#event-connection) - [Usage Examples](#usage-examples) - [Backend Architecture (.NET)](#backend-architecture-net) - [Protocol Messages](#protocol-messages) - [Event Filtering and Delivery](#event-filtering-and-delivery) - [Persistent Subscription Storage](#persistent-subscription-storage) - [Flow Diagrams](#flow-diagrams) - [Considerations](#considerations) --- ## Overview The goal is to extend a CQRS framework to support: 1. **Command-correlated event subscriptions**: When executing a command, optionally subscribe to related events 2. **Selective subscriptions**: Choose which specific events to receive (not all-or-nothing) 3. **Persistent delivery**: Events are stored and delivered even if the user is offline 4. **Catch-up on reconnect**: Missed events are delivered when the user comes back online 5. **Per-usage flexibility**: Same DataSource can be used with or without events depending on context ``` ┌─────────────────────────────────────────────────────────────────┐ │ DataSource │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌───────────────────┐ │ │ │ Query │ │ Commands │ │ Event Connection │ │ │ │ │ │ │ │ │ │ │ │ UsersQuery │ │ InviteUser │ │ subscribe(...) │ │ │ │ │ │ RemoveUser │ │ unsubscribe(...) │ │ │ └─────────────┘ └─────────────┘ └───────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## Core Concepts | Concept | Description | |---------|-------------| | **DataSource** | Combines a query, related commands, and optional event connection | | **EventConnection** | WebSocket connection that handles subscriptions and event delivery | | **Correlation ID** | Links a command to its resulting events | | **Persistent Subscription** | Server-stored subscription that survives disconnection | | **Terminal Event** | An event that completes/closes a subscription (e.g., `InvitationAccepted`) | | **Catch-up** | Delivering missed events when a client reconnects | --- ## Frontend Architecture (Flutter) ### DataSource The DataSource is the central abstraction combining query data, commands, and event subscriptions. ```dart abstract class DataSource { /// The query this datasource executes final TQuery query; /// Stream of query results Stream get dataStream; /// Current cached data TData? get currentData; /// Event connection (lazy - only created when needed) EventConnection? _eventConnection; /// Getter for event connection (for external listeners) EventConnection? get eventConnection => _eventConnection; /// Commands attached to this datasource final Map _commands = {}; DataSource(this.query); /// Register a command this datasource can execute void registerCommand( CommandConfig config, ) { _commands[TCommand] = config; } /// Execute a command with optional event subscription Future execute( TCommand command, { EventSubscriptionConfig? subscribeToEvents, }) async { final result = await _commandSender.send(command); // If caller wants events, set up subscription if (subscribeToEvents != null && subscribeToEvents.eventTypes.isNotEmpty) { await _ensureEventConnection(); await _eventConnection!.subscribe( correlationId: result.correlationId, config: subscribeToEvents, ); } return result; } /// Explicitly open event connection (triggers catch-up) Future openEventConnection() async { _eventConnection ??= EventConnection( dataSourceId: id, userId: _authService.currentUserId, ); await _eventConnection!.connect(); return _eventConnection!; } /// Ensure connection exists Future _ensureEventConnection() async { if (_eventConnection == null || !_eventConnection!.isConnected) { await openEventConnection(); } } /// Cleanup resources Future dispose() async { await _eventConnection?.disconnect(); } } ``` ### Event Subscription Config Fine-grained control over what events to subscribe to for a given command execution. ```dart /// Configuration for event subscriptions class EventSubscriptionConfig { /// Which event types to receive final Set eventTypes; /// Auto-unsubscribe after this duration (null = no timeout) final Duration? timeout; /// Should the subscription survive app restart? final bool persistent; /// How should events be delivered? final EventDeliveryMode deliveryMode; const EventSubscriptionConfig({ required this.eventTypes, this.timeout, this.persistent = true, this.deliveryMode = EventDeliveryMode.immediate, }); /// Subscribe to all provided event types factory EventSubscriptionConfig.all(List types) { return EventSubscriptionConfig( eventTypes: types.toSet(), persistent: true, ); } /// Subscribe to specific event types only factory EventSubscriptionConfig.only(List types) { return EventSubscriptionConfig( eventTypes: types.toSet(), persistent: true, ); } /// No subscription (explicit opt-out) factory EventSubscriptionConfig.none() { return EventSubscriptionConfig( eventTypes: {}, persistent: false, ); } /// Transient subscription (not persisted, immediate delivery) factory EventSubscriptionConfig.transient(List types) { return EventSubscriptionConfig( eventTypes: types.toSet(), persistent: false, deliveryMode: EventDeliveryMode.immediate, ); } } /// How events should be delivered to the client enum EventDeliveryMode { /// Push immediately when event occurs immediate, /// Batch events and deliver periodically batched, /// Only deliver on reconnect (save bandwidth for background updates) onReconnect, } ``` ### Event Connection Manages the WebSocket connection, subscriptions, and event routing. ```dart /// Manages WebSocket connection and event subscriptions class EventConnection { final String dataSourceId; final String userId; WebSocketChannel? _channel; final _eventController = StreamController.broadcast(); final Map _activeSubscriptions = {}; bool _isConnected = false; bool get isConnected => _isConnected; /// Stream of all events Stream get events => _eventController.stream; /// Typed event stream - filters to specific event type Stream on() { return events.whereType(); } /// Events for a specific correlation Stream forCorrelation(String correlationId) { return events.where((e) => e.correlationId == correlationId); } EventConnection({ required this.dataSourceId, required this.userId, }); /// Connect to the event server Future connect() async { if (_isConnected) return; _channel = WebSocketChannel.connect( Uri.parse('wss://api.example.com/events'), ); // Authenticate the connection _send({ 'type': 'auth', 'token': await _authService.getToken(), 'dataSourceId': dataSourceId, }); // Request catch-up for any existing persistent subscriptions _send({ 'type': 'catch_up', 'subscriptionIds': _activeSubscriptions.keys.toList(), }); // Listen for incoming messages _channel!.stream.listen( _handleMessage, onDone: _handleDisconnect, onError: _handleError, ); _isConnected = true; } /// Subscribe to events for a correlation ID Future subscribe({ required String correlationId, required EventSubscriptionConfig config, }) async { final subscription = EventSubscription( id: const Uuid().v4(), correlationId: correlationId, config: config, ); _activeSubscriptions[subscription.id] = subscription; // Notify backend about this subscription _send({ 'type': 'subscribe', 'subscriptionId': subscription.id, 'correlationId': correlationId, 'eventTypes': config.eventTypes.map((t) => t.toString()).toList(), 'persistent': config.persistent, 'timeout': config.timeout?.inSeconds, 'deliveryMode': config.deliveryMode.name, }); return subscription; } /// Unsubscribe from a subscription Future unsubscribe(String subscriptionId) async { _activeSubscriptions.remove(subscriptionId); _send({ 'type': 'unsubscribe', 'subscriptionId': subscriptionId, }); } /// Disconnect from the server Future disconnect() async { _isConnected = false; await _channel?.sink.close(); _channel = null; } void _send(Map message) { _channel?.sink.add(jsonEncode(message)); } void _handleMessage(dynamic message) { final data = jsonDecode(message as String) as Map; switch (data['type']) { case 'event': _handleEventMessage(data); break; case 'subscription_completed': _handleSubscriptionCompleted(data); break; case 'error': _handleErrorMessage(data); break; } } void _handleEventMessage(Map data) { final event = _deserializeEvent(data); _eventController.add(event); // Notify specific subscription handler if registered final subscription = _activeSubscriptions[data['subscriptionId']]; subscription?.onEvent?.call(event); } void _handleSubscriptionCompleted(Map data) { final subscriptionId = data['subscriptionId'] as String; final subscription = _activeSubscriptions.remove(subscriptionId); subscription?.onCompleted?.call(data['reason'] as String); } void _handleDisconnect() { _isConnected = false; // Implement reconnection logic here } void _handleError(Object error) { // Handle connection errors } void _handleErrorMessage(Map data) { // Handle protocol-level errors } CorrelatedEvent _deserializeEvent(Map data) { // Deserialize based on eventType // Implementation depends on your serialization strategy throw UnimplementedError(); } } /// Represents an active subscription class EventSubscription { final String id; final String correlationId; final EventSubscriptionConfig config; /// Callback when an event is received void Function(CorrelatedEvent)? onEvent; /// Callback when subscription is completed (terminal event or timeout) void Function(String reason)? onCompleted; EventSubscription({ required this.id, required this.correlationId, required this.config, }); } ``` ### Event Base Classes ```dart /// Base class for all events that can be correlated to a command abstract class CorrelatedEvent { final String correlationId; final String eventId; final DateTime occurredAt; CorrelatedEvent({ required this.correlationId, required this.eventId, required this.occurredAt, }); } /// Example: User invitation events abstract class UserInvitationEvent extends CorrelatedEvent { UserInvitationEvent({ required super.correlationId, required super.eventId, required super.occurredAt, }); } class UserInvitationSentEvent extends UserInvitationEvent { final String email; UserInvitationSentEvent({ required this.email, required super.correlationId, required super.eventId, required super.occurredAt, }); } class UserInvitationAcceptedEvent extends UserInvitationEvent { final String userId; final String email; UserInvitationAcceptedEvent({ required this.userId, required this.email, required super.correlationId, required super.eventId, required super.occurredAt, }); } class UserInvitationDeclinedEvent extends UserInvitationEvent { final String email; final String? reason; UserInvitationDeclinedEvent({ required this.email, this.reason, required super.correlationId, required super.eventId, required super.occurredAt, }); } ``` --- ## Usage Examples ### Example 1: Admin Dashboard (wants all events) ```dart class AdminDashboardPage extends StatefulWidget { @override State createState() => _AdminDashboardPageState(); } class _AdminDashboardPageState extends State { late final UsersDataSource _usersDataSource; @override void initState() { super.initState(); _usersDataSource = UsersDataSource(UsersQuery()); // Open event connection - triggers catch-up for pending events _usersDataSource.openEventConnection(); // Listen to all invitation events for toast notifications _usersDataSource.eventConnection?.on().listen(_showEventToast); } void _showEventToast(UserInvitationEvent event) { final message = switch (event) { UserInvitationSentEvent e => 'Invitation sent to ${e.email}', UserInvitationAcceptedEvent e => '${e.email} has joined!', UserInvitationDeclinedEvent e => '${e.email} declined the invitation', _ => 'Unknown event', }; ScaffoldMessenger.of(context).showSnackBar( SnackBar(content: Text(message)), ); // Refresh list if someone joined if (event is UserInvitationAcceptedEvent) { _usersDataSource.refresh(); } } Future _inviteUser(String email) async { await _usersDataSource.inviteUser( email, // Subscribe to ALL invitation events subscribeToEvents: { UserInvitationSentEvent, UserInvitationAcceptedEvent, UserInvitationDeclinedEvent, }, ); } @override void dispose() { _usersDataSource.dispose(); super.dispose(); } @override Widget build(BuildContext context) { return StreamBuilder>( stream: _usersDataSource.dataStream, builder: (context, snapshot) { // Build UI... }, ); } } ``` ### Example 2: Simple User List (no events needed) ```dart class UserListPage extends StatefulWidget { @override State createState() => _UserListPageState(); } class _UserListPageState extends State { late final UsersDataSource _usersDataSource; @override void initState() { super.initState(); _usersDataSource = UsersDataSource(UsersQuery()); // NOT opening event connection - no real-time updates needed } @override Widget build(BuildContext context) { return StreamBuilder>( stream: _usersDataSource.dataStream, builder: (context, snapshot) { if (!snapshot.hasData) { return const CircularProgressIndicator(); } return ListView.builder( itemCount: snapshot.data!.length, itemBuilder: (context, index) { final user = snapshot.data![index]; return ListTile(title: Text(user.name)); }, ); }, ); } @override void dispose() { _usersDataSource.dispose(); super.dispose(); } } ``` ### Example 3: Invite Dialog (only wants sent confirmation) ```dart class InviteUserDialog extends StatefulWidget { final UsersDataSource dataSource; const InviteUserDialog({required this.dataSource}); @override State createState() => _InviteUserDialogState(); } class _InviteUserDialogState extends State { final _emailController = TextEditingController(); bool _isSending = false; Future _sendInvite() async { setState(() => _isSending = true); try { await widget.dataSource.inviteUser( _emailController.text, // Only care about the "sent" confirmation subscribeToEvents: {UserInvitationSentEvent}, onEvent: (event) { if (event is UserInvitationSentEvent) { Navigator.of(context).pop(); ScaffoldMessenger.of(context).showSnackBar( const SnackBar(content: Text('Invitation sent!')), ); } }, ); } finally { setState(() => _isSending = false); } } @override Widget build(BuildContext context) { return AlertDialog( title: const Text('Invite User'), content: TextField( controller: _emailController, decoration: const InputDecoration(labelText: 'Email'), ), actions: [ TextButton( onPressed: () => Navigator.of(context).pop(), child: const Text('Cancel'), ), ElevatedButton( onPressed: _isSending ? null : _sendInvite, child: _isSending ? const CircularProgressIndicator() : const Text('Send'), ), ], ); } } ``` ### Example 4: Concrete UsersDataSource Implementation ```dart class UsersDataSource extends DataSource> { UsersDataSource(UsersQuery query) : super(query); /// Invite a user with optional event subscription Future inviteUser( String email, { Set? subscribeToEvents, void Function(UserInvitationEvent)? onEvent, }) async { // Build subscription config if events requested EventSubscriptionConfig? config; if (subscribeToEvents != null && subscribeToEvents.isNotEmpty) { config = EventSubscriptionConfig( eventTypes: subscribeToEvents, persistent: true, ); } // Execute the command final result = await execute( InviteUserCommand(email: email), subscribeToEvents: config, ); // Wire up event handler if provided if (config != null && onEvent != null) { eventConnection?.events .where((e) => e.correlationId == result.correlationId) .whereType() .listen(onEvent); } return result; } /// Remove a user Future removeUser(String userId) async { await execute( RemoveUserCommand(userId: userId), // No events for this command ); } } ``` --- ## Backend Architecture (.NET) ### Protocol Messages #### Client to Server **Subscribe** ```json { "type": "subscribe", "subscriptionId": "sub-123-abc", "correlationId": "invite-456-def", "eventTypes": ["UserInvitationSentEvent", "UserInvitationAcceptedEvent"], "persistent": true, "timeout": null, "deliveryMode": "immediate" } ``` **Unsubscribe** ```json { "type": "unsubscribe", "subscriptionId": "sub-123-abc" } ``` **Catch-up Request** ```json { "type": "catch_up", "subscriptionIds": ["sub-123-abc", "sub-789-xyz"] } ``` **Authentication** ```json { "type": "auth", "token": "jwt-token-here", "dataSourceId": "users-datasource" } ``` #### Server to Client **Event Delivery** ```json { "type": "event", "subscriptionId": "sub-123-abc", "correlationId": "invite-456-def", "eventType": "UserInvitationAcceptedEvent", "eventId": "evt-001", "sequence": 42, "payload": { "userId": "user-789", "email": "john@example.com" }, "occurredAt": "2025-01-15T10:30:00Z" } ``` **Subscription Completed** ```json { "type": "subscription_completed", "subscriptionId": "sub-123-abc", "reason": "terminal_event", "terminalEvent": "UserInvitationAcceptedEvent" } ``` **Error** ```json { "type": "error", "code": "subscription_not_found", "message": "Subscription sub-123-abc not found", "subscriptionId": "sub-123-abc" } ``` ### Event Filtering and Delivery ```csharp public class EventDeliveryService { private readonly ISubscriptionStore _subscriptionStore; private readonly IConnectionTracker _connectionTracker; private readonly IClientNotifier _notifier; private readonly IEventStore _eventStore; public async Task DeliverEventAsync(ICorrelatedEvent @event) { // Store the event with sequence number var sequence = await _eventStore.AppendAsync(@event); // Find all subscriptions interested in this correlation var subscriptions = await _subscriptionStore .FindByCorrelationIdAsync(@event.CorrelationId); foreach (var subscription in subscriptions) { await DeliverToSubscriptionAsync(subscription, @event, sequence); } } private async Task DeliverToSubscriptionAsync( PersistentSubscription subscription, ICorrelatedEvent @event, long sequence) { // FILTER: Only deliver if subscriber requested this event type var eventTypeName = @event.GetType().Name; if (!subscription.EventTypes.Contains(eventTypeName)) { // Subscriber didn't ask for this type - skip return; } // Check delivery mode if (subscription.DeliveryMode == DeliveryMode.OnReconnect) { // Don't deliver now, will be caught up on reconnect return; } // Check if subscriber is online var connection = _connectionTracker.GetConnection(subscription.SubscriberId); if (connection != null) { // Deliver immediately await _notifier.SendAsync(connection.Id, new EventMessage { Type = "event", SubscriptionId = subscription.Id.ToString(), CorrelationId = @event.CorrelationId, EventType = eventTypeName, EventId = @event.EventId.ToString(), Sequence = sequence, Payload = @event, OccurredAt = @event.OccurredAt, }); subscription.MarkDelivered(sequence); } // If offline, event stays in store for catch-up // Check if this is a terminal event if (subscription.TerminalEventTypes.Contains(eventTypeName)) { subscription.Complete(); if (connection != null) { await _notifier.SendAsync(connection.Id, new SubscriptionCompletedMessage { Type = "subscription_completed", SubscriptionId = subscription.Id.ToString(), Reason = "terminal_event", TerminalEvent = eventTypeName, }); } } await _subscriptionStore.UpdateAsync(subscription); } public async Task HandleCatchUpRequestAsync( string connectionId, string userId, IEnumerable subscriptionIds) { // Get all active subscriptions for this user var subscriptions = await _subscriptionStore.GetByUserIdAsync(userId); foreach (var subscription in subscriptions) { if (subscription.Status != SubscriptionStatus.Active) continue; // Get events after the last delivered sequence var missedEvents = await _eventStore.GetEventsAsync( correlationId: subscription.CorrelationId, afterSequence: subscription.LastDeliveredSequence, eventTypes: subscription.EventTypes ); foreach (var evt in missedEvents.OrderBy(e => e.Sequence)) { await _notifier.SendAsync(connectionId, new EventMessage { Type = "event", SubscriptionId = subscription.Id.ToString(), CorrelationId = evt.CorrelationId, EventType = evt.GetType().Name, EventId = evt.EventId.ToString(), Sequence = evt.Sequence, Payload = evt, OccurredAt = evt.OccurredAt, }); subscription.MarkDelivered(evt.Sequence); // Check for terminal event if (subscription.TerminalEventTypes.Contains(evt.GetType().Name)) { subscription.Complete(); await _notifier.SendAsync(connectionId, new SubscriptionCompletedMessage { Type = "subscription_completed", SubscriptionId = subscription.Id.ToString(), Reason = "terminal_event", TerminalEvent = evt.GetType().Name, }); break; } } await _subscriptionStore.UpdateAsync(subscription); } } } ``` ### Persistent Subscription Storage ```csharp public class PersistentSubscription { public Guid Id { get; init; } public Guid SubscriberId { get; init; } public string CorrelationId { get; init; } = string.Empty; /// /// Event types the subscriber wants to receive /// public HashSet EventTypes { get; init; } = new(); /// /// Events that complete/close the subscription /// public HashSet TerminalEventTypes { get; init; } = new(); public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate; public DateTimeOffset CreatedAt { get; init; } public DateTimeOffset? ExpiresAt { get; init; } public DateTimeOffset? CompletedAt { get; private set; } /// /// Last successfully delivered event sequence (for catch-up) /// public long LastDeliveredSequence { get; private set; } public SubscriptionStatus Status { get; private set; } = SubscriptionStatus.Active; public void MarkDelivered(long sequence) { if (sequence > LastDeliveredSequence) { LastDeliveredSequence = sequence; } } public void Complete() { Status = SubscriptionStatus.Completed; CompletedAt = DateTimeOffset.UtcNow; } public void Cancel() { Status = SubscriptionStatus.Cancelled; CompletedAt = DateTimeOffset.UtcNow; } public bool IsExpired => ExpiresAt.HasValue && DateTimeOffset.UtcNow > ExpiresAt.Value; } public enum SubscriptionStatus { Active, Completed, // Terminal event received Expired, // TTL reached Cancelled // User cancelled } public enum DeliveryMode { Immediate, Batched, OnReconnect } public interface ISubscriptionStore { Task CreateAsync(PersistentSubscription subscription); Task GetByIdAsync(Guid id); Task> GetByUserIdAsync(Guid userId); Task> FindByCorrelationIdAsync(string correlationId); Task UpdateAsync(PersistentSubscription subscription); Task DeleteAsync(Guid id); } ``` ### Command Handler Integration ```csharp /// /// Pipeline behavior that creates subscriptions for commands that request them /// public class CommandSubscriptionBehavior : IPipelineBehavior where TCommand : ICommand { private readonly ISubscriptionStore _subscriptions; private readonly ICurrentUserService _currentUser; public async Task Handle( TCommand command, RequestHandlerDelegate next, CancellationToken cancellationToken) { // Execute the command first var result = await next(); // Check if this command has subscription metadata if (command is ICommandWithSubscription subCommand && subCommand.SubscriptionConfig != null && result is ICorrelatedResult correlatedResult) { var config = subCommand.SubscriptionConfig; await _subscriptions.CreateAsync(new PersistentSubscription { Id = Guid.NewGuid(), SubscriberId = _currentUser.UserId, CorrelationId = correlatedResult.CorrelationId, EventTypes = config.EventTypes.ToHashSet(), TerminalEventTypes = config.TerminalEventTypes.ToHashSet(), DeliveryMode = config.DeliveryMode, CreatedAt = DateTimeOffset.UtcNow, ExpiresAt = config.Timeout.HasValue ? DateTimeOffset.UtcNow + config.Timeout.Value : null, }); } return result; } } ``` --- ## Flow Diagrams ### Command Execution with Subscription ``` ┌────────┐ ┌─────────────┐ ┌──────────────┐ ┌────────────┐ │ Client │ │ DataSource │ │ Backend │ │ Subscription│ │ │ │ │ │ │ │ Store │ └───┬────┘ └──────┬──────┘ └──────┬───────┘ └─────┬──────┘ │ │ │ │ │ inviteUser() │ │ │ │ + eventTypes │ │ │ │────────────────>│ │ │ │ │ │ │ │ │ POST /command │ │ │ │ {InviteUser} │ │ │ │──────────────────>│ │ │ │ │ │ │ │ │ Create │ │ │ │ Subscription │ │ │ │──────────────────>│ │ │ │ │ │ │ {correlationId} │ │ │ │<──────────────────│ │ │ │ │ │ │ │ WS: subscribe │ │ │ │ {correlationId, │ │ │ │ eventTypes} │ │ │ │──────────────────>│ │ │ │ │ │ │ InviteResult │ │ │ │<────────────────│ │ │ │ │ │ │ ``` ### Event Delivery (Online) ``` ┌──────────┐ ┌──────────────┐ ┌────────────┐ ┌────────┐ │ Domain │ │ Event │ │ Subscription│ │ Client │ │ Event │ │ Dispatcher │ │ Store │ │ │ └────┬─────┘ └──────┬───────┘ └─────┬──────┘ └───┬────┘ │ │ │ │ │ UserInvitation │ │ │ │ AcceptedEvent │ │ │ │─────────────────>│ │ │ │ │ │ │ │ │ Find subscriptions│ │ │ │ by correlationId │ │ │ │──────────────────>│ │ │ │ │ │ │ │ [sub-123] │ │ │ │<──────────────────│ │ │ │ │ │ │ │ Check: eventType │ │ │ │ in sub.eventTypes?│ │ │ │ │ │ │ │ WS: event │ │ │ │──────────────────────────────────>│ │ │ │ │ │ │ (terminal event) │ │ │ │ WS: subscription │ │ │ │ _completed │ │ │ │──────────────────────────────────>│ │ │ │ │ ``` ### Catch-up on Reconnect ``` ┌────────┐ ┌──────────────┐ ┌────────────┐ ┌───────────┐ │ Client │ │ Backend │ │ Subscription│ │ Event │ │ │ │ │ │ Store │ │ Store │ └───┬────┘ └──────┬───────┘ └─────┬──────┘ └─────┬─────┘ │ │ │ │ │ WS: connect │ │ │ │ + auth │ │ │ │────────────────>│ │ │ │ │ │ │ │ WS: catch_up │ │ │ │────────────────>│ │ │ │ │ │ │ │ │ Get active subs │ │ │ │ for userId │ │ │ │──────────────────>│ │ │ │ │ │ │ │ [sub-123, │ │ │ │ sub-456] │ │ │ │<──────────────────│ │ │ │ │ │ │ │ For each sub: │ │ │ │ Get events after │ │ │ │ lastDeliveredSeq │ │ │ │─────────────────────────────────────>│ │ │ │ │ │ │ [evt-1, evt-2] │ │ │ │<─────────────────────────────────────│ │ │ │ │ │ WS: event │ │ │ │ (evt-1) │ │ │ │<────────────────│ │ │ │ │ │ │ │ WS: event │ │ │ │ (evt-2) │ │ │ │<────────────────│ │ │ │ │ │ │ ``` --- ## Considerations ### Cleanup and Maintenance - Run a background job to expire old subscriptions - Archive or delete events older than a retention period - Clean up completed/cancelled subscriptions after a grace period ### Scaling - For multiple server instances, use a distributed connection tracker (Redis) - Consider using SignalR with Azure SignalR Service or Redis backplane - Event store should support efficient queries by correlationId + sequence ### Ordering Guarantees - Events within a correlation are ordered by sequence number - Catch-up delivery must respect sequence ordering - Consider idempotency keys for duplicate detection on the client ### Client-Side Deduplication ```dart class EventConnection { final Set _processedEventIds = {}; void _handleEventMessage(Map data) { final eventId = data['eventId'] as String; // Skip if already processed (duplicate delivery) if (_processedEventIds.contains(eventId)) { return; } _processedEventIds.add(eventId); // Process the event... } } ``` ### Error Handling - Handle WebSocket disconnections with exponential backoff reconnection - Queue outgoing messages during disconnection - Validate subscription requests on the backend (authorization, rate limiting) ### Security - Validate that users can only subscribe to correlations they own - Authenticate WebSocket connections with JWT tokens - Rate limit subscription creation to prevent abuse