14 KiB
Phase 7: Advanced Features - Implementation Summary
Status: ✅ COMPLETED
Phase 7 adds three advanced features to the Svrnty.CQRS event streaming framework: Event Sourcing Projections, SignalR Integration, and Saga Orchestration.
📊 Phase 7.1: Event Sourcing Projections
Overview
Build materialized read models from event streams using the catch-up subscription pattern with checkpointing and automatic recovery.
Key Components
Abstractions (Svrnty.CQRS.Events.Abstractions/Projections/)
IProjection<TEvent>- Typed projection interface for specific event typesIDynamicProjection- Dynamic projection handling multiple event types via pattern matchingIResettableProjection- Optional interface for projections that support rebuildingIProjectionCheckpointStore- Persistent checkpoint storage interfaceIProjectionEngine- Core projection execution engine interfaceIProjectionRegistry- Thread-safe projection definition registryProjectionOptions- Configuration: batch size, retry policy, polling interval
Core Implementation (Svrnty.CQRS.Events/Projections/)
ProjectionEngine(~300 lines)- Catch-up subscription pattern
- Exponential backoff retry:
delay = baseDelay × 2^attempt - Batch processing with configurable size
- Per-event or per-batch checkpointing
- Continuous polling for new events
ProjectionRegistry- Thread-safe ConcurrentDictionary-based registryInMemoryProjectionCheckpointStore- Development storageProjectionHostedService- Auto-start background service
PostgreSQL Storage (Svrnty.CQRS.Events.PostgreSQL/)
- Migration:
007_ProjectionCheckpoints.sql- Composite primary key:
(projection_name, stream_name) - Tracks: last offset, events processed, error state
- Composite primary key:
PostgresProjectionCheckpointStore- UPSERT-based atomic updates:
INSERT ... ON CONFLICT ... DO UPDATE - Thread-safe for concurrent projection instances
- UPSERT-based atomic updates:
Sample Implementation (Svrnty.Sample/Projections/)
UserStatistics.cs- Read model tracking user additions/removalsUserStatisticsProjection.cs- Dynamic projection processing UserWorkflow events- Endpoint:
GET /api/projections/user-statistics
Configuration Example
services.AddProjections(useInMemoryCheckpoints: !usePostgreSQL);
if (usePostgreSQL)
{
services.AddPostgresProjectionCheckpointStore();
}
services.AddDynamicProjection<UserStatisticsProjection>(
projectionName: "user-statistics",
streamName: "UserWorkflow",
configure: options =>
{
options.BatchSize = 50;
options.AutoStart = true;
options.MaxRetries = 3;
options.CheckpointPerEvent = false;
options.PollingInterval = TimeSpan.FromSeconds(1);
});
Key Features
- ✅ Idempotent event processing
- ✅ Automatic checkpoint recovery on restart
- ✅ Exponential backoff retry on failures
- ✅ Projection rebuilding support
- ✅ Both typed and dynamic projections
- ✅ In-memory (dev) and PostgreSQL (prod) storage
🔄 Phase 7.2: SignalR Integration
Overview
Real-time event streaming to browser clients via WebSockets using ASP.NET Core SignalR.
Key Components
Package: Svrnty.CQRS.Events.SignalR
EventStreamHub.cs(~220 lines)- Per-connection subscription tracking with
ConcurrentDictionary<connectionId, subscriptions> - Independent Task per subscription with CancellationToken
- Automatic cleanup on disconnect
- Batch reading with configurable offset
- Per-connection subscription tracking with
Hub Methods
// Client-callable methods
Task SubscribeToStream(string streamName, long? startFromOffset = null)
Task UnsubscribeFromStream(string streamName)
// Server-to-client callbacks
await Clients.Caller.SendAsync("SubscriptionConfirmed", streamName);
await Clients.Client(connectionId).SendAsync("EventReceived", streamName, eventData);
await Clients.Caller.SendAsync("Error", errorMessage);
Registration Example
// Server-side
services.AddEventStreamHub();
app.MapEventStreamHub("/hubs/events");
// Client-side (JavaScript)
const connection = new signalR.HubConnectionBuilder()
.withUrl("/hubs/events")
.build();
connection.on("EventReceived", (streamName, event) => {
console.log("Received event:", event);
});
await connection.start();
await connection.invoke("SubscribeToStream", "UserWorkflow", 0);
Key Features
- ✅ WebSocket-based real-time streaming
- ✅ Multiple concurrent subscriptions per connection
- ✅ Start from specific offset (catch-up + real-time)
- ✅ Automatic connection cleanup
- ✅ Thread-safe subscription management
🔀 Phase 7.3: Saga Orchestration
Overview
Long-running business processes with compensation pattern (not two-phase commit). Steps execute sequentially; on failure, completed steps compensate in reverse order.
Key Components
Abstractions (Svrnty.CQRS.Events.Abstractions/Sagas/)
Core Interfaces:
public interface ISaga
{
string SagaId { get; }
string CorrelationId { get; }
string SagaName { get; }
}
public interface ISagaStep
{
string StepName { get; }
Task ExecuteAsync(ISagaContext context, CancellationToken cancellationToken);
Task CompensateAsync(ISagaContext context, CancellationToken cancellationToken);
}
public interface ISagaContext
{
ISaga Saga { get; }
SagaState State { get; }
ISagaData Data { get; }
T? Get<T>(string key);
void Set<T>(string key, T value);
}
State Machine:
NotStarted → Running → Completed
↓
Compensating → Compensated
↓
Failed
ISagaOrchestrator - Lifecycle management:
StartSagaAsync<TSaga>()- Initialize and executeResumeSagaAsync()- Resume paused sagaCancelSagaAsync()- Trigger compensationGetStatusAsync()- Query saga state
ISagaStateStore - Persistent state storage:
SaveStateAsync()- UPSERT saga stateLoadStateAsync()- Restore saga stateGetByCorrelationIdAsync()- Multi-saga workflowsGetByStateAsync()- Query by state
Core Implementation (Svrnty.CQRS.Events/Sagas/)
SagaOrchestrator.cs - Core execution engine:
- Fire-and-forget execution pattern
- Sequential step execution with checkpointing
- Reverse-order compensation on failure
- Saga instance reconstruction from snapshots
- Uses ActivatorUtilities for DI-enabled saga construction
SagaRegistry.cs - Thread-safe definition storage:
- ConcurrentDictionary-based
- Type-to-definition mapping
- Name-to-definition mapping
- Name-to-type mapping
SagaData.cs - In-memory data storage:
- Dictionary-based key-value store
- Type conversion support via
Convert.ChangeType - Serializable for persistence
InMemorySagaStateStore.cs - Development storage:
- ConcurrentDictionary for state
- Correlation ID indexing
- State-based queries
PostgreSQL Storage (Svrnty.CQRS.Events.PostgreSQL/)
Migration: 008_SagaState.sql
CREATE TABLE saga_states (
saga_id TEXT PRIMARY KEY,
correlation_id TEXT NOT NULL,
saga_name TEXT NOT NULL,
state INT NOT NULL,
current_step INT NOT NULL,
total_steps INT NOT NULL,
completed_steps JSONB NOT NULL,
data JSONB NOT NULL,
...
);
CREATE INDEX idx_saga_states_correlation_id ON saga_states (correlation_id);
CREATE INDEX idx_saga_states_state ON saga_states (state);
PostgresSagaStateStore.cs:
- JSONB storage for steps and data
- UPSERT for atomic state updates
- Indexed queries by correlation ID and state
Sample Implementation (Svrnty.Sample/Sagas/)
OrderFulfillmentSaga - 3-step workflow:
-
Reserve Inventory
- Execute: Reserve items in inventory system
- Compensate: Release reservation
-
Authorize Payment
- Execute: Get payment authorization from payment gateway
- Compensate: Void authorization
- Failure Point: Simulated via
FailPaymentflag for testing
-
Ship Order
- Execute: Create shipment and get tracking number
- Compensate: Cancel shipment
Test Scenario: Payment Failure
[Start] → Reserve Inventory ✅ → Authorize Payment ❌
↓
Compensating...
↓
Void Payment (skipped - never completed)
↓
Release Inventory ✅
↓
[Compensated]
HTTP Endpoints
POST /api/sagas/order-fulfillment/start
Body: {
"orderId": "ORD-123",
"items": [...],
"amount": 99.99,
"shippingAddress": "123 Main St",
"simulatePaymentFailure": false
}
Response: { "sagaId": "guid", "correlationId": "ORD-123" }
GET /api/sagas/{sagaId}/status
Response: {
"sagaId": "guid",
"state": "Running",
"progress": "2/3",
"currentStep": 2,
"totalSteps": 3,
"data": {...}
}
POST /api/sagas/{sagaId}/cancel
Response: { "message": "Saga cancellation initiated" }
Registration Example
// Infrastructure
services.AddSagaOrchestration(useInMemoryStateStore: !usePostgreSQL);
if (usePostgreSQL)
{
services.AddPostgresSagaStateStore();
}
// Saga definition
services.AddSaga<OrderFulfillmentSaga>(
sagaName: "order-fulfillment",
configure: definition =>
{
definition.AddStep(
stepName: "ReserveInventory",
execute: OrderFulfillmentSteps.ReserveInventoryAsync,
compensate: OrderFulfillmentSteps.CompensateReserveInventoryAsync);
definition.AddStep(
stepName: "AuthorizePayment",
execute: OrderFulfillmentSteps.AuthorizePaymentAsync,
compensate: OrderFulfillmentSteps.CompensateAuthorizePaymentAsync);
definition.AddStep(
stepName: "ShipOrder",
execute: OrderFulfillmentSteps.ShipOrderAsync,
compensate: OrderFulfillmentSteps.CompensateShipOrderAsync);
});
Key Features
- ✅ Compensation pattern (not 2PC)
- ✅ Sequential execution with checkpointing
- ✅ Reverse-order compensation
- ✅ Persistent state across restarts
- ✅ Correlation ID for multi-saga workflows
- ✅ State-based queries
- ✅ Pause/resume support
- ✅ Manual cancellation
- ✅ In-memory (dev) and PostgreSQL (prod) storage
📦 New Packages Created
Svrnty.CQRS.Events.SignalR
- Purpose: Real-time event streaming to browser clients
- Dependencies: ASP.NET Core SignalR, Svrnty.CQRS.Events.Abstractions
- Key Type:
EventStreamHub
🗄️ Database Migrations
007_ProjectionCheckpoints.sql
CREATE TABLE projection_checkpoints (
projection_name TEXT NOT NULL,
stream_name TEXT NOT NULL,
last_processed_offset BIGINT NOT NULL DEFAULT -1,
last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(),
events_processed BIGINT NOT NULL DEFAULT 0,
last_error TEXT NULL,
last_error_at TIMESTAMPTZ NULL,
CONSTRAINT pk_projection_checkpoints PRIMARY KEY (projection_name, stream_name)
);
008_SagaState.sql
CREATE TABLE saga_states (
saga_id TEXT PRIMARY KEY,
correlation_id TEXT NOT NULL,
saga_name TEXT NOT NULL,
state INT NOT NULL,
current_step INT NOT NULL,
total_steps INT NOT NULL,
completed_steps JSONB NOT NULL DEFAULT '[]'::jsonb,
data JSONB NOT NULL DEFAULT '{}'::jsonb,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ NULL,
error_message TEXT NULL
);
CREATE INDEX idx_saga_states_correlation_id ON saga_states (correlation_id);
CREATE INDEX idx_saga_states_state ON saga_states (state);
🎯 Testing the Implementation
Test Projection
# Start the application
cd Svrnty.Sample
dotnet run
# Query projection status
curl http://localhost:6001/api/projections/user-statistics
Test SignalR (JavaScript)
const connection = new signalR.HubConnectionBuilder()
.withUrl("http://localhost:6001/hubs/events")
.build();
connection.on("EventReceived", (streamName, event) => {
console.log(`[${streamName}] ${event.EventType}:`, event.Data);
});
await connection.start();
await connection.invoke("SubscribeToStream", "UserWorkflow", 0);
Test Saga - Success Path
curl -X POST http://localhost:6001/api/sagas/order-fulfillment/start \
-H "Content-Type: application/json" \
-d '{
"orderId": "ORD-001",
"items": [
{
"productId": "PROD-123",
"productName": "Widget",
"quantity": 2,
"price": 49.99
}
],
"amount": 99.98,
"shippingAddress": "123 Main St, City, ST 12345",
"simulatePaymentFailure": false
}'
# Check status
curl http://localhost:6001/api/sagas/{sagaId}/status
Test Saga - Compensation Path
curl -X POST http://localhost:6001/api/sagas/order-fulfillment/start \
-H "Content-Type: application/json" \
-d '{
"orderId": "ORD-002",
"items": [...],
"amount": 99.98,
"shippingAddress": "123 Main St",
"simulatePaymentFailure": true
}'
# Console output will show:
# [SAGA] Reserving inventory for order ORD-002...
# [SAGA] Inventory reserved: {guid}
# [SAGA] Authorizing payment for order ORD-002: $99.98...
# [ERROR] Payment authorization failed: Insufficient funds
# [SAGA] COMPENSATING: Releasing inventory reservation {guid}...
# [SAGA] COMPENSATING: Inventory released
# [SAGA] Saga 'order-fulfillment' compensation completed
📊 Build Status
Solution Build: ✅ SUCCESS
- Projects: 12 (including new SignalR package)
- Errors: 0
- Warnings: 20 (package pruning + API deprecation warnings)
- Build Time: ~2 seconds
🎓 Key Design Patterns
Event Sourcing Projections
- Pattern: Catch-up Subscription
- Retry: Exponential Backoff
- Persistence: Checkpoint-based Recovery
SignalR Integration
- Pattern: Observer (Pub/Sub)
- Lifecycle: Per-Connection Management
- Concurrency: CancellationToken-based
Saga Orchestration
- Pattern: Saga (Compensation-based)
- Execution: Sequential with Checkpointing
- Recovery: Reverse-order Compensation
- Not Used: Two-Phase Commit (2PC)
📝 Next Steps
Phase 7 is complete! The framework now includes:
- ✅ Event Sourcing Projections for building read models
- ✅ SignalR Integration for real-time browser notifications
- ✅ Saga Orchestration for long-running workflows
All implementations support both in-memory (development) and PostgreSQL (production) storage backends.
Future Enhancements:
- Projection snapshots for faster rebuilds
- Saga timeout handling
- SignalR backpressure management
- Distributed saga coordination
- Projection monitoring dashboard