using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions.Projections; namespace Svrnty.CQRS.Events.Projections; /// /// Background service that automatically starts projections with AutoStart=true. /// public sealed class ProjectionHostedService : BackgroundService { private readonly IProjectionRegistry _registry; private readonly IProjectionEngine _engine; private readonly ILogger _logger; private readonly List _runningProjections = new(); public ProjectionHostedService( IProjectionRegistry registry, IProjectionEngine engine, ILogger logger) { _registry = registry ?? throw new ArgumentNullException(nameof(registry)); _engine = engine ?? throw new ArgumentNullException(nameof(engine)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Projection hosted service starting..."); // Get all projections with AutoStart=true var autoStartProjections = _registry.GetAllProjections() .Where(p => p.Options.AutoStart) .ToList(); if (autoStartProjections.Count == 0) { _logger.LogInformation("No projections configured for auto-start"); return; } _logger.LogInformation( "Starting {Count} auto-start projections: {ProjectionNames}", autoStartProjections.Count, string.Join(", ", autoStartProjections.Select(p => p.ProjectionName))); // Start each projection in its own task foreach (var definition in autoStartProjections) { var projectionTask = Task.Run(async () => { try { await _engine.RunAsync( definition.ProjectionName, definition.StreamName, stoppingToken); } catch (OperationCanceledException) { // Expected when stopping } catch (Exception ex) { _logger.LogError(ex, "Projection failed: {ProjectionName} on stream {StreamName}", definition.ProjectionName, definition.StreamName); } }, stoppingToken); _runningProjections.Add(projectionTask); } // Wait for all projections to complete (or be cancelled) await Task.WhenAll(_runningProjections); _logger.LogInformation("Projection hosted service stopped"); } public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Stopping projection hosted service..."); await base.StopAsync(cancellationToken); } }