dotnet-cqrs/PHASE_8_SUMMARY.md

17 KiB

Phase 8: Bidirectional Communication & Persistent Subscriptions - Implementation Summary

Status: 🚧 IN PROGRESS - Core implementation complete, naming conflicts need resolution

Phase 8 implements persistent, correlation-based event subscriptions that survive client disconnection and support selective event filtering with catch-up delivery.


Overview

Phase 8 extends Phase 7.2's basic SignalR streaming with a comprehensive persistent subscription system based on the design in bidirectional-communication-design.md.

Key Differences from Phase 7.2

Phase 7.2 (Basic SignalR):

  • Stream-based subscriptions (subscribe to entire stream)
  • Client must stay connected to receive events
  • Offline = missed events
  • All-or-nothing event delivery

Phase 8 (Persistent Subscriptions):

  • Correlation-based subscriptions (subscribe to specific command executions)
  • Subscriptions persist across disconnections
  • Catch-up mechanism delivers missed events
  • Selective event filtering (choose which event types to receive)
  • Terminal events auto-complete subscriptions
  • Multiple delivery modes

📋 Phase 8.1: Subscription Abstractions

Files Created

Svrnty.CQRS.Events.Abstractions/Subscriptions/SubscriptionTypes.cs

public enum SubscriptionStatus
{
    Active, Completed, Expired, Cancelled, Paused
}

public enum DeliveryMode
{
    Immediate,   // Push immediately
    Batched,     // Batch and deliver periodically
    OnReconnect  // Only deliver when client reconnects
}

Svrnty.CQRS.Events.Abstractions/Subscriptions/PersistentSubscription.cs (173 lines)

Domain model with:

  • Properties: Id, SubscriberId, CorrelationId, EventTypes (filter), TerminalEventTypes
  • Tracking: LastDeliveredSequence, CreatedAt, ExpiresAt, CompletedAt
  • Lifecycle: Complete(), Cancel(), Expire(), Pause(), Resume()
  • Filtering: ShouldDeliverEventType(), IsTerminalEvent(), CanReceiveEvents

Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionStore.cs

Persistence interface:

  • CreateAsync(), GetByIdAsync(), GetBySubscriberIdAsync()
  • GetByCorrelationIdAsync(), GetByStatusAsync(), GetByConnectionIdAsync()
  • UpdateAsync(), DeleteAsync(), GetExpiredSubscriptionsAsync()

Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionManager.cs

Lifecycle management:

  • CreateSubscriptionAsync() - Create with event filters and terminal events
  • MarkEventDeliveredAsync() - Track delivery progress
  • CompleteSubscriptionAsync(), CancelSubscriptionAsync()
  • PauseSubscriptionAsync(), ResumeSubscriptionAsync()
  • AttachConnectionAsync(), DetachConnectionAsync()
  • CleanupExpiredSubscriptionsAsync()

Svrnty.CQRS.Events.Abstractions/Subscriptions/IEventDeliveryService.cs

Event routing:

  • DeliverEventAsync() - Deliver to all matching subscriptions
  • CatchUpSubscriptionAsync() - Deliver missed events
  • GetPendingEventsAsync() - Query undelivered events

🔧 Phase 8.2: Subscription Manager

Files Created

Svrnty.CQRS.Events/Subscriptions/SubscriptionManager.cs (234 lines)

Default implementation:

  • Creates subscriptions with GUID IDs
  • Tracks delivery progress via LastDeliveredSequence
  • Implements full lifecycle (create, pause, resume, cancel, complete)
  • Connection management (attach/detach)
  • Automatic expiration cleanup

Svrnty.CQRS.Events/Subscriptions/InMemorySubscriptionStore.cs

Development storage using ConcurrentDictionary:

  • Thread-safe in-memory storage
  • Query by correlation ID, subscriber ID, status, connection ID
  • Expiration detection via DateTimeOffset comparison

📨 Phase 8.3: Event Delivery Service

Files Created

Svrnty.CQRS.Events/Subscriptions/EventDeliveryService.cs (194 lines)

Core delivery logic:

  • Matches events to subscriptions by correlation ID
  • Filters events by event type name
  • Respects delivery modes (Immediate, Batched, OnReconnect)
  • Detects and processes terminal events
  • Catch-up logic for missed events
  • Integration with IEventStreamStore.ReadStreamAsync()

Key Method:

public async Task<int> DeliverEventAsync(
    string correlationId,
    ICorrelatedEvent @event,
    CancellationToken cancellationToken)
{
    // Get all active subscriptions for correlation
    // Filter by event type
    // Check delivery mode
    // Detect terminal events → Complete subscription
    return deliveredCount;
}

⏱️ Phase 8.4: Catch-up Mechanism

Integrated into EventDeliveryService.CatchUpSubscriptionAsync():

  • Reads events from stream starting at LastDeliveredSequence + 1
  • Filters by event type preferences
  • Stops at terminal events
  • Updates sequence tracking

🗄️ Phase 8.5: PostgreSQL Storage

Files Created

Svrnty.CQRS.Events.PostgreSQL/Migrations/009_PersistentSubscriptions.sql

CREATE TABLE persistent_subscriptions (
    id TEXT PRIMARY KEY,
    subscriber_id TEXT NOT NULL,
    correlation_id TEXT NOT NULL,
    event_types JSONB NOT NULL DEFAULT '[]'::jsonb,
    terminal_event_types JSONB NOT NULL DEFAULT '[]'::jsonb,
    delivery_mode INT NOT NULL DEFAULT 0,
    last_delivered_sequence BIGINT NOT NULL DEFAULT -1,
    status INT NOT NULL DEFAULT 0,
    connection_id TEXT NULL,
    ...
);

-- Indexes for hot paths
CREATE INDEX idx_persistent_subscriptions_correlation_id ON ...;
CREATE INDEX idx_persistent_subscriptions_correlation_active ON ... WHERE status = 0;

Svrnty.CQRS.Events.PostgreSQL/Subscriptions/PostgresSubscriptionStore.cs (330 lines)

Production storage:

  • JSONB for event type arrays
  • Indexed queries by correlation ID (hot path)
  • Reflection-based property setting for private setters
  • UPSERT pattern for updates

Service Registration

services.AddPostgresSubscriptionStore();

🔄 Phase 8.6: Enhanced SignalR Hub

Files Created

Svrnty.CQRS.Events.SignalR/PersistentSubscriptionHub.cs (370 lines)

WebSocket protocol implementation:

Client Methods:

  • CreateSubscription(request) - Create persistent subscription
  • AttachSubscription(subscriptionId) - Reconnect to existing subscription
  • DetachSubscription(subscriptionId) - Temporarily disconnect
  • CancelSubscription(subscriptionId) - Permanently cancel
  • CatchUp(subscriptionId) - Request missed events
  • PauseSubscription(subscriptionId), ResumeSubscription(subscriptionId)
  • GetMySubscriptions(subscriberId) - Query user's subscriptions

Server Events (pushed to clients):

  • SubscriptionCreated - Confirmation with subscription ID
  • EventReceived - New event delivered
  • SubscriptionCompleted - Terminal event received
  • CatchUpComplete - Catch-up finished
  • Error - Error occurred

Request Model:

public class CreateSubscriptionRequest
{
    public required string SubscriberId { get; init; }
    public required string CorrelationId { get; init; }
    public List<string>? EventTypes { get; init; }
    public List<string>? TerminalEventTypes { get; init; }
    public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate;
    public DateTimeOffset? ExpiresAt { get; init; }
    public string? DataSourceId { get; init; }
}

Updated Svrnty.CQRS.Events.SignalR/ServiceCollectionExtensions.cs

Added extension methods:

services.AddPersistentSubscriptionHub();
app.MapPersistentSubscriptionHub("/hubs/subscriptions");

⚙️ Phase 8.7: Command Integration

Files Created

Svrnty.CQRS.Events/Subscriptions/SubscriptionDeliveryHostedService.cs (154 lines)

Background service for automatic event delivery:

  • Polls every 500ms for new events
  • Groups subscriptions by correlation ID
  • Reads new events from streams
  • Filters by event type
  • Detects terminal events
  • Cleans up expired subscriptions

Processing Flow:

1. Get all Active subscriptions
2. Group by CorrelationId
3. For each correlation:
   a. Find min LastDeliveredSequence
   b. Read new events from stream
   c. For each subscription:
      - Filter by EventTypes
      - Check DeliveryMode
      - Mark as delivered
      - Check for TerminalEvent → Complete
4. Cleanup expired subscriptions

Svrnty.CQRS.Events/Subscriptions/SubscriptionEventPublisherDecorator.cs

Decorator pattern for IEventPublisher:

  • Wraps event publishing
  • Triggers background delivery (fire-and-forget)
  • Non-blocking design

Service Registration

services.AddPersistentSubscriptions(
    useInMemoryStore: !usePostgreSQL,
    enableBackgroundDelivery: true);

🎯 Phase 8.8: Sample Implementation

Files Created

Svrnty.Sample/Invitations/InvitationEvents.cs

Event definitions:

  • InvitationSentEvent
  • InvitationAcceptedEvent (Terminal)
  • InvitationDeclinedEvent (Terminal)
  • InvitationReminderSentEvent

Svrnty.Sample/Invitations/InvitationCommands.cs

Commands:

  • SendInvitationCommand → Returns SendInvitationResult with SubscriptionId
  • AcceptInvitationCommand, DeclineInvitationCommand
  • SendInvitationReminderCommand

Svrnty.Sample/Invitations/InvitationCommandHandlers.cs (220 lines)

Handlers demonstrating integration:

SendInvitationCommandHandler:

1. Generate invitationId and correlationId = $"invitation-{invitationId}"
2. Publish InvitationSentEvent with correlation
3. Optionally create PersistentSubscription:
   - EventTypes: [InvitationAccepted, InvitationDeclined, InvitationReminder]
   - TerminalEventTypes: [InvitationAccepted, InvitationDeclined]
   - Delivery: Immediate
   - Expires: 30 days
4. Return {InvitationId, CorrelationId, SubscriptionId}

Svrnty.Sample/Invitations/InvitationEndpoints.cs

HTTP API:

POST /api/invitations/send
POST /api/invitations/{id}/accept
POST /api/invitations/{id}/decline
POST /api/invitations/{id}/reminder
GET  /api/invitations/subscriptions/{subscriptionId}
POST /api/invitations/subscriptions/{subscriptionId}/cancel
GET  /api/invitations/subscriptions/{subscriptionId}/pending

Program.cs Integration

Added:

// Services
builder.Services.AddSignalR();
builder.Services.AddPersistentSubscriptions(useInMemoryStore: !usePostgreSQL);
if (usePostgreSQL) {
    builder.Services.AddPostgresSubscriptionStore();
}
builder.Services.AddPersistentSubscriptionHub();

// Command handlers
builder.Services.AddCommand<SendInvitationCommand, SendInvitationResult, SendInvitationCommandHandler>();
builder.Services.AddCommand<AcceptInvitationCommand, AcceptInvitationCommandHandler>();
...

// Endpoints
app.MapPersistentSubscriptionHub("/hubs/subscriptions");
app.MapInvitationEndpoints();

🚧 Known Issues

1. Naming Conflicts (Blocking Compilation)

There are ambiguous type references with existing interfaces from earlier phases:

Conflicts:

  • IEventDeliveryService exists in both:

    • Svrnty.CQRS.Events.Abstractions (from earlier phase)
    • Svrnty.CQRS.Events.Abstractions.Subscriptions (Phase 8)
  • ISubscriptionStore exists in both:

    • Svrnty.CQRS.Events.Abstractions (from earlier phase)
    • Svrnty.CQRS.Events.Abstractions.Subscriptions (Phase 8)

Resolution Options:

  1. Rename Phase 8 interfaces (Recommended):

    • IEventDeliveryServiceISubscriptionEventDeliveryService
    • ISubscriptionStoreIPersistentSubscriptionStore
  2. Use namespace aliases in implementation files:

    using SubscriptionDelivery = Svrnty.CQRS.Events.Abstractions.Subscriptions.IEventDeliveryService;
    
  3. Consolidate interfaces if they serve similar purposes

2. EventData vs ICorrelatedEvent

The implementation uses ICorrelatedEvent from the existing event system, but doesn't have access to sequence numbers directly. The current design tracks sequences via LastDeliveredSequence on subscriptions, but this needs to be mapped to stream offsets from IEventStreamStore.ReadStreamAsync().

Current Workaround:

  • Using stream offset as implicit sequence
  • LastDeliveredSequence maps to fromOffset parameter

Better Approach:

  • Wrap ICorrelatedEvent with metadata (offset, sequence)
  • Or extend event store to return enriched event data

3. Event Type Name Resolution

Currently using @event.GetType().Name which assumes:

  • Event types are uniquely named
  • No namespace collisions
  • No assembly versioning issues

Better Approach:

  • Use fully qualified type names
  • Or event type registry with string keys

📦 Package Structure

Svrnty.CQRS.Events.Abstractions/
  └── Subscriptions/
      ├── PersistentSubscription.cs (domain model)
      ├── SubscriptionTypes.cs (enums)
      ├── ISubscriptionStore.cs
      ├── ISubscriptionManager.cs
      └── IEventDeliveryService.cs

Svrnty.CQRS.Events/
  └── Subscriptions/
      ├── SubscriptionManager.cs
      ├── InMemorySubscriptionStore.cs
      ├── EventDeliveryService.cs
      ├── SubscriptionDeliveryHostedService.cs
      ├── SubscriptionEventPublisherDecorator.cs
      └── ServiceCollectionExtensions.cs

Svrnty.CQRS.Events.PostgreSQL/
  ├── Migrations/
  │   └── 009_PersistentSubscriptions.sql
  └── Subscriptions/
      ├── PostgresSubscriptionStore.cs
      └── ServiceCollectionExtensions.cs

Svrnty.CQRS.Events.SignalR/
  ├── PersistentSubscriptionHub.cs
  └── ServiceCollectionExtensions.cs (updated)

Svrnty.Sample/
  └── Invitations/
      ├── InvitationEvents.cs
      ├── InvitationCommands.cs
      ├── InvitationCommandHandlers.cs
      └── InvitationEndpoints.cs

🎓 Key Design Patterns

1. Persistent Subscription Pattern

  • Subscriptions survive disconnections
  • Sequence-based catch-up
  • Terminal event completion

2. Correlation-Based Filtering

  • Events grouped by correlation ID (command execution)
  • Selective event type delivery
  • Terminal events auto-complete

3. Multiple Delivery Modes

  • Immediate: Push as events occur
  • Batched: Periodic batch delivery
  • OnReconnect: Only deliver on client request

4. Background Processing

  • Hosted service polls for new events
  • Automatic delivery to active subscriptions
  • Automatic expiration cleanup

5. Repository + Manager Pattern

  • ISubscriptionStore = data access
  • ISubscriptionManager = business logic + lifecycle

📝 Next Steps to Complete Phase 8

  1. Resolve Naming Conflicts (HIGH PRIORITY):

    • Rename interfaces to avoid ambiguity
    • Update all references
    • Ensure clean compilation
  2. Fix Event Sequence Tracking:

    • Map stream offsets to subscription sequences
    • Ensure accurate catch-up logic
  3. Complete Integration Testing:

    • Test invitation workflow end-to-end
    • Verify terminal event completion
    • Test catch-up after disconnect
  4. Implement Batched Delivery Mode:

    • Currently Batched mode is placeholder
    • Add batch aggregation logic
    • Add batch delivery timer
  5. Add SignalR Push Notifications:

    • Currently delivery happens in background
    • Need to push events via SignalR when client is connected
    • Integrate IHubContext for server-initiated pushes
  6. Testing Scenarios:

    # 1. Send invitation
    curl -X POST http://localhost:6001/api/invitations/send \
      -H "Content-Type: application/json" \
      -d '{
        "inviterUserId": "user1",
        "inviteeEmail": "user2@example.com",
        "message": "Join our team!",
        "createSubscription": true
      }'
    # Returns: {invitationId, correlationId, subscriptionId}
    
    # 2. Accept invitation (triggers terminal event)
    curl -X POST http://localhost:6001/api/invitations/{id}/accept \
      -H "Content-Type: application/json" \
      -d '{"acceptedByUserId": "user2"}'
    
    # 3. Check subscription status (should be Completed)
    curl http://localhost:6001/api/invitations/subscriptions/{subscriptionId}
    

🎯 Phase 8 Summary

Created: 15+ new files, 2000+ lines of code

Core Capabilities:

  • Persistent subscriptions with correlation filtering
  • Selective event type delivery
  • Terminal event auto-completion
  • Catch-up mechanism for missed events
  • Multiple delivery modes
  • PostgreSQL persistent storage
  • SignalR WebSocket protocol
  • Background delivery service
  • Sample invitation workflow
  • ⚠️ Naming conflicts need resolution
  • ⚠️ SignalR push integration incomplete

Architecture:

  • Clean separation: Abstractions → Implementation → Storage → Transport
  • Supports in-memory (dev) and PostgreSQL (prod)
  • Background hosted service for automatic delivery
  • SignalR for real-time client communication
  • Event-driven with terminal event support

Future Enhancements:

  • Subscription groups (multiple subscribers per subscription)
  • Subscription templates (pre-configured filters)
  • Delivery guarantees (at-least-once, exactly-once)
  • Dead letter queue for failed deliveries
  • Subscription analytics and monitoring
  • GraphQL subscription integration