diff --git a/EVENT-STREAMING-IMPLEMENTATION-PLAN.md b/EVENT-STREAMING-IMPLEMENTATION-PLAN.md new file mode 100644 index 0000000..b2362f1 --- /dev/null +++ b/EVENT-STREAMING-IMPLEMENTATION-PLAN.md @@ -0,0 +1,977 @@ +# Event Streaming Implementation Plan + +## Executive Summary + +Transform the CQRS framework into a complete enterprise event streaming platform that supports: +- **Workflows**: Business process correlation and event emission +- **Multiple Consumer Patterns**: Broadcast, exclusive, consumer groups, read receipts +- **Storage Models**: Ephemeral (message queue) and persistent (event sourcing) +- **Delivery Semantics**: At-most-once, at-least-once, exactly-once +- **Cross-Service Communication**: RabbitMQ, Kafka integration with zero developer friction +- **Schema Evolution**: Event versioning with automatic upcasting +- **Event Replay**: Time-travel queries for persistent streams + +**Design Philosophy**: Simple by default, powerful when needed. Progressive complexity. + +--- + +## Architecture Layers + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Layer 1: WORKFLOW (Business Process) │ +│ What events belong together logically? │ +│ Example: InvitationWorkflow, UserWorkflow │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Layer 2: EVENT STREAM (Organization & Retention) │ +│ How are events stored and organized? │ +│ Example: Persistent vs Ephemeral, retention policies │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Layer 3: SUBSCRIPTION (Consumer Routing) │ +│ Who wants to consume what? │ +│ Example: Broadcast, Exclusive, ConsumerGroup, ReadReceipt │ +└─────────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────────┐ +│ Layer 4: DELIVERY (Transport Mechanism) │ +│ How do events reach consumers? │ +│ Example: gRPC, RabbitMQ, Kafka │ +└─────────────────────────────────────────────────────────────┘ +``` + +--- + +## Core Enumerations + +### StreamType +- `Ephemeral`: Message queue semantics (events deleted after consumption) +- `Persistent`: Event log semantics (events retained for replay) + +### DeliverySemantics +- `AtMostOnce`: Fire and forget (fast, might lose messages) +- `AtLeastOnce`: Retry until ack (might see duplicates) +- `ExactlyOnce`: Deduplication (slower, no duplicates) + +### SubscriptionMode +- `Broadcast`: All consumers get all events (pub/sub) +- `Exclusive`: Only one consumer gets each event (queue) +- `ConsumerGroup`: Load-balanced across group members (Kafka-style) +- `ReadReceipt`: Requires explicit "user saw this" confirmation + +### StreamScope +- `Internal`: Same service only (default) +- `CrossService`: Available to external services via message broker + +--- + +## Phase 1: Core Workflow & Streaming Foundation + +**Goal**: Get basic workflow + ephemeral streaming working with in-memory storage + +**Duration**: Weeks 1-2 + +### Phase 1 Tasks + +#### 1.1 Workflow Abstraction +- [ ] Create `Workflow` abstract base class + - [ ] `Id` property (workflow instance identifier) + - [ ] `IsNew` property (started vs continued) + - [ ] `Emit()` protected method + - [ ] Internal `PendingEvents` collection +- [ ] Create `ICommandHandlerWithWorkflow` interface +- [ ] Create `ICommandHandlerWithWorkflow` interface (no result) +- [ ] Update sample: Convert `UserEvent` to `UserWorkflow : Workflow` +- [ ] Update sample: Convert `InvitationEvent` to `InvitationWorkflow : Workflow` + +#### 1.2 Stream Configuration +- [ ] Create `StreamType` enum +- [ ] Create `DeliverySemantics` enum +- [ ] Create `SubscriptionMode` enum +- [ ] Create `StreamScope` enum +- [ ] Create `IStreamConfiguration` interface +- [ ] Create `StreamConfiguration` implementation +- [ ] Create fluent configuration API: `AddEventStreaming()` + +#### 1.3 In-Memory Storage (Ephemeral) +- [ ] Create `IEventStreamStore` interface + - [ ] `EnqueueAsync()` for ephemeral streams + - [ ] `DequeueAsync()` for ephemeral streams + - [ ] `AcknowledgeAsync()` for message acknowledgment + - [ ] `NackAsync()` for requeue/dead-letter +- [ ] Create `InMemoryEventStreamStore` implementation + - [ ] Concurrent queues per stream + - [ ] Per-consumer visibility tracking + - [ ] Acknowledgment handling +- [ ] Create `ISubscriptionStore` interface + - [ ] `RegisterConsumerAsync()` + - [ ] `UnregisterConsumerAsync()` + - [ ] `GetConsumersAsync()` +- [ ] Create `InMemorySubscriptionStore` implementation + +#### 1.4 Subscription System +- [ ] Create `ISubscription` interface +- [ ] Create `Subscription` implementation +- [ ] Create `IEventSubscriptionClient` for consumers +- [ ] Create `EventSubscriptionClient` implementation +- [ ] Implement `Broadcast` mode +- [ ] Implement `Exclusive` mode +- [ ] Create subscription configuration API + +#### 1.5 Workflow Decorators +- [ ] Create `WorkflowContext` class +- [ ] Create `CommandHandlerWithWorkflowDecorator` +- [ ] Create `CommandHandlerWithWorkflowDecoratorNoResult` +- [ ] Update event emission to use workflow ID as correlation ID +- [ ] Integrate with existing `IEventEmitter` + +#### 1.6 Service Registration +- [ ] Create `AddCommandWithWorkflow()` extension +- [ ] Create `AddCommandWithWorkflow()` extension +- [ ] Create `AddCommandWithWorkflow()` extension (no result) +- [ ] Deprecate `AddCommandWithEvents` (keep for backward compatibility) +- [ ] Update `ServiceCollectionExtensions` with workflow registration + +#### 1.7 gRPC Streaming (Basic) +- [ ] Create `IEventDeliveryProvider` interface +- [ ] Create `GrpcEventDeliveryProvider` implementation +- [ ] Update gRPC service to support bidirectional streaming +- [ ] Implement consumer registration/unregistration +- [ ] Handle connection lifecycle (connect/disconnect/reconnect) + +#### 1.8 Sample Project Updates +- [ ] Refactor `UserEvents.cs` → `UserWorkflow.cs` +- [ ] Refactor `InvitationWorkflow.cs` to use new API +- [ ] Update `Program.cs` with workflow registration +- [ ] Add simple subscription consumer example +- [ ] Add gRPC streaming consumer example +- [ ] Update documentation + +#### 1.9 Testing & Validation +- [ ] Build and verify no regressions +- [ ] Test workflow start/continue semantics +- [ ] Test ephemeral stream (message queue behavior) +- [ ] Test broadcast subscription (multiple consumers) +- [ ] Test exclusive subscription (single consumer) +- [ ] Test gRPC streaming connection +- [ ] Verify existing features still work + +**Phase 1 Success Criteria:** +```csharp +✅ This should work: + +// Registration +builder.Services.AddCommandWithWorkflow(); + +// Handler +public class InviteUserCommandHandler + : ICommandHandlerWithWorkflow +{ + public async Task HandleAsync( + InviteUserCommand command, + InvitationWorkflow workflow, + CancellationToken ct) + { + workflow.Emit(new UserInvitedEvent { ... }); + return workflow.Id; + } +} + +// Consumer +await foreach (var @event in client.SubscribeAsync("my-subscription", "consumer-1", ct)) +{ + Console.WriteLine($"Received: {@event}"); +} +``` + +--- + +## Phase 2: Persistence & Event Sourcing + +**Goal**: Add persistent streams with replay capability + +**Duration**: Weeks 3-4 + +### Phase 2 Tasks + +#### 2.1 Storage Abstractions (Persistent) +- [ ] Extend `IEventStreamStore` with append-only log methods: + - [ ] `AppendAsync()` for persistent streams + - [ ] `ReadStreamAsync()` for reading event log + - [ ] `GetStreamLengthAsync()` for stream metadata +- [ ] Create `StoredEvent` record (offset, timestamp, event data) +- [ ] Create `StreamMetadata` record (length, retention, oldest event) + +#### 2.2 PostgreSQL Storage Implementation +- [ ] Create `PostgresEventStreamStore : IEventStreamStore` +- [ ] Design event log schema: + - [ ] `events` table (stream_name, offset, event_type, event_data, correlation_id, timestamp) + - [ ] Indexes for efficient queries + - [ ] Partition strategy for large streams +- [ ] Implement append operations with optimistic concurrency +- [ ] Implement read operations with offset-based pagination +- [ ] Implement queue operations for ephemeral streams + +#### 2.3 Offset Tracking +- [ ] Create `IConsumerOffsetStore` interface + - [ ] `GetOffsetAsync(subscriptionId, consumerId)` + - [ ] `SetOffsetAsync(subscriptionId, consumerId, offset)` + - [ ] `GetConsumerPositionsAsync(subscriptionId)` (for monitoring) +- [ ] Create `PostgresConsumerOffsetStore` implementation +- [ ] Design offset tracking schema: + - [ ] `consumer_offsets` table (subscription_id, consumer_id, stream_offset, last_updated) +- [ ] Integrate offset tracking with subscription client + +#### 2.4 Retention Policies +- [ ] Create `RetentionPolicy` configuration + - [ ] Time-based retention (e.g., 90 days) + - [ ] Size-based retention (e.g., 10GB max) + - [ ] Count-based retention (e.g., 1M events max) +- [ ] Create `IRetentionService` interface +- [ ] Create `RetentionService` background service +- [ ] Implement retention policy enforcement +- [ ] Add configurable cleanup intervals + +#### 2.5 Event Replay API +- [ ] Create `IEventReplayService` interface +- [ ] Create `EventReplayService` implementation +- [ ] Create `ReplayOptions` configuration: + - [ ] `StartPosition` (Beginning, Offset, Timestamp, EventId) + - [ ] `EndPosition` (Latest, Offset, Timestamp, EventId) + - [ ] `Filter` predicate + - [ ] `MaxEvents` limit +- [ ] Implement replay from persistent streams +- [ ] Add replay to new consumer (catch-up subscription) + +#### 2.6 Stream Configuration Extensions +- [ ] Extend stream configuration with: + - [ ] `Type = StreamType.Persistent` + - [ ] `Retention` policies + - [ ] `EnableReplay = true/false` +- [ ] Validate configuration (ephemeral can't have replay) +- [ ] Add stream type detection and routing + +#### 2.7 Migration & Compatibility +- [ ] Create database migration scripts +- [ ] Add backward compatibility for in-memory implementation +- [ ] Allow mixing persistent and ephemeral streams +- [ ] Support runtime switching (development vs production) + +#### 2.8 Testing +- [ ] Test persistent stream append/read +- [ ] Test offset tracking across restarts +- [ ] Test retention policy enforcement +- [ ] Test event replay from various positions +- [ ] Test catch-up subscriptions +- [ ] Stress test with large event volumes + +**Phase 2 Success Criteria:** +```csharp +✅ This should work: + +// Configure persistent stream +builder.Services.AddEventStreaming(streaming => +{ + streaming.AddStream(stream => + { + stream.Type = StreamType.Persistent; + stream.Retention = TimeSpan.FromDays(90); + stream.EnableReplay = true; + }); +}); + +// Use PostgreSQL storage +services.AddSingleton(); + +// Replay events +var replay = await replayService.ReplayStreamAsync("user-events", new ReplayOptions +{ + From = new StartPosition.Timestamp(DateTimeOffset.UtcNow.AddDays(-7)) +}, ct); + +await foreach (var @event in replay) +{ + // Process historical events +} +``` + +--- + +## Phase 3: Exactly-Once Delivery & Read Receipts + +**Goal**: Add deduplication and explicit user confirmation + +**Duration**: Week 5 + +### Phase 3 Tasks + +#### 3.1 Idempotency Store +- [ ] Create `IIdempotencyStore` interface + - [ ] `WasProcessedAsync(consumerId, eventId)` + - [ ] `MarkProcessedAsync(consumerId, eventId, processedAt)` + - [ ] `TryAcquireIdempotencyLockAsync(idempotencyKey, lockDuration)` + - [ ] `ReleaseIdempotencyLockAsync(idempotencyKey)` + - [ ] `CleanupAsync(olderThan)` +- [ ] Create `PostgresIdempotencyStore` implementation +- [ ] Design idempotency schema: + - [ ] `processed_events` table (consumer_id, event_id, processed_at) + - [ ] `idempotency_locks` table (lock_key, acquired_at, expires_at) +- [ ] Add TTL-based cleanup + +#### 3.2 Exactly-Once Middleware +- [ ] Create `ExactlyOnceDeliveryDecorator` +- [ ] Implement duplicate detection +- [ ] Implement distributed locking +- [ ] Add automatic retry on lock contention +- [ ] Integrate with subscription pipeline + +#### 3.3 Read Receipt Store +- [ ] Create `IReadReceiptStore` interface + - [ ] `MarkDeliveredAsync(subscriptionId, consumerId, eventId, deliveredAt)` + - [ ] `MarkReadAsync(subscriptionId, consumerId, eventId, readAt)` + - [ ] `GetUnreadEventsAsync(subscriptionId, consumerId)` + - [ ] `GetExpiredUnreadEventsAsync(timeout)` +- [ ] Create `PostgresReadReceiptStore` implementation +- [ ] Design read receipt schema: + - [ ] `read_receipts` table (subscription_id, consumer_id, event_id, delivered_at, read_at, status) + +#### 3.4 Read Receipt API +- [ ] Extend `IEventSubscriptionClient` with: + - [ ] `MarkAsReadAsync(eventId)` + - [ ] `MarkAllAsReadAsync()` + - [ ] `GetUnreadCountAsync()` +- [ ] Create `ReadReceiptEvent` wrapper with `.MarkAsReadAsync()` method +- [ ] Implement unread timeout handling +- [ ] Add dead letter queue for expired unread events + +#### 3.5 Configuration +- [ ] Extend stream configuration with: + - [ ] `DeliverySemantics = DeliverySemantics.ExactlyOnce` +- [ ] Extend subscription configuration with: + - [ ] `Mode = SubscriptionMode.ReadReceipt` + - [ ] `OnUnreadTimeout` duration + - [ ] `OnUnreadExpired` policy (Requeue, DeadLetter, Drop) +- [ ] Add validation for configuration combinations + +#### 3.6 Monitoring & Cleanup +- [ ] Create background service for unread timeout detection +- [ ] Add metrics for unread events per consumer +- [ ] Add health checks for lagging consumers +- [ ] Implement automatic cleanup of old processed events + +#### 3.7 Testing +- [ ] Test duplicate event detection +- [ ] Test concurrent processing with locking +- [ ] Test read receipt lifecycle (delivered → read) +- [ ] Test unread timeout handling +- [ ] Test exactly-once guarantees under failure + +**Phase 3 Success Criteria:** +```csharp +✅ This should work: + +// Exactly-once delivery +builder.Services.AddEventStreaming(streaming => +{ + streaming.AddStream(stream => + { + stream.Type = StreamType.Persistent; + stream.DeliverySemantics = DeliverySemantics.ExactlyOnce; + }); +}); + +// Read receipts +streaming.AddSubscription("admin-notifications", subscription => +{ + subscription.ToStream(); + subscription.Mode = SubscriptionMode.ReadReceipt; + subscription.OnUnreadTimeout = TimeSpan.FromHours(24); +}); + +// Consumer +await foreach (var notification in client.SubscribeAsync("admin-notifications", "admin-123", ct)) +{ + await ShowNotificationAsync(notification); + await notification.MarkAsReadAsync(); // Explicit confirmation +} +``` + +--- + +## Phase 4: Cross-Service Communication (RabbitMQ) + +**Goal**: Enable event streaming across different services via RabbitMQ with zero developer friction + +**Duration**: Weeks 6-7 + +### Phase 4 Tasks + +#### 4.1 External Delivery Abstraction +- [ ] Extend `IEventDeliveryProvider` with: + - [ ] `PublishExternalAsync(streamName, event, metadata)` + - [ ] `SubscribeExternalAsync(streamName, subscriptionId, consumerId)` +- [ ] Create `ExternalDeliveryConfiguration` +- [ ] Add provider registration API + +#### 4.2 RabbitMQ Provider +- [ ] Create `RabbitMqEventDeliveryProvider : IEventDeliveryProvider` +- [ ] Create `RabbitMqConfiguration`: + - [ ] Connection string + - [ ] Exchange prefix + - [ ] Exchange type (topic, fanout, direct) + - [ ] Routing key strategy + - [ ] Auto-declare topology +- [ ] Implement connection management (connect, reconnect, dispose) +- [ ] Implement publish operations +- [ ] Implement subscribe operations +- [ ] Add NuGet dependency: `RabbitMQ.Client` + +#### 4.3 Topology Management +- [ ] Create `IRabbitMqTopologyManager` interface +- [ ] Implement automatic exchange creation: + - [ ] Format: `{prefix}.{stream-name}` (e.g., `myapp.user-events`) + - [ ] Type: topic exchange (default) +- [ ] Implement automatic queue creation: + - [ ] Broadcast: `{prefix}.{subscription-id}.{consumer-id}` + - [ ] Exclusive: `{prefix}.{subscription-id}` + - [ ] ConsumerGroup: `{prefix}.{subscription-id}` +- [ ] Implement automatic binding creation: + - [ ] Routing keys based on event type names +- [ ] Add validation for valid names (no spaces, special chars) + +#### 4.4 Remote Stream Configuration +- [ ] Create `IRemoteStreamConfiguration` interface +- [ ] Create fluent API: `AddRemoteStream(name, config)` +- [ ] Implement remote stream subscription +- [ ] Add cross-service event routing + +#### 4.5 Message Serialization +- [ ] Create `IEventSerializer` interface +- [ ] Create `JsonEventSerializer` implementation +- [ ] Add event type metadata in message headers: + - [ ] `event-type` (CLR type name) + - [ ] `event-version` (schema version) + - [ ] `correlation-id` + - [ ] `timestamp` +- [ ] Implement deserialization with type resolution + +#### 4.6 Acknowledgment & Redelivery +- [ ] Implement manual acknowledgment (ack) +- [ ] Implement negative acknowledgment (nack) with requeue +- [ ] Add dead letter queue configuration +- [ ] Implement retry policies (exponential backoff) +- [ ] Add max retry count + +#### 4.7 Connection Resilience +- [ ] Implement automatic reconnection on failure +- [ ] Add connection health checks +- [ ] Implement circuit breaker pattern +- [ ] Add connection pool management +- [ ] Log connection events (connected, disconnected, reconnecting) + +#### 4.8 Cross-Service Sample +- [ ] Create second sample project: `Svrnty.Sample.Analytics` +- [ ] Configure Service A to publish to RabbitMQ +- [ ] Configure Service B to consume from RabbitMQ +- [ ] Demonstrate cross-service event flow +- [ ] Add docker-compose with RabbitMQ + +#### 4.9 Testing +- [ ] Test exchange/queue creation +- [ ] Test message publishing +- [ ] Test message consumption +- [ ] Test acknowledgment handling +- [ ] Test connection failure recovery +- [ ] Test dead letter queue +- [ ] Integration test across two services + +**Phase 4 Success Criteria:** +```csharp +✅ This should work: + +// Service A: Publish events externally +builder.Services.AddEventStreaming(streaming => +{ + streaming.AddStream(stream => + { + stream.Type = StreamType.Persistent; + stream.Scope = StreamScope.CrossService; + stream.ExternalDelivery.UseRabbitMq(rabbitmq => + { + rabbitmq.ConnectionString = "amqp://localhost"; + rabbitmq.ExchangeName = "user-service.events"; + }); + }); +}); + +// Service B: Consume from Service A +builder.Services.AddEventStreaming(streaming => +{ + streaming.AddRemoteStream("user-service.events", remote => + { + remote.UseRabbitMq(rabbitmq => + { + rabbitmq.ConnectionString = "amqp://localhost"; + }); + }); + + streaming.AddSubscription("analytics", subscription => + { + subscription.ToRemoteStream("user-service.events"); + subscription.Mode = SubscriptionMode.ConsumerGroup; + }); +}); + +// Zero RabbitMQ knowledge needed by developer! +``` + +--- + +## Phase 5: Schema Evolution & Versioning + +**Goal**: Support event versioning with automatic upcasting + +**Duration**: Weeks 8-9 + +### Phase 5 Tasks + +#### 5.1 Schema Registry Abstractions +- [ ] Create `ISchemaRegistry` interface + - [ ] `RegisterSchemaAsync(version, upcastFromType)` + - [ ] `GetSchemaAsync(eventType, version)` + - [ ] `GetSchemaHistoryAsync(eventType)` + - [ ] `UpcastAsync(event, targetVersion)` +- [ ] Create `SchemaInfo` record (version, CLR type, JSON schema, upcast info) +- [ ] Create `ISchemaStore` interface for persistence + +#### 5.2 Event Versioning Attributes +- [ ] Create `[EventVersion(int)]` attribute +- [ ] Create `[EventVersionAttribute]` with: + - [ ] `Version` property + - [ ] `UpcastFrom` type property +- [ ] Add compile-time validation (via analyzer if time permits) + +#### 5.3 Schema Registry Implementation +- [ ] Create `SchemaRegistry : ISchemaRegistry` +- [ ] Create `PostgresSchemaStore : ISchemaStore` +- [ ] Design schema storage: + - [ ] `event_schemas` table (event_type, version, clr_type, json_schema, upcast_from_type, registered_at) +- [ ] Implement version registration +- [ ] Implement schema lookup with caching + +#### 5.4 Upcasting Pipeline +- [ ] Create `IEventUpcaster` interface +- [ ] Create `EventUpcastingMiddleware` +- [ ] Implement automatic upcaster discovery: + - [ ] Via static method: `TTo.UpcastFrom(TFrom)` + - [ ] Via registered `IEventUpcaster` implementations +- [ ] Implement multi-hop upcasting (V1 → V2 → V3) +- [ ] Add upcasting to subscription pipeline + +#### 5.5 JSON Schema Generation +- [ ] Create `IJsonSchemaGenerator` interface +- [ ] Create `JsonSchemaGenerator` implementation +- [ ] Generate JSON Schema from CLR types +- [ ] Store schemas in registry for external consumers +- [ ] Add schema validation (optional) + +#### 5.6 Configuration +- [ ] Extend stream configuration with: + - [ ] `EnableSchemaEvolution = true/false` + - [ ] `SchemaRegistry` configuration +- [ ] Add fluent API for schema registration: + - [ ] `registry.Register(version)` + - [ ] `registry.Register(version, upcastFrom: typeof(TOldEvent))` +- [ ] Extend subscription configuration: + - [ ] `ReceiveAs()` to specify target version + +#### 5.7 Backward Compatibility +- [ ] Handle events without version attribute (default to version 1) +- [ ] Support mixed versioned/unversioned events +- [ ] Add migration path for existing events + +#### 5.8 Testing +- [ ] Test version registration +- [ ] Test single-hop upcasting (V1 → V2) +- [ ] Test multi-hop upcasting (V1 → V2 → V3) +- [ ] Test new consumers receiving old events (auto-upcast) +- [ ] Test schema storage and retrieval +- [ ] Test JSON schema generation + +**Phase 5 Success Criteria:** +```csharp +✅ This should work: + +// Event V1 +[EventVersion(1)] +public sealed record UserAddedEventV1 : UserWorkflow +{ + public required int UserId { get; init; } + public required string Name { get; init; } +} + +// Event V2 with upcaster +[EventVersion(2, UpcastFrom = typeof(UserAddedEventV1))] +public sealed record UserAddedEventV2 : UserWorkflow +{ + public required int UserId { get; init; } + public required string FirstName { get; init; } + public required string LastName { get; init; } + public required string Email { get; init; } + + public static UserAddedEventV2 UpcastFrom(UserAddedEventV1 v1) + { + var names = v1.Name.Split(' ', 2); + return new UserAddedEventV2 + { + UserId = v1.UserId, + FirstName = names[0], + LastName = names.Length > 1 ? names[1] : "", + Email = $"user{v1.UserId}@unknown.com" + }; + } +} + +// Configuration +streaming.UseSchemaRegistry(registry => +{ + registry.Register(version: 1); + registry.Register(version: 2, upcastFrom: typeof(UserAddedEventV1)); +}); + +// Consumer always receives V2 (framework auto-upcasts V1 → V2) +streaming.AddSubscription("analytics", subscription => +{ + subscription.ToStream(); + subscription.ReceiveAs(); +}); +``` + +--- + +## Phase 6: Management, Monitoring & Observability + +**Goal**: Production-ready monitoring, health checks, and management APIs + +**Duration**: Week 10+ + +### Phase 6 Tasks + +#### 6.1 Health Checks +- [ ] Create `IStreamHealthCheck` interface +- [ ] Implement stream health checks: + - [ ] Stream exists and is writable + - [ ] Consumer lag detection (offset vs stream length) + - [ ] Stalled consumer detection (no progress for N minutes) +- [ ] Integrate with ASP.NET Core health checks +- [ ] Add health check endpoints + +#### 6.2 Metrics & Telemetry +- [ ] Define key metrics: + - [ ] Events published per stream (rate) + - [ ] Events consumed per subscription (rate) + - [ ] Consumer lag (offset delta) + - [ ] Processing latency (time from publish to ack) + - [ ] Error rate +- [ ] Integrate with OpenTelemetry +- [ ] Add Prometheus endpoint +- [ ] Create Grafana dashboard templates + +#### 6.3 Management API +- [ ] Create REST API for management: + - [ ] `GET /api/streams` - List all streams + - [ ] `GET /api/streams/{name}` - Get stream details + - [ ] `GET /api/streams/{name}/subscriptions` - List subscriptions + - [ ] `GET /api/subscriptions/{id}/consumers` - List consumers + - [ ] `GET /api/subscriptions/{id}/consumers/{consumerId}/offset` - Get consumer position + - [ ] `POST /api/subscriptions/{id}/consumers/{consumerId}/reset-offset` - Reset offset + - [ ] `DELETE /api/subscriptions/{id}/consumers/{consumerId}` - Remove consumer +- [ ] Add authorization (admin only) +- [ ] Add Swagger documentation + +#### 6.4 Admin Dashboard (Optional) +- [ ] Create simple web UI for monitoring: + - [ ] Stream list with event counts + - [ ] Subscription list with consumer status + - [ ] Consumer lag visualization + - [ ] Event replay interface +- [ ] Use Blazor or simple HTML/JS + +#### 6.5 Logging +- [ ] Add structured logging with Serilog/NLog +- [ ] Log key events: + - [ ] Stream created + - [ ] Consumer registered/unregistered + - [ ] Event published + - [ ] Event consumed + - [ ] Errors and retries +- [ ] Add correlation IDs to all logs +- [ ] Add log levels (Debug, Info, Warning, Error) + +#### 6.6 Alerting (Optional) +- [ ] Define alerting rules: + - [ ] Consumer lag exceeds threshold + - [ ] Consumer stalled (no progress) + - [ ] Error rate spike + - [ ] Dead letter queue growth +- [ ] Integration with alerting systems (email, Slack, PagerDuty) + +#### 6.7 Documentation +- [ ] Update CLAUDE.md with event streaming documentation +- [ ] Create developer guide +- [ ] Create deployment guide +- [ ] Create troubleshooting guide +- [ ] Add API reference documentation +- [ ] Create architecture diagrams + +#### 6.8 Testing +- [ ] Test health check endpoints +- [ ] Test metrics collection +- [ ] Test management API +- [ ] Load testing (throughput, latency) +- [ ] Chaos testing (failure scenarios) + +**Phase 6 Success Criteria:** +```csharp +✅ Production-ready features: + +// Health checks +builder.Services.AddHealthChecks() + .AddEventStreamHealthCheck(); + +// Metrics exposed at /metrics +builder.Services.AddEventStreaming(streaming => +{ + streaming.EnableMetrics(); + streaming.EnableHealthChecks(); +}); + +// Management API available +// GET /api/streams → List all streams +// GET /api/streams/user-events/subscriptions → View subscriptions +// POST /api/subscriptions/admin-notifications/consumers/admin-123/reset-offset → Reset lag +``` + +--- + +## Optional Future Phases + +### Phase 7: Advanced Features (Post-Launch) +- [ ] Kafka provider implementation +- [ ] Azure Service Bus provider +- [ ] AWS SQS/SNS provider +- [ ] Saga orchestration support +- [ ] Event sourcing projections +- [ ] Snapshot support for aggregates +- [ ] CQRS read model synchronization +- [ ] GraphQL subscriptions integration +- [ ] SignalR integration for browser clients + +### Phase 8: Performance Optimizations +- [ ] Batch processing support +- [ ] Stream partitioning +- [ ] Parallel consumer processing +- [ ] Event compression +- [ ] Connection pooling +- [ ] Query optimization + +--- + +## Design Decisions & Rationale + +### Why Workflows Over Events? +**Decision**: Make workflows the primary abstraction, not events. + +**Rationale**: +- Workflows represent business processes (how developers think) +- Events are implementation details of workflows +- Clearer intent: "This command participates in an invitation workflow" +- Solves correlation problem elegantly (workflow ID = correlation ID) + +### Why Support Both Ephemeral & Persistent? +**Decision**: Support both message queue (ephemeral) and event sourcing (persistent) patterns. + +**Rationale**: +- Different use cases have different needs +- Ephemeral: Simple notifications, no need for history +- Persistent: Audit logs, analytics, replay capability +- Developer chooses based on requirements +- Same API for both (progressive complexity) + +### Why Exactly-Once Opt-In? +**Decision**: Make exactly-once delivery optional, default to at-least-once. + +**Rationale**: +- Exactly-once has performance cost (deduplication, locking) +- Most scenarios can handle duplicates (idempotent handlers) +- Developer opts in when critical (financial transactions) +- Simpler default behavior + +### Why Cross-Service Opt-In? +**Decision**: Streams are internal by default, external requires explicit configuration. + +**Rationale**: +- Security: Don't expose events externally by accident +- Performance: Internal delivery (gRPC) is faster +- Simplicity: Most services don't need cross-service events +- Developer explicitly chooses when needed + +### Why Schema Evolution? +**Decision**: Support event versioning from the start. + +**Rationale**: +- Events are long-lived (years in persistent streams) +- Schema changes are inevitable +- Breaking changes hurt (can't deserialize old events) +- Automatic upcasting prevents data loss +- Essential for persistent streams with replay + +--- + +## Success Metrics + +### Phase 1 +- ✅ Basic workflow registration works +- ✅ Ephemeral streams work (in-memory) +- ✅ Broadcast and exclusive subscriptions work +- ✅ gRPC streaming works +- ✅ Zero breaking changes to existing features + +### Phase 2 +- ✅ Persistent streams work (PostgreSQL) +- ✅ Event replay works from any position +- ✅ Retention policies enforced +- ✅ Consumers can resume from last offset + +### Phase 3 +- ✅ Exactly-once delivery works (no duplicates) +- ✅ Read receipts work (delivered vs read) +- ✅ Unread timeout handling works + +### Phase 4 +- ✅ Events flow from Service A to Service B via RabbitMQ +- ✅ Zero RabbitMQ code in handlers +- ✅ Automatic topology creation works +- ✅ Connection resilience works + +### Phase 5 +- ✅ Old events automatically upcast to new version +- ✅ New consumers receive latest version +- ✅ Multi-hop upcasting works (V1→V2→V3) + +### Phase 6 +- ✅ Health checks detect lagging consumers +- ✅ Metrics exposed for monitoring +- ✅ Management API works +- ✅ Documentation complete + +--- + +## Risk Mitigation + +### Risk: Breaking Existing Features +**Mitigation**: +- Keep `AddCommandWithEvents` for backward compatibility +- Run full test suite after each phase +- Feature flags for new functionality + +### Risk: Performance Issues +**Mitigation**: +- Start with in-memory (fast) +- Benchmark at each phase +- Add performance tests before Phase 6 +- Use profiling tools + +### Risk: Complexity Overload +**Mitigation**: +- Progressive disclosure (simple by default) +- Each phase is independently useful +- Clear documentation at each level +- Sample projects for each complexity level + +### Risk: Database Schema Changes +**Mitigation**: +- Use migrations from Phase 2 onward +- Backward-compatible schema changes +- Test migration paths + +### Risk: External Dependencies (RabbitMQ, etc.) +**Mitigation**: +- Make external delivery optional +- Provide in-memory fallback +- Docker Compose for development +- Clear setup documentation + +--- + +## Development Guidelines + +### Coding Standards +- Use C# 14 features (field keyword, extension members) +- Follow existing patterns in codebase +- XML documentation on public APIs +- Async/await throughout +- CancellationToken support on all async methods + +### Testing Strategy +- Unit tests for core logic +- Integration tests for storage implementations +- End-to-end tests for full scenarios +- Performance benchmarks for critical paths + +### Documentation Requirements +- XML doc comments on all public APIs +- README updates for each phase +- Sample code for new features +- Architecture diagrams + +### Code Review Checklist +- [ ] Follows existing code style +- [ ] Has XML documentation +- [ ] Has unit tests +- [ ] No breaking changes (or documented) +- [ ] Performance acceptable +- [ ] Error handling complete + +--- + +## Timeline Summary + +| Phase | Duration | Key Deliverable | +|-------|----------|----------------| +| Phase 1 | 2 weeks | Basic workflows + ephemeral streaming | +| Phase 2 | 2 weeks | Persistent streams + replay | +| Phase 3 | 1 week | Exactly-once + read receipts | +| Phase 4 | 2 weeks | RabbitMQ cross-service | +| Phase 5 | 2 weeks | Schema evolution | +| Phase 6 | 1+ week | Management & monitoring | +| **Total** | **10+ weeks** | **Production-ready event streaming platform** | + +--- + +## Next Steps + +1. **Review this plan** - Validate approach and priorities +2. **Create feature branch** - `feature/event-streaming` +3. **Start Phase 1.1** - Workflow abstraction +4. **Iterate rapidly** - Small commits, frequent builds +5. **Update this document** - Check off tasks as completed + +--- + +## Notes & Questions + +- [ ] Decision: PostgreSQL or pluggable storage from Phase 2? +- [ ] Decision: gRPC-only or add SignalR for browser support? +- [ ] Decision: Create separate NuGet packages per phase or monolithic? +- [ ] Question: Should we support Kafka in Phase 4 or separate phase? +- [ ] Question: Do we need distributed tracing (OpenTelemetry) integration? + +--- + +**Last Updated**: 2025-12-09 +**Status**: Planning Phase - Not Started +**Owner**: Mathias Beaulieu-Duncan diff --git a/bidirectional-communication-design.md b/bidirectional-communication-design.md new file mode 100644 index 0000000..3783c7a --- /dev/null +++ b/bidirectional-communication-design.md @@ -0,0 +1,1218 @@ +# Bidirectional Communication Design for CQRS Framework + +This document outlines the design for adding persistent, selective event subscriptions to a CQRS framework with a Flutter frontend and .NET backend. + +## Table of Contents + +- [Overview](#overview) +- [Core Concepts](#core-concepts) +- [Frontend Architecture (Flutter)](#frontend-architecture-flutter) + - [DataSource](#datasource) + - [Event Subscription Config](#event-subscription-config) + - [Event Connection](#event-connection) +- [Usage Examples](#usage-examples) +- [Backend Architecture (.NET)](#backend-architecture-net) + - [Protocol Messages](#protocol-messages) + - [Event Filtering and Delivery](#event-filtering-and-delivery) + - [Persistent Subscription Storage](#persistent-subscription-storage) +- [Flow Diagrams](#flow-diagrams) +- [Considerations](#considerations) + +--- + +## Overview + +The goal is to extend a CQRS framework to support: + +1. **Command-correlated event subscriptions**: When executing a command, optionally subscribe to related events +2. **Selective subscriptions**: Choose which specific events to receive (not all-or-nothing) +3. **Persistent delivery**: Events are stored and delivered even if the user is offline +4. **Catch-up on reconnect**: Missed events are delivered when the user comes back online +5. **Per-usage flexibility**: Same DataSource can be used with or without events depending on context + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ DataSource │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌───────────────────┐ │ +│ │ Query │ │ Commands │ │ Event Connection │ │ +│ │ │ │ │ │ │ │ +│ │ UsersQuery │ │ InviteUser │ │ subscribe(...) │ │ +│ │ │ │ RemoveUser │ │ unsubscribe(...) │ │ +│ └─────────────┘ └─────────────┘ └───────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Core Concepts + +| Concept | Description | +|---------|-------------| +| **DataSource** | Combines a query, related commands, and optional event connection | +| **EventConnection** | WebSocket connection that handles subscriptions and event delivery | +| **Correlation ID** | Links a command to its resulting events | +| **Persistent Subscription** | Server-stored subscription that survives disconnection | +| **Terminal Event** | An event that completes/closes a subscription (e.g., `InvitationAccepted`) | +| **Catch-up** | Delivering missed events when a client reconnects | + +--- + +## Frontend Architecture (Flutter) + +### DataSource + +The DataSource is the central abstraction combining query data, commands, and event subscriptions. + +```dart +abstract class DataSource { + /// The query this datasource executes + final TQuery query; + + /// Stream of query results + Stream get dataStream; + + /// Current cached data + TData? get currentData; + + /// Event connection (lazy - only created when needed) + EventConnection? _eventConnection; + + /// Getter for event connection (for external listeners) + EventConnection? get eventConnection => _eventConnection; + + /// Commands attached to this datasource + final Map _commands = {}; + + DataSource(this.query); + + /// Register a command this datasource can execute + void registerCommand( + CommandConfig config, + ) { + _commands[TCommand] = config; + } + + /// Execute a command with optional event subscription + Future execute( + TCommand command, { + EventSubscriptionConfig? subscribeToEvents, + }) async { + final result = await _commandSender.send(command); + + // If caller wants events, set up subscription + if (subscribeToEvents != null && subscribeToEvents.eventTypes.isNotEmpty) { + await _ensureEventConnection(); + await _eventConnection!.subscribe( + correlationId: result.correlationId, + config: subscribeToEvents, + ); + } + + return result; + } + + /// Explicitly open event connection (triggers catch-up) + Future openEventConnection() async { + _eventConnection ??= EventConnection( + dataSourceId: id, + userId: _authService.currentUserId, + ); + await _eventConnection!.connect(); + return _eventConnection!; + } + + /// Ensure connection exists + Future _ensureEventConnection() async { + if (_eventConnection == null || !_eventConnection!.isConnected) { + await openEventConnection(); + } + } + + /// Cleanup resources + Future dispose() async { + await _eventConnection?.disconnect(); + } +} +``` + +### Event Subscription Config + +Fine-grained control over what events to subscribe to for a given command execution. + +```dart +/// Configuration for event subscriptions +class EventSubscriptionConfig { + /// Which event types to receive + final Set eventTypes; + + /// Auto-unsubscribe after this duration (null = no timeout) + final Duration? timeout; + + /// Should the subscription survive app restart? + final bool persistent; + + /// How should events be delivered? + final EventDeliveryMode deliveryMode; + + const EventSubscriptionConfig({ + required this.eventTypes, + this.timeout, + this.persistent = true, + this.deliveryMode = EventDeliveryMode.immediate, + }); + + /// Subscribe to all provided event types + factory EventSubscriptionConfig.all(List types) { + return EventSubscriptionConfig( + eventTypes: types.toSet(), + persistent: true, + ); + } + + /// Subscribe to specific event types only + factory EventSubscriptionConfig.only(List types) { + return EventSubscriptionConfig( + eventTypes: types.toSet(), + persistent: true, + ); + } + + /// No subscription (explicit opt-out) + factory EventSubscriptionConfig.none() { + return EventSubscriptionConfig( + eventTypes: {}, + persistent: false, + ); + } + + /// Transient subscription (not persisted, immediate delivery) + factory EventSubscriptionConfig.transient(List types) { + return EventSubscriptionConfig( + eventTypes: types.toSet(), + persistent: false, + deliveryMode: EventDeliveryMode.immediate, + ); + } +} + +/// How events should be delivered to the client +enum EventDeliveryMode { + /// Push immediately when event occurs + immediate, + + /// Batch events and deliver periodically + batched, + + /// Only deliver on reconnect (save bandwidth for background updates) + onReconnect, +} +``` + +### Event Connection + +Manages the WebSocket connection, subscriptions, and event routing. + +```dart +/// Manages WebSocket connection and event subscriptions +class EventConnection { + final String dataSourceId; + final String userId; + + WebSocketChannel? _channel; + final _eventController = StreamController.broadcast(); + final Map _activeSubscriptions = {}; + + bool _isConnected = false; + bool get isConnected => _isConnected; + + /// Stream of all events + Stream get events => _eventController.stream; + + /// Typed event stream - filters to specific event type + Stream on() { + return events.whereType(); + } + + /// Events for a specific correlation + Stream forCorrelation(String correlationId) { + return events.where((e) => e.correlationId == correlationId); + } + + EventConnection({ + required this.dataSourceId, + required this.userId, + }); + + /// Connect to the event server + Future connect() async { + if (_isConnected) return; + + _channel = WebSocketChannel.connect( + Uri.parse('wss://api.example.com/events'), + ); + + // Authenticate the connection + _send({ + 'type': 'auth', + 'token': await _authService.getToken(), + 'dataSourceId': dataSourceId, + }); + + // Request catch-up for any existing persistent subscriptions + _send({ + 'type': 'catch_up', + 'subscriptionIds': _activeSubscriptions.keys.toList(), + }); + + // Listen for incoming messages + _channel!.stream.listen( + _handleMessage, + onDone: _handleDisconnect, + onError: _handleError, + ); + + _isConnected = true; + } + + /// Subscribe to events for a correlation ID + Future subscribe({ + required String correlationId, + required EventSubscriptionConfig config, + }) async { + final subscription = EventSubscription( + id: const Uuid().v4(), + correlationId: correlationId, + config: config, + ); + + _activeSubscriptions[subscription.id] = subscription; + + // Notify backend about this subscription + _send({ + 'type': 'subscribe', + 'subscriptionId': subscription.id, + 'correlationId': correlationId, + 'eventTypes': config.eventTypes.map((t) => t.toString()).toList(), + 'persistent': config.persistent, + 'timeout': config.timeout?.inSeconds, + 'deliveryMode': config.deliveryMode.name, + }); + + return subscription; + } + + /// Unsubscribe from a subscription + Future unsubscribe(String subscriptionId) async { + _activeSubscriptions.remove(subscriptionId); + + _send({ + 'type': 'unsubscribe', + 'subscriptionId': subscriptionId, + }); + } + + /// Disconnect from the server + Future disconnect() async { + _isConnected = false; + await _channel?.sink.close(); + _channel = null; + } + + void _send(Map message) { + _channel?.sink.add(jsonEncode(message)); + } + + void _handleMessage(dynamic message) { + final data = jsonDecode(message as String) as Map; + + switch (data['type']) { + case 'event': + _handleEventMessage(data); + break; + + case 'subscription_completed': + _handleSubscriptionCompleted(data); + break; + + case 'error': + _handleErrorMessage(data); + break; + } + } + + void _handleEventMessage(Map data) { + final event = _deserializeEvent(data); + _eventController.add(event); + + // Notify specific subscription handler if registered + final subscription = _activeSubscriptions[data['subscriptionId']]; + subscription?.onEvent?.call(event); + } + + void _handleSubscriptionCompleted(Map data) { + final subscriptionId = data['subscriptionId'] as String; + final subscription = _activeSubscriptions.remove(subscriptionId); + subscription?.onCompleted?.call(data['reason'] as String); + } + + void _handleDisconnect() { + _isConnected = false; + // Implement reconnection logic here + } + + void _handleError(Object error) { + // Handle connection errors + } + + void _handleErrorMessage(Map data) { + // Handle protocol-level errors + } + + CorrelatedEvent _deserializeEvent(Map data) { + // Deserialize based on eventType + // Implementation depends on your serialization strategy + throw UnimplementedError(); + } +} + +/// Represents an active subscription +class EventSubscription { + final String id; + final String correlationId; + final EventSubscriptionConfig config; + + /// Callback when an event is received + void Function(CorrelatedEvent)? onEvent; + + /// Callback when subscription is completed (terminal event or timeout) + void Function(String reason)? onCompleted; + + EventSubscription({ + required this.id, + required this.correlationId, + required this.config, + }); +} +``` + +### Event Base Classes + +```dart +/// Base class for all events that can be correlated to a command +abstract class CorrelatedEvent { + final String correlationId; + final String eventId; + final DateTime occurredAt; + + CorrelatedEvent({ + required this.correlationId, + required this.eventId, + required this.occurredAt, + }); +} + +/// Example: User invitation events +abstract class UserInvitationEvent extends CorrelatedEvent { + UserInvitationEvent({ + required super.correlationId, + required super.eventId, + required super.occurredAt, + }); +} + +class UserInvitationSentEvent extends UserInvitationEvent { + final String email; + + UserInvitationSentEvent({ + required this.email, + required super.correlationId, + required super.eventId, + required super.occurredAt, + }); +} + +class UserInvitationAcceptedEvent extends UserInvitationEvent { + final String userId; + final String email; + + UserInvitationAcceptedEvent({ + required this.userId, + required this.email, + required super.correlationId, + required super.eventId, + required super.occurredAt, + }); +} + +class UserInvitationDeclinedEvent extends UserInvitationEvent { + final String email; + final String? reason; + + UserInvitationDeclinedEvent({ + required this.email, + this.reason, + required super.correlationId, + required super.eventId, + required super.occurredAt, + }); +} +``` + +--- + +## Usage Examples + +### Example 1: Admin Dashboard (wants all events) + +```dart +class AdminDashboardPage extends StatefulWidget { + @override + State createState() => _AdminDashboardPageState(); +} + +class _AdminDashboardPageState extends State { + late final UsersDataSource _usersDataSource; + + @override + void initState() { + super.initState(); + _usersDataSource = UsersDataSource(UsersQuery()); + + // Open event connection - triggers catch-up for pending events + _usersDataSource.openEventConnection(); + + // Listen to all invitation events for toast notifications + _usersDataSource.eventConnection?.on().listen(_showEventToast); + } + + void _showEventToast(UserInvitationEvent event) { + final message = switch (event) { + UserInvitationSentEvent e => 'Invitation sent to ${e.email}', + UserInvitationAcceptedEvent e => '${e.email} has joined!', + UserInvitationDeclinedEvent e => '${e.email} declined the invitation', + _ => 'Unknown event', + }; + + ScaffoldMessenger.of(context).showSnackBar( + SnackBar(content: Text(message)), + ); + + // Refresh list if someone joined + if (event is UserInvitationAcceptedEvent) { + _usersDataSource.refresh(); + } + } + + Future _inviteUser(String email) async { + await _usersDataSource.inviteUser( + email, + // Subscribe to ALL invitation events + subscribeToEvents: { + UserInvitationSentEvent, + UserInvitationAcceptedEvent, + UserInvitationDeclinedEvent, + }, + ); + } + + @override + void dispose() { + _usersDataSource.dispose(); + super.dispose(); + } + + @override + Widget build(BuildContext context) { + return StreamBuilder>( + stream: _usersDataSource.dataStream, + builder: (context, snapshot) { + // Build UI... + }, + ); + } +} +``` + +### Example 2: Simple User List (no events needed) + +```dart +class UserListPage extends StatefulWidget { + @override + State createState() => _UserListPageState(); +} + +class _UserListPageState extends State { + late final UsersDataSource _usersDataSource; + + @override + void initState() { + super.initState(); + _usersDataSource = UsersDataSource(UsersQuery()); + // NOT opening event connection - no real-time updates needed + } + + @override + Widget build(BuildContext context) { + return StreamBuilder>( + stream: _usersDataSource.dataStream, + builder: (context, snapshot) { + if (!snapshot.hasData) { + return const CircularProgressIndicator(); + } + + return ListView.builder( + itemCount: snapshot.data!.length, + itemBuilder: (context, index) { + final user = snapshot.data![index]; + return ListTile(title: Text(user.name)); + }, + ); + }, + ); + } + + @override + void dispose() { + _usersDataSource.dispose(); + super.dispose(); + } +} +``` + +### Example 3: Invite Dialog (only wants sent confirmation) + +```dart +class InviteUserDialog extends StatefulWidget { + final UsersDataSource dataSource; + + const InviteUserDialog({required this.dataSource}); + + @override + State createState() => _InviteUserDialogState(); +} + +class _InviteUserDialogState extends State { + final _emailController = TextEditingController(); + bool _isSending = false; + + Future _sendInvite() async { + setState(() => _isSending = true); + + try { + await widget.dataSource.inviteUser( + _emailController.text, + // Only care about the "sent" confirmation + subscribeToEvents: {UserInvitationSentEvent}, + onEvent: (event) { + if (event is UserInvitationSentEvent) { + Navigator.of(context).pop(); + ScaffoldMessenger.of(context).showSnackBar( + const SnackBar(content: Text('Invitation sent!')), + ); + } + }, + ); + } finally { + setState(() => _isSending = false); + } + } + + @override + Widget build(BuildContext context) { + return AlertDialog( + title: const Text('Invite User'), + content: TextField( + controller: _emailController, + decoration: const InputDecoration(labelText: 'Email'), + ), + actions: [ + TextButton( + onPressed: () => Navigator.of(context).pop(), + child: const Text('Cancel'), + ), + ElevatedButton( + onPressed: _isSending ? null : _sendInvite, + child: _isSending + ? const CircularProgressIndicator() + : const Text('Send'), + ), + ], + ); + } +} +``` + +### Example 4: Concrete UsersDataSource Implementation + +```dart +class UsersDataSource extends DataSource> { + UsersDataSource(UsersQuery query) : super(query); + + /// Invite a user with optional event subscription + Future inviteUser( + String email, { + Set? subscribeToEvents, + void Function(UserInvitationEvent)? onEvent, + }) async { + // Build subscription config if events requested + EventSubscriptionConfig? config; + if (subscribeToEvents != null && subscribeToEvents.isNotEmpty) { + config = EventSubscriptionConfig( + eventTypes: subscribeToEvents, + persistent: true, + ); + } + + // Execute the command + final result = await execute( + InviteUserCommand(email: email), + subscribeToEvents: config, + ); + + // Wire up event handler if provided + if (config != null && onEvent != null) { + eventConnection?.events + .where((e) => e.correlationId == result.correlationId) + .whereType() + .listen(onEvent); + } + + return result; + } + + /// Remove a user + Future removeUser(String userId) async { + await execute( + RemoveUserCommand(userId: userId), + // No events for this command + ); + } +} +``` + +--- + +## Backend Architecture (.NET) + +### Protocol Messages + +#### Client to Server + +**Subscribe** +```json +{ + "type": "subscribe", + "subscriptionId": "sub-123-abc", + "correlationId": "invite-456-def", + "eventTypes": ["UserInvitationSentEvent", "UserInvitationAcceptedEvent"], + "persistent": true, + "timeout": null, + "deliveryMode": "immediate" +} +``` + +**Unsubscribe** +```json +{ + "type": "unsubscribe", + "subscriptionId": "sub-123-abc" +} +``` + +**Catch-up Request** +```json +{ + "type": "catch_up", + "subscriptionIds": ["sub-123-abc", "sub-789-xyz"] +} +``` + +**Authentication** +```json +{ + "type": "auth", + "token": "jwt-token-here", + "dataSourceId": "users-datasource" +} +``` + +#### Server to Client + +**Event Delivery** +```json +{ + "type": "event", + "subscriptionId": "sub-123-abc", + "correlationId": "invite-456-def", + "eventType": "UserInvitationAcceptedEvent", + "eventId": "evt-001", + "sequence": 42, + "payload": { + "userId": "user-789", + "email": "john@example.com" + }, + "occurredAt": "2025-01-15T10:30:00Z" +} +``` + +**Subscription Completed** +```json +{ + "type": "subscription_completed", + "subscriptionId": "sub-123-abc", + "reason": "terminal_event", + "terminalEvent": "UserInvitationAcceptedEvent" +} +``` + +**Error** +```json +{ + "type": "error", + "code": "subscription_not_found", + "message": "Subscription sub-123-abc not found", + "subscriptionId": "sub-123-abc" +} +``` + +### Event Filtering and Delivery + +```csharp +public class EventDeliveryService +{ + private readonly ISubscriptionStore _subscriptionStore; + private readonly IConnectionTracker _connectionTracker; + private readonly IClientNotifier _notifier; + private readonly IEventStore _eventStore; + + public async Task DeliverEventAsync(ICorrelatedEvent @event) + { + // Store the event with sequence number + var sequence = await _eventStore.AppendAsync(@event); + + // Find all subscriptions interested in this correlation + var subscriptions = await _subscriptionStore + .FindByCorrelationIdAsync(@event.CorrelationId); + + foreach (var subscription in subscriptions) + { + await DeliverToSubscriptionAsync(subscription, @event, sequence); + } + } + + private async Task DeliverToSubscriptionAsync( + PersistentSubscription subscription, + ICorrelatedEvent @event, + long sequence) + { + // FILTER: Only deliver if subscriber requested this event type + var eventTypeName = @event.GetType().Name; + if (!subscription.EventTypes.Contains(eventTypeName)) + { + // Subscriber didn't ask for this type - skip + return; + } + + // Check delivery mode + if (subscription.DeliveryMode == DeliveryMode.OnReconnect) + { + // Don't deliver now, will be caught up on reconnect + return; + } + + // Check if subscriber is online + var connection = _connectionTracker.GetConnection(subscription.SubscriberId); + + if (connection != null) + { + // Deliver immediately + await _notifier.SendAsync(connection.Id, new EventMessage + { + Type = "event", + SubscriptionId = subscription.Id.ToString(), + CorrelationId = @event.CorrelationId, + EventType = eventTypeName, + EventId = @event.EventId.ToString(), + Sequence = sequence, + Payload = @event, + OccurredAt = @event.OccurredAt, + }); + + subscription.MarkDelivered(sequence); + } + // If offline, event stays in store for catch-up + + // Check if this is a terminal event + if (subscription.TerminalEventTypes.Contains(eventTypeName)) + { + subscription.Complete(); + + if (connection != null) + { + await _notifier.SendAsync(connection.Id, new SubscriptionCompletedMessage + { + Type = "subscription_completed", + SubscriptionId = subscription.Id.ToString(), + Reason = "terminal_event", + TerminalEvent = eventTypeName, + }); + } + } + + await _subscriptionStore.UpdateAsync(subscription); + } + + public async Task HandleCatchUpRequestAsync( + string connectionId, + string userId, + IEnumerable subscriptionIds) + { + // Get all active subscriptions for this user + var subscriptions = await _subscriptionStore.GetByUserIdAsync(userId); + + foreach (var subscription in subscriptions) + { + if (subscription.Status != SubscriptionStatus.Active) + continue; + + // Get events after the last delivered sequence + var missedEvents = await _eventStore.GetEventsAsync( + correlationId: subscription.CorrelationId, + afterSequence: subscription.LastDeliveredSequence, + eventTypes: subscription.EventTypes + ); + + foreach (var evt in missedEvents.OrderBy(e => e.Sequence)) + { + await _notifier.SendAsync(connectionId, new EventMessage + { + Type = "event", + SubscriptionId = subscription.Id.ToString(), + CorrelationId = evt.CorrelationId, + EventType = evt.GetType().Name, + EventId = evt.EventId.ToString(), + Sequence = evt.Sequence, + Payload = evt, + OccurredAt = evt.OccurredAt, + }); + + subscription.MarkDelivered(evt.Sequence); + + // Check for terminal event + if (subscription.TerminalEventTypes.Contains(evt.GetType().Name)) + { + subscription.Complete(); + + await _notifier.SendAsync(connectionId, new SubscriptionCompletedMessage + { + Type = "subscription_completed", + SubscriptionId = subscription.Id.ToString(), + Reason = "terminal_event", + TerminalEvent = evt.GetType().Name, + }); + + break; + } + } + + await _subscriptionStore.UpdateAsync(subscription); + } + } +} +``` + +### Persistent Subscription Storage + +```csharp +public class PersistentSubscription +{ + public Guid Id { get; init; } + public Guid SubscriberId { get; init; } + public string CorrelationId { get; init; } = string.Empty; + + /// + /// Event types the subscriber wants to receive + /// + public HashSet EventTypes { get; init; } = new(); + + /// + /// Events that complete/close the subscription + /// + public HashSet TerminalEventTypes { get; init; } = new(); + + public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate; + + public DateTimeOffset CreatedAt { get; init; } + public DateTimeOffset? ExpiresAt { get; init; } + public DateTimeOffset? CompletedAt { get; private set; } + + /// + /// Last successfully delivered event sequence (for catch-up) + /// + public long LastDeliveredSequence { get; private set; } + + public SubscriptionStatus Status { get; private set; } = SubscriptionStatus.Active; + + public void MarkDelivered(long sequence) + { + if (sequence > LastDeliveredSequence) + { + LastDeliveredSequence = sequence; + } + } + + public void Complete() + { + Status = SubscriptionStatus.Completed; + CompletedAt = DateTimeOffset.UtcNow; + } + + public void Cancel() + { + Status = SubscriptionStatus.Cancelled; + CompletedAt = DateTimeOffset.UtcNow; + } + + public bool IsExpired => ExpiresAt.HasValue && DateTimeOffset.UtcNow > ExpiresAt.Value; +} + +public enum SubscriptionStatus +{ + Active, + Completed, // Terminal event received + Expired, // TTL reached + Cancelled // User cancelled +} + +public enum DeliveryMode +{ + Immediate, + Batched, + OnReconnect +} + +public interface ISubscriptionStore +{ + Task CreateAsync(PersistentSubscription subscription); + Task GetByIdAsync(Guid id); + Task> GetByUserIdAsync(Guid userId); + Task> FindByCorrelationIdAsync(string correlationId); + Task UpdateAsync(PersistentSubscription subscription); + Task DeleteAsync(Guid id); +} +``` + +### Command Handler Integration + +```csharp +/// +/// Pipeline behavior that creates subscriptions for commands that request them +/// +public class CommandSubscriptionBehavior + : IPipelineBehavior + where TCommand : ICommand +{ + private readonly ISubscriptionStore _subscriptions; + private readonly ICurrentUserService _currentUser; + + public async Task Handle( + TCommand command, + RequestHandlerDelegate next, + CancellationToken cancellationToken) + { + // Execute the command first + var result = await next(); + + // Check if this command has subscription metadata + if (command is ICommandWithSubscription subCommand + && subCommand.SubscriptionConfig != null + && result is ICorrelatedResult correlatedResult) + { + var config = subCommand.SubscriptionConfig; + + await _subscriptions.CreateAsync(new PersistentSubscription + { + Id = Guid.NewGuid(), + SubscriberId = _currentUser.UserId, + CorrelationId = correlatedResult.CorrelationId, + EventTypes = config.EventTypes.ToHashSet(), + TerminalEventTypes = config.TerminalEventTypes.ToHashSet(), + DeliveryMode = config.DeliveryMode, + CreatedAt = DateTimeOffset.UtcNow, + ExpiresAt = config.Timeout.HasValue + ? DateTimeOffset.UtcNow + config.Timeout.Value + : null, + }); + } + + return result; + } +} +``` + +--- + +## Flow Diagrams + +### Command Execution with Subscription + +``` +┌────────┐ ┌─────────────┐ ┌──────────────┐ ┌────────────┐ +│ Client │ │ DataSource │ │ Backend │ │ Subscription│ +│ │ │ │ │ │ │ Store │ +└───┬────┘ └──────┬──────┘ └──────┬───────┘ └─────┬──────┘ + │ │ │ │ + │ inviteUser() │ │ │ + │ + eventTypes │ │ │ + │────────────────>│ │ │ + │ │ │ │ + │ │ POST /command │ │ + │ │ {InviteUser} │ │ + │ │──────────────────>│ │ + │ │ │ │ + │ │ │ Create │ + │ │ │ Subscription │ + │ │ │──────────────────>│ + │ │ │ │ + │ │ {correlationId} │ │ + │ │<──────────────────│ │ + │ │ │ │ + │ │ WS: subscribe │ │ + │ │ {correlationId, │ │ + │ │ eventTypes} │ │ + │ │──────────────────>│ │ + │ │ │ │ + │ InviteResult │ │ │ + │<────────────────│ │ │ + │ │ │ │ +``` + +### Event Delivery (Online) + +``` +┌──────────┐ ┌──────────────┐ ┌────────────┐ ┌────────┐ +│ Domain │ │ Event │ │ Subscription│ │ Client │ +│ Event │ │ Dispatcher │ │ Store │ │ │ +└────┬─────┘ └──────┬───────┘ └─────┬──────┘ └───┬────┘ + │ │ │ │ + │ UserInvitation │ │ │ + │ AcceptedEvent │ │ │ + │─────────────────>│ │ │ + │ │ │ │ + │ │ Find subscriptions│ │ + │ │ by correlationId │ │ + │ │──────────────────>│ │ + │ │ │ │ + │ │ [sub-123] │ │ + │ │<──────────────────│ │ + │ │ │ │ + │ │ Check: eventType │ │ + │ │ in sub.eventTypes?│ │ + │ │ │ │ + │ │ WS: event │ │ + │ │──────────────────────────────────>│ + │ │ │ │ + │ │ (terminal event) │ │ + │ │ WS: subscription │ │ + │ │ _completed │ │ + │ │──────────────────────────────────>│ + │ │ │ │ +``` + +### Catch-up on Reconnect + +``` +┌────────┐ ┌──────────────┐ ┌────────────┐ ┌───────────┐ +│ Client │ │ Backend │ │ Subscription│ │ Event │ +│ │ │ │ │ Store │ │ Store │ +└───┬────┘ └──────┬───────┘ └─────┬──────┘ └─────┬─────┘ + │ │ │ │ + │ WS: connect │ │ │ + │ + auth │ │ │ + │────────────────>│ │ │ + │ │ │ │ + │ WS: catch_up │ │ │ + │────────────────>│ │ │ + │ │ │ │ + │ │ Get active subs │ │ + │ │ for userId │ │ + │ │──────────────────>│ │ + │ │ │ │ + │ │ [sub-123, │ │ + │ │ sub-456] │ │ + │ │<──────────────────│ │ + │ │ │ │ + │ │ For each sub: │ │ + │ │ Get events after │ │ + │ │ lastDeliveredSeq │ │ + │ │─────────────────────────────────────>│ + │ │ │ │ + │ │ [evt-1, evt-2] │ │ + │ │<─────────────────────────────────────│ + │ │ │ │ + │ WS: event │ │ │ + │ (evt-1) │ │ │ + │<────────────────│ │ │ + │ │ │ │ + │ WS: event │ │ │ + │ (evt-2) │ │ │ + │<────────────────│ │ │ + │ │ │ │ +``` + +--- + +## Considerations + +### Cleanup and Maintenance + +- Run a background job to expire old subscriptions +- Archive or delete events older than a retention period +- Clean up completed/cancelled subscriptions after a grace period + +### Scaling + +- For multiple server instances, use a distributed connection tracker (Redis) +- Consider using SignalR with Azure SignalR Service or Redis backplane +- Event store should support efficient queries by correlationId + sequence + +### Ordering Guarantees + +- Events within a correlation are ordered by sequence number +- Catch-up delivery must respect sequence ordering +- Consider idempotency keys for duplicate detection on the client + +### Client-Side Deduplication + +```dart +class EventConnection { + final Set _processedEventIds = {}; + + void _handleEventMessage(Map data) { + final eventId = data['eventId'] as String; + + // Skip if already processed (duplicate delivery) + if (_processedEventIds.contains(eventId)) { + return; + } + + _processedEventIds.add(eventId); + + // Process the event... + } +} +``` + +### Error Handling + +- Handle WebSocket disconnections with exponential backoff reconnection +- Queue outgoing messages during disconnection +- Validate subscription requests on the backend (authorization, rate limiting) + +### Security + +- Validate that users can only subscribe to correlations they own +- Authenticate WebSocket connections with JWT tokens +- Rate limit subscription creation to prevent abuse