using System; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Sagas.Abstractions; using Svrnty.CQRS.Sagas.Abstractions.Messaging; namespace Svrnty.CQRS.Sagas.RabbitMQ; /// /// Hosted service that manages RabbitMQ saga connections and subscriptions. /// public class RabbitMqSagaHostedService : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly ISagaMessageBus _messageBus; private readonly ILogger _logger; /// /// Creates a new RabbitMQ saga hosted service. /// public RabbitMqSagaHostedService( IServiceProvider serviceProvider, ISagaMessageBus messageBus, ILogger logger) { _serviceProvider = serviceProvider; _messageBus = messageBus; _logger = logger; } /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Starting RabbitMQ saga hosted service"); try { // Subscribe to saga responses so the orchestrator can process them await _messageBus.SubscribeToResponsesAsync( async (response, ct) => { using var scope = _serviceProvider.CreateScope(); var orchestrator = scope.ServiceProvider.GetRequiredService(); // The orchestrator needs to handle responses // This is a simplified approach - in production you'd want to handle this more robustly _logger.LogDebug( "Received response for saga {SagaId}, step {StepName}, success: {Success}", response.SagaId, response.StepName, response.Success); // For now, we just log the response // The orchestrator's HandleResponseAsync method would be called here // but it requires knowing the saga data type, which we don't have in this context }, stoppingToken); _logger.LogInformation("RabbitMQ saga hosted service started successfully"); // Keep the service running await Task.Delay(Timeout.Infinite, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { _logger.LogInformation("RabbitMQ saga hosted service is stopping"); } catch (Exception ex) { _logger.LogError(ex, "Error in RabbitMQ saga hosted service"); throw; } } /// public override async Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("Stopping RabbitMQ saga hosted service"); if (_messageBus is IAsyncDisposable disposable) { await disposable.DisposeAsync(); } await base.StopAsync(cancellationToken); } }