using Microsoft.Extensions.Hosting; using Svrnty.Sample.Events; using Svrnty.CQRS.Events.Abstractions.Delivery; using Svrnty.CQRS.Events.Abstractions.Models; using Svrnty.CQRS.Events.Abstractions.EventStore; using Microsoft.Extensions.Logging; using Svrnty.CQRS.Events.Abstractions; using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Svrnty.Sample.Workflows; namespace Svrnty.Sample.BackgroundServices; /// /// Background service that demonstrates consuming events from RabbitMQ (cross-service communication). /// /// /// /// Phase 4: Cross-Service Event Streaming: /// This service shows how to consume events from RabbitMQ using . /// It subscribes to external event streams published by other services. /// /// /// Difference from EventConsumerBackgroundService: /// - EventConsumerBackgroundService: Consumes events from internal event store (same process/service) /// - RabbitMQEventConsumerBackgroundService: Consumes events from RabbitMQ (cross-service) /// /// /// Usage Pattern: /// External event delivery allows multiple independent services to communicate via events. /// This enables microservices architecture where services can react to events from other services. /// /// public class RabbitMQEventConsumerBackgroundService : BackgroundService { private readonly IExternalEventDeliveryProvider _rabbitMq; private readonly ILogger _logger; public RabbitMQEventConsumerBackgroundService( IExternalEventDeliveryProvider rabbitMq, ILogger logger) { _rabbitMq = rabbitMq; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("RabbitMQ event consumer starting..."); // Wait a bit for the application to fully start await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken); _logger.LogInformation("Subscribing to 'UserWorkflow' stream from RabbitMQ..."); var consumerId = $"rabbitmq-consumer-{Guid.NewGuid():N}"; try { // Subscribe to external events from RabbitMQ // This demonstrates cross-service communication where events published // by one service (e.g., UserService) can be consumed by another (e.g., EmailService) // Stream name matches the workflow type name (UserWorkflow) await _rabbitMq.SubscribeExternalAsync( streamName: "UserWorkflow", subscriptionId: "email-service", consumerId: consumerId, eventHandler: ProcessEventAsync, cancellationToken: stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { _logger.LogInformation("RabbitMQ event consumer stopping gracefully..."); } catch (Exception ex) { _logger.LogError(ex, "Error in RabbitMQ event consumer"); } } private Task ProcessEventAsync( ICorrelatedEvent @event, IDictionary metadata, CancellationToken cancellationToken) { // Log the event details _logger.LogInformation( "[RABBITMQ] Received external event: {EventType} (EventId: {EventId}, CorrelationId: {CorrelationId})", @event.GetType().Name, @event.EventId, @event.CorrelationId); // Log metadata foreach (var (key, value) in metadata) { _logger.LogDebug("[RABBITMQ] Metadata: {Key} = {Value}", key, value); } // Type-specific processing for cross-service communication switch (@event) { case UserAddedEvent userAdded: _logger.LogInformation( "[RABBITMQ] Sending welcome email to {Email} (UserId: {UserId})", userAdded.Email, userAdded.UserId); // In a real application: // - Send welcome email via email service // - Create user profile in email marketing system // - Subscribe to mailing lists break; case UserRemovedEvent userRemoved: _logger.LogInformation( "[RABBITMQ] Processing user removal for UserId: {UserId}", userRemoved.UserId); // In a real application: // - Clean up user data in external systems // - Unsubscribe from mailing lists // - Archive user communications break; case UserInvitedEvent userInvited: _logger.LogInformation( "[RABBITMQ] Sending invitation email to {Email} (InvitationId: {InvitationId})", userInvited.Email, userInvited.InvitationId); // In a real application: // - Send invitation email with link // - Track invitation in email system break; case UserInviteAcceptedEvent inviteAccepted: _logger.LogInformation( "[RABBITMQ] User {Name} accepted invitation {InvitationId}", inviteAccepted.Name, inviteAccepted.InvitationId); // In a real application: // - Send confirmation email // - Update CRM system // - Trigger onboarding workflows break; case UserInviteDeclinedEvent inviteDeclined: _logger.LogInformation( "[RABBITMQ] Invitation {InvitationId} was declined", inviteDeclined.InvitationId); // In a real application: // - Update invitation tracking // - Send feedback survey break; default: _logger.LogInformation( "[RABBITMQ] Received unknown event type: {EventType}", @event.GetType().Name); break; } return Task.CompletedTask; } }