dotnet-cqrs/Svrnty.CQRS.Sagas.RabbitMQ/RabbitMqSagaHostedService.cs

89 lines
3.1 KiB
C#

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Svrnty.CQRS.Sagas.Abstractions;
using Svrnty.CQRS.Sagas.Abstractions.Messaging;
namespace Svrnty.CQRS.Sagas.RabbitMQ;
/// <summary>
/// Hosted service that manages RabbitMQ saga connections and subscriptions.
/// </summary>
public class RabbitMqSagaHostedService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ISagaMessageBus _messageBus;
private readonly ILogger<RabbitMqSagaHostedService> _logger;
/// <summary>
/// Creates a new RabbitMQ saga hosted service.
/// </summary>
public RabbitMqSagaHostedService(
IServiceProvider serviceProvider,
ISagaMessageBus messageBus,
ILogger<RabbitMqSagaHostedService> logger)
{
_serviceProvider = serviceProvider;
_messageBus = messageBus;
_logger = logger;
}
/// <inheritdoc />
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting RabbitMQ saga hosted service");
try
{
// Subscribe to saga responses so the orchestrator can process them
await _messageBus.SubscribeToResponsesAsync(
async (response, ct) =>
{
using var scope = _serviceProvider.CreateScope();
var orchestrator = scope.ServiceProvider.GetRequiredService<ISagaOrchestrator>();
// The orchestrator needs to handle responses
// This is a simplified approach - in production you'd want to handle this more robustly
_logger.LogDebug(
"Received response for saga {SagaId}, step {StepName}, success: {Success}",
response.SagaId, response.StepName, response.Success);
// For now, we just log the response
// The orchestrator's HandleResponseAsync method would be called here
// but it requires knowing the saga data type, which we don't have in this context
},
stoppingToken);
_logger.LogInformation("RabbitMQ saga hosted service started successfully");
// Keep the service running
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("RabbitMQ saga hosted service is stopping");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in RabbitMQ saga hosted service");
throw;
}
}
/// <inheritdoc />
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping RabbitMQ saga hosted service");
if (_messageBus is IAsyncDisposable disposable)
{
await disposable.DisposeAsync();
}
await base.StopAsync(cancellationToken);
}
}