- 6 phases covering workflow abstraction through production monitoring - Detailed task breakdowns with checkboxes for tracking - Progressive complexity: simple by default, powerful when needed - Support for ephemeral & persistent streams - Cross-service communication via RabbitMQ - Schema evolution with automatic upcasting - Exactly-once delivery and read receipts - ~10+ weeks timeline with clear success criteria
1219 lines
38 KiB
Markdown
1219 lines
38 KiB
Markdown
# Bidirectional Communication Design for CQRS Framework
|
|
|
|
This document outlines the design for adding persistent, selective event subscriptions to a CQRS framework with a Flutter frontend and .NET backend.
|
|
|
|
## Table of Contents
|
|
|
|
- [Overview](#overview)
|
|
- [Core Concepts](#core-concepts)
|
|
- [Frontend Architecture (Flutter)](#frontend-architecture-flutter)
|
|
- [DataSource](#datasource)
|
|
- [Event Subscription Config](#event-subscription-config)
|
|
- [Event Connection](#event-connection)
|
|
- [Usage Examples](#usage-examples)
|
|
- [Backend Architecture (.NET)](#backend-architecture-net)
|
|
- [Protocol Messages](#protocol-messages)
|
|
- [Event Filtering and Delivery](#event-filtering-and-delivery)
|
|
- [Persistent Subscription Storage](#persistent-subscription-storage)
|
|
- [Flow Diagrams](#flow-diagrams)
|
|
- [Considerations](#considerations)
|
|
|
|
---
|
|
|
|
## Overview
|
|
|
|
The goal is to extend a CQRS framework to support:
|
|
|
|
1. **Command-correlated event subscriptions**: When executing a command, optionally subscribe to related events
|
|
2. **Selective subscriptions**: Choose which specific events to receive (not all-or-nothing)
|
|
3. **Persistent delivery**: Events are stored and delivered even if the user is offline
|
|
4. **Catch-up on reconnect**: Missed events are delivered when the user comes back online
|
|
5. **Per-usage flexibility**: Same DataSource can be used with or without events depending on context
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────────┐
|
|
│ DataSource │
|
|
│ │
|
|
│ ┌─────────────┐ ┌─────────────┐ ┌───────────────────┐ │
|
|
│ │ Query │ │ Commands │ │ Event Connection │ │
|
|
│ │ │ │ │ │ │ │
|
|
│ │ UsersQuery │ │ InviteUser │ │ subscribe(...) │ │
|
|
│ │ │ │ RemoveUser │ │ unsubscribe(...) │ │
|
|
│ └─────────────┘ └─────────────┘ └───────────────────┘ │
|
|
│ │
|
|
└─────────────────────────────────────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Core Concepts
|
|
|
|
| Concept | Description |
|
|
|---------|-------------|
|
|
| **DataSource** | Combines a query, related commands, and optional event connection |
|
|
| **EventConnection** | WebSocket connection that handles subscriptions and event delivery |
|
|
| **Correlation ID** | Links a command to its resulting events |
|
|
| **Persistent Subscription** | Server-stored subscription that survives disconnection |
|
|
| **Terminal Event** | An event that completes/closes a subscription (e.g., `InvitationAccepted`) |
|
|
| **Catch-up** | Delivering missed events when a client reconnects |
|
|
|
|
---
|
|
|
|
## Frontend Architecture (Flutter)
|
|
|
|
### DataSource
|
|
|
|
The DataSource is the central abstraction combining query data, commands, and event subscriptions.
|
|
|
|
```dart
|
|
abstract class DataSource<TQuery, TData> {
|
|
/// The query this datasource executes
|
|
final TQuery query;
|
|
|
|
/// Stream of query results
|
|
Stream<TData> get dataStream;
|
|
|
|
/// Current cached data
|
|
TData? get currentData;
|
|
|
|
/// Event connection (lazy - only created when needed)
|
|
EventConnection? _eventConnection;
|
|
|
|
/// Getter for event connection (for external listeners)
|
|
EventConnection? get eventConnection => _eventConnection;
|
|
|
|
/// Commands attached to this datasource
|
|
final Map<Type, CommandConfig> _commands = {};
|
|
|
|
DataSource(this.query);
|
|
|
|
/// Register a command this datasource can execute
|
|
void registerCommand<TCommand extends Command>(
|
|
CommandConfig<TCommand> config,
|
|
) {
|
|
_commands[TCommand] = config;
|
|
}
|
|
|
|
/// Execute a command with optional event subscription
|
|
Future<TResult> execute<TCommand extends Command, TResult>(
|
|
TCommand command, {
|
|
EventSubscriptionConfig? subscribeToEvents,
|
|
}) async {
|
|
final result = await _commandSender.send<TResult>(command);
|
|
|
|
// If caller wants events, set up subscription
|
|
if (subscribeToEvents != null && subscribeToEvents.eventTypes.isNotEmpty) {
|
|
await _ensureEventConnection();
|
|
await _eventConnection!.subscribe(
|
|
correlationId: result.correlationId,
|
|
config: subscribeToEvents,
|
|
);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/// Explicitly open event connection (triggers catch-up)
|
|
Future<EventConnection> openEventConnection() async {
|
|
_eventConnection ??= EventConnection(
|
|
dataSourceId: id,
|
|
userId: _authService.currentUserId,
|
|
);
|
|
await _eventConnection!.connect();
|
|
return _eventConnection!;
|
|
}
|
|
|
|
/// Ensure connection exists
|
|
Future<void> _ensureEventConnection() async {
|
|
if (_eventConnection == null || !_eventConnection!.isConnected) {
|
|
await openEventConnection();
|
|
}
|
|
}
|
|
|
|
/// Cleanup resources
|
|
Future<void> dispose() async {
|
|
await _eventConnection?.disconnect();
|
|
}
|
|
}
|
|
```
|
|
|
|
### Event Subscription Config
|
|
|
|
Fine-grained control over what events to subscribe to for a given command execution.
|
|
|
|
```dart
|
|
/// Configuration for event subscriptions
|
|
class EventSubscriptionConfig {
|
|
/// Which event types to receive
|
|
final Set<Type> eventTypes;
|
|
|
|
/// Auto-unsubscribe after this duration (null = no timeout)
|
|
final Duration? timeout;
|
|
|
|
/// Should the subscription survive app restart?
|
|
final bool persistent;
|
|
|
|
/// How should events be delivered?
|
|
final EventDeliveryMode deliveryMode;
|
|
|
|
const EventSubscriptionConfig({
|
|
required this.eventTypes,
|
|
this.timeout,
|
|
this.persistent = true,
|
|
this.deliveryMode = EventDeliveryMode.immediate,
|
|
});
|
|
|
|
/// Subscribe to all provided event types
|
|
factory EventSubscriptionConfig.all(List<Type> types) {
|
|
return EventSubscriptionConfig(
|
|
eventTypes: types.toSet(),
|
|
persistent: true,
|
|
);
|
|
}
|
|
|
|
/// Subscribe to specific event types only
|
|
factory EventSubscriptionConfig.only(List<Type> types) {
|
|
return EventSubscriptionConfig(
|
|
eventTypes: types.toSet(),
|
|
persistent: true,
|
|
);
|
|
}
|
|
|
|
/// No subscription (explicit opt-out)
|
|
factory EventSubscriptionConfig.none() {
|
|
return EventSubscriptionConfig(
|
|
eventTypes: {},
|
|
persistent: false,
|
|
);
|
|
}
|
|
|
|
/// Transient subscription (not persisted, immediate delivery)
|
|
factory EventSubscriptionConfig.transient(List<Type> types) {
|
|
return EventSubscriptionConfig(
|
|
eventTypes: types.toSet(),
|
|
persistent: false,
|
|
deliveryMode: EventDeliveryMode.immediate,
|
|
);
|
|
}
|
|
}
|
|
|
|
/// How events should be delivered to the client
|
|
enum EventDeliveryMode {
|
|
/// Push immediately when event occurs
|
|
immediate,
|
|
|
|
/// Batch events and deliver periodically
|
|
batched,
|
|
|
|
/// Only deliver on reconnect (save bandwidth for background updates)
|
|
onReconnect,
|
|
}
|
|
```
|
|
|
|
### Event Connection
|
|
|
|
Manages the WebSocket connection, subscriptions, and event routing.
|
|
|
|
```dart
|
|
/// Manages WebSocket connection and event subscriptions
|
|
class EventConnection {
|
|
final String dataSourceId;
|
|
final String userId;
|
|
|
|
WebSocketChannel? _channel;
|
|
final _eventController = StreamController<CorrelatedEvent>.broadcast();
|
|
final Map<String, EventSubscription> _activeSubscriptions = {};
|
|
|
|
bool _isConnected = false;
|
|
bool get isConnected => _isConnected;
|
|
|
|
/// Stream of all events
|
|
Stream<CorrelatedEvent> get events => _eventController.stream;
|
|
|
|
/// Typed event stream - filters to specific event type
|
|
Stream<T> on<T extends CorrelatedEvent>() {
|
|
return events.whereType<T>();
|
|
}
|
|
|
|
/// Events for a specific correlation
|
|
Stream<CorrelatedEvent> forCorrelation(String correlationId) {
|
|
return events.where((e) => e.correlationId == correlationId);
|
|
}
|
|
|
|
EventConnection({
|
|
required this.dataSourceId,
|
|
required this.userId,
|
|
});
|
|
|
|
/// Connect to the event server
|
|
Future<void> connect() async {
|
|
if (_isConnected) return;
|
|
|
|
_channel = WebSocketChannel.connect(
|
|
Uri.parse('wss://api.example.com/events'),
|
|
);
|
|
|
|
// Authenticate the connection
|
|
_send({
|
|
'type': 'auth',
|
|
'token': await _authService.getToken(),
|
|
'dataSourceId': dataSourceId,
|
|
});
|
|
|
|
// Request catch-up for any existing persistent subscriptions
|
|
_send({
|
|
'type': 'catch_up',
|
|
'subscriptionIds': _activeSubscriptions.keys.toList(),
|
|
});
|
|
|
|
// Listen for incoming messages
|
|
_channel!.stream.listen(
|
|
_handleMessage,
|
|
onDone: _handleDisconnect,
|
|
onError: _handleError,
|
|
);
|
|
|
|
_isConnected = true;
|
|
}
|
|
|
|
/// Subscribe to events for a correlation ID
|
|
Future<EventSubscription> subscribe({
|
|
required String correlationId,
|
|
required EventSubscriptionConfig config,
|
|
}) async {
|
|
final subscription = EventSubscription(
|
|
id: const Uuid().v4(),
|
|
correlationId: correlationId,
|
|
config: config,
|
|
);
|
|
|
|
_activeSubscriptions[subscription.id] = subscription;
|
|
|
|
// Notify backend about this subscription
|
|
_send({
|
|
'type': 'subscribe',
|
|
'subscriptionId': subscription.id,
|
|
'correlationId': correlationId,
|
|
'eventTypes': config.eventTypes.map((t) => t.toString()).toList(),
|
|
'persistent': config.persistent,
|
|
'timeout': config.timeout?.inSeconds,
|
|
'deliveryMode': config.deliveryMode.name,
|
|
});
|
|
|
|
return subscription;
|
|
}
|
|
|
|
/// Unsubscribe from a subscription
|
|
Future<void> unsubscribe(String subscriptionId) async {
|
|
_activeSubscriptions.remove(subscriptionId);
|
|
|
|
_send({
|
|
'type': 'unsubscribe',
|
|
'subscriptionId': subscriptionId,
|
|
});
|
|
}
|
|
|
|
/// Disconnect from the server
|
|
Future<void> disconnect() async {
|
|
_isConnected = false;
|
|
await _channel?.sink.close();
|
|
_channel = null;
|
|
}
|
|
|
|
void _send(Map<String, dynamic> message) {
|
|
_channel?.sink.add(jsonEncode(message));
|
|
}
|
|
|
|
void _handleMessage(dynamic message) {
|
|
final data = jsonDecode(message as String) as Map<String, dynamic>;
|
|
|
|
switch (data['type']) {
|
|
case 'event':
|
|
_handleEventMessage(data);
|
|
break;
|
|
|
|
case 'subscription_completed':
|
|
_handleSubscriptionCompleted(data);
|
|
break;
|
|
|
|
case 'error':
|
|
_handleErrorMessage(data);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void _handleEventMessage(Map<String, dynamic> data) {
|
|
final event = _deserializeEvent(data);
|
|
_eventController.add(event);
|
|
|
|
// Notify specific subscription handler if registered
|
|
final subscription = _activeSubscriptions[data['subscriptionId']];
|
|
subscription?.onEvent?.call(event);
|
|
}
|
|
|
|
void _handleSubscriptionCompleted(Map<String, dynamic> data) {
|
|
final subscriptionId = data['subscriptionId'] as String;
|
|
final subscription = _activeSubscriptions.remove(subscriptionId);
|
|
subscription?.onCompleted?.call(data['reason'] as String);
|
|
}
|
|
|
|
void _handleDisconnect() {
|
|
_isConnected = false;
|
|
// Implement reconnection logic here
|
|
}
|
|
|
|
void _handleError(Object error) {
|
|
// Handle connection errors
|
|
}
|
|
|
|
void _handleErrorMessage(Map<String, dynamic> data) {
|
|
// Handle protocol-level errors
|
|
}
|
|
|
|
CorrelatedEvent _deserializeEvent(Map<String, dynamic> data) {
|
|
// Deserialize based on eventType
|
|
// Implementation depends on your serialization strategy
|
|
throw UnimplementedError();
|
|
}
|
|
}
|
|
|
|
/// Represents an active subscription
|
|
class EventSubscription {
|
|
final String id;
|
|
final String correlationId;
|
|
final EventSubscriptionConfig config;
|
|
|
|
/// Callback when an event is received
|
|
void Function(CorrelatedEvent)? onEvent;
|
|
|
|
/// Callback when subscription is completed (terminal event or timeout)
|
|
void Function(String reason)? onCompleted;
|
|
|
|
EventSubscription({
|
|
required this.id,
|
|
required this.correlationId,
|
|
required this.config,
|
|
});
|
|
}
|
|
```
|
|
|
|
### Event Base Classes
|
|
|
|
```dart
|
|
/// Base class for all events that can be correlated to a command
|
|
abstract class CorrelatedEvent {
|
|
final String correlationId;
|
|
final String eventId;
|
|
final DateTime occurredAt;
|
|
|
|
CorrelatedEvent({
|
|
required this.correlationId,
|
|
required this.eventId,
|
|
required this.occurredAt,
|
|
});
|
|
}
|
|
|
|
/// Example: User invitation events
|
|
abstract class UserInvitationEvent extends CorrelatedEvent {
|
|
UserInvitationEvent({
|
|
required super.correlationId,
|
|
required super.eventId,
|
|
required super.occurredAt,
|
|
});
|
|
}
|
|
|
|
class UserInvitationSentEvent extends UserInvitationEvent {
|
|
final String email;
|
|
|
|
UserInvitationSentEvent({
|
|
required this.email,
|
|
required super.correlationId,
|
|
required super.eventId,
|
|
required super.occurredAt,
|
|
});
|
|
}
|
|
|
|
class UserInvitationAcceptedEvent extends UserInvitationEvent {
|
|
final String userId;
|
|
final String email;
|
|
|
|
UserInvitationAcceptedEvent({
|
|
required this.userId,
|
|
required this.email,
|
|
required super.correlationId,
|
|
required super.eventId,
|
|
required super.occurredAt,
|
|
});
|
|
}
|
|
|
|
class UserInvitationDeclinedEvent extends UserInvitationEvent {
|
|
final String email;
|
|
final String? reason;
|
|
|
|
UserInvitationDeclinedEvent({
|
|
required this.email,
|
|
this.reason,
|
|
required super.correlationId,
|
|
required super.eventId,
|
|
required super.occurredAt,
|
|
});
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Usage Examples
|
|
|
|
### Example 1: Admin Dashboard (wants all events)
|
|
|
|
```dart
|
|
class AdminDashboardPage extends StatefulWidget {
|
|
@override
|
|
State<AdminDashboardPage> createState() => _AdminDashboardPageState();
|
|
}
|
|
|
|
class _AdminDashboardPageState extends State<AdminDashboardPage> {
|
|
late final UsersDataSource _usersDataSource;
|
|
|
|
@override
|
|
void initState() {
|
|
super.initState();
|
|
_usersDataSource = UsersDataSource(UsersQuery());
|
|
|
|
// Open event connection - triggers catch-up for pending events
|
|
_usersDataSource.openEventConnection();
|
|
|
|
// Listen to all invitation events for toast notifications
|
|
_usersDataSource.eventConnection?.on<UserInvitationEvent>().listen(_showEventToast);
|
|
}
|
|
|
|
void _showEventToast(UserInvitationEvent event) {
|
|
final message = switch (event) {
|
|
UserInvitationSentEvent e => 'Invitation sent to ${e.email}',
|
|
UserInvitationAcceptedEvent e => '${e.email} has joined!',
|
|
UserInvitationDeclinedEvent e => '${e.email} declined the invitation',
|
|
_ => 'Unknown event',
|
|
};
|
|
|
|
ScaffoldMessenger.of(context).showSnackBar(
|
|
SnackBar(content: Text(message)),
|
|
);
|
|
|
|
// Refresh list if someone joined
|
|
if (event is UserInvitationAcceptedEvent) {
|
|
_usersDataSource.refresh();
|
|
}
|
|
}
|
|
|
|
Future<void> _inviteUser(String email) async {
|
|
await _usersDataSource.inviteUser(
|
|
email,
|
|
// Subscribe to ALL invitation events
|
|
subscribeToEvents: {
|
|
UserInvitationSentEvent,
|
|
UserInvitationAcceptedEvent,
|
|
UserInvitationDeclinedEvent,
|
|
},
|
|
);
|
|
}
|
|
|
|
@override
|
|
void dispose() {
|
|
_usersDataSource.dispose();
|
|
super.dispose();
|
|
}
|
|
|
|
@override
|
|
Widget build(BuildContext context) {
|
|
return StreamBuilder<List<User>>(
|
|
stream: _usersDataSource.dataStream,
|
|
builder: (context, snapshot) {
|
|
// Build UI...
|
|
},
|
|
);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Example 2: Simple User List (no events needed)
|
|
|
|
```dart
|
|
class UserListPage extends StatefulWidget {
|
|
@override
|
|
State<UserListPage> createState() => _UserListPageState();
|
|
}
|
|
|
|
class _UserListPageState extends State<UserListPage> {
|
|
late final UsersDataSource _usersDataSource;
|
|
|
|
@override
|
|
void initState() {
|
|
super.initState();
|
|
_usersDataSource = UsersDataSource(UsersQuery());
|
|
// NOT opening event connection - no real-time updates needed
|
|
}
|
|
|
|
@override
|
|
Widget build(BuildContext context) {
|
|
return StreamBuilder<List<User>>(
|
|
stream: _usersDataSource.dataStream,
|
|
builder: (context, snapshot) {
|
|
if (!snapshot.hasData) {
|
|
return const CircularProgressIndicator();
|
|
}
|
|
|
|
return ListView.builder(
|
|
itemCount: snapshot.data!.length,
|
|
itemBuilder: (context, index) {
|
|
final user = snapshot.data![index];
|
|
return ListTile(title: Text(user.name));
|
|
},
|
|
);
|
|
},
|
|
);
|
|
}
|
|
|
|
@override
|
|
void dispose() {
|
|
_usersDataSource.dispose();
|
|
super.dispose();
|
|
}
|
|
}
|
|
```
|
|
|
|
### Example 3: Invite Dialog (only wants sent confirmation)
|
|
|
|
```dart
|
|
class InviteUserDialog extends StatefulWidget {
|
|
final UsersDataSource dataSource;
|
|
|
|
const InviteUserDialog({required this.dataSource});
|
|
|
|
@override
|
|
State<InviteUserDialog> createState() => _InviteUserDialogState();
|
|
}
|
|
|
|
class _InviteUserDialogState extends State<InviteUserDialog> {
|
|
final _emailController = TextEditingController();
|
|
bool _isSending = false;
|
|
|
|
Future<void> _sendInvite() async {
|
|
setState(() => _isSending = true);
|
|
|
|
try {
|
|
await widget.dataSource.inviteUser(
|
|
_emailController.text,
|
|
// Only care about the "sent" confirmation
|
|
subscribeToEvents: {UserInvitationSentEvent},
|
|
onEvent: (event) {
|
|
if (event is UserInvitationSentEvent) {
|
|
Navigator.of(context).pop();
|
|
ScaffoldMessenger.of(context).showSnackBar(
|
|
const SnackBar(content: Text('Invitation sent!')),
|
|
);
|
|
}
|
|
},
|
|
);
|
|
} finally {
|
|
setState(() => _isSending = false);
|
|
}
|
|
}
|
|
|
|
@override
|
|
Widget build(BuildContext context) {
|
|
return AlertDialog(
|
|
title: const Text('Invite User'),
|
|
content: TextField(
|
|
controller: _emailController,
|
|
decoration: const InputDecoration(labelText: 'Email'),
|
|
),
|
|
actions: [
|
|
TextButton(
|
|
onPressed: () => Navigator.of(context).pop(),
|
|
child: const Text('Cancel'),
|
|
),
|
|
ElevatedButton(
|
|
onPressed: _isSending ? null : _sendInvite,
|
|
child: _isSending
|
|
? const CircularProgressIndicator()
|
|
: const Text('Send'),
|
|
),
|
|
],
|
|
);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Example 4: Concrete UsersDataSource Implementation
|
|
|
|
```dart
|
|
class UsersDataSource extends DataSource<UsersQuery, List<User>> {
|
|
UsersDataSource(UsersQuery query) : super(query);
|
|
|
|
/// Invite a user with optional event subscription
|
|
Future<InviteResult> inviteUser(
|
|
String email, {
|
|
Set<Type>? subscribeToEvents,
|
|
void Function(UserInvitationEvent)? onEvent,
|
|
}) async {
|
|
// Build subscription config if events requested
|
|
EventSubscriptionConfig? config;
|
|
if (subscribeToEvents != null && subscribeToEvents.isNotEmpty) {
|
|
config = EventSubscriptionConfig(
|
|
eventTypes: subscribeToEvents,
|
|
persistent: true,
|
|
);
|
|
}
|
|
|
|
// Execute the command
|
|
final result = await execute<InviteUserCommand, InviteResult>(
|
|
InviteUserCommand(email: email),
|
|
subscribeToEvents: config,
|
|
);
|
|
|
|
// Wire up event handler if provided
|
|
if (config != null && onEvent != null) {
|
|
eventConnection?.events
|
|
.where((e) => e.correlationId == result.correlationId)
|
|
.whereType<UserInvitationEvent>()
|
|
.listen(onEvent);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/// Remove a user
|
|
Future<void> removeUser(String userId) async {
|
|
await execute<RemoveUserCommand, void>(
|
|
RemoveUserCommand(userId: userId),
|
|
// No events for this command
|
|
);
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Backend Architecture (.NET)
|
|
|
|
### Protocol Messages
|
|
|
|
#### Client to Server
|
|
|
|
**Subscribe**
|
|
```json
|
|
{
|
|
"type": "subscribe",
|
|
"subscriptionId": "sub-123-abc",
|
|
"correlationId": "invite-456-def",
|
|
"eventTypes": ["UserInvitationSentEvent", "UserInvitationAcceptedEvent"],
|
|
"persistent": true,
|
|
"timeout": null,
|
|
"deliveryMode": "immediate"
|
|
}
|
|
```
|
|
|
|
**Unsubscribe**
|
|
```json
|
|
{
|
|
"type": "unsubscribe",
|
|
"subscriptionId": "sub-123-abc"
|
|
}
|
|
```
|
|
|
|
**Catch-up Request**
|
|
```json
|
|
{
|
|
"type": "catch_up",
|
|
"subscriptionIds": ["sub-123-abc", "sub-789-xyz"]
|
|
}
|
|
```
|
|
|
|
**Authentication**
|
|
```json
|
|
{
|
|
"type": "auth",
|
|
"token": "jwt-token-here",
|
|
"dataSourceId": "users-datasource"
|
|
}
|
|
```
|
|
|
|
#### Server to Client
|
|
|
|
**Event Delivery**
|
|
```json
|
|
{
|
|
"type": "event",
|
|
"subscriptionId": "sub-123-abc",
|
|
"correlationId": "invite-456-def",
|
|
"eventType": "UserInvitationAcceptedEvent",
|
|
"eventId": "evt-001",
|
|
"sequence": 42,
|
|
"payload": {
|
|
"userId": "user-789",
|
|
"email": "john@example.com"
|
|
},
|
|
"occurredAt": "2025-01-15T10:30:00Z"
|
|
}
|
|
```
|
|
|
|
**Subscription Completed**
|
|
```json
|
|
{
|
|
"type": "subscription_completed",
|
|
"subscriptionId": "sub-123-abc",
|
|
"reason": "terminal_event",
|
|
"terminalEvent": "UserInvitationAcceptedEvent"
|
|
}
|
|
```
|
|
|
|
**Error**
|
|
```json
|
|
{
|
|
"type": "error",
|
|
"code": "subscription_not_found",
|
|
"message": "Subscription sub-123-abc not found",
|
|
"subscriptionId": "sub-123-abc"
|
|
}
|
|
```
|
|
|
|
### Event Filtering and Delivery
|
|
|
|
```csharp
|
|
public class EventDeliveryService
|
|
{
|
|
private readonly ISubscriptionStore _subscriptionStore;
|
|
private readonly IConnectionTracker _connectionTracker;
|
|
private readonly IClientNotifier _notifier;
|
|
private readonly IEventStore _eventStore;
|
|
|
|
public async Task DeliverEventAsync(ICorrelatedEvent @event)
|
|
{
|
|
// Store the event with sequence number
|
|
var sequence = await _eventStore.AppendAsync(@event);
|
|
|
|
// Find all subscriptions interested in this correlation
|
|
var subscriptions = await _subscriptionStore
|
|
.FindByCorrelationIdAsync(@event.CorrelationId);
|
|
|
|
foreach (var subscription in subscriptions)
|
|
{
|
|
await DeliverToSubscriptionAsync(subscription, @event, sequence);
|
|
}
|
|
}
|
|
|
|
private async Task DeliverToSubscriptionAsync(
|
|
PersistentSubscription subscription,
|
|
ICorrelatedEvent @event,
|
|
long sequence)
|
|
{
|
|
// FILTER: Only deliver if subscriber requested this event type
|
|
var eventTypeName = @event.GetType().Name;
|
|
if (!subscription.EventTypes.Contains(eventTypeName))
|
|
{
|
|
// Subscriber didn't ask for this type - skip
|
|
return;
|
|
}
|
|
|
|
// Check delivery mode
|
|
if (subscription.DeliveryMode == DeliveryMode.OnReconnect)
|
|
{
|
|
// Don't deliver now, will be caught up on reconnect
|
|
return;
|
|
}
|
|
|
|
// Check if subscriber is online
|
|
var connection = _connectionTracker.GetConnection(subscription.SubscriberId);
|
|
|
|
if (connection != null)
|
|
{
|
|
// Deliver immediately
|
|
await _notifier.SendAsync(connection.Id, new EventMessage
|
|
{
|
|
Type = "event",
|
|
SubscriptionId = subscription.Id.ToString(),
|
|
CorrelationId = @event.CorrelationId,
|
|
EventType = eventTypeName,
|
|
EventId = @event.EventId.ToString(),
|
|
Sequence = sequence,
|
|
Payload = @event,
|
|
OccurredAt = @event.OccurredAt,
|
|
});
|
|
|
|
subscription.MarkDelivered(sequence);
|
|
}
|
|
// If offline, event stays in store for catch-up
|
|
|
|
// Check if this is a terminal event
|
|
if (subscription.TerminalEventTypes.Contains(eventTypeName))
|
|
{
|
|
subscription.Complete();
|
|
|
|
if (connection != null)
|
|
{
|
|
await _notifier.SendAsync(connection.Id, new SubscriptionCompletedMessage
|
|
{
|
|
Type = "subscription_completed",
|
|
SubscriptionId = subscription.Id.ToString(),
|
|
Reason = "terminal_event",
|
|
TerminalEvent = eventTypeName,
|
|
});
|
|
}
|
|
}
|
|
|
|
await _subscriptionStore.UpdateAsync(subscription);
|
|
}
|
|
|
|
public async Task HandleCatchUpRequestAsync(
|
|
string connectionId,
|
|
string userId,
|
|
IEnumerable<string> subscriptionIds)
|
|
{
|
|
// Get all active subscriptions for this user
|
|
var subscriptions = await _subscriptionStore.GetByUserIdAsync(userId);
|
|
|
|
foreach (var subscription in subscriptions)
|
|
{
|
|
if (subscription.Status != SubscriptionStatus.Active)
|
|
continue;
|
|
|
|
// Get events after the last delivered sequence
|
|
var missedEvents = await _eventStore.GetEventsAsync(
|
|
correlationId: subscription.CorrelationId,
|
|
afterSequence: subscription.LastDeliveredSequence,
|
|
eventTypes: subscription.EventTypes
|
|
);
|
|
|
|
foreach (var evt in missedEvents.OrderBy(e => e.Sequence))
|
|
{
|
|
await _notifier.SendAsync(connectionId, new EventMessage
|
|
{
|
|
Type = "event",
|
|
SubscriptionId = subscription.Id.ToString(),
|
|
CorrelationId = evt.CorrelationId,
|
|
EventType = evt.GetType().Name,
|
|
EventId = evt.EventId.ToString(),
|
|
Sequence = evt.Sequence,
|
|
Payload = evt,
|
|
OccurredAt = evt.OccurredAt,
|
|
});
|
|
|
|
subscription.MarkDelivered(evt.Sequence);
|
|
|
|
// Check for terminal event
|
|
if (subscription.TerminalEventTypes.Contains(evt.GetType().Name))
|
|
{
|
|
subscription.Complete();
|
|
|
|
await _notifier.SendAsync(connectionId, new SubscriptionCompletedMessage
|
|
{
|
|
Type = "subscription_completed",
|
|
SubscriptionId = subscription.Id.ToString(),
|
|
Reason = "terminal_event",
|
|
TerminalEvent = evt.GetType().Name,
|
|
});
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
await _subscriptionStore.UpdateAsync(subscription);
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Persistent Subscription Storage
|
|
|
|
```csharp
|
|
public class PersistentSubscription
|
|
{
|
|
public Guid Id { get; init; }
|
|
public Guid SubscriberId { get; init; }
|
|
public string CorrelationId { get; init; } = string.Empty;
|
|
|
|
/// <summary>
|
|
/// Event types the subscriber wants to receive
|
|
/// </summary>
|
|
public HashSet<string> EventTypes { get; init; } = new();
|
|
|
|
/// <summary>
|
|
/// Events that complete/close the subscription
|
|
/// </summary>
|
|
public HashSet<string> TerminalEventTypes { get; init; } = new();
|
|
|
|
public DeliveryMode DeliveryMode { get; init; } = DeliveryMode.Immediate;
|
|
|
|
public DateTimeOffset CreatedAt { get; init; }
|
|
public DateTimeOffset? ExpiresAt { get; init; }
|
|
public DateTimeOffset? CompletedAt { get; private set; }
|
|
|
|
/// <summary>
|
|
/// Last successfully delivered event sequence (for catch-up)
|
|
/// </summary>
|
|
public long LastDeliveredSequence { get; private set; }
|
|
|
|
public SubscriptionStatus Status { get; private set; } = SubscriptionStatus.Active;
|
|
|
|
public void MarkDelivered(long sequence)
|
|
{
|
|
if (sequence > LastDeliveredSequence)
|
|
{
|
|
LastDeliveredSequence = sequence;
|
|
}
|
|
}
|
|
|
|
public void Complete()
|
|
{
|
|
Status = SubscriptionStatus.Completed;
|
|
CompletedAt = DateTimeOffset.UtcNow;
|
|
}
|
|
|
|
public void Cancel()
|
|
{
|
|
Status = SubscriptionStatus.Cancelled;
|
|
CompletedAt = DateTimeOffset.UtcNow;
|
|
}
|
|
|
|
public bool IsExpired => ExpiresAt.HasValue && DateTimeOffset.UtcNow > ExpiresAt.Value;
|
|
}
|
|
|
|
public enum SubscriptionStatus
|
|
{
|
|
Active,
|
|
Completed, // Terminal event received
|
|
Expired, // TTL reached
|
|
Cancelled // User cancelled
|
|
}
|
|
|
|
public enum DeliveryMode
|
|
{
|
|
Immediate,
|
|
Batched,
|
|
OnReconnect
|
|
}
|
|
|
|
public interface ISubscriptionStore
|
|
{
|
|
Task<PersistentSubscription> CreateAsync(PersistentSubscription subscription);
|
|
Task<PersistentSubscription?> GetByIdAsync(Guid id);
|
|
Task<IReadOnlyList<PersistentSubscription>> GetByUserIdAsync(Guid userId);
|
|
Task<IReadOnlyList<PersistentSubscription>> FindByCorrelationIdAsync(string correlationId);
|
|
Task UpdateAsync(PersistentSubscription subscription);
|
|
Task DeleteAsync(Guid id);
|
|
}
|
|
```
|
|
|
|
### Command Handler Integration
|
|
|
|
```csharp
|
|
/// <summary>
|
|
/// Pipeline behavior that creates subscriptions for commands that request them
|
|
/// </summary>
|
|
public class CommandSubscriptionBehavior<TCommand, TResult>
|
|
: IPipelineBehavior<TCommand, TResult>
|
|
where TCommand : ICommand<TResult>
|
|
{
|
|
private readonly ISubscriptionStore _subscriptions;
|
|
private readonly ICurrentUserService _currentUser;
|
|
|
|
public async Task<TResult> Handle(
|
|
TCommand command,
|
|
RequestHandlerDelegate<TResult> next,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
// Execute the command first
|
|
var result = await next();
|
|
|
|
// Check if this command has subscription metadata
|
|
if (command is ICommandWithSubscription<TResult> subCommand
|
|
&& subCommand.SubscriptionConfig != null
|
|
&& result is ICorrelatedResult correlatedResult)
|
|
{
|
|
var config = subCommand.SubscriptionConfig;
|
|
|
|
await _subscriptions.CreateAsync(new PersistentSubscription
|
|
{
|
|
Id = Guid.NewGuid(),
|
|
SubscriberId = _currentUser.UserId,
|
|
CorrelationId = correlatedResult.CorrelationId,
|
|
EventTypes = config.EventTypes.ToHashSet(),
|
|
TerminalEventTypes = config.TerminalEventTypes.ToHashSet(),
|
|
DeliveryMode = config.DeliveryMode,
|
|
CreatedAt = DateTimeOffset.UtcNow,
|
|
ExpiresAt = config.Timeout.HasValue
|
|
? DateTimeOffset.UtcNow + config.Timeout.Value
|
|
: null,
|
|
});
|
|
}
|
|
|
|
return result;
|
|
}
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## Flow Diagrams
|
|
|
|
### Command Execution with Subscription
|
|
|
|
```
|
|
┌────────┐ ┌─────────────┐ ┌──────────────┐ ┌────────────┐
|
|
│ Client │ │ DataSource │ │ Backend │ │ Subscription│
|
|
│ │ │ │ │ │ │ Store │
|
|
└───┬────┘ └──────┬──────┘ └──────┬───────┘ └─────┬──────┘
|
|
│ │ │ │
|
|
│ inviteUser() │ │ │
|
|
│ + eventTypes │ │ │
|
|
│────────────────>│ │ │
|
|
│ │ │ │
|
|
│ │ POST /command │ │
|
|
│ │ {InviteUser} │ │
|
|
│ │──────────────────>│ │
|
|
│ │ │ │
|
|
│ │ │ Create │
|
|
│ │ │ Subscription │
|
|
│ │ │──────────────────>│
|
|
│ │ │ │
|
|
│ │ {correlationId} │ │
|
|
│ │<──────────────────│ │
|
|
│ │ │ │
|
|
│ │ WS: subscribe │ │
|
|
│ │ {correlationId, │ │
|
|
│ │ eventTypes} │ │
|
|
│ │──────────────────>│ │
|
|
│ │ │ │
|
|
│ InviteResult │ │ │
|
|
│<────────────────│ │ │
|
|
│ │ │ │
|
|
```
|
|
|
|
### Event Delivery (Online)
|
|
|
|
```
|
|
┌──────────┐ ┌──────────────┐ ┌────────────┐ ┌────────┐
|
|
│ Domain │ │ Event │ │ Subscription│ │ Client │
|
|
│ Event │ │ Dispatcher │ │ Store │ │ │
|
|
└────┬─────┘ └──────┬───────┘ └─────┬──────┘ └───┬────┘
|
|
│ │ │ │
|
|
│ UserInvitation │ │ │
|
|
│ AcceptedEvent │ │ │
|
|
│─────────────────>│ │ │
|
|
│ │ │ │
|
|
│ │ Find subscriptions│ │
|
|
│ │ by correlationId │ │
|
|
│ │──────────────────>│ │
|
|
│ │ │ │
|
|
│ │ [sub-123] │ │
|
|
│ │<──────────────────│ │
|
|
│ │ │ │
|
|
│ │ Check: eventType │ │
|
|
│ │ in sub.eventTypes?│ │
|
|
│ │ │ │
|
|
│ │ WS: event │ │
|
|
│ │──────────────────────────────────>│
|
|
│ │ │ │
|
|
│ │ (terminal event) │ │
|
|
│ │ WS: subscription │ │
|
|
│ │ _completed │ │
|
|
│ │──────────────────────────────────>│
|
|
│ │ │ │
|
|
```
|
|
|
|
### Catch-up on Reconnect
|
|
|
|
```
|
|
┌────────┐ ┌──────────────┐ ┌────────────┐ ┌───────────┐
|
|
│ Client │ │ Backend │ │ Subscription│ │ Event │
|
|
│ │ │ │ │ Store │ │ Store │
|
|
└───┬────┘ └──────┬───────┘ └─────┬──────┘ └─────┬─────┘
|
|
│ │ │ │
|
|
│ WS: connect │ │ │
|
|
│ + auth │ │ │
|
|
│────────────────>│ │ │
|
|
│ │ │ │
|
|
│ WS: catch_up │ │ │
|
|
│────────────────>│ │ │
|
|
│ │ │ │
|
|
│ │ Get active subs │ │
|
|
│ │ for userId │ │
|
|
│ │──────────────────>│ │
|
|
│ │ │ │
|
|
│ │ [sub-123, │ │
|
|
│ │ sub-456] │ │
|
|
│ │<──────────────────│ │
|
|
│ │ │ │
|
|
│ │ For each sub: │ │
|
|
│ │ Get events after │ │
|
|
│ │ lastDeliveredSeq │ │
|
|
│ │─────────────────────────────────────>│
|
|
│ │ │ │
|
|
│ │ [evt-1, evt-2] │ │
|
|
│ │<─────────────────────────────────────│
|
|
│ │ │ │
|
|
│ WS: event │ │ │
|
|
│ (evt-1) │ │ │
|
|
│<────────────────│ │ │
|
|
│ │ │ │
|
|
│ WS: event │ │ │
|
|
│ (evt-2) │ │ │
|
|
│<────────────────│ │ │
|
|
│ │ │ │
|
|
```
|
|
|
|
---
|
|
|
|
## Considerations
|
|
|
|
### Cleanup and Maintenance
|
|
|
|
- Run a background job to expire old subscriptions
|
|
- Archive or delete events older than a retention period
|
|
- Clean up completed/cancelled subscriptions after a grace period
|
|
|
|
### Scaling
|
|
|
|
- For multiple server instances, use a distributed connection tracker (Redis)
|
|
- Consider using SignalR with Azure SignalR Service or Redis backplane
|
|
- Event store should support efficient queries by correlationId + sequence
|
|
|
|
### Ordering Guarantees
|
|
|
|
- Events within a correlation are ordered by sequence number
|
|
- Catch-up delivery must respect sequence ordering
|
|
- Consider idempotency keys for duplicate detection on the client
|
|
|
|
### Client-Side Deduplication
|
|
|
|
```dart
|
|
class EventConnection {
|
|
final Set<String> _processedEventIds = {};
|
|
|
|
void _handleEventMessage(Map<String, dynamic> data) {
|
|
final eventId = data['eventId'] as String;
|
|
|
|
// Skip if already processed (duplicate delivery)
|
|
if (_processedEventIds.contains(eventId)) {
|
|
return;
|
|
}
|
|
|
|
_processedEventIds.add(eventId);
|
|
|
|
// Process the event...
|
|
}
|
|
}
|
|
```
|
|
|
|
### Error Handling
|
|
|
|
- Handle WebSocket disconnections with exponential backoff reconnection
|
|
- Queue outgoing messages during disconnection
|
|
- Validate subscription requests on the backend (authorization, rate limiting)
|
|
|
|
### Security
|
|
|
|
- Validate that users can only subscribe to correlations they own
|
|
- Authenticate WebSocket connections with JWT tokens
|
|
- Rate limit subscription creation to prevent abuse
|