using System; using Svrnty.CQRS.Events.Abstractions.EventStore; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions; using Svrnty.CQRS.Events.Abstractions.Projections; namespace Svrnty.CQRS.Events.Projections; /// /// Manages execution of event stream projections. /// public sealed class ProjectionEngine : IProjectionEngine { private readonly IProjectionRegistry _registry; private readonly IProjectionCheckpointStore _checkpointStore; private readonly IEventStreamStore _streamStore; private readonly IServiceProvider _serviceProvider; private readonly ILogger _logger; public ProjectionEngine( IProjectionRegistry registry, IProjectionCheckpointStore checkpointStore, IEventStreamStore streamStore, IServiceProvider serviceProvider, ILogger logger) { _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _checkpointStore = checkpointStore ?? throw new ArgumentNullException(nameof(checkpointStore)); _streamStore = streamStore ?? throw new ArgumentNullException(nameof(streamStore)); _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } /// public async Task RunAsync( string projectionName, string streamName, CancellationToken cancellationToken = default) { var definition = _registry.GetProjection(projectionName); if (definition == null) { throw new InvalidOperationException($"Projection '{projectionName}' is not registered"); } if (definition.StreamName != streamName) { throw new InvalidOperationException( $"Projection '{projectionName}' is registered for stream '{definition.StreamName}', not '{streamName}'"); } _logger.LogInformation( "Starting projection: {ProjectionName} on stream {StreamName}", projectionName, streamName); try { await RunProjectionLoopAsync(definition, cancellationToken); } catch (OperationCanceledException) { _logger.LogInformation( "Projection stopped: {ProjectionName} on stream {StreamName}", projectionName, streamName); throw; } catch (Exception ex) { _logger.LogError(ex, "Projection failed: {ProjectionName} on stream {StreamName}", projectionName, streamName); throw; } } /// public async Task RebuildAsync( string projectionName, string streamName, CancellationToken cancellationToken = default) { var definition = _registry.GetProjection(projectionName); if (definition == null) { throw new InvalidOperationException($"Projection '{projectionName}' is not registered"); } if (!definition.Options.AllowRebuild) { throw new InvalidOperationException( $"Projection '{projectionName}' does not allow rebuilding (AllowRebuild=false)"); } _logger.LogWarning( "Rebuilding projection: {ProjectionName} on stream {StreamName}", projectionName, streamName); // Reset the projection if it implements IResettableProjection using (var scope = _serviceProvider.CreateScope()) { var projection = scope.ServiceProvider.GetRequiredService(definition.ProjectionType); if (projection is IResettableProjection resettable) { _logger.LogInformation("Resetting projection read model: {ProjectionName}", projectionName); await resettable.ResetAsync(cancellationToken); } } // Reset checkpoint await _checkpointStore.ResetCheckpointAsync(projectionName, streamName, cancellationToken); _logger.LogInformation("Projection reset complete: {ProjectionName}", projectionName); // Replay all events await RunProjectionLoopAsync(definition, cancellationToken); } /// public async Task GetStatusAsync( string projectionName, string streamName, CancellationToken cancellationToken = default) { var checkpoint = await _checkpointStore.GetCheckpointAsync(projectionName, streamName, cancellationToken); var streamLength = await _streamStore.GetStreamLengthAsync(streamName, cancellationToken); return new ProjectionStatus { ProjectionName = projectionName, StreamName = streamName, IsRunning = false, // This is a simple implementation; full tracking would require more state State = checkpoint == null ? ProjectionState.NotStarted : ProjectionState.Running, LastProcessedOffset = checkpoint?.LastProcessedOffset ?? -1, StreamLength = streamLength, LastUpdated = checkpoint?.LastUpdated ?? DateTimeOffset.MinValue, EventsProcessed = checkpoint?.EventsProcessed ?? 0, LastError = checkpoint?.LastError, LastErrorAt = checkpoint?.LastErrorAt }; } private async Task RunProjectionLoopAsync( ProjectionDefinition definition, CancellationToken cancellationToken) { var checkpoint = await _checkpointStore.GetCheckpointAsync( definition.ProjectionName, definition.StreamName, cancellationToken); long currentOffset = checkpoint?.LastProcessedOffset + 1 ?? 0; _logger.LogInformation( "Projection starting from offset {Offset}: {ProjectionName}", currentOffset, definition.ProjectionName); while (!cancellationToken.IsCancellationRequested) { var events = await _streamStore.ReadStreamAsync( definition.StreamName, currentOffset, definition.Options.BatchSize, cancellationToken); if (events.Count == 0) { // Caught up, wait before polling again await Task.Delay(definition.Options.PollingInterval, cancellationToken); continue; } _logger.LogDebug( "Processing {Count} events from offset {Offset}: {ProjectionName}", events.Count, currentOffset, definition.ProjectionName); // Process batch foreach (var @event in events) { var success = await ProcessEventAsync(definition, @event, cancellationToken); if (!success) { // Failed after retries, update checkpoint with error and stop var errorCheckpoint = new ProjectionCheckpoint { ProjectionName = definition.ProjectionName, StreamName = definition.StreamName, LastProcessedOffset = currentOffset - 1, LastUpdated = DateTimeOffset.UtcNow, EventsProcessed = checkpoint?.EventsProcessed ?? 0, LastError = $"Failed to process event at offset {currentOffset}", LastErrorAt = DateTimeOffset.UtcNow }; await _checkpointStore.SaveCheckpointAsync(errorCheckpoint, cancellationToken); throw new InvalidOperationException( $"Projection '{definition.ProjectionName}' failed after max retries at offset {currentOffset}"); } currentOffset++; // Checkpoint per event if configured if (definition.Options.CheckpointPerEvent) { checkpoint = new ProjectionCheckpoint { ProjectionName = definition.ProjectionName, StreamName = definition.StreamName, LastProcessedOffset = currentOffset - 1, LastUpdated = DateTimeOffset.UtcNow, EventsProcessed = (checkpoint?.EventsProcessed ?? 0) + 1 }; await _checkpointStore.SaveCheckpointAsync(checkpoint, cancellationToken); } } // Checkpoint after batch if not checkpointing per event if (!definition.Options.CheckpointPerEvent) { checkpoint = new ProjectionCheckpoint { ProjectionName = definition.ProjectionName, StreamName = definition.StreamName, LastProcessedOffset = currentOffset - 1, LastUpdated = DateTimeOffset.UtcNow, EventsProcessed = (checkpoint?.EventsProcessed ?? 0) + events.Count }; await _checkpointStore.SaveCheckpointAsync(checkpoint, cancellationToken); } _logger.LogDebug( "Processed batch up to offset {Offset}: {ProjectionName}", currentOffset - 1, definition.ProjectionName); } } private async Task ProcessEventAsync( ProjectionDefinition definition, ICorrelatedEvent @event, CancellationToken cancellationToken) { using var scope = _serviceProvider.CreateScope(); var projection = scope.ServiceProvider.GetRequiredService(definition.ProjectionType); for (int attempt = 0; attempt <= definition.Options.MaxRetries; attempt++) { try { if (projection is IDynamicProjection dynamicProjection) { await dynamicProjection.HandleAsync(@event, cancellationToken); } else { // Use reflection to call HandleAsync with the correct event type var handleMethod = definition.ProjectionType.GetMethod( nameof(IProjection.HandleAsync)); if (handleMethod != null) { var task = (Task?)handleMethod.Invoke( projection, new object[] { @event, cancellationToken }); if (task != null) { await task; } } } return true; // Success } catch (Exception ex) { _logger.LogWarning(ex, "Projection failed (attempt {Attempt}/{MaxRetries}): {ProjectionName}, Event: {@Event}", attempt + 1, definition.Options.MaxRetries + 1, definition.ProjectionName, @event); if (attempt < definition.Options.MaxRetries) { var delaySeconds = definition.Options.BaseRetryDelay.TotalSeconds * Math.Pow(2, attempt); await Task.Delay(TimeSpan.FromSeconds(delaySeconds), cancellationToken); } else { _logger.LogError(ex, "Projection failed after {MaxRetries} retries: {ProjectionName}, Event: {@Event}", definition.Options.MaxRetries + 1, definition.ProjectionName, @event); return false; // Failed after all retries } } } return false; } }