536 lines
17 KiB
Markdown
536 lines
17 KiB
Markdown
# Phase 8: Bidirectional Communication & Persistent Subscriptions - Implementation Summary
|
|
|
|
**Status**: 🚧 **IN PROGRESS** - Core implementation complete, naming conflicts need resolution
|
|
|
|
Phase 8 implements persistent, correlation-based event subscriptions that survive client disconnection and support selective event filtering with catch-up delivery.
|
|
|
|
---
|
|
|
|
## Overview
|
|
|
|
Phase 8 extends Phase 7.2's basic SignalR streaming with a comprehensive persistent subscription system based on the design in `bidirectional-communication-design.md`.
|
|
|
|
### Key Differences from Phase 7.2
|
|
|
|
**Phase 7.2 (Basic SignalR):**
|
|
- Stream-based subscriptions (subscribe to entire stream)
|
|
- Client must stay connected to receive events
|
|
- Offline = missed events
|
|
- All-or-nothing event delivery
|
|
|
|
**Phase 8 (Persistent Subscriptions):**
|
|
- Correlation-based subscriptions (subscribe to specific command executions)
|
|
- Subscriptions persist across disconnections
|
|
- Catch-up mechanism delivers missed events
|
|
- Selective event filtering (choose which event types to receive)
|
|
- Terminal events auto-complete subscriptions
|
|
- Multiple delivery modes
|
|
|
|
---
|
|
|
|
## 📋 Phase 8.1: Subscription Abstractions
|
|
|
|
### Files Created
|
|
|
|
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/SubscriptionTypes.cs`
|
|
```csharp
|
|
public enum SubscriptionStatus
|
|
{
|
|
Active, Completed, Expired, Cancelled, Paused
|
|
}
|
|
|
|
public enum DeliveryMode
|
|
{
|
|
Immediate, // Push immediately
|
|
Batched, // Batch and deliver periodically
|
|
OnReconnect // Only deliver when client reconnects
|
|
}
|
|
```
|
|
|
|
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/PersistentSubscription.cs` (173 lines)
|
|
Domain model with:
|
|
- **Properties**: Id, SubscriberId, CorrelationId, EventTypes (filter), TerminalEventTypes
|
|
- **Tracking**: LastDeliveredSequence, CreatedAt, ExpiresAt, CompletedAt
|
|
- **Lifecycle**: `Complete()`, `Cancel()`, `Expire()`, `Pause()`, `Resume()`
|
|
- **Filtering**: `ShouldDeliverEventType()`, `IsTerminalEvent()`, `CanReceiveEvents`
|
|
|
|
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionStore.cs`
|
|
Persistence interface:
|
|
- `CreateAsync()`, `GetByIdAsync()`, `GetBySubscriberIdAsync()`
|
|
- `GetByCorrelationIdAsync()`, `GetByStatusAsync()`, `GetByConnectionIdAsync()`
|
|
- `UpdateAsync()`, `DeleteAsync()`, `GetExpiredSubscriptionsAsync()`
|
|
|
|
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/ISubscriptionManager.cs`
|
|
Lifecycle management:
|
|
- `CreateSubscriptionAsync()` - Create with event filters and terminal events
|
|
- `MarkEventDeliveredAsync()` - Track delivery progress
|
|
- `CompleteSubscriptionAsync()`, `CancelSubscriptionAsync()`
|
|
- `PauseSubscriptionAsync()`, `ResumeSubscriptionAsync()`
|
|
- `AttachConnectionAsync()`, `DetachConnectionAsync()`
|
|
- `CleanupExpiredSubscriptionsAsync()`
|
|
|
|
#### `Svrnty.CQRS.Events.Abstractions/Subscriptions/IEventDeliveryService.cs`
|
|
Event routing:
|
|
- `DeliverEventAsync()` - Deliver to all matching subscriptions
|
|
- `CatchUpSubscriptionAsync()` - Deliver missed events
|
|
- `GetPendingEventsAsync()` - Query undelivered events
|
|
|
|
---
|
|
|
|
## 🔧 Phase 8.2: Subscription Manager
|
|
|
|
### Files Created
|
|
|
|
#### `Svrnty.CQRS.Events/Subscriptions/SubscriptionManager.cs` (234 lines)
|
|
Default implementation:
|
|
- Creates subscriptions with GUID IDs
|
|
- Tracks delivery progress via LastDeliveredSequence
|
|
- Implements full lifecycle (create, pause, resume, cancel, complete)
|
|
- Connection management (attach/detach)
|
|
- Automatic expiration cleanup
|
|
|
|
#### `Svrnty.CQRS.Events/Subscriptions/InMemorySubscriptionStore.cs`
|
|
Development storage using `ConcurrentDictionary`:
|
|
- Thread-safe in-memory storage
|
|
- Query by correlation ID, subscriber ID, status, connection ID
|
|
- Expiration detection via `DateTimeOffset` comparison
|
|
|
|
---
|
|
|
|
## 📨 Phase 8.3: Event Delivery Service
|
|
|
|
### Files Created
|
|
|
|
#### `Svrnty.CQRS.Events/Subscriptions/EventDeliveryService.cs` (194 lines)
|
|
Core delivery logic:
|
|
- Matches events to subscriptions by correlation ID
|
|
- Filters events by event type name
|
|
- Respects delivery modes (Immediate, Batched, OnReconnect)
|
|
- Detects and processes terminal events
|
|
- Catch-up logic for missed events
|
|
- Integration with `IEventStreamStore.ReadStreamAsync()`
|
|
|
|
**Key Method**:
|
|
```csharp
|
|
public async Task<int> DeliverEventAsync(
|
|
string correlationId,
|
|
ICorrelatedEvent @event,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
// Get all active subscriptions for correlation
|
|
// Filter by event type
|
|
// Check delivery mode
|
|
// Detect terminal events → Complete subscription
|
|
return deliveredCount;
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## ⏱️ Phase 8.4: Catch-up Mechanism
|
|
|
|
Integrated into `EventDeliveryService.CatchUpSubscriptionAsync()`:
|
|
- Reads events from stream starting at `LastDeliveredSequence + 1`
|
|
- Filters by event type preferences
|
|
- Stops at terminal events
|
|
- Updates sequence tracking
|
|
|
|
---
|
|
|
|
## 🗄️ Phase 8.5: PostgreSQL Storage
|
|
|
|
### Files Created
|
|
|
|
#### `Svrnty.CQRS.Events.PostgreSQL/Migrations/009_PersistentSubscriptions.sql`
|
|
```sql
|
|
CREATE TABLE persistent_subscriptions (
|
|
id TEXT PRIMARY KEY,
|
|
subscriber_id TEXT NOT NULL,
|
|
correlation_id TEXT NOT NULL,
|
|
event_types JSONB NOT NULL DEFAULT '[]'::jsonb,
|
|
terminal_event_types JSONB NOT NULL DEFAULT '[]'::jsonb,
|
|
delivery_mode INT NOT NULL DEFAULT 0,
|
|
last_delivered_sequence BIGINT NOT NULL DEFAULT -1,
|
|
status INT NOT NULL DEFAULT 0,
|
|
connection_id TEXT NULL,
|
|
...
|
|
);
|
|
|
|
-- Indexes for hot paths
|
|
CREATE INDEX idx_persistent_subscriptions_correlation_id ON ...;
|
|
CREATE INDEX idx_persistent_subscriptions_correlation_active ON ... WHERE status = 0;
|
|
```
|
|
|
|
#### `Svrnty.CQRS.Events.PostgreSQL/Subscriptions/PostgresSubscriptionStore.cs` (330 lines)
|
|
Production storage:
|
|
- JSONB for event type arrays
|
|
- Indexed queries by correlation ID (hot path)
|
|
- Reflection-based property setting for private setters
|
|
- UPSERT pattern for updates
|
|
|
|
#### Service Registration
|
|
```csharp
|
|
services.AddPostgresSubscriptionStore();
|
|
```
|
|
|
|
---
|
|
|
|
## 🔄 Phase 8.6: Enhanced SignalR Hub
|
|
|
|
### Files Created
|
|
|
|
#### `Svrnty.CQRS.Events.SignalR/PersistentSubscriptionHub.cs` (370 lines)
|
|
WebSocket protocol implementation:
|
|
|
|
**Client Methods**:
|
|
- `CreateSubscription(request)` - Create persistent subscription
|
|
- `AttachSubscription(subscriptionId)` - Reconnect to existing subscription
|
|
- `DetachSubscription(subscriptionId)` - Temporarily disconnect
|
|
- `CancelSubscription(subscriptionId)` - Permanently cancel
|
|
- `CatchUp(subscriptionId)` - Request missed events
|
|
- `PauseSubscription(subscriptionId)`, `ResumeSubscription(subscriptionId)`
|
|
- `GetMySubscriptions(subscriberId)` - Query user's subscriptions
|
|
|
|
**Server Events** (pushed to clients):
|
|
- `SubscriptionCreated` - Confirmation with subscription ID
|
|
- `EventReceived` - New event delivered
|
|
- `SubscriptionCompleted` - Terminal event received
|
|
- `CatchUpComplete` - Catch-up finished
|
|
- `Error` - Error occurred
|
|
|
|
**Request Model**:
|
|
```csharp
|
|
public class CreateSubscriptionRequest
|
|
{
|
|
public required string SubscriberId { get; init; }
|
|
public required string CorrelationId { get; init; }
|
|
public List<string>? EventTypes { get; init; }
|
|
public List<string>? TerminalEventTypes { get; init; }
|
|
public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate;
|
|
public DateTimeOffset? ExpiresAt { get; init; }
|
|
public string? DataSourceId { get; init; }
|
|
}
|
|
```
|
|
|
|
#### Updated `Svrnty.CQRS.Events.SignalR/ServiceCollectionExtensions.cs`
|
|
Added extension methods:
|
|
```csharp
|
|
services.AddPersistentSubscriptionHub();
|
|
app.MapPersistentSubscriptionHub("/hubs/subscriptions");
|
|
```
|
|
|
|
---
|
|
|
|
## ⚙️ Phase 8.7: Command Integration
|
|
|
|
### Files Created
|
|
|
|
#### `Svrnty.CQRS.Events/Subscriptions/SubscriptionDeliveryHostedService.cs` (154 lines)
|
|
Background service for automatic event delivery:
|
|
- Polls every 500ms for new events
|
|
- Groups subscriptions by correlation ID
|
|
- Reads new events from streams
|
|
- Filters by event type
|
|
- Detects terminal events
|
|
- Cleans up expired subscriptions
|
|
|
|
**Processing Flow**:
|
|
```
|
|
1. Get all Active subscriptions
|
|
2. Group by CorrelationId
|
|
3. For each correlation:
|
|
a. Find min LastDeliveredSequence
|
|
b. Read new events from stream
|
|
c. For each subscription:
|
|
- Filter by EventTypes
|
|
- Check DeliveryMode
|
|
- Mark as delivered
|
|
- Check for TerminalEvent → Complete
|
|
4. Cleanup expired subscriptions
|
|
```
|
|
|
|
#### `Svrnty.CQRS.Events/Subscriptions/SubscriptionEventPublisherDecorator.cs`
|
|
Decorator pattern for `IEventPublisher`:
|
|
- Wraps event publishing
|
|
- Triggers background delivery (fire-and-forget)
|
|
- Non-blocking design
|
|
|
|
#### Service Registration
|
|
```csharp
|
|
services.AddPersistentSubscriptions(
|
|
useInMemoryStore: !usePostgreSQL,
|
|
enableBackgroundDelivery: true);
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 Phase 8.8: Sample Implementation
|
|
|
|
### Files Created
|
|
|
|
#### `Svrnty.Sample/Invitations/InvitationEvents.cs`
|
|
Event definitions:
|
|
- `InvitationSentEvent`
|
|
- `InvitationAcceptedEvent` (Terminal)
|
|
- `InvitationDeclinedEvent` (Terminal)
|
|
- `InvitationReminderSentEvent`
|
|
|
|
#### `Svrnty.Sample/Invitations/InvitationCommands.cs`
|
|
Commands:
|
|
- `SendInvitationCommand` → Returns `SendInvitationResult` with SubscriptionId
|
|
- `AcceptInvitationCommand`, `DeclineInvitationCommand`
|
|
- `SendInvitationReminderCommand`
|
|
|
|
#### `Svrnty.Sample/Invitations/InvitationCommandHandlers.cs` (220 lines)
|
|
Handlers demonstrating integration:
|
|
|
|
**SendInvitationCommandHandler**:
|
|
```csharp
|
|
1. Generate invitationId and correlationId = $"invitation-{invitationId}"
|
|
2. Publish InvitationSentEvent with correlation
|
|
3. Optionally create PersistentSubscription:
|
|
- EventTypes: [InvitationAccepted, InvitationDeclined, InvitationReminder]
|
|
- TerminalEventTypes: [InvitationAccepted, InvitationDeclined]
|
|
- Delivery: Immediate
|
|
- Expires: 30 days
|
|
4. Return {InvitationId, CorrelationId, SubscriptionId}
|
|
```
|
|
|
|
#### `Svrnty.Sample/Invitations/InvitationEndpoints.cs`
|
|
HTTP API:
|
|
```
|
|
POST /api/invitations/send
|
|
POST /api/invitations/{id}/accept
|
|
POST /api/invitations/{id}/decline
|
|
POST /api/invitations/{id}/reminder
|
|
GET /api/invitations/subscriptions/{subscriptionId}
|
|
POST /api/invitations/subscriptions/{subscriptionId}/cancel
|
|
GET /api/invitations/subscriptions/{subscriptionId}/pending
|
|
```
|
|
|
|
#### `Program.cs` Integration
|
|
Added:
|
|
```csharp
|
|
// Services
|
|
builder.Services.AddSignalR();
|
|
builder.Services.AddPersistentSubscriptions(useInMemoryStore: !usePostgreSQL);
|
|
if (usePostgreSQL) {
|
|
builder.Services.AddPostgresSubscriptionStore();
|
|
}
|
|
builder.Services.AddPersistentSubscriptionHub();
|
|
|
|
// Command handlers
|
|
builder.Services.AddCommand<SendInvitationCommand, SendInvitationResult, SendInvitationCommandHandler>();
|
|
builder.Services.AddCommand<AcceptInvitationCommand, AcceptInvitationCommandHandler>();
|
|
...
|
|
|
|
// Endpoints
|
|
app.MapPersistentSubscriptionHub("/hubs/subscriptions");
|
|
app.MapInvitationEndpoints();
|
|
```
|
|
|
|
---
|
|
|
|
## 🚧 Known Issues
|
|
|
|
### 1. Naming Conflicts (Blocking Compilation)
|
|
|
|
There are ambiguous type references with existing interfaces from earlier phases:
|
|
|
|
**Conflicts:**
|
|
- `IEventDeliveryService` exists in both:
|
|
- `Svrnty.CQRS.Events.Abstractions` (from earlier phase)
|
|
- `Svrnty.CQRS.Events.Abstractions.Subscriptions` (Phase 8)
|
|
|
|
- `ISubscriptionStore` exists in both:
|
|
- `Svrnty.CQRS.Events.Abstractions` (from earlier phase)
|
|
- `Svrnty.CQRS.Events.Abstractions.Subscriptions` (Phase 8)
|
|
|
|
**Resolution Options:**
|
|
1. **Rename Phase 8 interfaces** (Recommended):
|
|
- `IEventDeliveryService` → `ISubscriptionEventDeliveryService`
|
|
- `ISubscriptionStore` → `IPersistentSubscriptionStore`
|
|
|
|
2. **Use namespace aliases** in implementation files:
|
|
```csharp
|
|
using SubscriptionDelivery = Svrnty.CQRS.Events.Abstractions.Subscriptions.IEventDeliveryService;
|
|
```
|
|
|
|
3. **Consolidate interfaces** if they serve similar purposes
|
|
|
|
### 2. EventData vs ICorrelatedEvent
|
|
|
|
The implementation uses `ICorrelatedEvent` from the existing event system, but doesn't have access to sequence numbers directly. The current design tracks sequences via `LastDeliveredSequence` on subscriptions, but this needs to be mapped to stream offsets from `IEventStreamStore.ReadStreamAsync()`.
|
|
|
|
**Current Workaround**:
|
|
- Using stream offset as implicit sequence
|
|
- `LastDeliveredSequence` maps to `fromOffset` parameter
|
|
|
|
**Better Approach**:
|
|
- Wrap `ICorrelatedEvent` with metadata (offset, sequence)
|
|
- Or extend event store to return enriched event data
|
|
|
|
### 3. Event Type Name Resolution
|
|
|
|
Currently using `@event.GetType().Name` which assumes:
|
|
- Event types are uniquely named
|
|
- No namespace collisions
|
|
- No assembly versioning issues
|
|
|
|
**Better Approach**:
|
|
- Use fully qualified type names
|
|
- Or event type registry with string keys
|
|
|
|
---
|
|
|
|
## 📦 Package Structure
|
|
|
|
```
|
|
Svrnty.CQRS.Events.Abstractions/
|
|
└── Subscriptions/
|
|
├── PersistentSubscription.cs (domain model)
|
|
├── SubscriptionTypes.cs (enums)
|
|
├── ISubscriptionStore.cs
|
|
├── ISubscriptionManager.cs
|
|
└── IEventDeliveryService.cs
|
|
|
|
Svrnty.CQRS.Events/
|
|
└── Subscriptions/
|
|
├── SubscriptionManager.cs
|
|
├── InMemorySubscriptionStore.cs
|
|
├── EventDeliveryService.cs
|
|
├── SubscriptionDeliveryHostedService.cs
|
|
├── SubscriptionEventPublisherDecorator.cs
|
|
└── ServiceCollectionExtensions.cs
|
|
|
|
Svrnty.CQRS.Events.PostgreSQL/
|
|
├── Migrations/
|
|
│ └── 009_PersistentSubscriptions.sql
|
|
└── Subscriptions/
|
|
├── PostgresSubscriptionStore.cs
|
|
└── ServiceCollectionExtensions.cs
|
|
|
|
Svrnty.CQRS.Events.SignalR/
|
|
├── PersistentSubscriptionHub.cs
|
|
└── ServiceCollectionExtensions.cs (updated)
|
|
|
|
Svrnty.Sample/
|
|
└── Invitations/
|
|
├── InvitationEvents.cs
|
|
├── InvitationCommands.cs
|
|
├── InvitationCommandHandlers.cs
|
|
└── InvitationEndpoints.cs
|
|
```
|
|
|
|
---
|
|
|
|
## 🎓 Key Design Patterns
|
|
|
|
### 1. Persistent Subscription Pattern
|
|
- Subscriptions survive disconnections
|
|
- Sequence-based catch-up
|
|
- Terminal event completion
|
|
|
|
### 2. Correlation-Based Filtering
|
|
- Events grouped by correlation ID (command execution)
|
|
- Selective event type delivery
|
|
- Terminal events auto-complete
|
|
|
|
### 3. Multiple Delivery Modes
|
|
- **Immediate**: Push as events occur
|
|
- **Batched**: Periodic batch delivery
|
|
- **OnReconnect**: Only deliver on client request
|
|
|
|
### 4. Background Processing
|
|
- Hosted service polls for new events
|
|
- Automatic delivery to active subscriptions
|
|
- Automatic expiration cleanup
|
|
|
|
### 5. Repository + Manager Pattern
|
|
- `ISubscriptionStore` = data access
|
|
- `ISubscriptionManager` = business logic + lifecycle
|
|
|
|
---
|
|
|
|
## 📝 Next Steps to Complete Phase 8
|
|
|
|
1. **Resolve Naming Conflicts** (HIGH PRIORITY):
|
|
- Rename interfaces to avoid ambiguity
|
|
- Update all references
|
|
- Ensure clean compilation
|
|
|
|
2. **Fix Event Sequence Tracking**:
|
|
- Map stream offsets to subscription sequences
|
|
- Ensure accurate catch-up logic
|
|
|
|
3. **Complete Integration Testing**:
|
|
- Test invitation workflow end-to-end
|
|
- Verify terminal event completion
|
|
- Test catch-up after disconnect
|
|
|
|
4. **Implement Batched Delivery Mode**:
|
|
- Currently Batched mode is placeholder
|
|
- Add batch aggregation logic
|
|
- Add batch delivery timer
|
|
|
|
5. **Add SignalR Push Notifications**:
|
|
- Currently delivery happens in background
|
|
- Need to push events via SignalR when client is connected
|
|
- Integrate `IHubContext` for server-initiated pushes
|
|
|
|
6. **Testing Scenarios**:
|
|
```bash
|
|
# 1. Send invitation
|
|
curl -X POST http://localhost:6001/api/invitations/send \
|
|
-H "Content-Type: application/json" \
|
|
-d '{
|
|
"inviterUserId": "user1",
|
|
"inviteeEmail": "user2@example.com",
|
|
"message": "Join our team!",
|
|
"createSubscription": true
|
|
}'
|
|
# Returns: {invitationId, correlationId, subscriptionId}
|
|
|
|
# 2. Accept invitation (triggers terminal event)
|
|
curl -X POST http://localhost:6001/api/invitations/{id}/accept \
|
|
-H "Content-Type: application/json" \
|
|
-d '{"acceptedByUserId": "user2"}'
|
|
|
|
# 3. Check subscription status (should be Completed)
|
|
curl http://localhost:6001/api/invitations/subscriptions/{subscriptionId}
|
|
```
|
|
|
|
---
|
|
|
|
## 🎯 Phase 8 Summary
|
|
|
|
**Created**: 15+ new files, 2000+ lines of code
|
|
|
|
**Core Capabilities**:
|
|
- ✅ Persistent subscriptions with correlation filtering
|
|
- ✅ Selective event type delivery
|
|
- ✅ Terminal event auto-completion
|
|
- ✅ Catch-up mechanism for missed events
|
|
- ✅ Multiple delivery modes
|
|
- ✅ PostgreSQL persistent storage
|
|
- ✅ SignalR WebSocket protocol
|
|
- ✅ Background delivery service
|
|
- ✅ Sample invitation workflow
|
|
- ⚠️ Naming conflicts need resolution
|
|
- ⚠️ SignalR push integration incomplete
|
|
|
|
**Architecture**:
|
|
- Clean separation: Abstractions → Implementation → Storage → Transport
|
|
- Supports in-memory (dev) and PostgreSQL (prod)
|
|
- Background hosted service for automatic delivery
|
|
- SignalR for real-time client communication
|
|
- Event-driven with terminal event support
|
|
|
|
**Future Enhancements**:
|
|
- Subscription groups (multiple subscribers per subscription)
|
|
- Subscription templates (pre-configured filters)
|
|
- Delivery guarantees (at-least-once, exactly-once)
|
|
- Dead letter queue for failed deliveries
|
|
- Subscription analytics and monitoring
|
|
- GraphQL subscription integration
|