dotnet-cqrs/PHASE_7_SUMMARY.md

508 lines
14 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Phase 7: Advanced Features - Implementation Summary
**Status**: ✅ **COMPLETED**
Phase 7 adds three advanced features to the Svrnty.CQRS event streaming framework: Event Sourcing Projections, SignalR Integration, and Saga Orchestration.
---
## 📊 Phase 7.1: Event Sourcing Projections
### Overview
Build materialized read models from event streams using the catch-up subscription pattern with checkpointing and automatic recovery.
### Key Components
#### Abstractions (`Svrnty.CQRS.Events.Abstractions/Projections/`)
- **`IProjection<TEvent>`** - Typed projection interface for specific event types
- **`IDynamicProjection`** - Dynamic projection handling multiple event types via pattern matching
- **`IResettableProjection`** - Optional interface for projections that support rebuilding
- **`IProjectionCheckpointStore`** - Persistent checkpoint storage interface
- **`IProjectionEngine`** - Core projection execution engine interface
- **`IProjectionRegistry`** - Thread-safe projection definition registry
- **`ProjectionOptions`** - Configuration: batch size, retry policy, polling interval
#### Core Implementation (`Svrnty.CQRS.Events/Projections/`)
- **`ProjectionEngine`** (~300 lines)
- Catch-up subscription pattern
- Exponential backoff retry: `delay = baseDelay × 2^attempt`
- Batch processing with configurable size
- Per-event or per-batch checkpointing
- Continuous polling for new events
- **`ProjectionRegistry`** - Thread-safe ConcurrentDictionary-based registry
- **`InMemoryProjectionCheckpointStore`** - Development storage
- **`ProjectionHostedService`** - Auto-start background service
#### PostgreSQL Storage (`Svrnty.CQRS.Events.PostgreSQL/`)
- **Migration**: `007_ProjectionCheckpoints.sql`
- Composite primary key: `(projection_name, stream_name)`
- Tracks: last offset, events processed, error state
- **`PostgresProjectionCheckpointStore`**
- UPSERT-based atomic updates: `INSERT ... ON CONFLICT ... DO UPDATE`
- Thread-safe for concurrent projection instances
#### Sample Implementation (`Svrnty.Sample/Projections/`)
- **`UserStatistics.cs`** - Read model tracking user additions/removals
- **`UserStatisticsProjection.cs`** - Dynamic projection processing UserWorkflow events
- **Endpoint**: `GET /api/projections/user-statistics`
### Configuration Example
```csharp
services.AddProjections(useInMemoryCheckpoints: !usePostgreSQL);
if (usePostgreSQL)
{
services.AddPostgresProjectionCheckpointStore();
}
services.AddDynamicProjection<UserStatisticsProjection>(
projectionName: "user-statistics",
streamName: "UserWorkflow",
configure: options =>
{
options.BatchSize = 50;
options.AutoStart = true;
options.MaxRetries = 3;
options.CheckpointPerEvent = false;
options.PollingInterval = TimeSpan.FromSeconds(1);
});
```
### Key Features
- ✅ Idempotent event processing
- ✅ Automatic checkpoint recovery on restart
- ✅ Exponential backoff retry on failures
- ✅ Projection rebuilding support
- ✅ Both typed and dynamic projections
- ✅ In-memory (dev) and PostgreSQL (prod) storage
---
## 🔄 Phase 7.2: SignalR Integration
### Overview
Real-time event streaming to browser clients via WebSockets using ASP.NET Core SignalR.
### Key Components
#### Package: `Svrnty.CQRS.Events.SignalR`
- **`EventStreamHub.cs`** (~220 lines)
- Per-connection subscription tracking with `ConcurrentDictionary<connectionId, subscriptions>`
- Independent Task per subscription with CancellationToken
- Automatic cleanup on disconnect
- Batch reading with configurable offset
#### Hub Methods
```csharp
// Client-callable methods
Task SubscribeToStream(string streamName, long? startFromOffset = null)
Task UnsubscribeFromStream(string streamName)
// Server-to-client callbacks
await Clients.Caller.SendAsync("SubscriptionConfirmed", streamName);
await Clients.Client(connectionId).SendAsync("EventReceived", streamName, eventData);
await Clients.Caller.SendAsync("Error", errorMessage);
```
### Registration Example
```csharp
// Server-side
services.AddEventStreamHub();
app.MapEventStreamHub("/hubs/events");
// Client-side (JavaScript)
const connection = new signalR.HubConnectionBuilder()
.withUrl("/hubs/events")
.build();
connection.on("EventReceived", (streamName, event) => {
console.log("Received event:", event);
});
await connection.start();
await connection.invoke("SubscribeToStream", "UserWorkflow", 0);
```
### Key Features
- ✅ WebSocket-based real-time streaming
- ✅ Multiple concurrent subscriptions per connection
- ✅ Start from specific offset (catch-up + real-time)
- ✅ Automatic connection cleanup
- ✅ Thread-safe subscription management
---
## 🔀 Phase 7.3: Saga Orchestration
### Overview
Long-running business processes with compensation pattern (not two-phase commit). Steps execute sequentially; on failure, completed steps compensate in reverse order.
### Key Components
#### Abstractions (`Svrnty.CQRS.Events.Abstractions/Sagas/`)
**Core Interfaces:**
```csharp
public interface ISaga
{
string SagaId { get; }
string CorrelationId { get; }
string SagaName { get; }
}
public interface ISagaStep
{
string StepName { get; }
Task ExecuteAsync(ISagaContext context, CancellationToken cancellationToken);
Task CompensateAsync(ISagaContext context, CancellationToken cancellationToken);
}
public interface ISagaContext
{
ISaga Saga { get; }
SagaState State { get; }
ISagaData Data { get; }
T? Get<T>(string key);
void Set<T>(string key, T value);
}
```
**State Machine:**
```
NotStarted → Running → Completed
Compensating → Compensated
Failed
```
**`ISagaOrchestrator`** - Lifecycle management:
- `StartSagaAsync<TSaga>()` - Initialize and execute
- `ResumeSagaAsync()` - Resume paused saga
- `CancelSagaAsync()` - Trigger compensation
- `GetStatusAsync()` - Query saga state
**`ISagaStateStore`** - Persistent state storage:
- `SaveStateAsync()` - UPSERT saga state
- `LoadStateAsync()` - Restore saga state
- `GetByCorrelationIdAsync()` - Multi-saga workflows
- `GetByStateAsync()` - Query by state
#### Core Implementation (`Svrnty.CQRS.Events/Sagas/`)
**`SagaOrchestrator.cs`** - Core execution engine:
- Fire-and-forget execution pattern
- Sequential step execution with checkpointing
- Reverse-order compensation on failure
- Saga instance reconstruction from snapshots
- Uses ActivatorUtilities for DI-enabled saga construction
**`SagaRegistry.cs`** - Thread-safe definition storage:
- ConcurrentDictionary-based
- Type-to-definition mapping
- Name-to-definition mapping
- Name-to-type mapping
**`SagaData.cs`** - In-memory data storage:
- Dictionary-based key-value store
- Type conversion support via `Convert.ChangeType`
- Serializable for persistence
**`InMemorySagaStateStore.cs`** - Development storage:
- ConcurrentDictionary for state
- Correlation ID indexing
- State-based queries
#### PostgreSQL Storage (`Svrnty.CQRS.Events.PostgreSQL/`)
**Migration**: `008_SagaState.sql`
```sql
CREATE TABLE saga_states (
saga_id TEXT PRIMARY KEY,
correlation_id TEXT NOT NULL,
saga_name TEXT NOT NULL,
state INT NOT NULL,
current_step INT NOT NULL,
total_steps INT NOT NULL,
completed_steps JSONB NOT NULL,
data JSONB NOT NULL,
...
);
CREATE INDEX idx_saga_states_correlation_id ON saga_states (correlation_id);
CREATE INDEX idx_saga_states_state ON saga_states (state);
```
**`PostgresSagaStateStore.cs`**:
- JSONB storage for steps and data
- UPSERT for atomic state updates
- Indexed queries by correlation ID and state
#### Sample Implementation (`Svrnty.Sample/Sagas/`)
**`OrderFulfillmentSaga`** - 3-step workflow:
1. **Reserve Inventory**
- Execute: Reserve items in inventory system
- Compensate: Release reservation
2. **Authorize Payment**
- Execute: Get payment authorization from payment gateway
- Compensate: Void authorization
- **Failure Point**: Simulated via `FailPayment` flag for testing
3. **Ship Order**
- Execute: Create shipment and get tracking number
- Compensate: Cancel shipment
**Test Scenario: Payment Failure**
```
[Start] → Reserve Inventory ✅ → Authorize Payment ❌
Compensating...
Void Payment (skipped - never completed)
Release Inventory ✅
[Compensated]
```
### HTTP Endpoints
```
POST /api/sagas/order-fulfillment/start
Body: {
"orderId": "ORD-123",
"items": [...],
"amount": 99.99,
"shippingAddress": "123 Main St",
"simulatePaymentFailure": false
}
Response: { "sagaId": "guid", "correlationId": "ORD-123" }
GET /api/sagas/{sagaId}/status
Response: {
"sagaId": "guid",
"state": "Running",
"progress": "2/3",
"currentStep": 2,
"totalSteps": 3,
"data": {...}
}
POST /api/sagas/{sagaId}/cancel
Response: { "message": "Saga cancellation initiated" }
```
### Registration Example
```csharp
// Infrastructure
services.AddSagaOrchestration(useInMemoryStateStore: !usePostgreSQL);
if (usePostgreSQL)
{
services.AddPostgresSagaStateStore();
}
// Saga definition
services.AddSaga<OrderFulfillmentSaga>(
sagaName: "order-fulfillment",
configure: definition =>
{
definition.AddStep(
stepName: "ReserveInventory",
execute: OrderFulfillmentSteps.ReserveInventoryAsync,
compensate: OrderFulfillmentSteps.CompensateReserveInventoryAsync);
definition.AddStep(
stepName: "AuthorizePayment",
execute: OrderFulfillmentSteps.AuthorizePaymentAsync,
compensate: OrderFulfillmentSteps.CompensateAuthorizePaymentAsync);
definition.AddStep(
stepName: "ShipOrder",
execute: OrderFulfillmentSteps.ShipOrderAsync,
compensate: OrderFulfillmentSteps.CompensateShipOrderAsync);
});
```
### Key Features
- ✅ Compensation pattern (not 2PC)
- ✅ Sequential execution with checkpointing
- ✅ Reverse-order compensation
- ✅ Persistent state across restarts
- ✅ Correlation ID for multi-saga workflows
- ✅ State-based queries
- ✅ Pause/resume support
- ✅ Manual cancellation
- ✅ In-memory (dev) and PostgreSQL (prod) storage
---
## 📦 New Packages Created
### Svrnty.CQRS.Events.SignalR
- **Purpose**: Real-time event streaming to browser clients
- **Dependencies**: ASP.NET Core SignalR, Svrnty.CQRS.Events.Abstractions
- **Key Type**: `EventStreamHub`
---
## 🗄️ Database Migrations
### 007_ProjectionCheckpoints.sql
```sql
CREATE TABLE projection_checkpoints (
projection_name TEXT NOT NULL,
stream_name TEXT NOT NULL,
last_processed_offset BIGINT NOT NULL DEFAULT -1,
last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(),
events_processed BIGINT NOT NULL DEFAULT 0,
last_error TEXT NULL,
last_error_at TIMESTAMPTZ NULL,
CONSTRAINT pk_projection_checkpoints PRIMARY KEY (projection_name, stream_name)
);
```
### 008_SagaState.sql
```sql
CREATE TABLE saga_states (
saga_id TEXT PRIMARY KEY,
correlation_id TEXT NOT NULL,
saga_name TEXT NOT NULL,
state INT NOT NULL,
current_step INT NOT NULL,
total_steps INT NOT NULL,
completed_steps JSONB NOT NULL DEFAULT '[]'::jsonb,
data JSONB NOT NULL DEFAULT '{}'::jsonb,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ NULL,
error_message TEXT NULL
);
CREATE INDEX idx_saga_states_correlation_id ON saga_states (correlation_id);
CREATE INDEX idx_saga_states_state ON saga_states (state);
```
---
## 🎯 Testing the Implementation
### Test Projection
```bash
# Start the application
cd Svrnty.Sample
dotnet run
# Query projection status
curl http://localhost:6001/api/projections/user-statistics
```
### Test SignalR (JavaScript)
```javascript
const connection = new signalR.HubConnectionBuilder()
.withUrl("http://localhost:6001/hubs/events")
.build();
connection.on("EventReceived", (streamName, event) => {
console.log(`[${streamName}] ${event.EventType}:`, event.Data);
});
await connection.start();
await connection.invoke("SubscribeToStream", "UserWorkflow", 0);
```
### Test Saga - Success Path
```bash
curl -X POST http://localhost:6001/api/sagas/order-fulfillment/start \
-H "Content-Type: application/json" \
-d '{
"orderId": "ORD-001",
"items": [
{
"productId": "PROD-123",
"productName": "Widget",
"quantity": 2,
"price": 49.99
}
],
"amount": 99.98,
"shippingAddress": "123 Main St, City, ST 12345",
"simulatePaymentFailure": false
}'
# Check status
curl http://localhost:6001/api/sagas/{sagaId}/status
```
### Test Saga - Compensation Path
```bash
curl -X POST http://localhost:6001/api/sagas/order-fulfillment/start \
-H "Content-Type: application/json" \
-d '{
"orderId": "ORD-002",
"items": [...],
"amount": 99.98,
"shippingAddress": "123 Main St",
"simulatePaymentFailure": true
}'
# Console output will show:
# [SAGA] Reserving inventory for order ORD-002...
# [SAGA] Inventory reserved: {guid}
# [SAGA] Authorizing payment for order ORD-002: $99.98...
# [ERROR] Payment authorization failed: Insufficient funds
# [SAGA] COMPENSATING: Releasing inventory reservation {guid}...
# [SAGA] COMPENSATING: Inventory released
# [SAGA] Saga 'order-fulfillment' compensation completed
```
---
## 📊 Build Status
**Solution Build**: ✅ **SUCCESS**
- **Projects**: 12 (including new SignalR package)
- **Errors**: 0
- **Warnings**: 20 (package pruning + API deprecation warnings)
- **Build Time**: ~2 seconds
---
## 🎓 Key Design Patterns
### Event Sourcing Projections
- **Pattern**: Catch-up Subscription
- **Retry**: Exponential Backoff
- **Persistence**: Checkpoint-based Recovery
### SignalR Integration
- **Pattern**: Observer (Pub/Sub)
- **Lifecycle**: Per-Connection Management
- **Concurrency**: CancellationToken-based
### Saga Orchestration
- **Pattern**: Saga (Compensation-based)
- **Execution**: Sequential with Checkpointing
- **Recovery**: Reverse-order Compensation
- **Not Used**: Two-Phase Commit (2PC)
---
## 📝 Next Steps
Phase 7 is complete! The framework now includes:
1. ✅ Event Sourcing Projections for building read models
2. ✅ SignalR Integration for real-time browser notifications
3. ✅ Saga Orchestration for long-running workflows
All implementations support both in-memory (development) and PostgreSQL (production) storage backends.
**Future Enhancements:**
- Projection snapshots for faster rebuilds
- Saga timeout handling
- SignalR backpressure management
- Distributed saga coordination
- Projection monitoring dashboard