dotnet-cqrs/PHASE_8_SUMMARY.md

536 lines
17 KiB
Markdown

# Phase 8: Bidirectional Communication & Persistent Subscriptions - Implementation Summary
**Status**: 🚧 **IN PROGRESS** - Core implementation complete, naming conflicts need resolution
Phase 8 implements persistent, correlation-based event subscriptions that survive client disconnection and support selective event filtering with catch-up delivery.
---
## Overview
Phase 8 extends Phase 7.2's basic SignalR streaming with a comprehensive persistent subscription system based on the design in `bidirectional-communication-design.md`.
### Key Differences from Phase 7.2
**Phase 7.2 (Basic SignalR):**
- Stream-based subscriptions (subscribe to entire stream)
- Client must stay connected to receive events
- Offline = missed events
- All-or-nothing event delivery
**Phase 8 (Persistent Subscriptions):**
- Correlation-based subscriptions (subscribe to specific command executions)
- Subscriptions persist across disconnections
- Catch-up mechanism delivers missed events
- Selective event filtering (choose which event types to receive)
- Terminal events auto-complete subscriptions
- Multiple delivery modes
---
## 📋 Phase 8.1: Subscription Abstractions
### Files Created
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/SubscriptionTypes.cs`
```csharp
public enum SubscriptionStatus
{
Active, Completed, Expired, Cancelled, Paused
}
public enum DeliveryMode
{
Immediate, // Push immediately
Batched, // Batch and deliver periodically
OnReconnect // Only deliver when client reconnects
}
```
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/PersistentSubscription.cs` (173 lines)
Domain model with:
- **Properties**: Id, SubscriberId, CorrelationId, EventTypes (filter), TerminalEventTypes
- **Tracking**: LastDeliveredSequence, CreatedAt, ExpiresAt, CompletedAt
- **Lifecycle**: `Complete()`, `Cancel()`, `Expire()`, `Pause()`, `Resume()`
- **Filtering**: `ShouldDeliverEventType()`, `IsTerminalEvent()`, `CanReceiveEvents`
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionStore.cs`
Persistence interface:
- `CreateAsync()`, `GetByIdAsync()`, `GetBySubscriberIdAsync()`
- `GetByCorrelationIdAsync()`, `GetByStatusAsync()`, `GetByConnectionIdAsync()`
- `UpdateAsync()`, `DeleteAsync()`, `GetExpiredSubscriptionsAsync()`
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionManager.cs`
Lifecycle management:
- `CreateSubscriptionAsync()` - Create with event filters and terminal events
- `MarkEventDeliveredAsync()` - Track delivery progress
- `CompleteSubscriptionAsync()`, `CancelSubscriptionAsync()`
- `PauseSubscriptionAsync()`, `ResumeSubscriptionAsync()`
- `AttachConnectionAsync()`, `DetachConnectionAsync()`
- `CleanupExpiredSubscriptionsAsync()`
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/IEventDeliveryService.cs`
Event routing:
- `DeliverEventAsync()` - Deliver to all matching subscriptions
- `CatchUpSubscriptionAsync()` - Deliver missed events
- `GetPendingEventsAsync()` - Query undelivered events
---
## 🔧 Phase 8.2: Subscription Manager
### Files Created
#### `Svrnty.CQRS.Events/Subscriptions/SubscriptionManager.cs` (234 lines)
Default implementation:
- Creates subscriptions with GUID IDs
- Tracks delivery progress via LastDeliveredSequence
- Implements full lifecycle (create, pause, resume, cancel, complete)
- Connection management (attach/detach)
- Automatic expiration cleanup
#### `Svrnty.CQRS.Events/Subscriptions/InMemorySubscriptionStore.cs`
Development storage using `ConcurrentDictionary`:
- Thread-safe in-memory storage
- Query by correlation ID, subscriber ID, status, connection ID
- Expiration detection via `DateTimeOffset` comparison
---
## 📨 Phase 8.3: Event Delivery Service
### Files Created
#### `Svrnty.CQRS.Events/Subscriptions/EventDeliveryService.cs` (194 lines)
Core delivery logic:
- Matches events to subscriptions by correlation ID
- Filters events by event type name
- Respects delivery modes (Immediate, Batched, OnReconnect)
- Detects and processes terminal events
- Catch-up logic for missed events
- Integration with `IEventStreamStore.ReadStreamAsync()`
**Key Method**:
```csharp
public async Task<int> DeliverEventAsync(
string correlationId,
ICorrelatedEvent @event,
CancellationToken cancellationToken)
{
// Get all active subscriptions for correlation
// Filter by event type
// Check delivery mode
// Detect terminal events → Complete subscription
return deliveredCount;
}
```
---
## ⏱️ Phase 8.4: Catch-up Mechanism
Integrated into `EventDeliveryService.CatchUpSubscriptionAsync()`:
- Reads events from stream starting at `LastDeliveredSequence + 1`
- Filters by event type preferences
- Stops at terminal events
- Updates sequence tracking
---
## 🗄️ Phase 8.5: PostgreSQL Storage
### Files Created
#### `Svrnty.CQRS.Events.PostgreSQL/Migrations/009_PersistentSubscriptions.sql`
```sql
CREATE TABLE persistent_subscriptions (
id TEXT PRIMARY KEY,
subscriber_id TEXT NOT NULL,
correlation_id TEXT NOT NULL,
event_types JSONB NOT NULL DEFAULT '[]'::jsonb,
terminal_event_types JSONB NOT NULL DEFAULT '[]'::jsonb,
delivery_mode INT NOT NULL DEFAULT 0,
last_delivered_sequence BIGINT NOT NULL DEFAULT -1,
status INT NOT NULL DEFAULT 0,
connection_id TEXT NULL,
...
);
-- Indexes for hot paths
CREATE INDEX idx_persistent_subscriptions_correlation_id ON ...;
CREATE INDEX idx_persistent_subscriptions_correlation_active ON ... WHERE status = 0;
```
#### `Svrnty.CQRS.Events.PostgreSQL/Subscriptions/PostgresSubscriptionStore.cs` (330 lines)
Production storage:
- JSONB for event type arrays
- Indexed queries by correlation ID (hot path)
- Reflection-based property setting for private setters
- UPSERT pattern for updates
#### Service Registration
```csharp
services.AddPostgresSubscriptionStore();
```
---
## 🔄 Phase 8.6: Enhanced SignalR Hub
### Files Created
#### `Svrnty.CQRS.Events.SignalR/PersistentSubscriptionHub.cs` (370 lines)
WebSocket protocol implementation:
**Client Methods**:
- `CreateSubscription(request)` - Create persistent subscription
- `AttachSubscription(subscriptionId)` - Reconnect to existing subscription
- `DetachSubscription(subscriptionId)` - Temporarily disconnect
- `CancelSubscription(subscriptionId)` - Permanently cancel
- `CatchUp(subscriptionId)` - Request missed events
- `PauseSubscription(subscriptionId)`, `ResumeSubscription(subscriptionId)`
- `GetMySubscriptions(subscriberId)` - Query user's subscriptions
**Server Events** (pushed to clients):
- `SubscriptionCreated` - Confirmation with subscription ID
- `EventReceived` - New event delivered
- `SubscriptionCompleted` - Terminal event received
- `CatchUpComplete` - Catch-up finished
- `Error` - Error occurred
**Request Model**:
```csharp
public class CreateSubscriptionRequest
{
public required string SubscriberId { get; init; }
public required string CorrelationId { get; init; }
public List<string>? EventTypes { get; init; }
public List<string>? TerminalEventTypes { get; init; }
public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate;
public DateTimeOffset? ExpiresAt { get; init; }
public string? DataSourceId { get; init; }
}
```
#### Updated `Svrnty.CQRS.Events.SignalR/ServiceCollectionExtensions.cs`
Added extension methods:
```csharp
services.AddPersistentSubscriptionHub();
app.MapPersistentSubscriptionHub("/hubs/subscriptions");
```
---
## ⚙️ Phase 8.7: Command Integration
### Files Created
#### `Svrnty.CQRS.Events/Subscriptions/SubscriptionDeliveryHostedService.cs` (154 lines)
Background service for automatic event delivery:
- Polls every 500ms for new events
- Groups subscriptions by correlation ID
- Reads new events from streams
- Filters by event type
- Detects terminal events
- Cleans up expired subscriptions
**Processing Flow**:
```
1. Get all Active subscriptions
2. Group by CorrelationId
3. For each correlation:
a. Find min LastDeliveredSequence
b. Read new events from stream
c. For each subscription:
- Filter by EventTypes
- Check DeliveryMode
- Mark as delivered
- Check for TerminalEvent → Complete
4. Cleanup expired subscriptions
```
#### `Svrnty.CQRS.Events/Subscriptions/SubscriptionEventPublisherDecorator.cs`
Decorator pattern for `IEventPublisher`:
- Wraps event publishing
- Triggers background delivery (fire-and-forget)
- Non-blocking design
#### Service Registration
```csharp
services.AddPersistentSubscriptions(
useInMemoryStore: !usePostgreSQL,
enableBackgroundDelivery: true);
```
---
## 🎯 Phase 8.8: Sample Implementation
### Files Created
#### `Svrnty.Sample/Invitations/InvitationEvents.cs`
Event definitions:
- `InvitationSentEvent`
- `InvitationAcceptedEvent` (Terminal)
- `InvitationDeclinedEvent` (Terminal)
- `InvitationReminderSentEvent`
#### `Svrnty.Sample/Invitations/InvitationCommands.cs`
Commands:
- `SendInvitationCommand` → Returns `SendInvitationResult` with SubscriptionId
- `AcceptInvitationCommand`, `DeclineInvitationCommand`
- `SendInvitationReminderCommand`
#### `Svrnty.Sample/Invitations/InvitationCommandHandlers.cs` (220 lines)
Handlers demonstrating integration:
**SendInvitationCommandHandler**:
```csharp
1. Generate invitationId and correlationId = $"invitation-{invitationId}"
2. Publish InvitationSentEvent with correlation
3. Optionally create PersistentSubscription:
- EventTypes: [InvitationAccepted, InvitationDeclined, InvitationReminder]
- TerminalEventTypes: [InvitationAccepted, InvitationDeclined]
- Delivery: Immediate
- Expires: 30 days
4. Return {InvitationId, CorrelationId, SubscriptionId}
```
#### `Svrnty.Sample/Invitations/InvitationEndpoints.cs`
HTTP API:
```
POST /api/invitations/send
POST /api/invitations/{id}/accept
POST /api/invitations/{id}/decline
POST /api/invitations/{id}/reminder
GET /api/invitations/subscriptions/{subscriptionId}
POST /api/invitations/subscriptions/{subscriptionId}/cancel
GET /api/invitations/subscriptions/{subscriptionId}/pending
```
#### `Program.cs` Integration
Added:
```csharp
// Services
builder.Services.AddSignalR();
builder.Services.AddPersistentSubscriptions(useInMemoryStore: !usePostgreSQL);
if (usePostgreSQL) {
builder.Services.AddPostgresSubscriptionStore();
}
builder.Services.AddPersistentSubscriptionHub();
// Command handlers
builder.Services.AddCommand<SendInvitationCommand, SendInvitationResult, SendInvitationCommandHandler>();
builder.Services.AddCommand<AcceptInvitationCommand, AcceptInvitationCommandHandler>();
...
// Endpoints
app.MapPersistentSubscriptionHub("/hubs/subscriptions");
app.MapInvitationEndpoints();
```
---
## 🚧 Known Issues
### 1. Naming Conflicts (Blocking Compilation)
There are ambiguous type references with existing interfaces from earlier phases:
**Conflicts:**
- `IEventDeliveryService` exists in both:
- `Svrnty.CQRS.Events.Abstractions` (from earlier phase)
- `Svrnty.CQRS.Events.Abstractions.Subscriptions` (Phase 8)
- `ISubscriptionStore` exists in both:
- `Svrnty.CQRS.Events.Abstractions` (from earlier phase)
- `Svrnty.CQRS.Events.Abstractions.Subscriptions` (Phase 8)
**Resolution Options:**
1. **Rename Phase 8 interfaces** (Recommended):
- `IEventDeliveryService``ISubscriptionEventDeliveryService`
- `ISubscriptionStore``IPersistentSubscriptionStore`
2. **Use namespace aliases** in implementation files:
```csharp
using SubscriptionDelivery = Svrnty.CQRS.Events.Abstractions.Subscriptions.IEventDeliveryService;
```
3. **Consolidate interfaces** if they serve similar purposes
### 2. EventData vs ICorrelatedEvent
The implementation uses `ICorrelatedEvent` from the existing event system, but doesn't have access to sequence numbers directly. The current design tracks sequences via `LastDeliveredSequence` on subscriptions, but this needs to be mapped to stream offsets from `IEventStreamStore.ReadStreamAsync()`.
**Current Workaround**:
- Using stream offset as implicit sequence
- `LastDeliveredSequence` maps to `fromOffset` parameter
**Better Approach**:
- Wrap `ICorrelatedEvent` with metadata (offset, sequence)
- Or extend event store to return enriched event data
### 3. Event Type Name Resolution
Currently using `@event.GetType().Name` which assumes:
- Event types are uniquely named
- No namespace collisions
- No assembly versioning issues
**Better Approach**:
- Use fully qualified type names
- Or event type registry with string keys
---
## 📦 Package Structure
```
Svrnty.CQRS.Events.Abstractions/
└── Subscriptions/
├── PersistentSubscription.cs (domain model)
├── SubscriptionTypes.cs (enums)
├── ISubscriptionStore.cs
├── ISubscriptionManager.cs
└── IEventDeliveryService.cs
Svrnty.CQRS.Events/
└── Subscriptions/
├── SubscriptionManager.cs
├── InMemorySubscriptionStore.cs
├── EventDeliveryService.cs
├── SubscriptionDeliveryHostedService.cs
├── SubscriptionEventPublisherDecorator.cs
└── ServiceCollectionExtensions.cs
Svrnty.CQRS.Events.PostgreSQL/
├── Migrations/
│ └── 009_PersistentSubscriptions.sql
└── Subscriptions/
├── PostgresSubscriptionStore.cs
└── ServiceCollectionExtensions.cs
Svrnty.CQRS.Events.SignalR/
├── PersistentSubscriptionHub.cs
└── ServiceCollectionExtensions.cs (updated)
Svrnty.Sample/
└── Invitations/
├── InvitationEvents.cs
├── InvitationCommands.cs
├── InvitationCommandHandlers.cs
└── InvitationEndpoints.cs
```
---
## 🎓 Key Design Patterns
### 1. Persistent Subscription Pattern
- Subscriptions survive disconnections
- Sequence-based catch-up
- Terminal event completion
### 2. Correlation-Based Filtering
- Events grouped by correlation ID (command execution)
- Selective event type delivery
- Terminal events auto-complete
### 3. Multiple Delivery Modes
- **Immediate**: Push as events occur
- **Batched**: Periodic batch delivery
- **OnReconnect**: Only deliver on client request
### 4. Background Processing
- Hosted service polls for new events
- Automatic delivery to active subscriptions
- Automatic expiration cleanup
### 5. Repository + Manager Pattern
- `ISubscriptionStore` = data access
- `ISubscriptionManager` = business logic + lifecycle
---
## 📝 Next Steps to Complete Phase 8
1. **Resolve Naming Conflicts** (HIGH PRIORITY):
- Rename interfaces to avoid ambiguity
- Update all references
- Ensure clean compilation
2. **Fix Event Sequence Tracking**:
- Map stream offsets to subscription sequences
- Ensure accurate catch-up logic
3. **Complete Integration Testing**:
- Test invitation workflow end-to-end
- Verify terminal event completion
- Test catch-up after disconnect
4. **Implement Batched Delivery Mode**:
- Currently Batched mode is placeholder
- Add batch aggregation logic
- Add batch delivery timer
5. **Add SignalR Push Notifications**:
- Currently delivery happens in background
- Need to push events via SignalR when client is connected
- Integrate `IHubContext` for server-initiated pushes
6. **Testing Scenarios**:
```bash
# 1. Send invitation
curl -X POST http://localhost:6001/api/invitations/send \
-H "Content-Type: application/json" \
-d '{
"inviterUserId": "user1",
"inviteeEmail": "user2@example.com",
"message": "Join our team!",
"createSubscription": true
}'
# Returns: {invitationId, correlationId, subscriptionId}
# 2. Accept invitation (triggers terminal event)
curl -X POST http://localhost:6001/api/invitations/{id}/accept \
-H "Content-Type: application/json" \
-d '{"acceptedByUserId": "user2"}'
# 3. Check subscription status (should be Completed)
curl http://localhost:6001/api/invitations/subscriptions/{subscriptionId}
```
---
## 🎯 Phase 8 Summary
**Created**: 15+ new files, 2000+ lines of code
**Core Capabilities**:
- ✅ Persistent subscriptions with correlation filtering
- ✅ Selective event type delivery
- ✅ Terminal event auto-completion
- ✅ Catch-up mechanism for missed events
- ✅ Multiple delivery modes
- ✅ PostgreSQL persistent storage
- ✅ SignalR WebSocket protocol
- ✅ Background delivery service
- ✅ Sample invitation workflow
- ⚠️ Naming conflicts need resolution
- ⚠️ SignalR push integration incomplete
**Architecture**:
- Clean separation: Abstractions → Implementation → Storage → Transport
- Supports in-memory (dev) and PostgreSQL (prod)
- Background hosted service for automatic delivery
- SignalR for real-time client communication
- Event-driven with terminal event support
**Future Enhancements**:
- Subscription groups (multiple subscribers per subscription)
- Subscription templates (pre-configured filters)
- Delivery guarantees (at-least-once, exactly-once)
- Dead letter queue for failed deliveries
- Subscription analytics and monitoring
- GraphQL subscription integration