# Phase 8: Bidirectional Communication & Persistent Subscriptions - Implementation Summary **Status**: 🚧 **IN PROGRESS** - Core implementation complete, naming conflicts need resolution Phase 8 implements persistent, correlation-based event subscriptions that survive client disconnection and support selective event filtering with catch-up delivery. --- ## Overview Phase 8 extends Phase 7.2's basic SignalR streaming with a comprehensive persistent subscription system based on the design in `bidirectional-communication-design.md`. ### Key Differences from Phase 7.2 **Phase 7.2 (Basic SignalR):** - Stream-based subscriptions (subscribe to entire stream) - Client must stay connected to receive events - Offline = missed events - All-or-nothing event delivery **Phase 8 (Persistent Subscriptions):** - Correlation-based subscriptions (subscribe to specific command executions) - Subscriptions persist across disconnections - Catch-up mechanism delivers missed events - Selective event filtering (choose which event types to receive) - Terminal events auto-complete subscriptions - Multiple delivery modes --- ## 📋 Phase 8.1: Subscription Abstractions ### Files Created #### `Svrnty.CQRS.Events.Abstractions/Subscriptions/SubscriptionTypes.cs` ```csharp public enum SubscriptionStatus { Active, Completed, Expired, Cancelled, Paused } public enum DeliveryMode { Immediate, // Push immediately Batched, // Batch and deliver periodically OnReconnect // Only deliver when client reconnects } ``` #### `Svrnty.CQRS.Events.Abstractions/Subscriptions/PersistentSubscription.cs` (173 lines) Domain model with: - **Properties**: Id, SubscriberId, CorrelationId, EventTypes (filter), TerminalEventTypes - **Tracking**: LastDeliveredSequence, CreatedAt, ExpiresAt, CompletedAt - **Lifecycle**: `Complete()`, `Cancel()`, `Expire()`, `Pause()`, `Resume()` - **Filtering**: `ShouldDeliverEventType()`, `IsTerminalEvent()`, `CanReceiveEvents` #### `Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionStore.cs` Persistence interface: - `CreateAsync()`, `GetByIdAsync()`, `GetBySubscriberIdAsync()` - `GetByCorrelationIdAsync()`, `GetByStatusAsync()`, `GetByConnectionIdAsync()` - `UpdateAsync()`, `DeleteAsync()`, `GetExpiredSubscriptionsAsync()` #### `Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionManager.cs` Lifecycle management: - `CreateSubscriptionAsync()` - Create with event filters and terminal events - `MarkEventDeliveredAsync()` - Track delivery progress - `CompleteSubscriptionAsync()`, `CancelSubscriptionAsync()` - `PauseSubscriptionAsync()`, `ResumeSubscriptionAsync()` - `AttachConnectionAsync()`, `DetachConnectionAsync()` - `CleanupExpiredSubscriptionsAsync()` #### `Svrnty.CQRS.Events.Abstractions/Subscriptions/IEventDeliveryService.cs` Event routing: - `DeliverEventAsync()` - Deliver to all matching subscriptions - `CatchUpSubscriptionAsync()` - Deliver missed events - `GetPendingEventsAsync()` - Query undelivered events --- ## 🔧 Phase 8.2: Subscription Manager ### Files Created #### `Svrnty.CQRS.Events/Subscriptions/SubscriptionManager.cs` (234 lines) Default implementation: - Creates subscriptions with GUID IDs - Tracks delivery progress via LastDeliveredSequence - Implements full lifecycle (create, pause, resume, cancel, complete) - Connection management (attach/detach) - Automatic expiration cleanup #### `Svrnty.CQRS.Events/Subscriptions/InMemorySubscriptionStore.cs` Development storage using `ConcurrentDictionary`: - Thread-safe in-memory storage - Query by correlation ID, subscriber ID, status, connection ID - Expiration detection via `DateTimeOffset` comparison --- ## 📨 Phase 8.3: Event Delivery Service ### Files Created #### `Svrnty.CQRS.Events/Subscriptions/EventDeliveryService.cs` (194 lines) Core delivery logic: - Matches events to subscriptions by correlation ID - Filters events by event type name - Respects delivery modes (Immediate, Batched, OnReconnect) - Detects and processes terminal events - Catch-up logic for missed events - Integration with `IEventStreamStore.ReadStreamAsync()` **Key Method**: ```csharp public async Task DeliverEventAsync( string correlationId, ICorrelatedEvent @event, CancellationToken cancellationToken) { // Get all active subscriptions for correlation // Filter by event type // Check delivery mode // Detect terminal events → Complete subscription return deliveredCount; } ``` --- ## ⏱️ Phase 8.4: Catch-up Mechanism Integrated into `EventDeliveryService.CatchUpSubscriptionAsync()`: - Reads events from stream starting at `LastDeliveredSequence + 1` - Filters by event type preferences - Stops at terminal events - Updates sequence tracking --- ## 🗄️ Phase 8.5: PostgreSQL Storage ### Files Created #### `Svrnty.CQRS.Events.PostgreSQL/Migrations/009_PersistentSubscriptions.sql` ```sql CREATE TABLE persistent_subscriptions ( id TEXT PRIMARY KEY, subscriber_id TEXT NOT NULL, correlation_id TEXT NOT NULL, event_types JSONB NOT NULL DEFAULT '[]'::jsonb, terminal_event_types JSONB NOT NULL DEFAULT '[]'::jsonb, delivery_mode INT NOT NULL DEFAULT 0, last_delivered_sequence BIGINT NOT NULL DEFAULT -1, status INT NOT NULL DEFAULT 0, connection_id TEXT NULL, ... ); -- Indexes for hot paths CREATE INDEX idx_persistent_subscriptions_correlation_id ON ...; CREATE INDEX idx_persistent_subscriptions_correlation_active ON ... WHERE status = 0; ``` #### `Svrnty.CQRS.Events.PostgreSQL/Subscriptions/PostgresSubscriptionStore.cs` (330 lines) Production storage: - JSONB for event type arrays - Indexed queries by correlation ID (hot path) - Reflection-based property setting for private setters - UPSERT pattern for updates #### Service Registration ```csharp services.AddPostgresSubscriptionStore(); ``` --- ## 🔄 Phase 8.6: Enhanced SignalR Hub ### Files Created #### `Svrnty.CQRS.Events.SignalR/PersistentSubscriptionHub.cs` (370 lines) WebSocket protocol implementation: **Client Methods**: - `CreateSubscription(request)` - Create persistent subscription - `AttachSubscription(subscriptionId)` - Reconnect to existing subscription - `DetachSubscription(subscriptionId)` - Temporarily disconnect - `CancelSubscription(subscriptionId)` - Permanently cancel - `CatchUp(subscriptionId)` - Request missed events - `PauseSubscription(subscriptionId)`, `ResumeSubscription(subscriptionId)` - `GetMySubscriptions(subscriberId)` - Query user's subscriptions **Server Events** (pushed to clients): - `SubscriptionCreated` - Confirmation with subscription ID - `EventReceived` - New event delivered - `SubscriptionCompleted` - Terminal event received - `CatchUpComplete` - Catch-up finished - `Error` - Error occurred **Request Model**: ```csharp public class CreateSubscriptionRequest { public required string SubscriberId { get; init; } public required string CorrelationId { get; init; } public List? EventTypes { get; init; } public List? TerminalEventTypes { get; init; } public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate; public DateTimeOffset? ExpiresAt { get; init; } public string? DataSourceId { get; init; } } ``` #### Updated `Svrnty.CQRS.Events.SignalR/ServiceCollectionExtensions.cs` Added extension methods: ```csharp services.AddPersistentSubscriptionHub(); app.MapPersistentSubscriptionHub("/hubs/subscriptions"); ``` --- ## ⚙️ Phase 8.7: Command Integration ### Files Created #### `Svrnty.CQRS.Events/Subscriptions/SubscriptionDeliveryHostedService.cs` (154 lines) Background service for automatic event delivery: - Polls every 500ms for new events - Groups subscriptions by correlation ID - Reads new events from streams - Filters by event type - Detects terminal events - Cleans up expired subscriptions **Processing Flow**: ``` 1. Get all Active subscriptions 2. Group by CorrelationId 3. For each correlation: a. Find min LastDeliveredSequence b. Read new events from stream c. For each subscription: - Filter by EventTypes - Check DeliveryMode - Mark as delivered - Check for TerminalEvent → Complete 4. Cleanup expired subscriptions ``` #### `Svrnty.CQRS.Events/Subscriptions/SubscriptionEventPublisherDecorator.cs` Decorator pattern for `IEventPublisher`: - Wraps event publishing - Triggers background delivery (fire-and-forget) - Non-blocking design #### Service Registration ```csharp services.AddPersistentSubscriptions( useInMemoryStore: !usePostgreSQL, enableBackgroundDelivery: true); ``` --- ## 🎯 Phase 8.8: Sample Implementation ### Files Created #### `Svrnty.Sample/Invitations/InvitationEvents.cs` Event definitions: - `InvitationSentEvent` - `InvitationAcceptedEvent` (Terminal) - `InvitationDeclinedEvent` (Terminal) - `InvitationReminderSentEvent` #### `Svrnty.Sample/Invitations/InvitationCommands.cs` Commands: - `SendInvitationCommand` → Returns `SendInvitationResult` with SubscriptionId - `AcceptInvitationCommand`, `DeclineInvitationCommand` - `SendInvitationReminderCommand` #### `Svrnty.Sample/Invitations/InvitationCommandHandlers.cs` (220 lines) Handlers demonstrating integration: **SendInvitationCommandHandler**: ```csharp 1. Generate invitationId and correlationId = $"invitation-{invitationId}" 2. Publish InvitationSentEvent with correlation 3. Optionally create PersistentSubscription: - EventTypes: [InvitationAccepted, InvitationDeclined, InvitationReminder] - TerminalEventTypes: [InvitationAccepted, InvitationDeclined] - Delivery: Immediate - Expires: 30 days 4. Return {InvitationId, CorrelationId, SubscriptionId} ``` #### `Svrnty.Sample/Invitations/InvitationEndpoints.cs` HTTP API: ``` POST /api/invitations/send POST /api/invitations/{id}/accept POST /api/invitations/{id}/decline POST /api/invitations/{id}/reminder GET /api/invitations/subscriptions/{subscriptionId} POST /api/invitations/subscriptions/{subscriptionId}/cancel GET /api/invitations/subscriptions/{subscriptionId}/pending ``` #### `Program.cs` Integration Added: ```csharp // Services builder.Services.AddSignalR(); builder.Services.AddPersistentSubscriptions(useInMemoryStore: !usePostgreSQL); if (usePostgreSQL) { builder.Services.AddPostgresSubscriptionStore(); } builder.Services.AddPersistentSubscriptionHub(); // Command handlers builder.Services.AddCommand(); builder.Services.AddCommand(); ... // Endpoints app.MapPersistentSubscriptionHub("/hubs/subscriptions"); app.MapInvitationEndpoints(); ``` --- ## 🚧 Known Issues ### 1. Naming Conflicts (Blocking Compilation) There are ambiguous type references with existing interfaces from earlier phases: **Conflicts:** - `IEventDeliveryService` exists in both: - `Svrnty.CQRS.Events.Abstractions` (from earlier phase) - `Svrnty.CQRS.Events.Abstractions.Subscriptions` (Phase 8) - `ISubscriptionStore` exists in both: - `Svrnty.CQRS.Events.Abstractions` (from earlier phase) - `Svrnty.CQRS.Events.Abstractions.Subscriptions` (Phase 8) **Resolution Options:** 1. **Rename Phase 8 interfaces** (Recommended): - `IEventDeliveryService` → `ISubscriptionEventDeliveryService` - `ISubscriptionStore` → `IPersistentSubscriptionStore` 2. **Use namespace aliases** in implementation files: ```csharp using SubscriptionDelivery = Svrnty.CQRS.Events.Abstractions.Subscriptions.IEventDeliveryService; ``` 3. **Consolidate interfaces** if they serve similar purposes ### 2. EventData vs ICorrelatedEvent The implementation uses `ICorrelatedEvent` from the existing event system, but doesn't have access to sequence numbers directly. The current design tracks sequences via `LastDeliveredSequence` on subscriptions, but this needs to be mapped to stream offsets from `IEventStreamStore.ReadStreamAsync()`. **Current Workaround**: - Using stream offset as implicit sequence - `LastDeliveredSequence` maps to `fromOffset` parameter **Better Approach**: - Wrap `ICorrelatedEvent` with metadata (offset, sequence) - Or extend event store to return enriched event data ### 3. Event Type Name Resolution Currently using `@event.GetType().Name` which assumes: - Event types are uniquely named - No namespace collisions - No assembly versioning issues **Better Approach**: - Use fully qualified type names - Or event type registry with string keys --- ## 📦 Package Structure ``` Svrnty.CQRS.Events.Abstractions/ └── Subscriptions/ ├── PersistentSubscription.cs (domain model) ├── SubscriptionTypes.cs (enums) ├── ISubscriptionStore.cs ├── ISubscriptionManager.cs └── IEventDeliveryService.cs Svrnty.CQRS.Events/ └── Subscriptions/ ├── SubscriptionManager.cs ├── InMemorySubscriptionStore.cs ├── EventDeliveryService.cs ├── SubscriptionDeliveryHostedService.cs ├── SubscriptionEventPublisherDecorator.cs └── ServiceCollectionExtensions.cs Svrnty.CQRS.Events.PostgreSQL/ ├── Migrations/ │ └── 009_PersistentSubscriptions.sql └── Subscriptions/ ├── PostgresSubscriptionStore.cs └── ServiceCollectionExtensions.cs Svrnty.CQRS.Events.SignalR/ ├── PersistentSubscriptionHub.cs └── ServiceCollectionExtensions.cs (updated) Svrnty.Sample/ └── Invitations/ ├── InvitationEvents.cs ├── InvitationCommands.cs ├── InvitationCommandHandlers.cs └── InvitationEndpoints.cs ``` --- ## 🎓 Key Design Patterns ### 1. Persistent Subscription Pattern - Subscriptions survive disconnections - Sequence-based catch-up - Terminal event completion ### 2. Correlation-Based Filtering - Events grouped by correlation ID (command execution) - Selective event type delivery - Terminal events auto-complete ### 3. Multiple Delivery Modes - **Immediate**: Push as events occur - **Batched**: Periodic batch delivery - **OnReconnect**: Only deliver on client request ### 4. Background Processing - Hosted service polls for new events - Automatic delivery to active subscriptions - Automatic expiration cleanup ### 5. Repository + Manager Pattern - `ISubscriptionStore` = data access - `ISubscriptionManager` = business logic + lifecycle --- ## 📝 Next Steps to Complete Phase 8 1. **Resolve Naming Conflicts** (HIGH PRIORITY): - Rename interfaces to avoid ambiguity - Update all references - Ensure clean compilation 2. **Fix Event Sequence Tracking**: - Map stream offsets to subscription sequences - Ensure accurate catch-up logic 3. **Complete Integration Testing**: - Test invitation workflow end-to-end - Verify terminal event completion - Test catch-up after disconnect 4. **Implement Batched Delivery Mode**: - Currently Batched mode is placeholder - Add batch aggregation logic - Add batch delivery timer 5. **Add SignalR Push Notifications**: - Currently delivery happens in background - Need to push events via SignalR when client is connected - Integrate `IHubContext` for server-initiated pushes 6. **Testing Scenarios**: ```bash # 1. Send invitation curl -X POST http://localhost:6001/api/invitations/send \ -H "Content-Type: application/json" \ -d '{ "inviterUserId": "user1", "inviteeEmail": "user2@example.com", "message": "Join our team!", "createSubscription": true }' # Returns: {invitationId, correlationId, subscriptionId} # 2. Accept invitation (triggers terminal event) curl -X POST http://localhost:6001/api/invitations/{id}/accept \ -H "Content-Type: application/json" \ -d '{"acceptedByUserId": "user2"}' # 3. Check subscription status (should be Completed) curl http://localhost:6001/api/invitations/subscriptions/{subscriptionId} ``` --- ## 🎯 Phase 8 Summary **Created**: 15+ new files, 2000+ lines of code **Core Capabilities**: - ✅ Persistent subscriptions with correlation filtering - ✅ Selective event type delivery - ✅ Terminal event auto-completion - ✅ Catch-up mechanism for missed events - ✅ Multiple delivery modes - ✅ PostgreSQL persistent storage - ✅ SignalR WebSocket protocol - ✅ Background delivery service - ✅ Sample invitation workflow - ⚠️ Naming conflicts need resolution - ⚠️ SignalR push integration incomplete **Architecture**: - Clean separation: Abstractions → Implementation → Storage → Transport - Supports in-memory (dev) and PostgreSQL (prod) - Background hosted service for automatic delivery - SignalR for real-time client communication - Event-driven with terminal event support **Future Enhancements**: - Subscription groups (multiple subscribers per subscription) - Subscription templates (pre-configured filters) - Delivery guarantees (at-least-once, exactly-once) - Dead letter queue for failed deliveries - Subscription analytics and monitoring - GraphQL subscription integration