89 lines
3.1 KiB
C#
89 lines
3.1 KiB
C#
using System;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using Svrnty.CQRS.Sagas.Abstractions;
|
|
using Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
|
|
|
namespace Svrnty.CQRS.Sagas.RabbitMQ;
|
|
|
|
/// <summary>
|
|
/// Hosted service that manages RabbitMQ saga connections and subscriptions.
|
|
/// </summary>
|
|
public class RabbitMqSagaHostedService : BackgroundService
|
|
{
|
|
private readonly IServiceProvider _serviceProvider;
|
|
private readonly ISagaMessageBus _messageBus;
|
|
private readonly ILogger<RabbitMqSagaHostedService> _logger;
|
|
|
|
/// <summary>
|
|
/// Creates a new RabbitMQ saga hosted service.
|
|
/// </summary>
|
|
public RabbitMqSagaHostedService(
|
|
IServiceProvider serviceProvider,
|
|
ISagaMessageBus messageBus,
|
|
ILogger<RabbitMqSagaHostedService> logger)
|
|
{
|
|
_serviceProvider = serviceProvider;
|
|
_messageBus = messageBus;
|
|
_logger = logger;
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("Starting RabbitMQ saga hosted service");
|
|
|
|
try
|
|
{
|
|
// Subscribe to saga responses so the orchestrator can process them
|
|
await _messageBus.SubscribeToResponsesAsync(
|
|
async (response, ct) =>
|
|
{
|
|
using var scope = _serviceProvider.CreateScope();
|
|
var orchestrator = scope.ServiceProvider.GetRequiredService<ISagaOrchestrator>();
|
|
|
|
// The orchestrator needs to handle responses
|
|
// This is a simplified approach - in production you'd want to handle this more robustly
|
|
_logger.LogDebug(
|
|
"Received response for saga {SagaId}, step {StepName}, success: {Success}",
|
|
response.SagaId, response.StepName, response.Success);
|
|
|
|
// For now, we just log the response
|
|
// The orchestrator's HandleResponseAsync method would be called here
|
|
// but it requires knowing the saga data type, which we don't have in this context
|
|
},
|
|
stoppingToken);
|
|
|
|
_logger.LogInformation("RabbitMQ saga hosted service started successfully");
|
|
|
|
// Keep the service running
|
|
await Task.Delay(Timeout.Infinite, stoppingToken);
|
|
}
|
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
{
|
|
_logger.LogInformation("RabbitMQ saga hosted service is stopping");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error in RabbitMQ saga hosted service");
|
|
throw;
|
|
}
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public override async Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
_logger.LogInformation("Stopping RabbitMQ saga hosted service");
|
|
|
|
if (_messageBus is IAsyncDisposable disposable)
|
|
{
|
|
await disposable.DisposeAsync();
|
|
}
|
|
|
|
await base.StopAsync(cancellationToken);
|
|
}
|
|
}
|