# Phase 5: Schema Evolution & Versioning - COMPLETE ✅ **Completion Date:** 2025-12-10 **Build Status:** ✅ SUCCESS (0 errors, 19 expected AOT/trimming warnings) **Total Lines of Code:** ~1,650 lines across 12 new files --- ## Executive Summary Phase 5 successfully implements a comprehensive **event schema evolution and versioning system** with automatic upcasting capabilities. This enables events to evolve over time without breaking backward compatibility, supporting both .NET-to-.NET and cross-platform (JSON Schema) communication. ### Key Features Delivered ✅ **Schema Registry** - Centralized management of event versions ✅ **Automatic Upcasting** - Multi-hop event transformation (V1→V2→V3) ✅ **Convention-Based Upcasters** - Static `UpcastFrom()` method discovery ✅ **PostgreSQL Persistence** - Durable schema storage with integrity constraints ✅ **JSON Schema Generation** - Automatic Draft 7 schema generation for external consumers ✅ **Pipeline Integration** - Transparent upcasting in subscription delivery ✅ **Fluent Configuration API** - Clean, discoverable service registration ✅ **Sample Demonstration** - Complete working example with 3 event versions --- ## Architecture Overview ### Core Components ``` ┌─────────────────────────────────────────────────────────────────┐ │ Schema Evolution Layer │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ ISchema │◄─────┤ Schema │─────►│ ISchema │ │ │ │ Registry │ │ Info │ │ Store │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ │ │ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ Upcasting │ │ Event │ │ Postgres/ │ │ │ │ Pipeline │ │ Version │ │ InMemory │ │ │ │ │ │ Attribute │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ Event Subscription & Delivery Layer │ ├─────────────────────────────────────────────────────────────────┤ │ Events are automatically upcast before delivery to consumers │ │ based on subscription configuration (EnableUpcasting: true) │ └─────────────────────────────────────────────────────────────────┘ ``` --- ## Implementation Details ### Phase 5.1: Schema Registry Abstractions (✅ Complete) **Files Created:** - `Svrnty.CQRS.Events.Abstractions/SchemaInfo.cs` (~90 lines) - `Svrnty.CQRS.Events.Abstractions/ISchemaRegistry.cs` (~120 lines) - `Svrnty.CQRS.Events.Abstractions/ISchemaStore.cs` (~70 lines) **Key Types:** #### SchemaInfo Record ```csharp public sealed record SchemaInfo( string EventType, // Logical event name (e.g., "UserCreatedEvent") int Version, // Schema version (starts at 1) Type ClrType, // .NET type for deserialization string? JsonSchema, // Optional JSON Schema Draft 7 Type? UpcastFromType, // Previous version CLR type int? UpcastFromVersion, // Previous version number DateTimeOffset RegisteredAt // Registration timestamp ); ``` **Validation Rules:** - Version 1 must not have upcast information - Version > 1 must upcast from version - 1 - CLR types must implement `ICorrelatedEvent` - Version chain integrity is enforced #### ISchemaRegistry Interface ```csharp public interface ISchemaRegistry { Task RegisterSchemaAsync( int version, Type? upcastFromType = null, string? jsonSchema = null, CancellationToken cancellationToken = default) where TEvent : ICorrelatedEvent; Task UpcastAsync( ICorrelatedEvent @event, int? targetVersion = null, CancellationToken cancellationToken = default); Task NeedsUpcastingAsync( ICorrelatedEvent @event, int? targetVersion = null, CancellationToken cancellationToken = default); } ``` --- ### Phase 5.2: Event Versioning Attributes (✅ Complete) **Files Created:** - `Svrnty.CQRS.Events.Abstractions/EventVersionAttribute.cs` (~130 lines) - `Svrnty.CQRS.Events.Abstractions/IEventUpcaster.cs` (~40 lines) **Usage Pattern:** ```csharp // Version 1 (initial schema) [EventVersion(1)] public record UserCreatedEventV1 : CorrelatedEvent { public required string FullName { get; init; } } // Version 2 (evolved schema) [EventVersion(2, UpcastFrom = typeof(UserCreatedEventV1))] public record UserCreatedEventV2 : CorrelatedEvent { public required string FirstName { get; init; } public required string LastName { get; init; } public required string Email { get; init; } // Convention-based upcaster (automatically discovered) public static UserCreatedEventV2 UpcastFrom(UserCreatedEventV1 v1) { var parts = v1.FullName.Split(' ', 2); return new UserCreatedEventV2 { EventId = v1.EventId, CorrelationId = v1.CorrelationId, OccurredAt = v1.OccurredAt, FirstName = parts[0], LastName = parts.Length > 1 ? parts[1] : "", Email = "unknown@example.com" }; } } ``` **Features:** - Automatic event type name normalization (removes V1, V2 suffixes) - Convention-based upcaster discovery via reflection - Support for custom event type names - Interface-based upcasting for complex scenarios --- ### Phase 5.3: Schema Registry Implementation (✅ Complete) **Files Created:** - `Svrnty.CQRS.Events/SchemaRegistry.cs` (~320 lines) - `Svrnty.CQRS.Events/InMemorySchemaStore.cs` (~90 lines) - `Svrnty.CQRS.Events.PostgreSQL/PostgresSchemaStore.cs` (~220 lines) - `Svrnty.CQRS.Events.PostgreSQL/Migrations/003_CreateEventSchemasTable.sql` (~56 lines) **SchemaRegistry Features:** 1. **In-Memory Caching** ```csharp private readonly ConcurrentDictionary _schemaCache; private readonly ConcurrentDictionary _latestVersionCache; ``` 2. **Thread-Safe Registration** ```csharp private readonly SemaphoreSlim _registrationLock = new(1, 1); ``` 3. **Multi-Hop Upcasting** ```csharp // Automatically chains: V1 → V2 → V3 while (version < actualTargetVersion.Value) { var nextVersion = version + 1; var nextSchema = await GetSchemaAsync(eventTypeName, nextVersion, cancellationToken); current = await UpcastSingleHopAsync(current, nextSchema, cancellationToken); version = nextVersion; } ``` 4. **Convention-Based Discovery** - Searches for `public static TTo UpcastFrom(TFrom from)` methods - Uses reflection to invoke upcasters - Provides clear error messages when upcasters are missing **PostgreSQL Schema Table:** ```sql CREATE TABLE event_streaming.event_schemas ( event_type VARCHAR(500) NOT NULL, version INTEGER NOT NULL, clr_type_name TEXT NOT NULL, json_schema TEXT NULL, upcast_from_type TEXT NULL, upcast_from_version INTEGER NULL, registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), CONSTRAINT pk_event_schemas PRIMARY KEY (event_type, version), CONSTRAINT chk_version_positive CHECK (version > 0), CONSTRAINT chk_upcast_version_valid CHECK ( (version = 1 AND upcast_from_type IS NULL AND upcast_from_version IS NULL) OR (version > 1 AND upcast_from_type IS NOT NULL AND upcast_from_version IS NOT NULL AND upcast_from_version = version - 1) ) ); ``` **Indexes for Performance:** - `idx_event_schemas_latest_version` - Fast latest version lookup - `idx_event_schemas_clr_type` - Fast type-based lookup --- ### Phase 5.4: Upcasting Pipeline Integration (✅ Complete) **Files Modified:** - `Svrnty.CQRS.Events.Abstractions/ISubscription.cs` - Added upcasting properties - `Svrnty.CQRS.Events/Subscription.cs` - Implemented upcasting properties - `Svrnty.CQRS.Events/EventSubscriptionClient.cs` - Integrated upcasting **New Subscription Properties:** ```csharp public interface ISubscription { // ... existing properties ... /// /// Whether to automatically upcast events to newer versions. /// bool EnableUpcasting { get; } /// /// Target event version for upcasting (null = latest version). /// int? TargetEventVersion { get; } } ``` **Upcasting Pipeline:** ```csharp private async Task ApplyUpcastingAsync( ICorrelatedEvent @event, Subscription subscription, CancellationToken cancellationToken) { if (!subscription.EnableUpcasting) return @event; if (_schemaRegistry == null) { _logger?.LogWarning("Upcasting enabled but ISchemaRegistry not registered"); return @event; } try { var needsUpcasting = await _schemaRegistry.NeedsUpcastingAsync( @event, subscription.TargetEventVersion, cancellationToken); if (!needsUpcasting) return @event; return await _schemaRegistry.UpcastAsync( @event, subscription.TargetEventVersion, cancellationToken); } catch (Exception ex) { _logger?.LogError(ex, "Upcast failed, delivering original event"); return @event; // Graceful degradation } } ``` **Integration Points:** - `StreamBroadcastAsync` - Upcasts before delivery in broadcast mode - `StreamExclusiveAsync` - Upcasts before delivery in exclusive mode - Transparent to consumers - they always receive the correct version --- ### Phase 5.5: JSON Schema Generation (✅ Complete) **Files Created:** - `Svrnty.CQRS.Events.Abstractions/IJsonSchemaGenerator.cs` (~70 lines) - `Svrnty.CQRS.Events/SystemTextJsonSchemaGenerator.cs` (~240 lines) **IJsonSchemaGenerator Interface:** ```csharp public interface IJsonSchemaGenerator { Task GenerateSchemaAsync( Type type, CancellationToken cancellationToken = default); Task ValidateAsync( string jsonData, string jsonSchema, CancellationToken cancellationToken = default); Task> GetValidationErrorsAsync( string jsonData, string jsonSchema, CancellationToken cancellationToken = default); } ``` **SystemTextJsonSchemaGenerator Features:** 1. **Automatic Schema Generation** - Generates JSON Schema Draft 7 from CLR types - Supports primitive types, objects, arrays, nullable types - Handles nested complex types - Circular reference detection 2. **Property Mapping** - Respects `[JsonPropertyName]` attributes - Converts to camelCase by default - Detects required vs optional fields (nullable reference types) 3. **Type Mapping** ```csharp string → "string" int/long → "integer" double/decimal → "number" bool → "boolean" DateTime/DateTimeOffset → "string" (ISO 8601) Guid → "string" (UUID) arrays/lists → "array" objects → "object" ``` **Auto-Generation Integration:** ```csharp // In SchemaRegistry.RegisterSchemaAsync: if (string.IsNullOrWhiteSpace(jsonSchema) && _jsonSchemaGenerator != null) { try { finalJsonSchema = await _jsonSchemaGenerator.GenerateSchemaAsync( typeof(TEvent), cancellationToken); _logger.LogDebug("Auto-generated JSON schema for {EventType} v{Version}", eventType, version); } catch (Exception ex) { _logger.LogWarning(ex, "Failed to auto-generate JSON schema"); } } ``` --- ### Phase 5.6: Configuration & Fluent API (✅ Complete) **Files Modified:** - `Svrnty.CQRS.Events/ServiceCollectionExtensions.cs` - Added schema evolution methods - `Svrnty.CQRS.Events.PostgreSQL/ServiceCollectionExtensions.cs` - Added PostgreSQL schema store **Service Registration Methods:** #### AddSchemaEvolution() ```csharp builder.Services.AddSchemaEvolution(); ``` **Registers:** - `ISchemaRegistry` → `SchemaRegistry` - `ISchemaStore` → `InMemorySchemaStore` (default) #### AddJsonSchemaGeneration() ```csharp builder.Services.AddJsonSchemaGeneration(); ``` **Registers:** - `IJsonSchemaGenerator` → `SystemTextJsonSchemaGenerator` #### AddPostgresSchemaStore() ```csharp builder.Services.AddPostgresSchemaStore(); ``` **Replaces:** - `ISchemaStore` → `PostgresSchemaStore` **Complete Configuration Example:** ```csharp var builder = WebApplication.CreateBuilder(args); // Add schema evolution support builder.Services.AddSchemaEvolution(); builder.Services.AddJsonSchemaGeneration(); // Use PostgreSQL for persistence builder.Services.AddPostgresEventStreaming("Host=localhost;Database=mydb;..."); builder.Services.AddPostgresSchemaStore(); var app = builder.Build(); // Register schemas at startup var schemaRegistry = app.Services.GetRequiredService(); await schemaRegistry.RegisterSchemaAsync(1); await schemaRegistry.RegisterSchemaAsync(2, typeof(UserCreatedEventV1)); await schemaRegistry.RegisterSchemaAsync(3, typeof(UserCreatedEventV2)); app.Run(); ``` --- ### Phase 5.7: Sample Project Integration (✅ Complete) **Files Created:** - `Svrnty.Sample/VersionedUserEvents.cs` (~160 lines) **Files Modified:** - `Svrnty.Sample/Program.cs` - Added schema evolution configuration **Demonstration Features:** 1. **Three Event Versions** - `UserCreatedEventV1` - Initial schema (FullName) - `UserCreatedEventV2` - Split name + email - `UserCreatedEventV3` - Nullable email + phone number 2. **Convention-Based Upcasters** ```csharp public static UserCreatedEventV2 UpcastFrom(UserCreatedEventV1 v1) { var parts = v1.FullName.Split(' ', 2, StringSplitOptions.RemoveEmptyEntries); return new UserCreatedEventV2 { EventId = v1.EventId, CorrelationId = v1.CorrelationId, OccurredAt = v1.OccurredAt, UserId = v1.UserId, FirstName = parts.Length > 0 ? parts[0] : "Unknown", LastName = parts.Length > 1 ? parts[1] : "", Email = "unknown@example.com" }; } ``` 3. **Subscription Configuration** ```csharp streaming.AddSubscription("user-versioning-demo", sub => { sub.Mode = SubscriptionMode.Broadcast; sub.EnableUpcasting = true; sub.TargetEventVersion = null; // Latest version sub.Description = "Phase 5: Demonstrates automatic event upcasting"; }); ``` 4. **Schema Registration** ```csharp var schemaRegistry = app.Services.GetRequiredService(); await schemaRegistry.RegisterSchemaAsync(1); await schemaRegistry.RegisterSchemaAsync(2, typeof(UserCreatedEventV1)); await schemaRegistry.RegisterSchemaAsync(3, typeof(UserCreatedEventV2)); Console.WriteLine("✓ Registered 3 versions of UserCreatedEvent schema with automatic upcasting"); ``` **Startup Output:** ``` ✓ Registered 3 versions of UserCreatedEvent schema with automatic upcasting === Svrnty CQRS Sample with Event Streaming === gRPC (HTTP/2): http://localhost:6000 HTTP API (HTTP/1.1): http://localhost:6001 Event Streams Configured: - UserWorkflow stream (ephemeral, at-least-once, internal) - InvitationWorkflow stream (ephemeral, at-least-once, internal) Subscriptions Active: - user-analytics (broadcast mode, internal) - invitation-processor (exclusive mode, internal) - user-versioning-demo (broadcast mode, with auto-upcasting enabled) Schema Evolution (Phase 5): - UserCreatedEvent: 3 versions registered (V1 → V2 → V3) - Auto-upcasting: Enabled on user-versioning-demo subscription - JSON Schema: Auto-generated for external consumers ``` --- ## Code Metrics ### New Files Created: 12 **Abstractions (4 files, ~310 lines):** - SchemaInfo.cs - ISchemaRegistry.cs - ISchemaStore.cs - EventVersionAttribute.cs - IEventUpcaster.cs - IJsonSchemaGenerator.cs **Implementation (6 files, ~1,020 lines):** - SchemaRegistry.cs - InMemorySchemaStore.cs - PostgresSchemaStore.cs - SystemTextJsonSchemaGenerator.cs **Database (1 file, ~56 lines):** - 003_CreateEventSchemasTable.sql **Sample (1 file, ~160 lines):** - VersionedUserEvents.cs **Modified Files: 4** - ISubscription.cs (+28 lines) - Subscription.cs (+8 lines) - EventSubscriptionClient.cs (+75 lines) - Program.cs (+25 lines) ### Total Lines of Code Added: ~1,650 lines --- ## Testing & Validation ### Build Status ``` ✅ Build: SUCCESS ❌ Errors: 0 ⚠️ Warnings: 19 (expected AOT/trimming warnings) ``` ### Manual Testing Checklist ✅ Schema registration with version chain validation ✅ In-memory schema storage ✅ PostgreSQL schema storage with migrations ✅ Automatic JSON schema generation ✅ Convention-based upcaster discovery ✅ Multi-hop upcasting (V1→V2→V3) ✅ Subscription-level upcasting configuration ✅ Graceful degradation when upcasting fails ✅ Sample project startup with schema registration ✅ Thread-safe concurrent schema registration --- ## Usage Examples ### Basic Setup ```csharp // 1. Register services builder.Services.AddSchemaEvolution(); builder.Services.AddJsonSchemaGeneration(); builder.Services.AddPostgresEventStreaming("connection-string"); builder.Services.AddPostgresSchemaStore(); // 2. Define versioned events [EventVersion(1)] public record UserCreatedEventV1 : CorrelatedEvent { public required string FullName { get; init; } } [EventVersion(2, UpcastFrom = typeof(UserCreatedEventV1))] public record UserCreatedEventV2 : CorrelatedEvent { public required string FirstName { get; init; } public required string LastName { get; init; } public static UserCreatedEventV2 UpcastFrom(UserCreatedEventV1 v1) { var parts = v1.FullName.Split(' ', 2); return new UserCreatedEventV2 { EventId = v1.EventId, CorrelationId = v1.CorrelationId, OccurredAt = v1.OccurredAt, FirstName = parts[0], LastName = parts.Length > 1 ? parts[1] : "" }; } } // 3. Register schemas var app = builder.Build(); var schemaRegistry = app.Services.GetRequiredService(); await schemaRegistry.RegisterSchemaAsync(1); await schemaRegistry.RegisterSchemaAsync(2, typeof(UserCreatedEventV1)); // 4. Configure subscription with upcasting builder.Services.AddEventStreaming(streaming => { streaming.AddSubscription("user-processor", sub => { sub.EnableUpcasting = true; // Automatically upgrade to latest version }); }); ``` ### Manual Upcasting ```csharp var schemaRegistry = services.GetRequiredService(); // Upcast to latest version var v1Event = new UserCreatedEventV1 { FullName = "John Doe" }; var latestEvent = await schemaRegistry.UpcastAsync(v1Event); // Returns UserCreatedEventV2 with FirstName="John", LastName="Doe" // Upcast to specific version var v2Event = await schemaRegistry.UpcastAsync(v1Event, targetVersion: 2); // Check if upcasting is needed bool needsUpcast = await schemaRegistry.NeedsUpcastingAsync(v1Event); ``` --- ## Performance Considerations ### Caching Strategy - **Schema cache**: In-memory `ConcurrentDictionary` for instant lookups - **Latest version cache**: Separate cache for version number queries - **Cache key format**: `"{EventType}:v{Version}"` ### Thread Safety - **Registration lock**: `SemaphoreSlim` prevents concurrent registration conflicts - **Double-checked locking**: Minimizes lock contention - **Read-optimized**: Cached reads are lock-free ### Database Performance - **Indexed columns**: `event_type`, `version`, `clr_type_name` - **Composite primary key**: Fast schema lookups - **Check constraints**: Database-level validation --- ## Migration Guide ### From Non-Versioned Events 1. **Define V1 with existing schema:** ```csharp [EventVersion(1)] public record UserCreatedEvent : CorrelatedEvent { public required string Name { get; init; } } ``` 2. **Create V2 with changes:** ```csharp [EventVersion(2, UpcastFrom = typeof(UserCreatedEvent))] public record UserCreatedEventV2 : CorrelatedEvent { public required string FirstName { get; init; } public required string LastName { get; init; } public static UserCreatedEventV2 UpcastFrom(UserCreatedEvent v1) { // Transform logic } } ``` 3. **Register both versions:** ```csharp await schemaRegistry.RegisterSchemaAsync(1); await schemaRegistry.RegisterSchemaAsync(2, typeof(UserCreatedEvent)); ``` 4. **Enable upcasting on subscriptions:** ```csharp subscription.EnableUpcasting = true; ``` --- ## Known Limitations 1. **Type Resolution Requirements** - Upcast types must be available in the consuming assembly - Assembly-qualified names must resolve via `Type.GetType()` 2. **Upcaster Constraints** - Convention-based: Must be named `UpcastFrom` and be static - Return type must match target event type - Single parameter matching source event type 3. **JSON Schema Limitations** - Basic implementation (System.Text.Json reflection) - No XML doc comment extraction - No complex validation rules - Consider NJsonSchema for advanced features 4. **AOT Compatibility** - Reflection-based upcaster discovery not AOT-compatible - JSON schema generation uses reflection - Future: Source generators for AOT support --- ## Future Enhancements ### Short Term - [ ] Source generator for upcaster registration (AOT compatibility) - [ ] Upcaster unit testing helpers - [ ] Schema migration utilities (bulk upcasting) - [ ] Schema version compatibility matrix ### Medium Term - [ ] NJsonSchema integration for richer schemas - [ ] GraphQL schema generation - [ ] Schema diff/comparison tools - [ ] Breaking change detection ### Long Term - [ ] Distributed schema registry (multi-node) - [ ] Schema evolution UI/dashboard - [ ] Automated compatibility testing - [ ] Schema-based code generation for other languages --- ## Success Criteria All Phase 5 success criteria have been met: ✅ **Schema Registry Implemented** - In-memory and PostgreSQL storage - Thread-safe registration - Multi-hop upcasting support ✅ **Versioning Attributes** - `[EventVersion]` attribute with upcast relationships - Convention-based upcaster discovery - Automatic event type name normalization ✅ **JSON Schema Generation** - Automatic Draft 7 schema generation - Integration with schema registry - Support for external consumers ✅ **Pipeline Integration** - Subscription-level upcasting configuration - Transparent event transformation - Graceful error handling ✅ **Configuration API** - Fluent service registration - Clear, discoverable methods - PostgreSQL integration ✅ **Sample Demonstration** - Working 3-version example - Complete upcasting demonstration - Documented best practices ✅ **Documentation** - Comprehensive PHASE5-COMPLETE.md - Code comments and XML docs - Usage examples and migration guide --- ## Conclusion Phase 5 successfully delivers a production-ready event schema evolution system with automatic upcasting. The implementation provides: - **Backward Compatibility**: Old events work seamlessly with new consumers - **Type Safety**: Strong CLR typing with compile-time checks - **Performance**: In-memory caching with database durability - **Flexibility**: Convention-based and interface-based upcasting - **Interoperability**: JSON Schema support for non-.NET clients - **Transparency**: Automatic upcasting integrated into delivery pipeline The system is now ready for production use, with robust error handling, comprehensive logging, and clear migration paths for evolving event schemas over time. **Phase 5 Status: COMPLETE ✅** --- *Documentation generated: 2025-12-10* *Implementation: Svrnty.CQRS Event Streaming Framework* *Version: Phase 5 - Schema Evolution & Versioning*