# Phase 7: Advanced Features - Implementation Summary **Status**: ✅ **COMPLETED** Phase 7 adds three advanced features to the Svrnty.CQRS event streaming framework: Event Sourcing Projections, SignalR Integration, and Saga Orchestration. --- ## 📊 Phase 7.1: Event Sourcing Projections ### Overview Build materialized read models from event streams using the catch-up subscription pattern with checkpointing and automatic recovery. ### Key Components #### Abstractions (`Svrnty.CQRS.Events.Abstractions/Projections/`) - **`IProjection`** - Typed projection interface for specific event types - **`IDynamicProjection`** - Dynamic projection handling multiple event types via pattern matching - **`IResettableProjection`** - Optional interface for projections that support rebuilding - **`IProjectionCheckpointStore`** - Persistent checkpoint storage interface - **`IProjectionEngine`** - Core projection execution engine interface - **`IProjectionRegistry`** - Thread-safe projection definition registry - **`ProjectionOptions`** - Configuration: batch size, retry policy, polling interval #### Core Implementation (`Svrnty.CQRS.Events/Projections/`) - **`ProjectionEngine`** (~300 lines) - Catch-up subscription pattern - Exponential backoff retry: `delay = baseDelay × 2^attempt` - Batch processing with configurable size - Per-event or per-batch checkpointing - Continuous polling for new events - **`ProjectionRegistry`** - Thread-safe ConcurrentDictionary-based registry - **`InMemoryProjectionCheckpointStore`** - Development storage - **`ProjectionHostedService`** - Auto-start background service #### PostgreSQL Storage (`Svrnty.CQRS.Events.PostgreSQL/`) - **Migration**: `007_ProjectionCheckpoints.sql` - Composite primary key: `(projection_name, stream_name)` - Tracks: last offset, events processed, error state - **`PostgresProjectionCheckpointStore`** - UPSERT-based atomic updates: `INSERT ... ON CONFLICT ... DO UPDATE` - Thread-safe for concurrent projection instances #### Sample Implementation (`Svrnty.Sample/Projections/`) - **`UserStatistics.cs`** - Read model tracking user additions/removals - **`UserStatisticsProjection.cs`** - Dynamic projection processing UserWorkflow events - **Endpoint**: `GET /api/projections/user-statistics` ### Configuration Example ```csharp services.AddProjections(useInMemoryCheckpoints: !usePostgreSQL); if (usePostgreSQL) { services.AddPostgresProjectionCheckpointStore(); } services.AddDynamicProjection( projectionName: "user-statistics", streamName: "UserWorkflow", configure: options => { options.BatchSize = 50; options.AutoStart = true; options.MaxRetries = 3; options.CheckpointPerEvent = false; options.PollingInterval = TimeSpan.FromSeconds(1); }); ``` ### Key Features - ✅ Idempotent event processing - ✅ Automatic checkpoint recovery on restart - ✅ Exponential backoff retry on failures - ✅ Projection rebuilding support - ✅ Both typed and dynamic projections - ✅ In-memory (dev) and PostgreSQL (prod) storage --- ## 🔄 Phase 7.2: SignalR Integration ### Overview Real-time event streaming to browser clients via WebSockets using ASP.NET Core SignalR. ### Key Components #### Package: `Svrnty.CQRS.Events.SignalR` - **`EventStreamHub.cs`** (~220 lines) - Per-connection subscription tracking with `ConcurrentDictionary` - Independent Task per subscription with CancellationToken - Automatic cleanup on disconnect - Batch reading with configurable offset #### Hub Methods ```csharp // Client-callable methods Task SubscribeToStream(string streamName, long? startFromOffset = null) Task UnsubscribeFromStream(string streamName) // Server-to-client callbacks await Clients.Caller.SendAsync("SubscriptionConfirmed", streamName); await Clients.Client(connectionId).SendAsync("EventReceived", streamName, eventData); await Clients.Caller.SendAsync("Error", errorMessage); ``` ### Registration Example ```csharp // Server-side services.AddEventStreamHub(); app.MapEventStreamHub("/hubs/events"); // Client-side (JavaScript) const connection = new signalR.HubConnectionBuilder() .withUrl("/hubs/events") .build(); connection.on("EventReceived", (streamName, event) => { console.log("Received event:", event); }); await connection.start(); await connection.invoke("SubscribeToStream", "UserWorkflow", 0); ``` ### Key Features - ✅ WebSocket-based real-time streaming - ✅ Multiple concurrent subscriptions per connection - ✅ Start from specific offset (catch-up + real-time) - ✅ Automatic connection cleanup - ✅ Thread-safe subscription management --- ## 🔀 Phase 7.3: Saga Orchestration ### Overview Long-running business processes with compensation pattern (not two-phase commit). Steps execute sequentially; on failure, completed steps compensate in reverse order. ### Key Components #### Abstractions (`Svrnty.CQRS.Events.Abstractions/Sagas/`) **Core Interfaces:** ```csharp public interface ISaga { string SagaId { get; } string CorrelationId { get; } string SagaName { get; } } public interface ISagaStep { string StepName { get; } Task ExecuteAsync(ISagaContext context, CancellationToken cancellationToken); Task CompensateAsync(ISagaContext context, CancellationToken cancellationToken); } public interface ISagaContext { ISaga Saga { get; } SagaState State { get; } ISagaData Data { get; } T? Get(string key); void Set(string key, T value); } ``` **State Machine:** ``` NotStarted → Running → Completed ↓ Compensating → Compensated ↓ Failed ``` **`ISagaOrchestrator`** - Lifecycle management: - `StartSagaAsync()` - Initialize and execute - `ResumeSagaAsync()` - Resume paused saga - `CancelSagaAsync()` - Trigger compensation - `GetStatusAsync()` - Query saga state **`ISagaStateStore`** - Persistent state storage: - `SaveStateAsync()` - UPSERT saga state - `LoadStateAsync()` - Restore saga state - `GetByCorrelationIdAsync()` - Multi-saga workflows - `GetByStateAsync()` - Query by state #### Core Implementation (`Svrnty.CQRS.Events/Sagas/`) **`SagaOrchestrator.cs`** - Core execution engine: - Fire-and-forget execution pattern - Sequential step execution with checkpointing - Reverse-order compensation on failure - Saga instance reconstruction from snapshots - Uses ActivatorUtilities for DI-enabled saga construction **`SagaRegistry.cs`** - Thread-safe definition storage: - ConcurrentDictionary-based - Type-to-definition mapping - Name-to-definition mapping - Name-to-type mapping **`SagaData.cs`** - In-memory data storage: - Dictionary-based key-value store - Type conversion support via `Convert.ChangeType` - Serializable for persistence **`InMemorySagaStateStore.cs`** - Development storage: - ConcurrentDictionary for state - Correlation ID indexing - State-based queries #### PostgreSQL Storage (`Svrnty.CQRS.Events.PostgreSQL/`) **Migration**: `008_SagaState.sql` ```sql CREATE TABLE saga_states ( saga_id TEXT PRIMARY KEY, correlation_id TEXT NOT NULL, saga_name TEXT NOT NULL, state INT NOT NULL, current_step INT NOT NULL, total_steps INT NOT NULL, completed_steps JSONB NOT NULL, data JSONB NOT NULL, ... ); CREATE INDEX idx_saga_states_correlation_id ON saga_states (correlation_id); CREATE INDEX idx_saga_states_state ON saga_states (state); ``` **`PostgresSagaStateStore.cs`**: - JSONB storage for steps and data - UPSERT for atomic state updates - Indexed queries by correlation ID and state #### Sample Implementation (`Svrnty.Sample/Sagas/`) **`OrderFulfillmentSaga`** - 3-step workflow: 1. **Reserve Inventory** - Execute: Reserve items in inventory system - Compensate: Release reservation 2. **Authorize Payment** - Execute: Get payment authorization from payment gateway - Compensate: Void authorization - **Failure Point**: Simulated via `FailPayment` flag for testing 3. **Ship Order** - Execute: Create shipment and get tracking number - Compensate: Cancel shipment **Test Scenario: Payment Failure** ``` [Start] → Reserve Inventory ✅ → Authorize Payment ❌ ↓ Compensating... ↓ Void Payment (skipped - never completed) ↓ Release Inventory ✅ ↓ [Compensated] ``` ### HTTP Endpoints ``` POST /api/sagas/order-fulfillment/start Body: { "orderId": "ORD-123", "items": [...], "amount": 99.99, "shippingAddress": "123 Main St", "simulatePaymentFailure": false } Response: { "sagaId": "guid", "correlationId": "ORD-123" } GET /api/sagas/{sagaId}/status Response: { "sagaId": "guid", "state": "Running", "progress": "2/3", "currentStep": 2, "totalSteps": 3, "data": {...} } POST /api/sagas/{sagaId}/cancel Response: { "message": "Saga cancellation initiated" } ``` ### Registration Example ```csharp // Infrastructure services.AddSagaOrchestration(useInMemoryStateStore: !usePostgreSQL); if (usePostgreSQL) { services.AddPostgresSagaStateStore(); } // Saga definition services.AddSaga( sagaName: "order-fulfillment", configure: definition => { definition.AddStep( stepName: "ReserveInventory", execute: OrderFulfillmentSteps.ReserveInventoryAsync, compensate: OrderFulfillmentSteps.CompensateReserveInventoryAsync); definition.AddStep( stepName: "AuthorizePayment", execute: OrderFulfillmentSteps.AuthorizePaymentAsync, compensate: OrderFulfillmentSteps.CompensateAuthorizePaymentAsync); definition.AddStep( stepName: "ShipOrder", execute: OrderFulfillmentSteps.ShipOrderAsync, compensate: OrderFulfillmentSteps.CompensateShipOrderAsync); }); ``` ### Key Features - ✅ Compensation pattern (not 2PC) - ✅ Sequential execution with checkpointing - ✅ Reverse-order compensation - ✅ Persistent state across restarts - ✅ Correlation ID for multi-saga workflows - ✅ State-based queries - ✅ Pause/resume support - ✅ Manual cancellation - ✅ In-memory (dev) and PostgreSQL (prod) storage --- ## 📦 New Packages Created ### Svrnty.CQRS.Events.SignalR - **Purpose**: Real-time event streaming to browser clients - **Dependencies**: ASP.NET Core SignalR, Svrnty.CQRS.Events.Abstractions - **Key Type**: `EventStreamHub` --- ## 🗄️ Database Migrations ### 007_ProjectionCheckpoints.sql ```sql CREATE TABLE projection_checkpoints ( projection_name TEXT NOT NULL, stream_name TEXT NOT NULL, last_processed_offset BIGINT NOT NULL DEFAULT -1, last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(), events_processed BIGINT NOT NULL DEFAULT 0, last_error TEXT NULL, last_error_at TIMESTAMPTZ NULL, CONSTRAINT pk_projection_checkpoints PRIMARY KEY (projection_name, stream_name) ); ``` ### 008_SagaState.sql ```sql CREATE TABLE saga_states ( saga_id TEXT PRIMARY KEY, correlation_id TEXT NOT NULL, saga_name TEXT NOT NULL, state INT NOT NULL, current_step INT NOT NULL, total_steps INT NOT NULL, completed_steps JSONB NOT NULL DEFAULT '[]'::jsonb, data JSONB NOT NULL DEFAULT '{}'::jsonb, started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(), completed_at TIMESTAMPTZ NULL, error_message TEXT NULL ); CREATE INDEX idx_saga_states_correlation_id ON saga_states (correlation_id); CREATE INDEX idx_saga_states_state ON saga_states (state); ``` --- ## 🎯 Testing the Implementation ### Test Projection ```bash # Start the application cd Svrnty.Sample dotnet run # Query projection status curl http://localhost:6001/api/projections/user-statistics ``` ### Test SignalR (JavaScript) ```javascript const connection = new signalR.HubConnectionBuilder() .withUrl("http://localhost:6001/hubs/events") .build(); connection.on("EventReceived", (streamName, event) => { console.log(`[${streamName}] ${event.EventType}:`, event.Data); }); await connection.start(); await connection.invoke("SubscribeToStream", "UserWorkflow", 0); ``` ### Test Saga - Success Path ```bash curl -X POST http://localhost:6001/api/sagas/order-fulfillment/start \ -H "Content-Type: application/json" \ -d '{ "orderId": "ORD-001", "items": [ { "productId": "PROD-123", "productName": "Widget", "quantity": 2, "price": 49.99 } ], "amount": 99.98, "shippingAddress": "123 Main St, City, ST 12345", "simulatePaymentFailure": false }' # Check status curl http://localhost:6001/api/sagas/{sagaId}/status ``` ### Test Saga - Compensation Path ```bash curl -X POST http://localhost:6001/api/sagas/order-fulfillment/start \ -H "Content-Type: application/json" \ -d '{ "orderId": "ORD-002", "items": [...], "amount": 99.98, "shippingAddress": "123 Main St", "simulatePaymentFailure": true }' # Console output will show: # [SAGA] Reserving inventory for order ORD-002... # [SAGA] Inventory reserved: {guid} # [SAGA] Authorizing payment for order ORD-002: $99.98... # [ERROR] Payment authorization failed: Insufficient funds # [SAGA] COMPENSATING: Releasing inventory reservation {guid}... # [SAGA] COMPENSATING: Inventory released # [SAGA] Saga 'order-fulfillment' compensation completed ``` --- ## 📊 Build Status **Solution Build**: ✅ **SUCCESS** - **Projects**: 12 (including new SignalR package) - **Errors**: 0 - **Warnings**: 20 (package pruning + API deprecation warnings) - **Build Time**: ~2 seconds --- ## 🎓 Key Design Patterns ### Event Sourcing Projections - **Pattern**: Catch-up Subscription - **Retry**: Exponential Backoff - **Persistence**: Checkpoint-based Recovery ### SignalR Integration - **Pattern**: Observer (Pub/Sub) - **Lifecycle**: Per-Connection Management - **Concurrency**: CancellationToken-based ### Saga Orchestration - **Pattern**: Saga (Compensation-based) - **Execution**: Sequential with Checkpointing - **Recovery**: Reverse-order Compensation - **Not Used**: Two-Phase Commit (2PC) --- ## 📝 Next Steps Phase 7 is complete! The framework now includes: 1. ✅ Event Sourcing Projections for building read models 2. ✅ SignalR Integration for real-time browser notifications 3. ✅ Saga Orchestration for long-running workflows All implementations support both in-memory (development) and PostgreSQL (production) storage backends. **Future Enhancements:** - Projection snapshots for faster rebuilds - Saga timeout handling - SignalR backpressure management - Distributed saga coordination - Projection monitoring dashboard