Compare commits
1 Commits
feature/wh
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b9e2cbdbe |
122
SAGAS_ROADMAP.md
Normal file
122
SAGAS_ROADMAP.md
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
# Saga Orchestration Roadmap
|
||||||
|
|
||||||
|
## Completed (Phase 1)
|
||||||
|
|
||||||
|
- [x] `Svrnty.CQRS.Sagas.Abstractions` - Core interfaces and contracts
|
||||||
|
- [x] `Svrnty.CQRS.Sagas` - Orchestration engine with fluent builder API
|
||||||
|
- [x] `Svrnty.CQRS.Sagas.RabbitMQ` - RabbitMQ message transport
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Phase 1d: Testing & Sample
|
||||||
|
|
||||||
|
### Unit Tests
|
||||||
|
- [ ] `SagaBuilder` step configuration tests
|
||||||
|
- [ ] `SagaOrchestrator` execution flow tests
|
||||||
|
- [ ] `SagaOrchestrator` compensation flow tests
|
||||||
|
- [ ] `InMemorySagaStateStore` persistence tests
|
||||||
|
- [ ] `RabbitMqSagaMessageBus` serialization tests
|
||||||
|
|
||||||
|
### Integration Tests
|
||||||
|
- [ ] End-to-end saga execution with RabbitMQ
|
||||||
|
- [ ] Multi-step saga with compensation scenario
|
||||||
|
- [ ] Concurrent saga execution tests
|
||||||
|
- [ ] Connection recovery tests
|
||||||
|
|
||||||
|
### Sample Implementation
|
||||||
|
- [ ] `OrderProcessingSaga` example in WarehouseManagement
|
||||||
|
- ReserveInventory step
|
||||||
|
- ProcessPayment step
|
||||||
|
- CreateShipment step
|
||||||
|
- Full compensation flow
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Phase 2: Persistence
|
||||||
|
|
||||||
|
### Svrnty.CQRS.Sagas.EntityFramework
|
||||||
|
- [ ] `EfCoreSagaStateStore` implementation
|
||||||
|
- [ ] `SagaState` entity configuration
|
||||||
|
- [ ] Migration support
|
||||||
|
- [ ] PostgreSQL/SQL Server compatibility
|
||||||
|
- [ ] Optimistic concurrency handling
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
```csharp
|
||||||
|
cqrs.AddSagas()
|
||||||
|
.UseEntityFramework<AppDbContext>();
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Phase 3: Reliability
|
||||||
|
|
||||||
|
### Saga Timeout Service
|
||||||
|
- [ ] `SagaTimeoutHostedService` - background service for stalled sagas
|
||||||
|
- [ ] Configurable timeout per saga type
|
||||||
|
- [ ] Automatic compensation trigger on timeout
|
||||||
|
- [ ] Dead letter handling for failed compensations
|
||||||
|
|
||||||
|
### Retry Policies
|
||||||
|
- [ ] Exponential backoff support
|
||||||
|
- [ ] Circuit breaker integration
|
||||||
|
- [ ] Polly integration option
|
||||||
|
|
||||||
|
### Idempotency
|
||||||
|
- [ ] Message deduplication
|
||||||
|
- [ ] Idempotent step execution
|
||||||
|
- [ ] Inbox/Outbox pattern support
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Phase 4: Observability
|
||||||
|
|
||||||
|
### OpenTelemetry Integration
|
||||||
|
- [ ] Distributed tracing for saga execution
|
||||||
|
- [ ] Span per saga step
|
||||||
|
- [ ] Correlation ID propagation
|
||||||
|
- [ ] Metrics (saga duration, success/failure rates)
|
||||||
|
|
||||||
|
### Saga Dashboard (Optional)
|
||||||
|
- [ ] Web UI for saga monitoring
|
||||||
|
- [ ] Real-time saga status
|
||||||
|
- [ ] Manual compensation trigger
|
||||||
|
- [ ] Saga history and audit log
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Phase 5: Flutter Integration
|
||||||
|
|
||||||
|
### gRPC Streaming for Saga Status
|
||||||
|
- [ ] `ISagaStatusStream` service
|
||||||
|
- [ ] Real-time saga progress updates
|
||||||
|
- [ ] Step completion notifications
|
||||||
|
- [ ] Error/compensation notifications
|
||||||
|
|
||||||
|
### Flutter Client
|
||||||
|
- [ ] Dart client for saga status streaming
|
||||||
|
- [ ] Saga progress widget components
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Phase 6: Alternative Transports
|
||||||
|
|
||||||
|
### Svrnty.CQRS.Sagas.AzureServiceBus
|
||||||
|
- [ ] Azure Service Bus message transport
|
||||||
|
- [ ] Topic/Subscription topology
|
||||||
|
- [ ] Dead letter queue handling
|
||||||
|
|
||||||
|
### Svrnty.CQRS.Sagas.Kafka
|
||||||
|
- [ ] Kafka message transport
|
||||||
|
- [ ] Consumer group management
|
||||||
|
- [ ] Partition key strategies
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Future Considerations
|
||||||
|
|
||||||
|
- **Event Sourcing**: Saga state as event stream
|
||||||
|
- **Saga Versioning**: Handle saga definition changes gracefully
|
||||||
|
- **Saga Composition**: Nested/child sagas
|
||||||
|
- **Saga Scheduling**: Delayed saga start
|
||||||
|
- **Multi-tenancy**: Tenant-aware saga execution
|
||||||
@ -0,0 +1,14 @@
|
|||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.DynamicQuery.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Marker interface for custom queryable providers that project entities to DTOs.
|
||||||
|
/// Extends <see cref="IQueryableProvider{TSource}"/> for semantic clarity in registration.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TSource">The DTO/Item type returned by the queryable.</typeparam>
|
||||||
|
public interface IQueryableProviderOverride<TSource> : IQueryableProvider<TSource>
|
||||||
|
{
|
||||||
|
}
|
||||||
@ -0,0 +1,26 @@
|
|||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||||
|
using PoweredSoft.Data.Core;
|
||||||
|
using PoweredSoft.Data.EntityFrameworkCore;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.DynamicQuery.EntityFramework;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Extensions for configuring DynamicQuery with Entity Framework Core.
|
||||||
|
/// </summary>
|
||||||
|
public static class DynamicQueryServicesBuilderExtensions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Uses Entity Framework Core for async queryable operations.
|
||||||
|
/// This replaces the default in-memory implementation with EF Core's async support.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="builder">The DynamicQuery services builder.</param>
|
||||||
|
/// <returns>The builder for chaining.</returns>
|
||||||
|
public static DynamicQueryServicesBuilder UseEntityFramework(this DynamicQueryServicesBuilder builder)
|
||||||
|
{
|
||||||
|
// Remove in-memory implementation and add EF Core implementation
|
||||||
|
builder.Services.RemoveAll<IAsyncQueryableService>();
|
||||||
|
builder.Services.AddPoweredSoftEntityFrameworkCoreDataServices();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,36 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>false</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="PoweredSoft.Data.EntityFrameworkCore" Version="3.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\Svrnty.CQRS.DynamicQuery\Svrnty.CQRS.DynamicQuery.csproj" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
19
Svrnty.CQRS.DynamicQuery/DynamicQueryServicesBuilder.cs
Normal file
19
Svrnty.CQRS.DynamicQuery/DynamicQueryServicesBuilder.cs
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.DynamicQuery;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builder for configuring DynamicQuery services.
|
||||||
|
/// </summary>
|
||||||
|
public class DynamicQueryServicesBuilder
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// The service collection being configured.
|
||||||
|
/// </summary>
|
||||||
|
public IServiceCollection Services { get; }
|
||||||
|
|
||||||
|
internal DynamicQueryServicesBuilder(IServiceCollection services)
|
||||||
|
{
|
||||||
|
Services = services;
|
||||||
|
}
|
||||||
|
}
|
||||||
78
Svrnty.CQRS.DynamicQuery/InMemoryAsyncQueryableService.cs
Normal file
78
Svrnty.CQRS.DynamicQuery/InMemoryAsyncQueryableService.cs
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Linq.Expressions;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using PoweredSoft.Data.Core;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.DynamicQuery;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// In-memory implementation of IAsyncQueryableService.
|
||||||
|
/// For EF Core projects, use AddDynamicQueryServices().UseEntityFramework() instead.
|
||||||
|
/// </summary>
|
||||||
|
public class InMemoryAsyncQueryableService : IAsyncQueryableService
|
||||||
|
{
|
||||||
|
public IEnumerable<IAsyncQueryableHandlerService> Handlers { get; } = Array.Empty<IAsyncQueryableHandlerService>();
|
||||||
|
|
||||||
|
public Task<List<TSource>> ToListAsync<TSource>(IQueryable<TSource> queryable, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.ToList());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TSource?> FirstOrDefaultAsync<TSource>(IQueryable<TSource> queryable, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.FirstOrDefault());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TSource?> FirstOrDefaultAsync<TSource>(IQueryable<TSource> queryable, Expression<Func<TSource, bool>> predicate, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.FirstOrDefault(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TSource?> LastOrDefaultAsync<TSource>(IQueryable<TSource> queryable, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.LastOrDefault());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TSource?> LastOrDefaultAsync<TSource>(IQueryable<TSource> queryable, Expression<Func<TSource, bool>> predicate, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.LastOrDefault(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<bool> AnyAsync<TSource>(IQueryable<TSource> queryable, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.Any());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<bool> AnyAsync<TSource>(IQueryable<TSource> queryable, Expression<Func<TSource, bool>> predicate, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.Any(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<bool> AllAsync<TSource>(IQueryable<TSource> queryable, Expression<Func<TSource, bool>> predicate, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.All(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<int> CountAsync<TSource>(IQueryable<TSource> queryable, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.Count());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<long> LongCountAsync<TSource>(IQueryable<TSource> queryable, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.LongCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<TSource?> SingleOrDefaultAsync<TSource>(IQueryable<TSource> queryable, Expression<Func<TSource, bool>> predicate, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.FromResult(queryable.SingleOrDefault(predicate));
|
||||||
|
}
|
||||||
|
|
||||||
|
public IAsyncQueryableHandlerService? GetAsyncQueryableHandler<TSource>(IQueryable<TSource> queryable)
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,16 +1,31 @@
|
|||||||
using System.Diagnostics.CodeAnalysis;
|
using System.Diagnostics.CodeAnalysis;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
using Microsoft.Extensions.DependencyInjection.Extensions;
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||||
|
using PoweredSoft.Data.Core;
|
||||||
|
using PoweredSoft.DynamicQuery;
|
||||||
|
using PoweredSoft.DynamicQuery.Core;
|
||||||
using Svrnty.CQRS.Abstractions;
|
using Svrnty.CQRS.Abstractions;
|
||||||
using Svrnty.CQRS.Abstractions.Discovery;
|
using Svrnty.CQRS.Abstractions.Discovery;
|
||||||
using Svrnty.CQRS.DynamicQuery.Abstractions;
|
using Svrnty.CQRS.DynamicQuery.Abstractions;
|
||||||
using Svrnty.CQRS.DynamicQuery.Discover;
|
using Svrnty.CQRS.DynamicQuery.Discover;
|
||||||
using PoweredSoft.DynamicQuery.Core;
|
|
||||||
|
|
||||||
namespace Svrnty.CQRS.DynamicQuery;
|
namespace Svrnty.CQRS.DynamicQuery;
|
||||||
|
|
||||||
public static class ServiceCollectionExtensions
|
public static class ServiceCollectionExtensions
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Registers core DynamicQuery services with in-memory async queryable.
|
||||||
|
/// For EF Core projects, chain with .UseEntityFramework().
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="services">The service collection.</param>
|
||||||
|
/// <returns>A builder for further configuration.</returns>
|
||||||
|
public static DynamicQueryServicesBuilder AddDynamicQueryServices(this IServiceCollection services)
|
||||||
|
{
|
||||||
|
services.TryAddTransient<IAsyncQueryableService, InMemoryAsyncQueryableService>();
|
||||||
|
services.TryAddTransient<IQueryHandlerAsync, QueryHandlerAsync>();
|
||||||
|
return new DynamicQueryServicesBuilder(services);
|
||||||
|
}
|
||||||
|
|
||||||
public static IServiceCollection AddDynamicQuery<TSourceAndDestination>(this IServiceCollection services, string name = null)
|
public static IServiceCollection AddDynamicQuery<TSourceAndDestination>(this IServiceCollection services, string name = null)
|
||||||
where TSourceAndDestination : class
|
where TSourceAndDestination : class
|
||||||
=> AddDynamicQuery<TSourceAndDestination, TSourceAndDestination>(services, name: name);
|
=> AddDynamicQuery<TSourceAndDestination, TSourceAndDestination>(services, name: name);
|
||||||
@ -55,6 +70,22 @@ public static class ServiceCollectionExtensions
|
|||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Registers a custom queryable provider override for the specified source type.
|
||||||
|
/// Use this for DTOs that require custom projection from entities.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TSource">The DTO/Item type returned by the queryable.</typeparam>
|
||||||
|
/// <typeparam name="TProvider">The provider implementation type.</typeparam>
|
||||||
|
/// <param name="services">The service collection.</param>
|
||||||
|
/// <returns>The service collection for chaining.</returns>
|
||||||
|
public static IServiceCollection AddQueryableProviderOverride<TSource, [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TProvider>(this IServiceCollection services)
|
||||||
|
where TSource : class
|
||||||
|
where TProvider : class, IQueryableProviderOverride<TSource>
|
||||||
|
{
|
||||||
|
services.AddTransient<IQueryableProvider<TSource>, TProvider>();
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
|
||||||
public static IServiceCollection AddDynamicQueryWithParams<TSourceAndDestination, TParams>(this IServiceCollection services, string name = null)
|
public static IServiceCollection AddDynamicQueryWithParams<TSourceAndDestination, TParams>(this IServiceCollection services, string name = null)
|
||||||
where TSourceAndDestination : class
|
where TSourceAndDestination : class
|
||||||
where TParams : class
|
where TParams : class
|
||||||
|
|||||||
17
Svrnty.CQRS.Events.Abstractions/IDomainEvent.cs
Normal file
17
Svrnty.CQRS.Events.Abstractions/IDomainEvent.cs
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
namespace Svrnty.CQRS.Events.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Marker interface for domain events.
|
||||||
|
/// </summary>
|
||||||
|
public interface IDomainEvent
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Unique identifier for this event instance.
|
||||||
|
/// </summary>
|
||||||
|
Guid EventId { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Timestamp when the event occurred.
|
||||||
|
/// </summary>
|
||||||
|
DateTime OccurredAt { get; }
|
||||||
|
}
|
||||||
16
Svrnty.CQRS.Events.Abstractions/IDomainEventPublisher.cs
Normal file
16
Svrnty.CQRS.Events.Abstractions/IDomainEventPublisher.cs
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
namespace Svrnty.CQRS.Events.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Interface for publishing domain events to external systems.
|
||||||
|
/// </summary>
|
||||||
|
public interface IDomainEventPublisher
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a domain event.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TEvent">The type of event to publish.</typeparam>
|
||||||
|
/// <param name="event">The event to publish.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
|
||||||
|
where TEvent : IDomainEvent;
|
||||||
|
}
|
||||||
@ -0,0 +1,29 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>true</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
<ImplicitUsings>enable</ImplicitUsings>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
163
Svrnty.CQRS.Events.RabbitMQ/RabbitMqDomainEventPublisher.cs
Normal file
163
Svrnty.CQRS.Events.RabbitMQ/RabbitMqDomainEventPublisher.cs
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
using System.Text;
|
||||||
|
using System.Text.Json;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using Svrnty.CQRS.Events.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Events.RabbitMQ;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ implementation of the domain event publisher.
|
||||||
|
/// </summary>
|
||||||
|
public class RabbitMqDomainEventPublisher : IDomainEventPublisher, IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly RabbitMqEventOptions _options;
|
||||||
|
private readonly ILogger<RabbitMqDomainEventPublisher> _logger;
|
||||||
|
private IConnection? _connection;
|
||||||
|
private IChannel? _channel;
|
||||||
|
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new RabbitMQ domain event publisher.
|
||||||
|
/// </summary>
|
||||||
|
public RabbitMqDomainEventPublisher(
|
||||||
|
IOptions<RabbitMqEventOptions> options,
|
||||||
|
ILogger<RabbitMqDomainEventPublisher> logger)
|
||||||
|
{
|
||||||
|
_options = options.Value;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
|
||||||
|
where TEvent : IDomainEvent
|
||||||
|
{
|
||||||
|
await EnsureConnectionAsync(cancellationToken);
|
||||||
|
|
||||||
|
var eventTypeName = typeof(TEvent).Name;
|
||||||
|
var routingKey = GetRoutingKey(eventTypeName);
|
||||||
|
var body = JsonSerializer.SerializeToUtf8Bytes(@event);
|
||||||
|
|
||||||
|
var properties = new BasicProperties
|
||||||
|
{
|
||||||
|
MessageId = @event.EventId.ToString(),
|
||||||
|
ContentType = "application/json",
|
||||||
|
DeliveryMode = _options.Durable ? DeliveryModes.Persistent : DeliveryModes.Transient,
|
||||||
|
Timestamp = new AmqpTimestamp(new DateTimeOffset(@event.OccurredAt).ToUnixTimeSeconds()),
|
||||||
|
Headers = new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
["event-type"] = eventTypeName,
|
||||||
|
["event-id"] = @event.EventId.ToString()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
await _channel!.BasicPublishAsync(
|
||||||
|
exchange: _options.Exchange,
|
||||||
|
routingKey: routingKey,
|
||||||
|
mandatory: false,
|
||||||
|
basicProperties: properties,
|
||||||
|
body: body,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Published domain event {EventType} with ID {EventId} to routing key {RoutingKey}",
|
||||||
|
eventTypeName, @event.EventId, routingKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string GetRoutingKey(string eventTypeName)
|
||||||
|
{
|
||||||
|
// Convert PascalCase to dot-notation, e.g., "InventoryMovementEvent" -> "events.inventory.movement"
|
||||||
|
var name = eventTypeName.Replace("Event", "");
|
||||||
|
var words = new List<string>();
|
||||||
|
var currentWord = new StringBuilder();
|
||||||
|
|
||||||
|
foreach (var c in name)
|
||||||
|
{
|
||||||
|
if (char.IsUpper(c) && currentWord.Length > 0)
|
||||||
|
{
|
||||||
|
words.Add(currentWord.ToString().ToLowerInvariant());
|
||||||
|
currentWord.Clear();
|
||||||
|
}
|
||||||
|
currentWord.Append(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (currentWord.Length > 0)
|
||||||
|
{
|
||||||
|
words.Add(currentWord.ToString().ToLowerInvariant());
|
||||||
|
}
|
||||||
|
|
||||||
|
return "events." + string.Join(".", words);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task EnsureConnectionAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (_connection?.IsOpen == true && _channel?.IsOpen == true)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await _connectionLock.WaitAsync(cancellationToken);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_connection?.IsOpen == true && _channel?.IsOpen == true)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var factory = new ConnectionFactory
|
||||||
|
{
|
||||||
|
HostName = _options.HostName,
|
||||||
|
Port = _options.Port,
|
||||||
|
UserName = _options.UserName,
|
||||||
|
Password = _options.Password,
|
||||||
|
VirtualHost = _options.VirtualHost
|
||||||
|
};
|
||||||
|
|
||||||
|
_connection = await factory.CreateConnectionAsync(cancellationToken);
|
||||||
|
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
// Declare topic exchange for domain events
|
||||||
|
await _channel.ExchangeDeclareAsync(
|
||||||
|
exchange: _options.Exchange,
|
||||||
|
type: ExchangeType.Topic,
|
||||||
|
durable: _options.Durable,
|
||||||
|
autoDelete: false,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Connected to RabbitMQ at {Host}:{Port}, exchange: {Exchange}",
|
||||||
|
_options.HostName, _options.Port, _options.Exchange);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_connectionLock.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_disposed = true;
|
||||||
|
|
||||||
|
if (_channel?.IsOpen == true)
|
||||||
|
{
|
||||||
|
await _channel.CloseAsync();
|
||||||
|
}
|
||||||
|
_channel?.Dispose();
|
||||||
|
|
||||||
|
if (_connection?.IsOpen == true)
|
||||||
|
{
|
||||||
|
await _connection.CloseAsync();
|
||||||
|
}
|
||||||
|
_connection?.Dispose();
|
||||||
|
|
||||||
|
_connectionLock.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
42
Svrnty.CQRS.Events.RabbitMQ/RabbitMqEventOptions.cs
Normal file
42
Svrnty.CQRS.Events.RabbitMQ/RabbitMqEventOptions.cs
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
namespace Svrnty.CQRS.Events.RabbitMQ;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Configuration options for RabbitMQ domain event publishing.
|
||||||
|
/// </summary>
|
||||||
|
public class RabbitMqEventOptions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ host name. Default: localhost
|
||||||
|
/// </summary>
|
||||||
|
public string HostName { get; set; } = "localhost";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ port. Default: 5672
|
||||||
|
/// </summary>
|
||||||
|
public int Port { get; set; } = 5672;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ username. Default: guest
|
||||||
|
/// </summary>
|
||||||
|
public string UserName { get; set; } = "guest";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ password. Default: guest
|
||||||
|
/// </summary>
|
||||||
|
public string Password { get; set; } = "guest";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ virtual host. Default: /
|
||||||
|
/// </summary>
|
||||||
|
public string VirtualHost { get; set; } = "/";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Exchange name for domain events. Default: domain.events
|
||||||
|
/// </summary>
|
||||||
|
public string Exchange { get; set; } = "domain.events";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether to use durable exchanges. Default: true
|
||||||
|
/// </summary>
|
||||||
|
public bool Durable { get; set; } = true;
|
||||||
|
}
|
||||||
30
Svrnty.CQRS.Events.RabbitMQ/ServiceCollectionExtensions.cs
Normal file
30
Svrnty.CQRS.Events.RabbitMQ/ServiceCollectionExtensions.cs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Svrnty.CQRS.Events.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Events.RabbitMQ;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Extension methods for registering RabbitMQ domain event publishing.
|
||||||
|
/// </summary>
|
||||||
|
public static class ServiceCollectionExtensions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Adds RabbitMQ domain event publishing to the service collection.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="services">The service collection.</param>
|
||||||
|
/// <param name="configure">Optional configuration action for RabbitMQ options.</param>
|
||||||
|
/// <returns>The service collection for chaining.</returns>
|
||||||
|
public static IServiceCollection AddRabbitMqDomainEvents(
|
||||||
|
this IServiceCollection services,
|
||||||
|
Action<RabbitMqEventOptions>? configure = null)
|
||||||
|
{
|
||||||
|
if (configure != null)
|
||||||
|
{
|
||||||
|
services.Configure(configure);
|
||||||
|
}
|
||||||
|
|
||||||
|
services.AddSingleton<IDomainEventPublisher, RabbitMqDomainEventPublisher>();
|
||||||
|
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,40 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>false</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
<ImplicitUsings>enable</ImplicitUsings>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\Svrnty.CQRS.Events.Abstractions\Svrnty.CQRS.Events.Abstractions.csproj" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
File diff suppressed because it is too large
Load Diff
@ -35,6 +35,18 @@ namespace Svrnty.CQRS.Grpc.Generators.Models
|
|||||||
public string FullyQualifiedType { get; set; }
|
public string FullyQualifiedType { get; set; }
|
||||||
public string ProtoType { get; set; }
|
public string ProtoType { get; set; }
|
||||||
public int FieldNumber { get; set; }
|
public int FieldNumber { get; set; }
|
||||||
|
public bool IsComplexType { get; set; }
|
||||||
|
public List<PropertyInfo> NestedProperties { get; set; }
|
||||||
|
|
||||||
|
// Type conversion metadata
|
||||||
|
public bool IsEnum { get; set; }
|
||||||
|
public bool IsList { get; set; }
|
||||||
|
public bool IsNullable { get; set; }
|
||||||
|
public bool IsDecimal { get; set; }
|
||||||
|
public bool IsDateTime { get; set; }
|
||||||
|
public string? ElementType { get; set; }
|
||||||
|
public bool IsElementComplexType { get; set; }
|
||||||
|
public List<PropertyInfo>? ElementNestedProperties { get; set; }
|
||||||
|
|
||||||
public PropertyInfo()
|
public PropertyInfo()
|
||||||
{
|
{
|
||||||
@ -42,6 +54,14 @@ namespace Svrnty.CQRS.Grpc.Generators.Models
|
|||||||
Type = string.Empty;
|
Type = string.Empty;
|
||||||
FullyQualifiedType = string.Empty;
|
FullyQualifiedType = string.Empty;
|
||||||
ProtoType = string.Empty;
|
ProtoType = string.Empty;
|
||||||
|
IsComplexType = false;
|
||||||
|
NestedProperties = new List<PropertyInfo>();
|
||||||
|
IsEnum = false;
|
||||||
|
IsList = false;
|
||||||
|
IsNullable = false;
|
||||||
|
IsDecimal = false;
|
||||||
|
IsDateTime = false;
|
||||||
|
IsElementComplexType = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
50
Svrnty.CQRS.Grpc.Generators/Models/NotificationInfo.cs
Normal file
50
Svrnty.CQRS.Grpc.Generators/Models/NotificationInfo.cs
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Grpc.Generators.Models
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Represents a discovered streaming notification type for proto/gRPC generation.
|
||||||
|
/// </summary>
|
||||||
|
public class NotificationInfo
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// The notification type name (e.g., "InventoryChangeNotification").
|
||||||
|
/// </summary>
|
||||||
|
public string Name { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The fully qualified type name including namespace.
|
||||||
|
/// </summary>
|
||||||
|
public string FullyQualifiedName { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The namespace of the notification type.
|
||||||
|
/// </summary>
|
||||||
|
public string Namespace { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The property name used as the subscription key (from [StreamingNotification] attribute).
|
||||||
|
/// </summary>
|
||||||
|
public string SubscriptionKeyProperty { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The subscription key property info.
|
||||||
|
/// </summary>
|
||||||
|
public PropertyInfo SubscriptionKeyInfo { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// All properties of the notification type.
|
||||||
|
/// </summary>
|
||||||
|
public List<PropertyInfo> Properties { get; set; }
|
||||||
|
|
||||||
|
public NotificationInfo()
|
||||||
|
{
|
||||||
|
Name = string.Empty;
|
||||||
|
FullyQualifiedName = string.Empty;
|
||||||
|
Namespace = string.Empty;
|
||||||
|
SubscriptionKeyProperty = string.Empty;
|
||||||
|
SubscriptionKeyInfo = new PropertyInfo();
|
||||||
|
Properties = new List<PropertyInfo>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,29 +2,90 @@ using System.Collections.Generic;
|
|||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Text;
|
using System.Text;
|
||||||
using Microsoft.CodeAnalysis;
|
using Microsoft.CodeAnalysis;
|
||||||
|
using Svrnty.CQRS.Grpc.Generators.Models;
|
||||||
|
|
||||||
namespace Svrnty.CQRS.Grpc.Generators;
|
namespace Svrnty.CQRS.Grpc.Generators;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Generates Protocol Buffer (.proto) files from C# Command and Query types
|
/// Generates Protocol Buffer (.proto) files from C# Command, Query, and Notification types
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal class ProtoFileGenerator
|
internal class ProtoFileGenerator
|
||||||
{
|
{
|
||||||
private readonly Compilation _compilation;
|
private readonly Compilation _compilation;
|
||||||
private readonly HashSet<string> _requiredImports = new HashSet<string>();
|
private readonly HashSet<string> _requiredImports = new HashSet<string>();
|
||||||
private readonly HashSet<string> _generatedMessages = new HashSet<string>();
|
private readonly HashSet<string> _generatedMessages = new HashSet<string>();
|
||||||
|
private readonly HashSet<string> _generatedEnums = new HashSet<string>();
|
||||||
|
private readonly List<INamedTypeSymbol> _pendingEnums = new List<INamedTypeSymbol>();
|
||||||
private readonly StringBuilder _messagesBuilder = new StringBuilder();
|
private readonly StringBuilder _messagesBuilder = new StringBuilder();
|
||||||
|
private readonly StringBuilder _enumsBuilder = new StringBuilder();
|
||||||
|
private List<INamedTypeSymbol>? _allTypesCache;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the discovered notifications after Generate() is called.
|
||||||
|
/// </summary>
|
||||||
|
public List<NotificationInfo> DiscoveredNotifications { get; private set; } = new List<NotificationInfo>();
|
||||||
|
|
||||||
public ProtoFileGenerator(Compilation compilation)
|
public ProtoFileGenerator(Compilation compilation)
|
||||||
{
|
{
|
||||||
_compilation = compilation;
|
_compilation = compilation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets all types from the compilation and all referenced assemblies
|
||||||
|
/// </summary>
|
||||||
|
private IEnumerable<INamedTypeSymbol> GetAllTypes()
|
||||||
|
{
|
||||||
|
if (_allTypesCache != null)
|
||||||
|
return _allTypesCache;
|
||||||
|
|
||||||
|
_allTypesCache = new List<INamedTypeSymbol>();
|
||||||
|
|
||||||
|
// Get types from the current assembly
|
||||||
|
CollectTypesFromNamespace(_compilation.Assembly.GlobalNamespace, _allTypesCache);
|
||||||
|
|
||||||
|
// Get types from all referenced assemblies
|
||||||
|
foreach (var reference in _compilation.References)
|
||||||
|
{
|
||||||
|
var assemblySymbol = _compilation.GetAssemblyOrModuleSymbol(reference) as IAssemblySymbol;
|
||||||
|
if (assemblySymbol != null)
|
||||||
|
{
|
||||||
|
CollectTypesFromNamespace(assemblySymbol.GlobalNamespace, _allTypesCache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return _allTypesCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void CollectTypesFromNamespace(INamespaceSymbol ns, List<INamedTypeSymbol> types)
|
||||||
|
{
|
||||||
|
foreach (var type in ns.GetTypeMembers())
|
||||||
|
{
|
||||||
|
types.Add(type);
|
||||||
|
CollectNestedTypes(type, types);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var nestedNs in ns.GetNamespaceMembers())
|
||||||
|
{
|
||||||
|
CollectTypesFromNamespace(nestedNs, types);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void CollectNestedTypes(INamedTypeSymbol type, List<INamedTypeSymbol> types)
|
||||||
|
{
|
||||||
|
foreach (var nestedType in type.GetTypeMembers())
|
||||||
|
{
|
||||||
|
types.Add(nestedType);
|
||||||
|
CollectNestedTypes(nestedType, types);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public string Generate(string packageName, string csharpNamespace)
|
public string Generate(string packageName, string csharpNamespace)
|
||||||
{
|
{
|
||||||
var commands = DiscoverCommands();
|
var commands = DiscoverCommands();
|
||||||
var queries = DiscoverQueries();
|
var queries = DiscoverQueries();
|
||||||
var dynamicQueries = DiscoverDynamicQueries();
|
var dynamicQueries = DiscoverDynamicQueries();
|
||||||
|
var notifications = DiscoverNotifications();
|
||||||
|
DiscoveredNotifications = notifications;
|
||||||
|
|
||||||
var sb = new StringBuilder();
|
var sb = new StringBuilder();
|
||||||
|
|
||||||
@ -98,6 +159,24 @@ internal class ProtoFileGenerator
|
|||||||
sb.AppendLine();
|
sb.AppendLine();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Notification Service (server streaming)
|
||||||
|
if (notifications.Any())
|
||||||
|
{
|
||||||
|
sb.AppendLine("// NotificationService for real-time streaming notifications");
|
||||||
|
sb.AppendLine("service NotificationService {");
|
||||||
|
foreach (var notification in notifications)
|
||||||
|
{
|
||||||
|
var methodName = $"SubscribeTo{notification.Name}";
|
||||||
|
var requestType = $"SubscribeTo{notification.Name}Request";
|
||||||
|
|
||||||
|
sb.AppendLine($" // Subscribe to {notification.Name} notifications");
|
||||||
|
sb.AppendLine($" rpc {methodName} ({requestType}) returns (stream {notification.Name});");
|
||||||
|
sb.AppendLine();
|
||||||
|
}
|
||||||
|
sb.AppendLine("}");
|
||||||
|
sb.AppendLine();
|
||||||
|
}
|
||||||
|
|
||||||
// Generate messages for commands
|
// Generate messages for commands
|
||||||
foreach (var command in commands)
|
foreach (var command in commands)
|
||||||
{
|
{
|
||||||
@ -118,7 +197,17 @@ internal class ProtoFileGenerator
|
|||||||
GenerateDynamicQueryMessages(dq);
|
GenerateDynamicQueryMessages(dq);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append all generated messages
|
// Generate messages for notifications
|
||||||
|
foreach (var notification in notifications)
|
||||||
|
{
|
||||||
|
GenerateNotificationMessages(notification);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate any pending enum definitions
|
||||||
|
GeneratePendingEnums();
|
||||||
|
|
||||||
|
// Append all generated enums first, then messages
|
||||||
|
sb.Append(_enumsBuilder);
|
||||||
sb.Append(_messagesBuilder);
|
sb.Append(_messagesBuilder);
|
||||||
|
|
||||||
// Insert imports if any were needed
|
// Insert imports if any were needed
|
||||||
@ -138,24 +227,78 @@ internal class ProtoFileGenerator
|
|||||||
|
|
||||||
private List<INamedTypeSymbol> DiscoverCommands()
|
private List<INamedTypeSymbol> DiscoverCommands()
|
||||||
{
|
{
|
||||||
return _compilation.GetSymbolsWithName(
|
// First, find all command handlers to know which commands are actually registered
|
||||||
name => name.EndsWith("Command"),
|
var commandHandlerInterface = _compilation.GetTypeByMetadataName("Svrnty.CQRS.Abstractions.ICommandHandler`1");
|
||||||
SymbolFilter.Type)
|
var commandHandlerWithResultInterface = _compilation.GetTypeByMetadataName("Svrnty.CQRS.Abstractions.ICommandHandler`2");
|
||||||
.OfType<INamedTypeSymbol>()
|
|
||||||
.Where(t => !HasGrpcIgnoreAttribute(t))
|
if (commandHandlerInterface == null && commandHandlerWithResultInterface == null)
|
||||||
.Where(t => t.TypeKind == TypeKind.Class || t.TypeKind == TypeKind.Struct)
|
return new List<INamedTypeSymbol>();
|
||||||
.ToList();
|
|
||||||
|
var registeredCommands = new HashSet<INamedTypeSymbol>(SymbolEqualityComparer.Default);
|
||||||
|
|
||||||
|
foreach (var type in GetAllTypes())
|
||||||
|
{
|
||||||
|
if (type.IsAbstract || type.IsStatic)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
foreach (var iface in type.AllInterfaces)
|
||||||
|
{
|
||||||
|
if (iface.IsGenericType)
|
||||||
|
{
|
||||||
|
if ((commandHandlerInterface != null && SymbolEqualityComparer.Default.Equals(iface.OriginalDefinition, commandHandlerInterface)) ||
|
||||||
|
(commandHandlerWithResultInterface != null && SymbolEqualityComparer.Default.Equals(iface.OriginalDefinition, commandHandlerWithResultInterface)))
|
||||||
|
{
|
||||||
|
var commandType = iface.TypeArguments[0] as INamedTypeSymbol;
|
||||||
|
if (commandType != null && !HasGrpcIgnoreAttribute(commandType))
|
||||||
|
{
|
||||||
|
registeredCommands.Add(commandType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return registeredCommands.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<INamedTypeSymbol> DiscoverQueries()
|
private List<INamedTypeSymbol> DiscoverQueries()
|
||||||
{
|
{
|
||||||
return _compilation.GetSymbolsWithName(
|
// First, find all query handlers to know which queries are actually registered
|
||||||
name => name.EndsWith("Query"),
|
var queryHandlerInterface = _compilation.GetTypeByMetadataName("Svrnty.CQRS.Abstractions.IQueryHandler`2");
|
||||||
SymbolFilter.Type)
|
var dynamicQueryInterface2 = _compilation.GetTypeByMetadataName("Svrnty.CQRS.DynamicQuery.Abstractions.IDynamicQuery`2");
|
||||||
.OfType<INamedTypeSymbol>()
|
var dynamicQueryInterface3 = _compilation.GetTypeByMetadataName("Svrnty.CQRS.DynamicQuery.Abstractions.IDynamicQuery`3");
|
||||||
.Where(t => !HasGrpcIgnoreAttribute(t))
|
|
||||||
.Where(t => t.TypeKind == TypeKind.Class || t.TypeKind == TypeKind.Struct)
|
if (queryHandlerInterface == null)
|
||||||
.ToList();
|
return new List<INamedTypeSymbol>();
|
||||||
|
|
||||||
|
var registeredQueries = new HashSet<INamedTypeSymbol>(SymbolEqualityComparer.Default);
|
||||||
|
|
||||||
|
foreach (var type in GetAllTypes())
|
||||||
|
{
|
||||||
|
if (type.IsAbstract || type.IsStatic)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
foreach (var iface in type.AllInterfaces)
|
||||||
|
{
|
||||||
|
if (iface.IsGenericType && SymbolEqualityComparer.Default.Equals(iface.OriginalDefinition, queryHandlerInterface))
|
||||||
|
{
|
||||||
|
var queryType = iface.TypeArguments[0] as INamedTypeSymbol;
|
||||||
|
if (queryType != null && !HasGrpcIgnoreAttribute(queryType))
|
||||||
|
{
|
||||||
|
// Skip dynamic queries - they're handled separately
|
||||||
|
if (queryType.IsGenericType &&
|
||||||
|
((dynamicQueryInterface2 != null && SymbolEqualityComparer.Default.Equals(queryType.OriginalDefinition, dynamicQueryInterface2)) ||
|
||||||
|
(dynamicQueryInterface3 != null && SymbolEqualityComparer.Default.Equals(queryType.OriginalDefinition, dynamicQueryInterface3))))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
registeredQueries.Add(queryType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return registeredQueries.ToList();
|
||||||
}
|
}
|
||||||
|
|
||||||
private bool HasGrpcIgnoreAttribute(INamedTypeSymbol type)
|
private bool HasGrpcIgnoreAttribute(INamedTypeSymbol type)
|
||||||
@ -180,6 +323,9 @@ internal class ProtoFileGenerator
|
|||||||
.Where(p => p.DeclaredAccessibility == Accessibility.Public)
|
.Where(p => p.DeclaredAccessibility == Accessibility.Public)
|
||||||
.ToList();
|
.ToList();
|
||||||
|
|
||||||
|
// Collect nested complex types to generate after closing this message
|
||||||
|
var nestedComplexTypes = new List<INamedTypeSymbol>();
|
||||||
|
|
||||||
int fieldNumber = 1;
|
int fieldNumber = 1;
|
||||||
foreach (var prop in properties)
|
foreach (var prop in properties)
|
||||||
{
|
{
|
||||||
@ -199,10 +345,19 @@ internal class ProtoFileGenerator
|
|||||||
var fieldName = ProtoFileTypeMapper.ToSnakeCase(prop.Name);
|
var fieldName = ProtoFileTypeMapper.ToSnakeCase(prop.Name);
|
||||||
_messagesBuilder.AppendLine($" {protoType} {fieldName} = {fieldNumber};");
|
_messagesBuilder.AppendLine($" {protoType} {fieldName} = {fieldNumber};");
|
||||||
|
|
||||||
// If this is a complex type, generate its message too
|
// Track enums for later generation
|
||||||
if (IsComplexType(prop.Type))
|
var enumType = ProtoFileTypeMapper.GetEnumType(prop.Type);
|
||||||
|
if (enumType != null)
|
||||||
{
|
{
|
||||||
GenerateComplexTypeMessage(prop.Type as INamedTypeSymbol);
|
TrackEnumType(enumType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect complex types to generate after this message is closed
|
||||||
|
// Use GetElementOrUnderlyingType to extract element type from collections
|
||||||
|
var underlyingType = ProtoFileTypeMapper.GetElementOrUnderlyingType(prop.Type);
|
||||||
|
if (IsComplexType(underlyingType) && underlyingType is INamedTypeSymbol namedType)
|
||||||
|
{
|
||||||
|
nestedComplexTypes.Add(namedType);
|
||||||
}
|
}
|
||||||
|
|
||||||
fieldNumber++;
|
fieldNumber++;
|
||||||
@ -210,6 +365,12 @@ internal class ProtoFileGenerator
|
|||||||
|
|
||||||
_messagesBuilder.AppendLine("}");
|
_messagesBuilder.AppendLine("}");
|
||||||
_messagesBuilder.AppendLine();
|
_messagesBuilder.AppendLine();
|
||||||
|
|
||||||
|
// Now generate nested complex type messages
|
||||||
|
foreach (var nestedType in nestedComplexTypes)
|
||||||
|
{
|
||||||
|
GenerateComplexTypeMessage(nestedType);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void GenerateResponseMessage(INamedTypeSymbol type)
|
private void GenerateResponseMessage(INamedTypeSymbol type)
|
||||||
@ -267,6 +428,9 @@ internal class ProtoFileGenerator
|
|||||||
.Where(p => p.DeclaredAccessibility == Accessibility.Public)
|
.Where(p => p.DeclaredAccessibility == Accessibility.Public)
|
||||||
.ToList();
|
.ToList();
|
||||||
|
|
||||||
|
// Collect nested complex types to generate after closing this message
|
||||||
|
var nestedComplexTypes = new List<INamedTypeSymbol>();
|
||||||
|
|
||||||
int fieldNumber = 1;
|
int fieldNumber = 1;
|
||||||
foreach (var prop in properties)
|
foreach (var prop in properties)
|
||||||
{
|
{
|
||||||
@ -285,10 +449,19 @@ internal class ProtoFileGenerator
|
|||||||
var fieldName = ProtoFileTypeMapper.ToSnakeCase(prop.Name);
|
var fieldName = ProtoFileTypeMapper.ToSnakeCase(prop.Name);
|
||||||
_messagesBuilder.AppendLine($" {protoType} {fieldName} = {fieldNumber};");
|
_messagesBuilder.AppendLine($" {protoType} {fieldName} = {fieldNumber};");
|
||||||
|
|
||||||
// Recursively generate nested complex types
|
// Track enums for later generation
|
||||||
if (IsComplexType(prop.Type))
|
var enumType = ProtoFileTypeMapper.GetEnumType(prop.Type);
|
||||||
|
if (enumType != null)
|
||||||
{
|
{
|
||||||
GenerateComplexTypeMessage(prop.Type as INamedTypeSymbol);
|
TrackEnumType(enumType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect complex types to generate after this message is closed
|
||||||
|
// Use GetElementOrUnderlyingType to extract element type from collections
|
||||||
|
var underlyingType = ProtoFileTypeMapper.GetElementOrUnderlyingType(prop.Type);
|
||||||
|
if (IsComplexType(underlyingType) && underlyingType is INamedTypeSymbol namedType)
|
||||||
|
{
|
||||||
|
nestedComplexTypes.Add(namedType);
|
||||||
}
|
}
|
||||||
|
|
||||||
fieldNumber++;
|
fieldNumber++;
|
||||||
@ -296,6 +469,12 @@ internal class ProtoFileGenerator
|
|||||||
|
|
||||||
_messagesBuilder.AppendLine("}");
|
_messagesBuilder.AppendLine("}");
|
||||||
_messagesBuilder.AppendLine();
|
_messagesBuilder.AppendLine();
|
||||||
|
|
||||||
|
// Now generate nested complex type messages
|
||||||
|
foreach (var nestedType in nestedComplexTypes)
|
||||||
|
{
|
||||||
|
GenerateComplexTypeMessage(nestedType);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ITypeSymbol? GetResultType(INamedTypeSymbol commandOrQueryType)
|
private ITypeSymbol? GetResultType(INamedTypeSymbol commandOrQueryType)
|
||||||
@ -305,11 +484,8 @@ internal class ProtoFileGenerator
|
|||||||
? "ICommandHandler"
|
? "ICommandHandler"
|
||||||
: "IQueryHandler";
|
: "IQueryHandler";
|
||||||
|
|
||||||
// Find all types in the compilation
|
// Find all types in the compilation and referenced assemblies
|
||||||
var allTypes = _compilation.GetSymbolsWithName(_ => true, SymbolFilter.Type)
|
foreach (var type in GetAllTypes())
|
||||||
.OfType<INamedTypeSymbol>();
|
|
||||||
|
|
||||||
foreach (var type in allTypes)
|
|
||||||
{
|
{
|
||||||
// Check if this type implements the handler interface
|
// Check if this type implements the handler interface
|
||||||
foreach (var @interface in type.AllInterfaces)
|
foreach (var @interface in type.AllInterfaces)
|
||||||
@ -372,10 +548,8 @@ internal class ProtoFileGenerator
|
|||||||
return new List<INamedTypeSymbol>();
|
return new List<INamedTypeSymbol>();
|
||||||
|
|
||||||
var dynamicQueryTypes = new List<INamedTypeSymbol>();
|
var dynamicQueryTypes = new List<INamedTypeSymbol>();
|
||||||
var allTypes = _compilation.GetSymbolsWithName(_ => true, SymbolFilter.Type)
|
|
||||||
.OfType<INamedTypeSymbol>();
|
|
||||||
|
|
||||||
foreach (var type in allTypes)
|
foreach (var type in GetAllTypes())
|
||||||
{
|
{
|
||||||
if (type.IsAbstract || type.IsStatic)
|
if (type.IsAbstract || type.IsStatic)
|
||||||
continue;
|
continue;
|
||||||
@ -471,4 +645,205 @@ internal class ProtoFileGenerator
|
|||||||
return word + "es";
|
return word + "es";
|
||||||
return word + "s";
|
return word + "s";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Tracks an enum type for later generation
|
||||||
|
/// </summary>
|
||||||
|
private void TrackEnumType(INamedTypeSymbol enumType)
|
||||||
|
{
|
||||||
|
if (!_generatedEnums.Contains(enumType.Name) && !_pendingEnums.Any(e => e.Name == enumType.Name))
|
||||||
|
{
|
||||||
|
_pendingEnums.Add(enumType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Generates all pending enum definitions
|
||||||
|
/// </summary>
|
||||||
|
private void GeneratePendingEnums()
|
||||||
|
{
|
||||||
|
foreach (var enumType in _pendingEnums)
|
||||||
|
{
|
||||||
|
if (_generatedEnums.Contains(enumType.Name))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
_generatedEnums.Add(enumType.Name);
|
||||||
|
|
||||||
|
_enumsBuilder.AppendLine($"// {enumType.Name} enum");
|
||||||
|
_enumsBuilder.AppendLine($"enum {enumType.Name} {{");
|
||||||
|
|
||||||
|
// Get all enum members
|
||||||
|
var members = enumType.GetMembers()
|
||||||
|
.OfType<IFieldSymbol>()
|
||||||
|
.Where(f => f.HasConstantValue)
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
foreach (var member in members)
|
||||||
|
{
|
||||||
|
var protoFieldName = $"{ProtoFileTypeMapper.ToSnakeCase(enumType.Name).ToUpperInvariant()}_{ProtoFileTypeMapper.ToSnakeCase(member.Name).ToUpperInvariant()}";
|
||||||
|
var value = member.ConstantValue;
|
||||||
|
_enumsBuilder.AppendLine($" {protoFieldName} = {value};");
|
||||||
|
}
|
||||||
|
|
||||||
|
_enumsBuilder.AppendLine("}");
|
||||||
|
_enumsBuilder.AppendLine();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Discovers types marked with [StreamingNotification] attribute
|
||||||
|
/// </summary>
|
||||||
|
private List<NotificationInfo> DiscoverNotifications()
|
||||||
|
{
|
||||||
|
var streamingNotificationAttribute = _compilation.GetTypeByMetadataName(
|
||||||
|
"Svrnty.CQRS.Notifications.Abstractions.StreamingNotificationAttribute");
|
||||||
|
|
||||||
|
if (streamingNotificationAttribute == null)
|
||||||
|
return new List<NotificationInfo>();
|
||||||
|
|
||||||
|
var notifications = new List<NotificationInfo>();
|
||||||
|
|
||||||
|
foreach (var type in GetAllTypes())
|
||||||
|
{
|
||||||
|
if (type.IsAbstract || type.IsStatic)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var attr = type.GetAttributes()
|
||||||
|
.FirstOrDefault(a => SymbolEqualityComparer.Default.Equals(
|
||||||
|
a.AttributeClass, streamingNotificationAttribute));
|
||||||
|
|
||||||
|
if (attr == null)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// Extract SubscriptionKey from attribute
|
||||||
|
var subscriptionKeyArg = attr.NamedArguments
|
||||||
|
.FirstOrDefault(a => a.Key == "SubscriptionKey");
|
||||||
|
var subscriptionKeyProp = subscriptionKeyArg.Value.Value as string;
|
||||||
|
|
||||||
|
if (string.IsNullOrEmpty(subscriptionKeyProp))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// Get all properties of the notification type
|
||||||
|
var properties = ExtractNotificationProperties(type);
|
||||||
|
|
||||||
|
// Find the subscription key property info
|
||||||
|
var keyPropInfo = properties.FirstOrDefault(p => p.Name == subscriptionKeyProp);
|
||||||
|
if (keyPropInfo == null)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
notifications.Add(new NotificationInfo
|
||||||
|
{
|
||||||
|
Name = type.Name,
|
||||||
|
FullyQualifiedName = type.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)
|
||||||
|
.Replace("global::", ""),
|
||||||
|
Namespace = type.ContainingNamespace?.ToDisplayString() ?? "",
|
||||||
|
SubscriptionKeyProperty = subscriptionKeyProp,
|
||||||
|
SubscriptionKeyInfo = keyPropInfo,
|
||||||
|
Properties = properties
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return notifications;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Extracts property information from a notification type
|
||||||
|
/// </summary>
|
||||||
|
private List<Models.PropertyInfo> ExtractNotificationProperties(INamedTypeSymbol type)
|
||||||
|
{
|
||||||
|
var properties = new List<Models.PropertyInfo>();
|
||||||
|
int fieldNumber = 1;
|
||||||
|
|
||||||
|
foreach (var prop in type.GetMembers().OfType<IPropertySymbol>()
|
||||||
|
.Where(p => p.DeclaredAccessibility == Accessibility.Public))
|
||||||
|
{
|
||||||
|
if (ProtoFileTypeMapper.IsUnsupportedType(prop.Type))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var protoType = ProtoFileTypeMapper.MapType(prop.Type, out _, out _);
|
||||||
|
var enumType = ProtoFileTypeMapper.GetEnumType(prop.Type);
|
||||||
|
|
||||||
|
properties.Add(new Models.PropertyInfo
|
||||||
|
{
|
||||||
|
Name = prop.Name,
|
||||||
|
Type = prop.Type.Name,
|
||||||
|
FullyQualifiedType = prop.Type.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)
|
||||||
|
.Replace("global::", ""),
|
||||||
|
ProtoType = protoType,
|
||||||
|
FieldNumber = fieldNumber++,
|
||||||
|
IsEnum = enumType != null,
|
||||||
|
IsDecimal = prop.Type.SpecialType == SpecialType.System_Decimal ||
|
||||||
|
prop.Type.ToDisplayString().Contains("decimal"),
|
||||||
|
IsDateTime = prop.Type.ToDisplayString().Contains("DateTime"),
|
||||||
|
IsNullable = prop.Type.NullableAnnotation == NullableAnnotation.Annotated ||
|
||||||
|
(prop.Type is INamedTypeSymbol namedType &&
|
||||||
|
namedType.OriginalDefinition.SpecialType == SpecialType.System_Nullable_T)
|
||||||
|
});
|
||||||
|
|
||||||
|
if (enumType != null)
|
||||||
|
{
|
||||||
|
TrackEnumType(enumType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Generates proto messages for a notification type
|
||||||
|
/// </summary>
|
||||||
|
private void GenerateNotificationMessages(NotificationInfo notification)
|
||||||
|
{
|
||||||
|
// Generate subscription request message (contains only the subscription key)
|
||||||
|
var requestMessageName = $"SubscribeTo{notification.Name}Request";
|
||||||
|
if (!_generatedMessages.Contains(requestMessageName))
|
||||||
|
{
|
||||||
|
_generatedMessages.Add(requestMessageName);
|
||||||
|
|
||||||
|
_messagesBuilder.AppendLine($"// Subscription request for {notification.Name}");
|
||||||
|
_messagesBuilder.AppendLine($"message {requestMessageName} {{");
|
||||||
|
_messagesBuilder.AppendLine($" {notification.SubscriptionKeyInfo.ProtoType} {ProtoFileTypeMapper.ToSnakeCase(notification.SubscriptionKeyProperty)} = 1;");
|
||||||
|
_messagesBuilder.AppendLine("}");
|
||||||
|
_messagesBuilder.AppendLine();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate the notification message itself
|
||||||
|
if (!_generatedMessages.Contains(notification.Name))
|
||||||
|
{
|
||||||
|
_generatedMessages.Add(notification.Name);
|
||||||
|
|
||||||
|
_messagesBuilder.AppendLine($"// {notification.Name} streaming notification");
|
||||||
|
_messagesBuilder.AppendLine($"message {notification.Name} {{");
|
||||||
|
|
||||||
|
foreach (var prop in notification.Properties)
|
||||||
|
{
|
||||||
|
var protoType = ProtoFileTypeMapper.MapType(
|
||||||
|
_compilation.GetTypeByMetadataName(prop.FullyQualifiedType) ??
|
||||||
|
GetTypeFromName(prop.FullyQualifiedType),
|
||||||
|
out var needsImport, out var importPath);
|
||||||
|
|
||||||
|
if (needsImport && importPath != null)
|
||||||
|
{
|
||||||
|
_requiredImports.Add(importPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
var fieldName = ProtoFileTypeMapper.ToSnakeCase(prop.Name);
|
||||||
|
_messagesBuilder.AppendLine($" {prop.ProtoType} {fieldName} = {prop.FieldNumber};");
|
||||||
|
}
|
||||||
|
|
||||||
|
_messagesBuilder.AppendLine("}");
|
||||||
|
_messagesBuilder.AppendLine();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets a type symbol from a type name by searching all types
|
||||||
|
/// </summary>
|
||||||
|
private ITypeSymbol? GetTypeFromName(string fullTypeName)
|
||||||
|
{
|
||||||
|
// Try to find the type in all types
|
||||||
|
return GetAllTypes().FirstOrDefault(t =>
|
||||||
|
t.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat).Replace("global::", "") == fullTypeName ||
|
||||||
|
t.ToDisplayString() == fullTypeName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,24 +20,25 @@ public class ProtoFileSourceGenerator : IIncrementalGenerator
|
|||||||
// Generate a placeholder - the actual proto will be generated in the source output
|
// Generate a placeholder - the actual proto will be generated in the source output
|
||||||
});
|
});
|
||||||
|
|
||||||
// Collect all command and query types
|
// Collect type declarations to trigger generation
|
||||||
var commandsAndQueries = context.SyntaxProvider
|
// We use any type declaration as a trigger since ProtoFileGenerator scans all assemblies
|
||||||
|
var typeDeclarations = context.SyntaxProvider
|
||||||
.CreateSyntaxProvider(
|
.CreateSyntaxProvider(
|
||||||
predicate: static (s, _) => IsCommandOrQuery(s),
|
predicate: static (s, _) => s is TypeDeclarationSyntax,
|
||||||
transform: static (ctx, _) => GetTypeSymbol(ctx))
|
transform: static (ctx, _) => GetTypeSymbol(ctx))
|
||||||
.Where(static m => m is not null)
|
.Where(static m => m is not null)
|
||||||
.Collect();
|
.Collect();
|
||||||
|
|
||||||
// Combine with compilation to have access to it
|
// Combine with compilation to have access to it
|
||||||
var compilationAndTypes = context.CompilationProvider.Combine(commandsAndQueries);
|
var compilationAndTypes = context.CompilationProvider.Combine(typeDeclarations);
|
||||||
|
|
||||||
// Generate proto file when commands/queries change
|
// Generate proto file when commands/queries change
|
||||||
context.RegisterSourceOutput(compilationAndTypes, (spc, source) =>
|
context.RegisterSourceOutput(compilationAndTypes, (spc, source) =>
|
||||||
{
|
{
|
||||||
var (compilation, types) = source;
|
var (compilation, types) = source;
|
||||||
|
|
||||||
if (types.IsDefaultOrEmpty)
|
// Note: We no longer bail out early since ProtoFileGenerator now scans all referenced assemblies
|
||||||
return;
|
// The types from source are just a trigger - the generator will find types from all assemblies
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -102,15 +103,6 @@ public class ProtoFileSourceGenerator : IIncrementalGenerator
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static bool IsCommandOrQuery(SyntaxNode node)
|
|
||||||
{
|
|
||||||
if (node is not TypeDeclarationSyntax typeDecl)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
var name = typeDecl.Identifier.Text;
|
|
||||||
return name.EndsWith("Command") || name.EndsWith("Query");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static INamedTypeSymbol? GetTypeSymbol(GeneratorSyntaxContext context)
|
private static INamedTypeSymbol? GetTypeSymbol(GeneratorSyntaxContext context)
|
||||||
{
|
{
|
||||||
var typeDecl = (TypeDeclarationSyntax)context.Node;
|
var typeDecl = (TypeDeclarationSyntax)context.Node;
|
||||||
|
|||||||
@ -17,11 +17,8 @@ internal static class ProtoFileTypeMapper
|
|||||||
var fullTypeName = typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
|
var fullTypeName = typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat);
|
||||||
var typeName = typeSymbol.Name;
|
var typeName = typeSymbol.Name;
|
||||||
|
|
||||||
// Nullable types - unwrap
|
// Note: NullableAnnotation.Annotated is for reference type nullability (List<T>?, string?, etc.)
|
||||||
if (typeSymbol.NullableAnnotation == NullableAnnotation.Annotated && typeSymbol is INamedTypeSymbol namedType && namedType.TypeArguments.Length > 0)
|
// We don't unwrap these - just use the underlying type. Nullable<T> value types are handled later.
|
||||||
{
|
|
||||||
return MapType(namedType.TypeArguments[0], out needsImport, out importPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Basic types
|
// Basic types
|
||||||
switch (typeName)
|
switch (typeName)
|
||||||
@ -75,17 +72,31 @@ internal static class ProtoFileTypeMapper
|
|||||||
return "string";
|
return "string";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fullTypeName.Contains("System.Decimal"))
|
if (fullTypeName.Contains("System.Decimal") || typeName == "Decimal" || fullTypeName == "decimal")
|
||||||
{
|
{
|
||||||
// Decimal serialized as string (no native decimal in proto)
|
// Decimal serialized as string (no native decimal in proto)
|
||||||
return "string";
|
return "string";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle Nullable<T> value types (e.g., int?, decimal?, enum?)
|
||||||
|
if (typeSymbol is INamedTypeSymbol nullableType &&
|
||||||
|
nullableType.OriginalDefinition.SpecialType == SpecialType.System_Nullable_T &&
|
||||||
|
nullableType.TypeArguments.Length == 1)
|
||||||
|
{
|
||||||
|
// Unwrap the nullable and map the inner type
|
||||||
|
return MapType(nullableType.TypeArguments[0], out needsImport, out importPath);
|
||||||
|
}
|
||||||
|
|
||||||
// Collections
|
// Collections
|
||||||
if (typeSymbol is INamedTypeSymbol collectionType)
|
if (typeSymbol is INamedTypeSymbol collectionType)
|
||||||
{
|
{
|
||||||
// List, IEnumerable, Array, etc.
|
// List, IEnumerable, Array, ICollection etc. (but not Nullable<T>)
|
||||||
if (collectionType.TypeArguments.Length == 1)
|
var typeName2 = collectionType.Name;
|
||||||
|
if (collectionType.TypeArguments.Length == 1 &&
|
||||||
|
(typeName2.Contains("List") || typeName2.Contains("Collection") ||
|
||||||
|
typeName2.Contains("Enumerable") || typeName2.Contains("Array") ||
|
||||||
|
typeName2.Contains("Set") || typeName2.Contains("IList") ||
|
||||||
|
typeName2.Contains("ICollection") || typeName2.Contains("IEnumerable")))
|
||||||
{
|
{
|
||||||
var elementType = collectionType.TypeArguments[0];
|
var elementType = collectionType.TypeArguments[0];
|
||||||
var protoElementType = MapType(elementType, out needsImport, out importPath);
|
var protoElementType = MapType(elementType, out needsImport, out importPath);
|
||||||
@ -188,4 +199,56 @@ internal static class ProtoFileTypeMapper
|
|||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the element type from a collection type, or returns the type itself if not a collection.
|
||||||
|
/// Also unwraps Nullable types.
|
||||||
|
/// </summary>
|
||||||
|
public static ITypeSymbol GetElementOrUnderlyingType(ITypeSymbol typeSymbol)
|
||||||
|
{
|
||||||
|
// Unwrap Nullable<T>
|
||||||
|
if (typeSymbol is INamedTypeSymbol nullableType &&
|
||||||
|
nullableType.OriginalDefinition.SpecialType == SpecialType.System_Nullable_T &&
|
||||||
|
nullableType.TypeArguments.Length == 1)
|
||||||
|
{
|
||||||
|
return GetElementOrUnderlyingType(nullableType.TypeArguments[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract element type from collections
|
||||||
|
if (typeSymbol is INamedTypeSymbol collectionType && collectionType.TypeArguments.Length == 1)
|
||||||
|
{
|
||||||
|
var typeName = collectionType.Name;
|
||||||
|
if (typeName.Contains("List") || typeName.Contains("Collection") ||
|
||||||
|
typeName.Contains("Enumerable") || typeName.Contains("Array") ||
|
||||||
|
typeName.Contains("Set") || typeName.Contains("IList") ||
|
||||||
|
typeName.Contains("ICollection") || typeName.Contains("IEnumerable"))
|
||||||
|
{
|
||||||
|
return GetElementOrUnderlyingType(collectionType.TypeArguments[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return typeSymbol;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Checks if the type is an enum (including nullable enums)
|
||||||
|
/// </summary>
|
||||||
|
public static bool IsEnumType(ITypeSymbol typeSymbol)
|
||||||
|
{
|
||||||
|
var underlying = GetElementOrUnderlyingType(typeSymbol);
|
||||||
|
return underlying.TypeKind == TypeKind.Enum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the enum type symbol if this is an enum or nullable enum, otherwise null
|
||||||
|
/// </summary>
|
||||||
|
public static INamedTypeSymbol? GetEnumType(ITypeSymbol typeSymbol)
|
||||||
|
{
|
||||||
|
var underlying = GetElementOrUnderlyingType(typeSymbol);
|
||||||
|
if (underlying.TypeKind == TypeKind.Enum && underlying is INamedTypeSymbol enumType)
|
||||||
|
{
|
||||||
|
return enumType;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -62,7 +62,27 @@ public class WriteProtoFileTask : Task
|
|||||||
Log.LogWarning(
|
Log.LogWarning(
|
||||||
$"Generated proto file not found at {generatedFilePath}. " +
|
$"Generated proto file not found at {generatedFilePath}. " +
|
||||||
"The proto file may not have been generated yet. This is normal on first build.");
|
"The proto file may not have been generated yet. This is normal on first build.");
|
||||||
return true; // Don't fail the build, just skip
|
|
||||||
|
// Write a minimal placeholder proto file so Grpc.Tools doesn't fail
|
||||||
|
// The real content will be generated on the next build
|
||||||
|
var placeholderProto = @"syntax = ""proto3"";
|
||||||
|
|
||||||
|
option csharp_namespace = ""Generated.Grpc"";
|
||||||
|
|
||||||
|
package cqrs;
|
||||||
|
|
||||||
|
// Placeholder proto file - will be regenerated on next build
|
||||||
|
";
|
||||||
|
var placeholderOutputPath = Path.Combine(ProjectDirectory, OutputDirectory);
|
||||||
|
Directory.CreateDirectory(placeholderOutputPath);
|
||||||
|
var placeholderProtoFilePath = Path.Combine(placeholderOutputPath, ProtoFileName);
|
||||||
|
File.WriteAllText(placeholderProtoFilePath, placeholderProto);
|
||||||
|
|
||||||
|
Log.LogMessage(MessageImportance.High,
|
||||||
|
$"Svrnty.CQRS.Grpc: Wrote placeholder proto file at {placeholderProtoFilePath}. " +
|
||||||
|
"Run build again to generate the actual proto content.");
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the generated C# file
|
// Read the generated C# file
|
||||||
|
|||||||
@ -27,7 +27,7 @@
|
|||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Grpc.AspNetCore" Version="2.68.0" />
|
<PackageReference Include="Grpc.AspNetCore" Version="2.71.0" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|||||||
@ -0,0 +1,18 @@
|
|||||||
|
namespace Svrnty.CQRS.Notifications.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes notifications to all subscribed gRPC clients.
|
||||||
|
/// </summary>
|
||||||
|
public interface INotificationPublisher
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Publish a notification to all subscribers matching the subscription key.
|
||||||
|
/// The subscription key is extracted from the notification based on the
|
||||||
|
/// <see cref="StreamingNotificationAttribute.SubscriptionKey"/> property.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TNotification">The notification type marked with <see cref="StreamingNotificationAttribute"/>.</typeparam>
|
||||||
|
/// <param name="notification">The notification to publish.</param>
|
||||||
|
/// <param name="ct">Cancellation token.</param>
|
||||||
|
Task PublishAsync<TNotification>(TNotification notification, CancellationToken ct = default)
|
||||||
|
where TNotification : class;
|
||||||
|
}
|
||||||
@ -0,0 +1,15 @@
|
|||||||
|
namespace Svrnty.CQRS.Notifications.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Marks a record as a streaming notification that can be subscribed to via gRPC.
|
||||||
|
/// The framework will auto-generate proto definitions and service implementations.
|
||||||
|
/// </summary>
|
||||||
|
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false, Inherited = false)]
|
||||||
|
public sealed class StreamingNotificationAttribute : Attribute
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// The property name used as the subscription key.
|
||||||
|
/// Subscribers filter notifications by this value.
|
||||||
|
/// </summary>
|
||||||
|
public required string SubscriptionKey { get; set; }
|
||||||
|
}
|
||||||
@ -0,0 +1,29 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>true</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
<ImplicitUsings>enable</ImplicitUsings>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
76
Svrnty.CQRS.Notifications.Grpc/NotificationPublisher.cs
Normal file
76
Svrnty.CQRS.Notifications.Grpc/NotificationPublisher.cs
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Reflection;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Svrnty.CQRS.Notifications.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Notifications.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes notifications to subscribed gRPC clients.
|
||||||
|
/// </summary>
|
||||||
|
public class NotificationPublisher : INotificationPublisher
|
||||||
|
{
|
||||||
|
private readonly NotificationSubscriptionManager _manager;
|
||||||
|
private readonly ILogger<NotificationPublisher> _logger;
|
||||||
|
|
||||||
|
// Cache subscription key property info per notification type
|
||||||
|
private static readonly ConcurrentDictionary<Type, SubscriptionKeyInfo> _keyCache = new();
|
||||||
|
|
||||||
|
public NotificationPublisher(
|
||||||
|
NotificationSubscriptionManager manager,
|
||||||
|
ILogger<NotificationPublisher> logger)
|
||||||
|
{
|
||||||
|
_manager = manager;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task PublishAsync<TNotification>(TNotification notification, CancellationToken ct = default)
|
||||||
|
where TNotification : class
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(notification);
|
||||||
|
|
||||||
|
var keyInfo = GetSubscriptionKeyInfo(typeof(TNotification));
|
||||||
|
var subscriptionKey = keyInfo.Property.GetValue(notification);
|
||||||
|
|
||||||
|
if (subscriptionKey == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(
|
||||||
|
"Subscription key {PropertyName} is null on {NotificationType}, skipping notification",
|
||||||
|
keyInfo.PropertyName, typeof(TNotification).Name);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Publishing {NotificationType} with subscription key {PropertyName}={KeyValue}",
|
||||||
|
typeof(TNotification).Name, keyInfo.PropertyName, subscriptionKey);
|
||||||
|
|
||||||
|
await _manager.NotifyAsync(notification, subscriptionKey, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static SubscriptionKeyInfo GetSubscriptionKeyInfo(Type type)
|
||||||
|
{
|
||||||
|
return _keyCache.GetOrAdd(type, t =>
|
||||||
|
{
|
||||||
|
var attr = t.GetCustomAttribute<StreamingNotificationAttribute>();
|
||||||
|
if (attr == null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Type {t.Name} is not marked with [{nameof(StreamingNotificationAttribute)}]. " +
|
||||||
|
$"Add the attribute with a SubscriptionKey to enable streaming notifications.");
|
||||||
|
}
|
||||||
|
|
||||||
|
var property = t.GetProperty(attr.SubscriptionKey);
|
||||||
|
if (property == null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Property '{attr.SubscriptionKey}' specified in [{nameof(StreamingNotificationAttribute)}] " +
|
||||||
|
$"was not found on type {t.Name}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new SubscriptionKeyInfo(attr.SubscriptionKey, property);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record SubscriptionKeyInfo(string PropertyName, PropertyInfo Property);
|
||||||
|
}
|
||||||
@ -0,0 +1,164 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Grpc.Core;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Notifications.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Manages gRPC stream subscriptions for notifications.
|
||||||
|
/// Thread-safe singleton that tracks subscriptions and routes notifications to subscribers.
|
||||||
|
/// </summary>
|
||||||
|
public class NotificationSubscriptionManager
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<(string TypeName, string Key), ConcurrentBag<object>> _subscriptions = new();
|
||||||
|
private readonly ILogger<NotificationSubscriptionManager> _logger;
|
||||||
|
|
||||||
|
public NotificationSubscriptionManager(ILogger<NotificationSubscriptionManager> logger)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribe to notifications of a specific domain type with a mapper to convert to proto format.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TDomain">The domain notification type.</typeparam>
|
||||||
|
/// <typeparam name="TProto">The proto message type.</typeparam>
|
||||||
|
/// <param name="subscriptionKey">The subscription key value (e.g., inventory ID).</param>
|
||||||
|
/// <param name="stream">The gRPC server stream writer.</param>
|
||||||
|
/// <param name="mapper">Function to map domain notification to proto message.</param>
|
||||||
|
/// <returns>A disposable that removes the subscription when disposed.</returns>
|
||||||
|
public IDisposable Subscribe<TDomain, TProto>(
|
||||||
|
object subscriptionKey,
|
||||||
|
IServerStreamWriter<TProto> stream,
|
||||||
|
Func<TDomain, TProto> mapper) where TDomain : class
|
||||||
|
{
|
||||||
|
var key = (typeof(TDomain).FullName!, subscriptionKey.ToString()!);
|
||||||
|
var subscriber = new Subscriber<TDomain, TProto>(stream, mapper);
|
||||||
|
var bag = _subscriptions.GetOrAdd(key, _ => new ConcurrentBag<object>());
|
||||||
|
bag.Add(subscriber);
|
||||||
|
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Client subscribed to {NotificationType} with key {SubscriptionKey}. Total subscribers: {Count}",
|
||||||
|
typeof(TDomain).Name, subscriptionKey, bag.Count);
|
||||||
|
|
||||||
|
return new SubscriptionHandle(() => Remove(key, subscriber));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Notify all subscribers of a specific notification type and subscription key.
|
||||||
|
/// </summary>
|
||||||
|
internal async Task NotifyAsync<TDomain>(TDomain notification, object subscriptionKey, CancellationToken ct) where TDomain : class
|
||||||
|
{
|
||||||
|
var key = (typeof(TDomain).FullName!, subscriptionKey.ToString()!);
|
||||||
|
|
||||||
|
if (!_subscriptions.TryGetValue(key, out var subscribers))
|
||||||
|
{
|
||||||
|
_logger.LogDebug(
|
||||||
|
"No subscribers for {NotificationType} with key {SubscriptionKey}",
|
||||||
|
typeof(TDomain).Name, subscriptionKey);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var deadSubscribers = new List<object>();
|
||||||
|
|
||||||
|
foreach (var sub in subscribers)
|
||||||
|
{
|
||||||
|
if (sub is INotifiable<TDomain> notifiable)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await notifiable.NotifyAsync(notification, ct);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex,
|
||||||
|
"Failed to notify subscriber for {NotificationType}, marking for removal",
|
||||||
|
typeof(TDomain).Name);
|
||||||
|
deadSubscribers.Add(sub);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up dead subscribers
|
||||||
|
foreach (var dead in deadSubscribers)
|
||||||
|
{
|
||||||
|
Remove(key, dead);
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Notified {Count} subscribers for {NotificationType} with key {SubscriptionKey}",
|
||||||
|
subscribers.Count - deadSubscribers.Count, typeof(TDomain).Name, subscriptionKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void Remove((string TypeName, string Key) key, object subscriber)
|
||||||
|
{
|
||||||
|
if (_subscriptions.TryGetValue(key, out var bag))
|
||||||
|
{
|
||||||
|
// ConcurrentBag doesn't support removal, so we rebuild
|
||||||
|
var remaining = bag.Where(s => !ReferenceEquals(s, subscriber)).ToList();
|
||||||
|
if (remaining.Count == 0)
|
||||||
|
{
|
||||||
|
_subscriptions.TryRemove(key, out _);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
var newBag = new ConcurrentBag<object>(remaining);
|
||||||
|
_subscriptions.TryUpdate(key, newBag, bag);
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Client unsubscribed from {NotificationType} with key {SubscriptionKey}",
|
||||||
|
key.TypeName.Split('.').Last(), key.Key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Internal interface for type-erased notification delivery.
|
||||||
|
/// </summary>
|
||||||
|
internal interface INotifiable<in TDomain>
|
||||||
|
{
|
||||||
|
Task NotifyAsync(TDomain notification, CancellationToken ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Wraps a gRPC stream writer with a domain→proto mapper.
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class Subscriber<TDomain, TProto> : INotifiable<TDomain>
|
||||||
|
{
|
||||||
|
private readonly IServerStreamWriter<TProto> _stream;
|
||||||
|
private readonly Func<TDomain, TProto> _mapper;
|
||||||
|
|
||||||
|
public Subscriber(IServerStreamWriter<TProto> stream, Func<TDomain, TProto> mapper)
|
||||||
|
{
|
||||||
|
_stream = stream;
|
||||||
|
_mapper = mapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task NotifyAsync(TDomain notification, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var proto = _mapper(notification);
|
||||||
|
await _stream.WriteAsync(proto, ct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handle that removes a subscription when disposed.
|
||||||
|
/// </summary>
|
||||||
|
internal sealed class SubscriptionHandle : IDisposable
|
||||||
|
{
|
||||||
|
private readonly Action _onDispose;
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
public SubscriptionHandle(Action onDispose)
|
||||||
|
{
|
||||||
|
_onDispose = onDispose;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (_disposed) return;
|
||||||
|
_disposed = true;
|
||||||
|
_onDispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,26 @@
|
|||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Svrnty.CQRS.Notifications.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Notifications.Grpc;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Extension methods for registering streaming notification services.
|
||||||
|
/// </summary>
|
||||||
|
public static class ServiceCollectionExtensions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Adds gRPC streaming notification services to the service collection.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="services">The service collection.</param>
|
||||||
|
/// <returns>The service collection for chaining.</returns>
|
||||||
|
public static IServiceCollection AddStreamingNotifications(this IServiceCollection services)
|
||||||
|
{
|
||||||
|
// Subscription manager is singleton - shared state for all subscriptions
|
||||||
|
services.AddSingleton<NotificationSubscriptionManager>();
|
||||||
|
|
||||||
|
// Publisher can be singleton since it only depends on the manager
|
||||||
|
services.AddSingleton<INotificationPublisher, NotificationPublisher>();
|
||||||
|
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,39 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>false</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
<ImplicitUsings>enable</ImplicitUsings>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Grpc.AspNetCore" Version="2.71.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="10.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\Svrnty.CQRS.Notifications.Abstractions\Svrnty.CQRS.Notifications.Abstractions.csproj" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
14
Svrnty.CQRS.Sagas.Abstractions/ISaga.cs
Normal file
14
Svrnty.CQRS.Sagas.Abstractions/ISaga.cs
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines a saga with its steps and compensation logic.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga's data/context type.</typeparam>
|
||||||
|
public interface ISaga<TData> where TData : class, ISagaData, new()
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Configures the saga steps using the fluent builder.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="builder">The saga builder for defining steps.</param>
|
||||||
|
void Configure(ISagaBuilder<TData> builder);
|
||||||
|
}
|
||||||
173
Svrnty.CQRS.Sagas.Abstractions/ISagaBuilder.cs
Normal file
173
Svrnty.CQRS.Sagas.Abstractions/ISagaBuilder.cs
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fluent builder for defining saga steps.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
public interface ISagaBuilder<TData> where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Adds a local step that executes synchronously within the orchestrator process.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="name">Unique name for this step.</param>
|
||||||
|
/// <returns>Builder for configuring the step.</returns>
|
||||||
|
ISagaStepBuilder<TData> Step(string name);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Adds a step that sends a command to a remote service via messaging.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TCommand">The command type to send.</typeparam>
|
||||||
|
/// <param name="name">Unique name for this step.</param>
|
||||||
|
/// <returns>Builder for configuring the remote step.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand> SendCommand<TCommand>(string name) where TCommand : class;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Adds a step that sends a command and expects a specific result.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TCommand">The command type to send.</typeparam>
|
||||||
|
/// <typeparam name="TResult">The expected result type.</typeparam>
|
||||||
|
/// <param name="name">Unique name for this step.</param>
|
||||||
|
/// <returns>Builder for configuring the remote step.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand, TResult> SendCommand<TCommand, TResult>(string name) where TCommand : class;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builder for configuring a local saga step.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
public interface ISagaStepBuilder<TData> where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Defines the execution action for this step.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="action">The action to execute.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaStepBuilder<TData> Execute(Func<TData, ISagaContext, CancellationToken, Task> action);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines the compensation action for this step.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="action">The compensation action to execute on rollback.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaStepBuilder<TData> Compensate(Func<TData, ISagaContext, CancellationToken, Task> action);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Completes this step definition and returns to the saga builder.
|
||||||
|
/// </summary>
|
||||||
|
/// <returns>The saga builder for adding more steps.</returns>
|
||||||
|
ISagaBuilder<TData> Then();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builder for configuring a remote command saga step (no result).
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <typeparam name="TCommand">The command type to send.</typeparam>
|
||||||
|
public interface ISagaRemoteStepBuilder<TData, TCommand>
|
||||||
|
where TData : class, ISagaData
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Defines how to build the command from saga data.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="commandBuilder">Function to create the command.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand> WithCommand(Func<TData, ISagaContext, TCommand> commandBuilder);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines what to do when the command completes successfully.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="handler">Handler for the response.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand> OnResponse(Func<TData, ISagaContext, CancellationToken, Task> handler);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines the compensation command to send on rollback.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TCompensationCommand">The compensation command type.</typeparam>
|
||||||
|
/// <param name="compensationBuilder">Function to create the compensation command.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand> Compensate<TCompensationCommand>(
|
||||||
|
Func<TData, ISagaContext, TCompensationCommand> compensationBuilder) where TCompensationCommand : class;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets a timeout for this step.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="timeout">The timeout duration.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand> WithTimeout(TimeSpan timeout);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Configures retry behavior for this step.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="maxRetries">Maximum number of retries.</param>
|
||||||
|
/// <param name="delay">Delay between retries.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand> WithRetry(int maxRetries, TimeSpan delay);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Completes this step definition and returns to the saga builder.
|
||||||
|
/// </summary>
|
||||||
|
/// <returns>The saga builder for adding more steps.</returns>
|
||||||
|
ISagaBuilder<TData> Then();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builder for configuring a remote command saga step with result.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <typeparam name="TCommand">The command type to send.</typeparam>
|
||||||
|
/// <typeparam name="TResult">The expected result type.</typeparam>
|
||||||
|
public interface ISagaRemoteStepBuilder<TData, TCommand, TResult>
|
||||||
|
where TData : class, ISagaData
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Defines how to build the command from saga data.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="commandBuilder">Function to create the command.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand, TResult> WithCommand(Func<TData, ISagaContext, TCommand> commandBuilder);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines what to do when the command completes successfully with a result.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="handler">Handler for the response with result.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand, TResult> OnResponse(
|
||||||
|
Func<TData, ISagaContext, TResult, CancellationToken, Task> handler);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Defines the compensation command to send on rollback.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TCompensationCommand">The compensation command type.</typeparam>
|
||||||
|
/// <param name="compensationBuilder">Function to create the compensation command.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand, TResult> Compensate<TCompensationCommand>(
|
||||||
|
Func<TData, ISagaContext, TCompensationCommand> compensationBuilder) where TCompensationCommand : class;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets a timeout for this step.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="timeout">The timeout duration.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand, TResult> WithTimeout(TimeSpan timeout);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Configures retry behavior for this step.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="maxRetries">Maximum number of retries.</param>
|
||||||
|
/// <param name="delay">Delay between retries.</param>
|
||||||
|
/// <returns>This builder for chaining.</returns>
|
||||||
|
ISagaRemoteStepBuilder<TData, TCommand, TResult> WithRetry(int maxRetries, TimeSpan delay);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Completes this step definition and returns to the saga builder.
|
||||||
|
/// </summary>
|
||||||
|
/// <returns>The saga builder for adding more steps.</returns>
|
||||||
|
ISagaBuilder<TData> Then();
|
||||||
|
}
|
||||||
55
Svrnty.CQRS.Sagas.Abstractions/ISagaContext.cs
Normal file
55
Svrnty.CQRS.Sagas.Abstractions/ISagaContext.cs
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Provides context information during saga step execution.
|
||||||
|
/// </summary>
|
||||||
|
public interface ISagaContext
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Unique identifier for this saga instance.
|
||||||
|
/// </summary>
|
||||||
|
Guid SagaId { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Correlation ID for tracing across services.
|
||||||
|
/// </summary>
|
||||||
|
Guid CorrelationId { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The fully qualified type name of the saga.
|
||||||
|
/// </summary>
|
||||||
|
string SagaType { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Index of the current step being executed.
|
||||||
|
/// </summary>
|
||||||
|
int CurrentStepIndex { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Name of the current step being executed.
|
||||||
|
/// </summary>
|
||||||
|
string CurrentStepName { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Results from completed steps, keyed by step name.
|
||||||
|
/// </summary>
|
||||||
|
IReadOnlyDictionary<string, object?> StepResults { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets a result from a previous step.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The expected result type.</typeparam>
|
||||||
|
/// <param name="stepName">The name of the step.</param>
|
||||||
|
/// <returns>The result, or default if not found.</returns>
|
||||||
|
T? GetStepResult<T>(string stepName);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sets a result for the current step (available to subsequent steps).
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="T">The result type.</typeparam>
|
||||||
|
/// <param name="result">The result value.</param>
|
||||||
|
void SetStepResult<T>(T result);
|
||||||
|
}
|
||||||
14
Svrnty.CQRS.Sagas.Abstractions/ISagaData.cs
Normal file
14
Svrnty.CQRS.Sagas.Abstractions/ISagaData.cs
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Marker interface for saga data. All saga data classes must implement this interface.
|
||||||
|
/// </summary>
|
||||||
|
public interface ISagaData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Correlation ID for tracing the saga across services.
|
||||||
|
/// </summary>
|
||||||
|
Guid CorrelationId { get; set; }
|
||||||
|
}
|
||||||
52
Svrnty.CQRS.Sagas.Abstractions/ISagaOrchestrator.cs
Normal file
52
Svrnty.CQRS.Sagas.Abstractions/ISagaOrchestrator.cs
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Orchestrates saga execution.
|
||||||
|
/// </summary>
|
||||||
|
public interface ISagaOrchestrator
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Starts a new saga instance with a generated correlation ID.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TSaga">The saga type.</typeparam>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <param name="initialData">The initial saga data.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The saga state.</returns>
|
||||||
|
Task<SagaState> StartAsync<TSaga, TData>(TData initialData, CancellationToken cancellationToken = default)
|
||||||
|
where TSaga : ISaga<TData>
|
||||||
|
where TData : class, ISagaData, new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Starts a new saga instance with a specific correlation ID.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TSaga">The saga type.</typeparam>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <param name="initialData">The initial saga data.</param>
|
||||||
|
/// <param name="correlationId">The correlation ID for tracing.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The saga state.</returns>
|
||||||
|
Task<SagaState> StartAsync<TSaga, TData>(TData initialData, Guid correlationId, CancellationToken cancellationToken = default)
|
||||||
|
where TSaga : ISaga<TData>
|
||||||
|
where TData : class, ISagaData, new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the current state of a saga by its ID.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="sagaId">The saga instance ID.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The saga state, or null if not found.</returns>
|
||||||
|
Task<SagaState?> GetStateAsync(Guid sagaId, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the current state of a saga by its correlation ID.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="correlationId">The correlation ID.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The saga state, or null if not found.</returns>
|
||||||
|
Task<SagaState?> GetStateByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken = default);
|
||||||
|
}
|
||||||
44
Svrnty.CQRS.Sagas.Abstractions/Messaging/ISagaMessageBus.cs
Normal file
44
Svrnty.CQRS.Sagas.Abstractions/Messaging/ISagaMessageBus.cs
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Abstraction for saga messaging transport.
|
||||||
|
/// </summary>
|
||||||
|
public interface ISagaMessageBus
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a saga command message to the message bus.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message">The message to publish.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
Task PublishAsync(SagaMessage message, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a saga step response to the message bus.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="response">The response to publish.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
Task PublishResponseAsync(SagaStepResponse response, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribes to saga messages for a specific command type.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TCommand">The command type to subscribe to.</typeparam>
|
||||||
|
/// <param name="handler">Handler that processes the message and returns a response.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
Task SubscribeAsync<TCommand>(
|
||||||
|
Func<SagaMessage, TCommand, CancellationToken, Task<SagaStepResponse>> handler,
|
||||||
|
CancellationToken cancellationToken = default) where TCommand : class;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Subscribes to saga step responses.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="handler">Handler that processes responses.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
Task SubscribeToResponsesAsync(
|
||||||
|
Func<SagaStepResponse, CancellationToken, Task> handler,
|
||||||
|
CancellationToken cancellationToken = default);
|
||||||
|
}
|
||||||
55
Svrnty.CQRS.Sagas.Abstractions/Messaging/SagaMessage.cs
Normal file
55
Svrnty.CQRS.Sagas.Abstractions/Messaging/SagaMessage.cs
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Message envelope for saga commands sent to remote services.
|
||||||
|
/// </summary>
|
||||||
|
public record SagaMessage
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Unique identifier for this message.
|
||||||
|
/// </summary>
|
||||||
|
public Guid MessageId { get; init; } = Guid.NewGuid();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The saga instance ID.
|
||||||
|
/// </summary>
|
||||||
|
public Guid SagaId { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Correlation ID for tracing across services.
|
||||||
|
/// </summary>
|
||||||
|
public Guid CorrelationId { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Name of the saga step that sent this message.
|
||||||
|
/// </summary>
|
||||||
|
public string StepName { get; init; } = string.Empty;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fully qualified type name of the command.
|
||||||
|
/// </summary>
|
||||||
|
public string CommandType { get; init; } = string.Empty;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Serialized command payload (JSON).
|
||||||
|
/// </summary>
|
||||||
|
public string? Payload { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When the message was created.
|
||||||
|
/// </summary>
|
||||||
|
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Additional headers for the message.
|
||||||
|
/// </summary>
|
||||||
|
public Dictionary<string, string> Headers { get; init; } = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether this is a compensation command.
|
||||||
|
/// </summary>
|
||||||
|
public bool IsCompensation { get; init; }
|
||||||
|
}
|
||||||
59
Svrnty.CQRS.Sagas.Abstractions/Messaging/SagaStepResponse.cs
Normal file
59
Svrnty.CQRS.Sagas.Abstractions/Messaging/SagaStepResponse.cs
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Response message from a saga step execution.
|
||||||
|
/// </summary>
|
||||||
|
public record SagaStepResponse
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Unique identifier for this response.
|
||||||
|
/// </summary>
|
||||||
|
public Guid MessageId { get; init; } = Guid.NewGuid();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The saga instance ID.
|
||||||
|
/// </summary>
|
||||||
|
public Guid SagaId { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Correlation ID for tracing across services.
|
||||||
|
/// </summary>
|
||||||
|
public Guid CorrelationId { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Name of the saga step that this response is for.
|
||||||
|
/// </summary>
|
||||||
|
public string StepName { get; init; } = string.Empty;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether the step executed successfully.
|
||||||
|
/// </summary>
|
||||||
|
public bool Success { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fully qualified type name of the result (if any).
|
||||||
|
/// </summary>
|
||||||
|
public string? ResultType { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Serialized result payload (JSON).
|
||||||
|
/// </summary>
|
||||||
|
public string? ResultPayload { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Error message if the step failed.
|
||||||
|
/// </summary>
|
||||||
|
public string? ErrorMessage { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Stack trace if the step failed (for debugging).
|
||||||
|
/// </summary>
|
||||||
|
public string? StackTrace { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When the response was created.
|
||||||
|
/// </summary>
|
||||||
|
public DateTimeOffset Timestamp { get; init; } = DateTimeOffset.UtcNow;
|
||||||
|
}
|
||||||
@ -0,0 +1,59 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions.Persistence;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Abstraction for saga state persistence.
|
||||||
|
/// </summary>
|
||||||
|
public interface ISagaStateStore
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new saga state entry.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="state">The saga state to create.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The created saga state.</returns>
|
||||||
|
Task<SagaState> CreateAsync(SagaState state, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets a saga state by its ID.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="sagaId">The saga instance ID.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The saga state, or null if not found.</returns>
|
||||||
|
Task<SagaState?> GetByIdAsync(Guid sagaId, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets a saga state by its correlation ID.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="correlationId">The correlation ID.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The saga state, or null if not found.</returns>
|
||||||
|
Task<SagaState?> GetByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Updates an existing saga state.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="state">The saga state to update.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>The updated saga state.</returns>
|
||||||
|
Task<SagaState> UpdateAsync(SagaState state, CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets all pending (in progress) sagas.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>List of pending saga states.</returns>
|
||||||
|
Task<IReadOnlyList<SagaState>> GetPendingSagasAsync(CancellationToken cancellationToken = default);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets all sagas with a specific status.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="status">The status to filter by.</param>
|
||||||
|
/// <param name="cancellationToken">Cancellation token.</param>
|
||||||
|
/// <returns>List of saga states with the specified status.</returns>
|
||||||
|
Task<IReadOnlyList<SagaState>> GetSagasByStatusAsync(SagaStatus status, CancellationToken cancellationToken = default);
|
||||||
|
}
|
||||||
85
Svrnty.CQRS.Sagas.Abstractions/SagaState.cs
Normal file
85
Svrnty.CQRS.Sagas.Abstractions/SagaState.cs
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Represents the persistent state of a saga instance.
|
||||||
|
/// </summary>
|
||||||
|
public class SagaState
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Unique identifier for this saga instance.
|
||||||
|
/// </summary>
|
||||||
|
public Guid SagaId { get; set; } = Guid.NewGuid();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The fully qualified type name of the saga.
|
||||||
|
/// </summary>
|
||||||
|
public string SagaType { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Correlation ID for tracing across services.
|
||||||
|
/// </summary>
|
||||||
|
public Guid CorrelationId { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Current execution status.
|
||||||
|
/// </summary>
|
||||||
|
public SagaStatus Status { get; set; } = SagaStatus.NotStarted;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Index of the current step being executed.
|
||||||
|
/// </summary>
|
||||||
|
public int CurrentStepIndex { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Name of the current step being executed.
|
||||||
|
/// </summary>
|
||||||
|
public string? CurrentStepName { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Results from completed steps, keyed by step name.
|
||||||
|
/// </summary>
|
||||||
|
public Dictionary<string, object?> StepResults { get; set; } = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Names of steps that have been completed.
|
||||||
|
/// </summary>
|
||||||
|
public List<string> CompletedSteps { get; set; } = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Errors that occurred during saga execution.
|
||||||
|
/// </summary>
|
||||||
|
public List<SagaStepError> Errors { get; set; } = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Serialized saga data (JSON).
|
||||||
|
/// </summary>
|
||||||
|
public string? SerializedData { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When the saga was created.
|
||||||
|
/// </summary>
|
||||||
|
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When the saga was last updated.
|
||||||
|
/// </summary>
|
||||||
|
public DateTimeOffset? UpdatedAt { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// When the saga completed (successfully or compensated).
|
||||||
|
/// </summary>
|
||||||
|
public DateTimeOffset? CompletedAt { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Represents an error that occurred during saga step execution.
|
||||||
|
/// </summary>
|
||||||
|
public record SagaStepError(
|
||||||
|
string StepName,
|
||||||
|
string ErrorMessage,
|
||||||
|
string? StackTrace,
|
||||||
|
DateTimeOffset OccurredAt
|
||||||
|
);
|
||||||
37
Svrnty.CQRS.Sagas.Abstractions/SagaStatus.cs
Normal file
37
Svrnty.CQRS.Sagas.Abstractions/SagaStatus.cs
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
namespace Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Represents the execution state of a saga.
|
||||||
|
/// </summary>
|
||||||
|
public enum SagaStatus
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Saga has not started execution.
|
||||||
|
/// </summary>
|
||||||
|
NotStarted,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Saga is currently executing steps.
|
||||||
|
/// </summary>
|
||||||
|
InProgress,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Saga completed successfully.
|
||||||
|
/// </summary>
|
||||||
|
Completed,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Saga failed and compensation has not been triggered.
|
||||||
|
/// </summary>
|
||||||
|
Failed,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Saga is currently executing compensation steps.
|
||||||
|
/// </summary>
|
||||||
|
Compensating,
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Saga has been compensated (rolled back) successfully.
|
||||||
|
/// </summary>
|
||||||
|
Compensated
|
||||||
|
}
|
||||||
@ -0,0 +1,28 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>true</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
60
Svrnty.CQRS.Sagas.RabbitMQ/CqrsBuilderExtensions.cs
Normal file
60
Svrnty.CQRS.Sagas.RabbitMQ/CqrsBuilderExtensions.cs
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
using System;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||||
|
using Svrnty.CQRS.Configuration;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.RabbitMQ;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Extensions for adding RabbitMQ saga transport to the CQRS pipeline.
|
||||||
|
/// </summary>
|
||||||
|
public static class CqrsBuilderExtensions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Uses RabbitMQ as the message transport for sagas.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="builder">The CQRS builder.</param>
|
||||||
|
/// <param name="configure">Configuration action for RabbitMQ options.</param>
|
||||||
|
/// <returns>The CQRS builder for chaining.</returns>
|
||||||
|
public static CqrsBuilder UseRabbitMq(this CqrsBuilder builder, Action<RabbitMqSagaOptions> configure)
|
||||||
|
{
|
||||||
|
var options = new RabbitMqSagaOptions();
|
||||||
|
configure(options);
|
||||||
|
|
||||||
|
builder.Services.Configure<RabbitMqSagaOptions>(opt =>
|
||||||
|
{
|
||||||
|
opt.HostName = options.HostName;
|
||||||
|
opt.Port = options.Port;
|
||||||
|
opt.UserName = options.UserName;
|
||||||
|
opt.Password = options.Password;
|
||||||
|
opt.VirtualHost = options.VirtualHost;
|
||||||
|
opt.CommandExchange = options.CommandExchange;
|
||||||
|
opt.ResponseExchange = options.ResponseExchange;
|
||||||
|
opt.QueuePrefix = options.QueuePrefix;
|
||||||
|
opt.DurableQueues = options.DurableQueues;
|
||||||
|
opt.PrefetchCount = options.PrefetchCount;
|
||||||
|
opt.ConnectionRetryDelay = options.ConnectionRetryDelay;
|
||||||
|
opt.MaxConnectionRetries = options.MaxConnectionRetries;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Replace the default message bus with RabbitMQ implementation
|
||||||
|
builder.Services.RemoveAll<ISagaMessageBus>();
|
||||||
|
builder.Services.AddSingleton<ISagaMessageBus, RabbitMqSagaMessageBus>();
|
||||||
|
|
||||||
|
// Add hosted service for connection management
|
||||||
|
builder.Services.AddHostedService<RabbitMqSagaHostedService>();
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Uses RabbitMQ as the message transport for sagas with default options.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="builder">The CQRS builder.</param>
|
||||||
|
/// <returns>The CQRS builder for chaining.</returns>
|
||||||
|
public static CqrsBuilder UseRabbitMq(this CqrsBuilder builder)
|
||||||
|
{
|
||||||
|
return builder.UseRabbitMq(_ => { });
|
||||||
|
}
|
||||||
|
}
|
||||||
88
Svrnty.CQRS.Sagas.RabbitMQ/RabbitMqSagaHostedService.cs
Normal file
88
Svrnty.CQRS.Sagas.RabbitMQ/RabbitMqSagaHostedService.cs
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.RabbitMQ;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Hosted service that manages RabbitMQ saga connections and subscriptions.
|
||||||
|
/// </summary>
|
||||||
|
public class RabbitMqSagaHostedService : BackgroundService
|
||||||
|
{
|
||||||
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
private readonly ISagaMessageBus _messageBus;
|
||||||
|
private readonly ILogger<RabbitMqSagaHostedService> _logger;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new RabbitMQ saga hosted service.
|
||||||
|
/// </summary>
|
||||||
|
public RabbitMqSagaHostedService(
|
||||||
|
IServiceProvider serviceProvider,
|
||||||
|
ISagaMessageBus messageBus,
|
||||||
|
ILogger<RabbitMqSagaHostedService> logger)
|
||||||
|
{
|
||||||
|
_serviceProvider = serviceProvider;
|
||||||
|
_messageBus = messageBus;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Starting RabbitMQ saga hosted service");
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Subscribe to saga responses so the orchestrator can process them
|
||||||
|
await _messageBus.SubscribeToResponsesAsync(
|
||||||
|
async (response, ct) =>
|
||||||
|
{
|
||||||
|
using var scope = _serviceProvider.CreateScope();
|
||||||
|
var orchestrator = scope.ServiceProvider.GetRequiredService<ISagaOrchestrator>();
|
||||||
|
|
||||||
|
// The orchestrator needs to handle responses
|
||||||
|
// This is a simplified approach - in production you'd want to handle this more robustly
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Received response for saga {SagaId}, step {StepName}, success: {Success}",
|
||||||
|
response.SagaId, response.StepName, response.Success);
|
||||||
|
|
||||||
|
// For now, we just log the response
|
||||||
|
// The orchestrator's HandleResponseAsync method would be called here
|
||||||
|
// but it requires knowing the saga data type, which we don't have in this context
|
||||||
|
},
|
||||||
|
stoppingToken);
|
||||||
|
|
||||||
|
_logger.LogInformation("RabbitMQ saga hosted service started successfully");
|
||||||
|
|
||||||
|
// Keep the service running
|
||||||
|
await Task.Delay(Timeout.Infinite, stoppingToken);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("RabbitMQ saga hosted service is stopping");
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error in RabbitMQ saga hosted service");
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Stopping RabbitMQ saga hosted service");
|
||||||
|
|
||||||
|
if (_messageBus is IAsyncDisposable disposable)
|
||||||
|
{
|
||||||
|
await disposable.DisposeAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
await base.StopAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
335
Svrnty.CQRS.Sagas.RabbitMQ/RabbitMqSagaMessageBus.cs
Normal file
335
Svrnty.CQRS.Sagas.RabbitMQ/RabbitMqSagaMessageBus.cs
Normal file
@ -0,0 +1,335 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Text;
|
||||||
|
using System.Text.Json;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using RabbitMQ.Client.Events;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.RabbitMQ;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ implementation of the saga message bus.
|
||||||
|
/// </summary>
|
||||||
|
public class RabbitMqSagaMessageBus : ISagaMessageBus, IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly RabbitMqSagaOptions _options;
|
||||||
|
private readonly ILogger<RabbitMqSagaMessageBus> _logger;
|
||||||
|
private IConnection? _connection;
|
||||||
|
private IChannel? _publishChannel;
|
||||||
|
private readonly ConcurrentDictionary<string, IChannel> _subscriptionChannels = new();
|
||||||
|
private readonly SemaphoreSlim _connectionLock = new(1, 1);
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new RabbitMQ saga message bus.
|
||||||
|
/// </summary>
|
||||||
|
public RabbitMqSagaMessageBus(
|
||||||
|
IOptions<RabbitMqSagaOptions> options,
|
||||||
|
ILogger<RabbitMqSagaMessageBus> logger)
|
||||||
|
{
|
||||||
|
_options = options.Value;
|
||||||
|
_logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task PublishAsync(SagaMessage message, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
await EnsureConnectionAsync(cancellationToken);
|
||||||
|
|
||||||
|
var routingKey = $"saga.command.{message.CommandType}";
|
||||||
|
var body = JsonSerializer.SerializeToUtf8Bytes(message);
|
||||||
|
|
||||||
|
var properties = new BasicProperties
|
||||||
|
{
|
||||||
|
MessageId = message.MessageId.ToString(),
|
||||||
|
CorrelationId = message.CorrelationId.ToString(),
|
||||||
|
ContentType = "application/json",
|
||||||
|
DeliveryMode = _options.DurableQueues ? DeliveryModes.Persistent : DeliveryModes.Transient,
|
||||||
|
Timestamp = new AmqpTimestamp(message.Timestamp.ToUnixTimeSeconds()),
|
||||||
|
Headers = new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
["saga-id"] = message.SagaId.ToString(),
|
||||||
|
["step-name"] = message.StepName,
|
||||||
|
["is-compensation"] = message.IsCompensation.ToString()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
await _publishChannel!.BasicPublishAsync(
|
||||||
|
exchange: _options.CommandExchange,
|
||||||
|
routingKey: routingKey,
|
||||||
|
mandatory: false,
|
||||||
|
basicProperties: properties,
|
||||||
|
body: body,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Published saga command {CommandType} for saga {SagaId}, step {StepName}",
|
||||||
|
message.CommandType, message.SagaId, message.StepName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task PublishResponseAsync(SagaStepResponse response, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
await EnsureConnectionAsync(cancellationToken);
|
||||||
|
|
||||||
|
var routingKey = $"saga.response.{response.SagaId}";
|
||||||
|
var body = JsonSerializer.SerializeToUtf8Bytes(response);
|
||||||
|
|
||||||
|
var properties = new BasicProperties
|
||||||
|
{
|
||||||
|
MessageId = response.MessageId.ToString(),
|
||||||
|
CorrelationId = response.CorrelationId.ToString(),
|
||||||
|
ContentType = "application/json",
|
||||||
|
DeliveryMode = _options.DurableQueues ? DeliveryModes.Persistent : DeliveryModes.Transient,
|
||||||
|
Timestamp = new AmqpTimestamp(response.Timestamp.ToUnixTimeSeconds()),
|
||||||
|
Headers = new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
["saga-id"] = response.SagaId.ToString(),
|
||||||
|
["step-name"] = response.StepName,
|
||||||
|
["success"] = response.Success.ToString()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
await _publishChannel!.BasicPublishAsync(
|
||||||
|
exchange: _options.ResponseExchange,
|
||||||
|
routingKey: routingKey,
|
||||||
|
mandatory: false,
|
||||||
|
basicProperties: properties,
|
||||||
|
body: body,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Published saga response for saga {SagaId}, step {StepName}, success: {Success}",
|
||||||
|
response.SagaId, response.StepName, response.Success);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task SubscribeAsync<TCommand>(
|
||||||
|
Func<SagaMessage, TCommand, CancellationToken, Task<SagaStepResponse>> handler,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
await EnsureConnectionAsync(cancellationToken);
|
||||||
|
|
||||||
|
var commandTypeName = typeof(TCommand).FullName!;
|
||||||
|
var queueName = $"{_options.QueuePrefix}.{SanitizeQueueName(commandTypeName)}";
|
||||||
|
var routingKey = $"saga.command.{commandTypeName}";
|
||||||
|
|
||||||
|
var channel = await _connection!.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||||
|
_subscriptionChannels[commandTypeName] = channel;
|
||||||
|
|
||||||
|
// Declare queue
|
||||||
|
await channel.QueueDeclareAsync(
|
||||||
|
queue: queueName,
|
||||||
|
durable: _options.DurableQueues,
|
||||||
|
exclusive: false,
|
||||||
|
autoDelete: false,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
// Bind to command exchange
|
||||||
|
await channel.QueueBindAsync(
|
||||||
|
queue: queueName,
|
||||||
|
exchange: _options.CommandExchange,
|
||||||
|
routingKey: routingKey,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: _options.PrefetchCount, global: false, cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||||
|
consumer.ReceivedAsync += async (sender, ea) =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var messageJson = Encoding.UTF8.GetString(ea.Body.ToArray());
|
||||||
|
var message = JsonSerializer.Deserialize<SagaMessage>(messageJson);
|
||||||
|
|
||||||
|
if (message == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Received null saga message");
|
||||||
|
await channel.BasicNackAsync(ea.DeliveryTag, false, false, cancellationToken);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var command = JsonSerializer.Deserialize<TCommand>(message.Payload!);
|
||||||
|
if (command == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Failed to deserialize command {CommandType}", commandTypeName);
|
||||||
|
await channel.BasicNackAsync(ea.DeliveryTag, false, false, cancellationToken);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var response = await handler(message, command, cancellationToken);
|
||||||
|
await PublishResponseAsync(response, cancellationToken);
|
||||||
|
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error processing saga command {CommandType}", commandTypeName);
|
||||||
|
await channel.BasicNackAsync(ea.DeliveryTag, false, true, cancellationToken);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
await channel.BasicConsumeAsync(queueName, false, consumer, cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Subscribed to saga commands of type {CommandType} on queue {QueueName}",
|
||||||
|
commandTypeName, queueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task SubscribeToResponsesAsync(
|
||||||
|
Func<SagaStepResponse, CancellationToken, Task> handler,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
await EnsureConnectionAsync(cancellationToken);
|
||||||
|
|
||||||
|
var queueName = $"{_options.QueuePrefix}.responses";
|
||||||
|
var routingKey = "saga.response.#";
|
||||||
|
|
||||||
|
var channel = await _connection!.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||||
|
_subscriptionChannels["responses"] = channel;
|
||||||
|
|
||||||
|
// Declare queue
|
||||||
|
await channel.QueueDeclareAsync(
|
||||||
|
queue: queueName,
|
||||||
|
durable: _options.DurableQueues,
|
||||||
|
exclusive: false,
|
||||||
|
autoDelete: false,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
// Bind to response exchange
|
||||||
|
await channel.QueueBindAsync(
|
||||||
|
queue: queueName,
|
||||||
|
exchange: _options.ResponseExchange,
|
||||||
|
routingKey: routingKey,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: _options.PrefetchCount, global: false, cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||||
|
consumer.ReceivedAsync += async (sender, ea) =>
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var responseJson = Encoding.UTF8.GetString(ea.Body.ToArray());
|
||||||
|
var response = JsonSerializer.Deserialize<SagaStepResponse>(responseJson);
|
||||||
|
|
||||||
|
if (response == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Received null saga response");
|
||||||
|
await channel.BasicNackAsync(ea.DeliveryTag, false, false, cancellationToken);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await handler(response, cancellationToken);
|
||||||
|
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error processing saga response");
|
||||||
|
await channel.BasicNackAsync(ea.DeliveryTag, false, true, cancellationToken);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
await channel.BasicConsumeAsync(queueName, false, consumer, cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation("Subscribed to saga responses on queue {QueueName}", queueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task EnsureConnectionAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await _connectionLock.WaitAsync(cancellationToken);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (_connection?.IsOpen == true && _publishChannel?.IsOpen == true)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var factory = new ConnectionFactory
|
||||||
|
{
|
||||||
|
HostName = _options.HostName,
|
||||||
|
Port = _options.Port,
|
||||||
|
UserName = _options.UserName,
|
||||||
|
Password = _options.Password,
|
||||||
|
VirtualHost = _options.VirtualHost
|
||||||
|
};
|
||||||
|
|
||||||
|
_connection = await factory.CreateConnectionAsync(cancellationToken);
|
||||||
|
_publishChannel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
// Declare exchanges
|
||||||
|
await _publishChannel.ExchangeDeclareAsync(
|
||||||
|
exchange: _options.CommandExchange,
|
||||||
|
type: ExchangeType.Topic,
|
||||||
|
durable: _options.DurableQueues,
|
||||||
|
autoDelete: false,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
await _publishChannel.ExchangeDeclareAsync(
|
||||||
|
exchange: _options.ResponseExchange,
|
||||||
|
type: ExchangeType.Topic,
|
||||||
|
durable: _options.DurableQueues,
|
||||||
|
autoDelete: false,
|
||||||
|
cancellationToken: cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Connected to RabbitMQ at {Host}:{Port}",
|
||||||
|
_options.HostName, _options.Port);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_connectionLock.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static string SanitizeQueueName(string name)
|
||||||
|
{
|
||||||
|
return name.Replace(".", "-").Replace("+", "-").ToLowerInvariant();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_disposed = true;
|
||||||
|
|
||||||
|
foreach (var channel in _subscriptionChannels.Values)
|
||||||
|
{
|
||||||
|
if (channel.IsOpen)
|
||||||
|
{
|
||||||
|
await channel.CloseAsync();
|
||||||
|
}
|
||||||
|
channel.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_publishChannel?.IsOpen == true)
|
||||||
|
{
|
||||||
|
await _publishChannel.CloseAsync();
|
||||||
|
}
|
||||||
|
_publishChannel?.Dispose();
|
||||||
|
|
||||||
|
if (_connection?.IsOpen == true)
|
||||||
|
{
|
||||||
|
await _connection.CloseAsync();
|
||||||
|
}
|
||||||
|
_connection?.Dispose();
|
||||||
|
|
||||||
|
_connectionLock.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
69
Svrnty.CQRS.Sagas.RabbitMQ/RabbitMqSagaOptions.cs
Normal file
69
Svrnty.CQRS.Sagas.RabbitMQ/RabbitMqSagaOptions.cs
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.RabbitMQ;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Configuration options for RabbitMQ saga transport.
|
||||||
|
/// </summary>
|
||||||
|
public class RabbitMqSagaOptions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ host name (default: localhost).
|
||||||
|
/// </summary>
|
||||||
|
public string HostName { get; set; } = "localhost";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ port (default: 5672).
|
||||||
|
/// </summary>
|
||||||
|
public int Port { get; set; } = 5672;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ user name (default: guest).
|
||||||
|
/// </summary>
|
||||||
|
public string UserName { get; set; } = "guest";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ password (default: guest).
|
||||||
|
/// </summary>
|
||||||
|
public string Password { get; set; } = "guest";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// RabbitMQ virtual host (default: /).
|
||||||
|
/// </summary>
|
||||||
|
public string VirtualHost { get; set; } = "/";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Exchange name for saga commands (default: svrnty.sagas.commands).
|
||||||
|
/// </summary>
|
||||||
|
public string CommandExchange { get; set; } = "svrnty.sagas.commands";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Exchange name for saga responses (default: svrnty.sagas.responses).
|
||||||
|
/// </summary>
|
||||||
|
public string ResponseExchange { get; set; } = "svrnty.sagas.responses";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Queue name prefix for this service (default: saga-service).
|
||||||
|
/// </summary>
|
||||||
|
public string QueuePrefix { get; set; } = "saga-service";
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether to use durable queues (default: true).
|
||||||
|
/// </summary>
|
||||||
|
public bool DurableQueues { get; set; } = true;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Prefetch count for consumers (default: 10).
|
||||||
|
/// </summary>
|
||||||
|
public ushort PrefetchCount { get; set; } = 10;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Connection retry delay (default: 5 seconds).
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan ConnectionRetryDelay { get; set; } = TimeSpan.FromSeconds(5);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maximum connection retry attempts (default: 10).
|
||||||
|
/// </summary>
|
||||||
|
public int MaxConnectionRetries { get; set; } = 10;
|
||||||
|
}
|
||||||
38
Svrnty.CQRS.Sagas.RabbitMQ/Svrnty.CQRS.Sagas.RabbitMQ.csproj
Normal file
38
Svrnty.CQRS.Sagas.RabbitMQ/Svrnty.CQRS.Sagas.RabbitMQ.csproj
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>false</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\Svrnty.CQRS.Sagas\Svrnty.CQRS.Sagas.csproj" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
54
Svrnty.CQRS.Sagas/Builders/LocalSagaStepBuilder.cs
Normal file
54
Svrnty.CQRS.Sagas/Builders/LocalSagaStepBuilder.cs
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Builders;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builder for configuring local saga steps.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
public class LocalSagaStepBuilder<TData> : ISagaStepBuilder<TData>
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
private readonly SagaBuilder<TData> _parent;
|
||||||
|
private readonly LocalSagaStepDefinition<TData> _definition;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new local step builder.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="parent">The parent saga builder.</param>
|
||||||
|
/// <param name="name">The step name.</param>
|
||||||
|
/// <param name="order">The step order.</param>
|
||||||
|
public LocalSagaStepBuilder(SagaBuilder<TData> parent, string name, int order)
|
||||||
|
{
|
||||||
|
_parent = parent;
|
||||||
|
_definition = new LocalSagaStepDefinition<TData>
|
||||||
|
{
|
||||||
|
Name = name,
|
||||||
|
Order = order
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaStepBuilder<TData> Execute(Func<TData, ISagaContext, CancellationToken, Task> action)
|
||||||
|
{
|
||||||
|
_definition.ExecuteAction = action;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaStepBuilder<TData> Compensate(Func<TData, ISagaContext, CancellationToken, Task> action)
|
||||||
|
{
|
||||||
|
_definition.CompensateAction = action;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaBuilder<TData> Then()
|
||||||
|
{
|
||||||
|
_parent.AddStep(_definition);
|
||||||
|
return _parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
158
Svrnty.CQRS.Sagas/Builders/RemoteSagaStepBuilder.cs
Normal file
158
Svrnty.CQRS.Sagas/Builders/RemoteSagaStepBuilder.cs
Normal file
@ -0,0 +1,158 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Builders;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builder for configuring remote saga steps (without result).
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <typeparam name="TCommand">The command type.</typeparam>
|
||||||
|
public class RemoteSagaStepBuilder<TData, TCommand> : ISagaRemoteStepBuilder<TData, TCommand>
|
||||||
|
where TData : class, ISagaData
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
private readonly SagaBuilder<TData> _parent;
|
||||||
|
private readonly RemoteSagaStepDefinition<TData, TCommand> _definition;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new remote step builder.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="parent">The parent saga builder.</param>
|
||||||
|
/// <param name="name">The step name.</param>
|
||||||
|
/// <param name="order">The step order.</param>
|
||||||
|
public RemoteSagaStepBuilder(SagaBuilder<TData> parent, string name, int order)
|
||||||
|
{
|
||||||
|
_parent = parent;
|
||||||
|
_definition = new RemoteSagaStepDefinition<TData, TCommand>
|
||||||
|
{
|
||||||
|
Name = name,
|
||||||
|
Order = order
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand> WithCommand(Func<TData, ISagaContext, TCommand> commandBuilder)
|
||||||
|
{
|
||||||
|
_definition.CommandBuilder = commandBuilder;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand> OnResponse(Func<TData, ISagaContext, CancellationToken, Task> handler)
|
||||||
|
{
|
||||||
|
_definition.ResponseHandler = handler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand> Compensate<TCompensationCommand>(
|
||||||
|
Func<TData, ISagaContext, TCompensationCommand> compensationBuilder)
|
||||||
|
where TCompensationCommand : class
|
||||||
|
{
|
||||||
|
_definition.CompensationCommandType = typeof(TCompensationCommand);
|
||||||
|
_definition.CompensationBuilder = (data, ctx) => compensationBuilder(data, ctx);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand> WithTimeout(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
_definition.Timeout = timeout;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand> WithRetry(int maxRetries, TimeSpan delay)
|
||||||
|
{
|
||||||
|
_definition.MaxRetries = maxRetries;
|
||||||
|
_definition.RetryDelay = delay;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaBuilder<TData> Then()
|
||||||
|
{
|
||||||
|
_parent.AddStep(_definition);
|
||||||
|
return _parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Builder for configuring remote saga steps with result.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <typeparam name="TCommand">The command type.</typeparam>
|
||||||
|
/// <typeparam name="TResult">The result type.</typeparam>
|
||||||
|
public class RemoteSagaStepBuilderWithResult<TData, TCommand, TResult> : ISagaRemoteStepBuilder<TData, TCommand, TResult>
|
||||||
|
where TData : class, ISagaData
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
private readonly SagaBuilder<TData> _parent;
|
||||||
|
private readonly RemoteSagaStepDefinition<TData, TCommand, TResult> _definition;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new remote step builder with result.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="parent">The parent saga builder.</param>
|
||||||
|
/// <param name="name">The step name.</param>
|
||||||
|
/// <param name="order">The step order.</param>
|
||||||
|
public RemoteSagaStepBuilderWithResult(SagaBuilder<TData> parent, string name, int order)
|
||||||
|
{
|
||||||
|
_parent = parent;
|
||||||
|
_definition = new RemoteSagaStepDefinition<TData, TCommand, TResult>
|
||||||
|
{
|
||||||
|
Name = name,
|
||||||
|
Order = order
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand, TResult> WithCommand(Func<TData, ISagaContext, TCommand> commandBuilder)
|
||||||
|
{
|
||||||
|
_definition.CommandBuilder = commandBuilder;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand, TResult> OnResponse(
|
||||||
|
Func<TData, ISagaContext, TResult, CancellationToken, Task> handler)
|
||||||
|
{
|
||||||
|
_definition.ResponseHandler = handler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand, TResult> Compensate<TCompensationCommand>(
|
||||||
|
Func<TData, ISagaContext, TCompensationCommand> compensationBuilder)
|
||||||
|
where TCompensationCommand : class
|
||||||
|
{
|
||||||
|
_definition.CompensationCommandType = typeof(TCompensationCommand);
|
||||||
|
_definition.CompensationBuilder = (data, ctx) => compensationBuilder(data, ctx);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand, TResult> WithTimeout(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
_definition.Timeout = timeout;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand, TResult> WithRetry(int maxRetries, TimeSpan delay)
|
||||||
|
{
|
||||||
|
_definition.MaxRetries = maxRetries;
|
||||||
|
_definition.RetryDelay = delay;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaBuilder<TData> Then()
|
||||||
|
{
|
||||||
|
_parent.AddStep(_definition);
|
||||||
|
return _parent;
|
||||||
|
}
|
||||||
|
}
|
||||||
49
Svrnty.CQRS.Sagas/Builders/SagaBuilder.cs
Normal file
49
Svrnty.CQRS.Sagas/Builders/SagaBuilder.cs
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Builders;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Implementation of the saga builder for defining saga steps.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
public class SagaBuilder<TData> : ISagaBuilder<TData>
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
private readonly List<SagaStepDefinition> _steps = new();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gets the defined steps.
|
||||||
|
/// </summary>
|
||||||
|
public IReadOnlyList<SagaStepDefinition> Steps => _steps.AsReadOnly();
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaStepBuilder<TData> Step(string name)
|
||||||
|
{
|
||||||
|
return new LocalSagaStepBuilder<TData>(this, name, _steps.Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand> SendCommand<TCommand>(string name)
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
return new RemoteSagaStepBuilder<TData, TCommand>(this, name, _steps.Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public ISagaRemoteStepBuilder<TData, TCommand, TResult> SendCommand<TCommand, TResult>(string name)
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
return new RemoteSagaStepBuilderWithResult<TData, TCommand, TResult>(this, name, _steps.Count);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Adds a step definition to the builder.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="step">The step definition to add.</param>
|
||||||
|
internal void AddStep(SagaStepDefinition step)
|
||||||
|
{
|
||||||
|
_steps.Add(step);
|
||||||
|
}
|
||||||
|
}
|
||||||
149
Svrnty.CQRS.Sagas/Builders/SagaStepDefinition.cs
Normal file
149
Svrnty.CQRS.Sagas/Builders/SagaStepDefinition.cs
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Builders;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Base class for saga step definitions.
|
||||||
|
/// </summary>
|
||||||
|
public abstract class SagaStepDefinition
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Unique name for this step.
|
||||||
|
/// </summary>
|
||||||
|
public string Name { get; set; } = string.Empty;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Order of the step in the saga.
|
||||||
|
/// </summary>
|
||||||
|
public int Order { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether this step has a compensation action.
|
||||||
|
/// </summary>
|
||||||
|
public abstract bool HasCompensation { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether this step is a remote step (sends a command).
|
||||||
|
/// </summary>
|
||||||
|
public abstract bool IsRemote { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Timeout for this step.
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan? Timeout { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Maximum number of retries.
|
||||||
|
/// </summary>
|
||||||
|
public int MaxRetries { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Delay between retries.
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Definition for a local saga step.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
public class LocalSagaStepDefinition<TData> : SagaStepDefinition
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// The execution action.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, CancellationToken, Task>? ExecuteAction { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The compensation action.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, CancellationToken, Task>? CompensateAction { get; set; }
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool HasCompensation => CompensateAction != null;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool IsRemote => false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Definition for a remote saga step.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <typeparam name="TCommand">The command type.</typeparam>
|
||||||
|
public class RemoteSagaStepDefinition<TData, TCommand> : SagaStepDefinition
|
||||||
|
where TData : class, ISagaData
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Function to build the command.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, TCommand>? CommandBuilder { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handler for successful response.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, CancellationToken, Task>? ResponseHandler { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Type of the compensation command.
|
||||||
|
/// </summary>
|
||||||
|
public Type? CompensationCommandType { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Function to build the compensation command.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, object>? CompensationBuilder { get; set; }
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool HasCompensation => CompensationBuilder != null;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool IsRemote => true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Definition for a remote saga step with result.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <typeparam name="TCommand">The command type.</typeparam>
|
||||||
|
/// <typeparam name="TResult">The result type.</typeparam>
|
||||||
|
public class RemoteSagaStepDefinition<TData, TCommand, TResult> : SagaStepDefinition
|
||||||
|
where TData : class, ISagaData
|
||||||
|
where TCommand : class
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Function to build the command.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, TCommand>? CommandBuilder { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handler for successful response with result.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, TResult, CancellationToken, Task>? ResponseHandler { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Type of the compensation command.
|
||||||
|
/// </summary>
|
||||||
|
public Type? CompensationCommandType { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Function to build the compensation command.
|
||||||
|
/// </summary>
|
||||||
|
public Func<TData, ISagaContext, object>? CompensationBuilder { get; set; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// The expected result type.
|
||||||
|
/// </summary>
|
||||||
|
public Type ResultType => typeof(TResult);
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool HasCompensation => CompensationBuilder != null;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public override bool IsRemote => true;
|
||||||
|
}
|
||||||
39
Svrnty.CQRS.Sagas/Configuration/SagaOptions.cs
Normal file
39
Svrnty.CQRS.Sagas/Configuration/SagaOptions.cs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Configuration;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Configuration options for saga orchestration.
|
||||||
|
/// </summary>
|
||||||
|
public class SagaOptions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Default timeout for saga steps (default: 30 seconds).
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan DefaultStepTimeout { get; set; } = TimeSpan.FromSeconds(30);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Default number of retries for failed steps (default: 3).
|
||||||
|
/// </summary>
|
||||||
|
public int DefaultMaxRetries { get; set; } = 3;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Default delay between retries (default: 1 second).
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan DefaultRetryDelay { get; set; } = TimeSpan.FromSeconds(1);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Whether to automatically compensate on failure (default: true).
|
||||||
|
/// </summary>
|
||||||
|
public bool AutoCompensateOnFailure { get; set; } = true;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Interval for checking pending/stalled sagas (default: 1 minute).
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan StalledSagaCheckInterval { get; set; } = TimeSpan.FromMinutes(1);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Time after which a saga step is considered stalled (default: 5 minutes).
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan StepStalledTimeout { get; set; } = TimeSpan.FromMinutes(5);
|
||||||
|
}
|
||||||
82
Svrnty.CQRS.Sagas/CqrsBuilderExtensions.cs
Normal file
82
Svrnty.CQRS.Sagas/CqrsBuilderExtensions.cs
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
using System;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.DependencyInjection.Extensions;
|
||||||
|
using Svrnty.CQRS.Configuration;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions.Persistence;
|
||||||
|
using Svrnty.CQRS.Sagas.Configuration;
|
||||||
|
using Svrnty.CQRS.Sagas.Persistence;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Extensions for adding saga support to the CQRS pipeline.
|
||||||
|
/// </summary>
|
||||||
|
public static class CqrsBuilderExtensions
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Adds saga orchestration support to the CQRS pipeline.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="builder">The CQRS builder.</param>
|
||||||
|
/// <param name="configure">Optional configuration action.</param>
|
||||||
|
/// <returns>The CQRS builder for chaining.</returns>
|
||||||
|
public static CqrsBuilder AddSagas(this CqrsBuilder builder, Action<SagaOptions>? configure = null)
|
||||||
|
{
|
||||||
|
var options = new SagaOptions();
|
||||||
|
configure?.Invoke(options);
|
||||||
|
|
||||||
|
builder.Services.Configure<SagaOptions>(opt =>
|
||||||
|
{
|
||||||
|
opt.DefaultStepTimeout = options.DefaultStepTimeout;
|
||||||
|
opt.DefaultMaxRetries = options.DefaultMaxRetries;
|
||||||
|
opt.DefaultRetryDelay = options.DefaultRetryDelay;
|
||||||
|
opt.AutoCompensateOnFailure = options.AutoCompensateOnFailure;
|
||||||
|
opt.StalledSagaCheckInterval = options.StalledSagaCheckInterval;
|
||||||
|
opt.StepStalledTimeout = options.StepStalledTimeout;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Store configuration
|
||||||
|
builder.Configuration.SetConfiguration(options);
|
||||||
|
|
||||||
|
// Register core saga services
|
||||||
|
builder.Services.TryAddSingleton<ISagaOrchestrator, SagaOrchestrator>();
|
||||||
|
|
||||||
|
// Register default in-memory state store if not already registered
|
||||||
|
builder.Services.TryAddSingleton<ISagaStateStore, InMemorySagaStateStore>();
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Registers a saga type with the CQRS pipeline.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TSaga">The saga type.</typeparam>
|
||||||
|
/// <typeparam name="TData">The saga data type.</typeparam>
|
||||||
|
/// <param name="builder">The CQRS builder.</param>
|
||||||
|
/// <returns>The CQRS builder for chaining.</returns>
|
||||||
|
public static CqrsBuilder AddSaga<TSaga, TData>(this CqrsBuilder builder)
|
||||||
|
where TSaga : class, ISaga<TData>
|
||||||
|
where TData : class, ISagaData, new()
|
||||||
|
{
|
||||||
|
builder.Services.AddTransient<TSaga>();
|
||||||
|
builder.Services.AddTransient<ISaga<TData>, TSaga>();
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Uses a custom saga state store implementation.
|
||||||
|
/// </summary>
|
||||||
|
/// <typeparam name="TStore">The state store implementation type.</typeparam>
|
||||||
|
/// <param name="builder">The CQRS builder.</param>
|
||||||
|
/// <returns>The CQRS builder for chaining.</returns>
|
||||||
|
public static CqrsBuilder UseSagaStateStore<TStore>(this CqrsBuilder builder)
|
||||||
|
where TStore : class, ISagaStateStore
|
||||||
|
{
|
||||||
|
// Remove existing registration
|
||||||
|
var descriptor = new ServiceDescriptor(typeof(ISagaStateStore), typeof(TStore), ServiceLifetime.Singleton);
|
||||||
|
builder.Services.Replace(descriptor);
|
||||||
|
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
||||||
68
Svrnty.CQRS.Sagas/Persistence/InMemorySagaStateStore.cs
Normal file
68
Svrnty.CQRS.Sagas/Persistence/InMemorySagaStateStore.cs
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions.Persistence;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas.Persistence;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// In-memory saga state store for development and testing.
|
||||||
|
/// </summary>
|
||||||
|
public class InMemorySagaStateStore : ISagaStateStore
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<Guid, SagaState> _states = new();
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<SagaState> CreateAsync(SagaState state, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (!_states.TryAdd(state.SagaId, state))
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException($"Saga with ID {state.SagaId} already exists.");
|
||||||
|
}
|
||||||
|
return Task.FromResult(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<SagaState?> GetByIdAsync(Guid sagaId, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
_states.TryGetValue(sagaId, out var state);
|
||||||
|
return Task.FromResult(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<SagaState?> GetByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var state = _states.Values.FirstOrDefault(s => s.CorrelationId == correlationId);
|
||||||
|
return Task.FromResult(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<SagaState> UpdateAsync(SagaState state, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
state.UpdatedAt = DateTimeOffset.UtcNow;
|
||||||
|
_states[state.SagaId] = state;
|
||||||
|
return Task.FromResult(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<IReadOnlyList<SagaState>> GetPendingSagasAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var pending = _states.Values
|
||||||
|
.Where(s => s.Status == SagaStatus.InProgress || s.Status == SagaStatus.Compensating)
|
||||||
|
.ToList();
|
||||||
|
return Task.FromResult<IReadOnlyList<SagaState>>(pending);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<IReadOnlyList<SagaState>> GetSagasByStatusAsync(SagaStatus status, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
var sagas = _states.Values
|
||||||
|
.Where(s => s.Status == status)
|
||||||
|
.ToList();
|
||||||
|
return Task.FromResult<IReadOnlyList<SagaState>>(sagas);
|
||||||
|
}
|
||||||
|
}
|
||||||
56
Svrnty.CQRS.Sagas/SagaContext.cs
Normal file
56
Svrnty.CQRS.Sagas/SagaContext.cs
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Implementation of saga context providing runtime information during step execution.
|
||||||
|
/// </summary>
|
||||||
|
public class SagaContext : ISagaContext
|
||||||
|
{
|
||||||
|
private readonly SagaState _state;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new saga context from a saga state.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="state">The saga state.</param>
|
||||||
|
public SagaContext(SagaState state)
|
||||||
|
{
|
||||||
|
_state = state ?? throw new ArgumentNullException(nameof(state));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Guid SagaId => _state.SagaId;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Guid CorrelationId => _state.CorrelationId;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public string SagaType => _state.SagaType;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public int CurrentStepIndex => _state.CurrentStepIndex;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public string CurrentStepName => _state.CurrentStepName ?? string.Empty;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public IReadOnlyDictionary<string, object?> StepResults => _state.StepResults;
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public T? GetStepResult<T>(string stepName)
|
||||||
|
{
|
||||||
|
if (_state.StepResults.TryGetValue(stepName, out var value) && value is T result)
|
||||||
|
{
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
return default;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public void SetStepResult<T>(T result)
|
||||||
|
{
|
||||||
|
_state.StepResults[CurrentStepName] = result;
|
||||||
|
}
|
||||||
|
}
|
||||||
429
Svrnty.CQRS.Sagas/SagaOrchestrator.cs
Normal file
429
Svrnty.CQRS.Sagas/SagaOrchestrator.cs
Normal file
@ -0,0 +1,429 @@
|
|||||||
|
using System;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Text.Json;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions.Messaging;
|
||||||
|
using Svrnty.CQRS.Sagas.Abstractions.Persistence;
|
||||||
|
using Svrnty.CQRS.Sagas.Builders;
|
||||||
|
using Svrnty.CQRS.Sagas.Configuration;
|
||||||
|
|
||||||
|
namespace Svrnty.CQRS.Sagas;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Implementation of saga orchestration.
|
||||||
|
/// </summary>
|
||||||
|
public class SagaOrchestrator : ISagaOrchestrator
|
||||||
|
{
|
||||||
|
private readonly IServiceProvider _serviceProvider;
|
||||||
|
private readonly ISagaStateStore _stateStore;
|
||||||
|
private readonly ISagaMessageBus? _messageBus;
|
||||||
|
private readonly ILogger<SagaOrchestrator> _logger;
|
||||||
|
private readonly SagaOptions _options;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a new saga orchestrator.
|
||||||
|
/// </summary>
|
||||||
|
public SagaOrchestrator(
|
||||||
|
IServiceProvider serviceProvider,
|
||||||
|
ISagaStateStore stateStore,
|
||||||
|
IOptions<SagaOptions> options,
|
||||||
|
ILogger<SagaOrchestrator> logger,
|
||||||
|
ISagaMessageBus? messageBus = null)
|
||||||
|
{
|
||||||
|
_serviceProvider = serviceProvider;
|
||||||
|
_stateStore = stateStore;
|
||||||
|
_messageBus = messageBus;
|
||||||
|
_logger = logger;
|
||||||
|
_options = options.Value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<SagaState> StartAsync<TSaga, TData>(TData initialData, CancellationToken cancellationToken = default)
|
||||||
|
where TSaga : ISaga<TData>
|
||||||
|
where TData : class, ISagaData, new()
|
||||||
|
{
|
||||||
|
return StartAsync<TSaga, TData>(initialData, Guid.NewGuid(), cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public async Task<SagaState> StartAsync<TSaga, TData>(
|
||||||
|
TData initialData,
|
||||||
|
Guid correlationId,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
where TSaga : ISaga<TData>
|
||||||
|
where TData : class, ISagaData, new()
|
||||||
|
{
|
||||||
|
initialData.CorrelationId = correlationId;
|
||||||
|
|
||||||
|
// Get the saga instance and configure it
|
||||||
|
var saga = _serviceProvider.GetRequiredService<TSaga>();
|
||||||
|
var builder = new SagaBuilder<TData>();
|
||||||
|
saga.Configure(builder);
|
||||||
|
|
||||||
|
var steps = builder.Steps;
|
||||||
|
if (steps.Count == 0)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException($"Saga {typeof(TSaga).Name} has no steps configured.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create initial state
|
||||||
|
var state = new SagaState
|
||||||
|
{
|
||||||
|
SagaType = typeof(TSaga).FullName!,
|
||||||
|
CorrelationId = correlationId,
|
||||||
|
Status = SagaStatus.InProgress,
|
||||||
|
CurrentStepIndex = 0,
|
||||||
|
CurrentStepName = steps[0].Name,
|
||||||
|
SerializedData = JsonSerializer.Serialize(initialData)
|
||||||
|
};
|
||||||
|
|
||||||
|
state = await _stateStore.CreateAsync(state, cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Started saga {SagaType} with ID {SagaId} and CorrelationId {CorrelationId}",
|
||||||
|
state.SagaType, state.SagaId, state.CorrelationId);
|
||||||
|
|
||||||
|
// Execute the first step
|
||||||
|
await ExecuteNextStepAsync<TData>(state, steps, initialData, cancellationToken);
|
||||||
|
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<SagaState?> GetStateAsync(Guid sagaId, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return _stateStore.GetByIdAsync(sagaId, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
public Task<SagaState?> GetStateByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return _stateStore.GetByCorrelationIdAsync(correlationId, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Handles a response from a remote step.
|
||||||
|
/// </summary>
|
||||||
|
public async Task HandleResponseAsync<TData>(
|
||||||
|
SagaStepResponse response,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
where TData : class, ISagaData, new()
|
||||||
|
{
|
||||||
|
var state = await _stateStore.GetByIdAsync(response.SagaId, cancellationToken);
|
||||||
|
if (state == null)
|
||||||
|
{
|
||||||
|
_logger.LogWarning("Received response for unknown saga {SagaId}", response.SagaId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var data = JsonSerializer.Deserialize<TData>(state.SerializedData!);
|
||||||
|
if (data == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("Failed to deserialize saga data for {SagaId}", response.SagaId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the saga definition
|
||||||
|
var sagaType = Type.GetType(state.SagaType);
|
||||||
|
if (sagaType == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("Unknown saga type {SagaType}", state.SagaType);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var saga = _serviceProvider.GetService(sagaType) as ISaga<TData>;
|
||||||
|
if (saga == null)
|
||||||
|
{
|
||||||
|
_logger.LogError("Could not resolve saga {SagaType}", state.SagaType);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var builder = new SagaBuilder<TData>();
|
||||||
|
saga.Configure(builder);
|
||||||
|
var steps = builder.Steps;
|
||||||
|
|
||||||
|
if (response.Success)
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"Step {StepName} completed successfully for saga {SagaId}",
|
||||||
|
response.StepName, response.SagaId);
|
||||||
|
|
||||||
|
state.CompletedSteps.Add(response.StepName);
|
||||||
|
state.CurrentStepIndex++;
|
||||||
|
|
||||||
|
if (state.CurrentStepIndex >= steps.Count)
|
||||||
|
{
|
||||||
|
// Saga completed
|
||||||
|
state.Status = SagaStatus.Completed;
|
||||||
|
state.CompletedAt = DateTimeOffset.UtcNow;
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation("Saga {SagaId} completed successfully", state.SagaId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Move to next step
|
||||||
|
state.CurrentStepName = steps[state.CurrentStepIndex].Name;
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
await ExecuteNextStepAsync(state, steps, data, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_logger.LogError(
|
||||||
|
"Step {StepName} failed for saga {SagaId}: {Error}",
|
||||||
|
response.StepName, response.SagaId, response.ErrorMessage);
|
||||||
|
|
||||||
|
state.Errors.Add(new SagaStepError(
|
||||||
|
response.StepName,
|
||||||
|
response.ErrorMessage ?? "Unknown error",
|
||||||
|
response.StackTrace,
|
||||||
|
DateTimeOffset.UtcNow));
|
||||||
|
|
||||||
|
if (_options.AutoCompensateOnFailure)
|
||||||
|
{
|
||||||
|
await StartCompensationAsync(state, steps, data, cancellationToken);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state.Status = SagaStatus.Failed;
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ExecuteNextStepAsync<TData>(
|
||||||
|
SagaState state,
|
||||||
|
System.Collections.Generic.IReadOnlyList<SagaStepDefinition> steps,
|
||||||
|
TData data,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
if (state.CurrentStepIndex >= steps.Count)
|
||||||
|
{
|
||||||
|
state.Status = SagaStatus.Completed;
|
||||||
|
state.CompletedAt = DateTimeOffset.UtcNow;
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var step = steps[state.CurrentStepIndex];
|
||||||
|
var context = new SagaContext(state);
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Executing step {StepName} ({StepIndex}/{TotalSteps}) for saga {SagaId}",
|
||||||
|
step.Name, state.CurrentStepIndex + 1, steps.Count, state.SagaId);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (step.IsRemote)
|
||||||
|
{
|
||||||
|
await ExecuteRemoteStepAsync(state, step, data, context, cancellationToken);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
await ExecuteLocalStepAsync(state, step, data, context, steps, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error executing step {StepName} for saga {SagaId}", step.Name, state.SagaId);
|
||||||
|
|
||||||
|
state.Errors.Add(new SagaStepError(
|
||||||
|
step.Name,
|
||||||
|
ex.Message,
|
||||||
|
ex.StackTrace,
|
||||||
|
DateTimeOffset.UtcNow));
|
||||||
|
|
||||||
|
if (_options.AutoCompensateOnFailure)
|
||||||
|
{
|
||||||
|
await StartCompensationAsync(state, steps, data, cancellationToken);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
state.Status = SagaStatus.Failed;
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ExecuteLocalStepAsync<TData>(
|
||||||
|
SagaState state,
|
||||||
|
SagaStepDefinition step,
|
||||||
|
TData data,
|
||||||
|
SagaContext context,
|
||||||
|
System.Collections.Generic.IReadOnlyList<SagaStepDefinition> steps,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
if (step is LocalSagaStepDefinition<TData> localStep && localStep.ExecuteAction != null)
|
||||||
|
{
|
||||||
|
await localStep.ExecuteAction(data, context, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Local step completed, update state and continue
|
||||||
|
state.CompletedSteps.Add(step.Name);
|
||||||
|
state.SerializedData = JsonSerializer.Serialize(data);
|
||||||
|
state.CurrentStepIndex++;
|
||||||
|
|
||||||
|
if (state.CurrentStepIndex < steps.Count)
|
||||||
|
{
|
||||||
|
state.CurrentStepName = steps[state.CurrentStepIndex].Name;
|
||||||
|
}
|
||||||
|
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
|
||||||
|
// Continue to next step
|
||||||
|
await ExecuteNextStepAsync(state, steps, data, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ExecuteRemoteStepAsync<TData>(
|
||||||
|
SagaState state,
|
||||||
|
SagaStepDefinition step,
|
||||||
|
TData data,
|
||||||
|
SagaContext context,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
if (_messageBus == null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
"Remote saga steps require a message bus. Configure RabbitMQ or another transport.");
|
||||||
|
}
|
||||||
|
|
||||||
|
object? command = null;
|
||||||
|
string commandType;
|
||||||
|
|
||||||
|
// Get the command from the step definition
|
||||||
|
var stepType = step.GetType();
|
||||||
|
var commandBuilderProp = stepType.GetProperty("CommandBuilder");
|
||||||
|
if (commandBuilderProp?.GetValue(step) is Delegate commandBuilder)
|
||||||
|
{
|
||||||
|
command = commandBuilder.DynamicInvoke(data, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (command == null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException($"Step {step.Name} did not produce a command.");
|
||||||
|
}
|
||||||
|
|
||||||
|
commandType = command.GetType().FullName!;
|
||||||
|
|
||||||
|
var message = new SagaMessage
|
||||||
|
{
|
||||||
|
SagaId = state.SagaId,
|
||||||
|
CorrelationId = state.CorrelationId,
|
||||||
|
StepName = step.Name,
|
||||||
|
CommandType = commandType,
|
||||||
|
Payload = JsonSerializer.Serialize(command, command.GetType())
|
||||||
|
};
|
||||||
|
|
||||||
|
await _messageBus.PublishAsync(message, cancellationToken);
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Published command {CommandType} for step {StepName} of saga {SagaId}",
|
||||||
|
commandType, step.Name, state.SagaId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task StartCompensationAsync<TData>(
|
||||||
|
SagaState state,
|
||||||
|
System.Collections.Generic.IReadOnlyList<SagaStepDefinition> steps,
|
||||||
|
TData data,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Starting compensation for saga {SagaId}", state.SagaId);
|
||||||
|
|
||||||
|
state.Status = SagaStatus.Compensating;
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
|
||||||
|
// Execute compensation in reverse order
|
||||||
|
var context = new SagaContext(state);
|
||||||
|
var completedSteps = state.CompletedSteps.ToList();
|
||||||
|
|
||||||
|
for (var i = completedSteps.Count - 1; i >= 0; i--)
|
||||||
|
{
|
||||||
|
var stepName = completedSteps[i];
|
||||||
|
var step = steps.FirstOrDefault(s => s.Name == stepName);
|
||||||
|
|
||||||
|
if (step == null || !step.HasCompensation)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogDebug("Compensating step {StepName} for saga {SagaId}", stepName, state.SagaId);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (step.IsRemote)
|
||||||
|
{
|
||||||
|
await ExecuteRemoteCompensationAsync(state, step, data, context, cancellationToken);
|
||||||
|
}
|
||||||
|
else if (step is LocalSagaStepDefinition<TData> localStep && localStep.CompensateAction != null)
|
||||||
|
{
|
||||||
|
await localStep.CompensateAction(data, context, cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error during compensation of step {StepName} for saga {SagaId}",
|
||||||
|
stepName, state.SagaId);
|
||||||
|
// Continue with other compensations even if one fails
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
state.Status = SagaStatus.Compensated;
|
||||||
|
state.CompletedAt = DateTimeOffset.UtcNow;
|
||||||
|
await _stateStore.UpdateAsync(state, cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogInformation("Saga {SagaId} compensation completed", state.SagaId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ExecuteRemoteCompensationAsync<TData>(
|
||||||
|
SagaState state,
|
||||||
|
SagaStepDefinition step,
|
||||||
|
TData data,
|
||||||
|
SagaContext context,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
where TData : class, ISagaData
|
||||||
|
{
|
||||||
|
if (_messageBus == null)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var stepType = step.GetType();
|
||||||
|
var compensationBuilderProp = stepType.GetProperty("CompensationBuilder");
|
||||||
|
var compensationTypeProp = stepType.GetProperty("CompensationCommandType");
|
||||||
|
|
||||||
|
if (compensationBuilderProp?.GetValue(step) is Delegate compensationBuilder &&
|
||||||
|
compensationTypeProp?.GetValue(step) is Type compensationType)
|
||||||
|
{
|
||||||
|
var compensationCommand = compensationBuilder.DynamicInvoke(data, context);
|
||||||
|
if (compensationCommand != null)
|
||||||
|
{
|
||||||
|
var message = new SagaMessage
|
||||||
|
{
|
||||||
|
SagaId = state.SagaId,
|
||||||
|
CorrelationId = state.CorrelationId,
|
||||||
|
StepName = step.Name,
|
||||||
|
CommandType = compensationType.FullName!,
|
||||||
|
Payload = JsonSerializer.Serialize(compensationCommand, compensationType),
|
||||||
|
IsCompensation = true
|
||||||
|
};
|
||||||
|
|
||||||
|
await _messageBus.PublishAsync(message, cancellationToken);
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"Published compensation command {CommandType} for step {StepName} of saga {SagaId}",
|
||||||
|
compensationType.Name, step.Name, state.SagaId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
38
Svrnty.CQRS.Sagas/Svrnty.CQRS.Sagas.csproj
Normal file
38
Svrnty.CQRS.Sagas/Svrnty.CQRS.Sagas.csproj
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
|
<PropertyGroup>
|
||||||
|
<TargetFramework>net10.0</TargetFramework>
|
||||||
|
<IsAotCompatible>true</IsAotCompatible>
|
||||||
|
<LangVersion>14</LangVersion>
|
||||||
|
<Nullable>enable</Nullable>
|
||||||
|
|
||||||
|
<Company>Svrnty</Company>
|
||||||
|
<Authors>David Lebee, Mathias Beaulieu-Duncan</Authors>
|
||||||
|
<PackageIcon>icon.png</PackageIcon>
|
||||||
|
<PackageReadmeFile>README.md</PackageReadmeFile>
|
||||||
|
<RepositoryUrl>https://git.openharbor.io/svrnty/dotnet-cqrs</RepositoryUrl>
|
||||||
|
<RepositoryType>git</RepositoryType>
|
||||||
|
<PublishRepositoryUrl>true</PublishRepositoryUrl>
|
||||||
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
|
|
||||||
|
<DebugType>portable</DebugType>
|
||||||
|
<DebugSymbols>true</DebugSymbols>
|
||||||
|
<IncludeSymbols>true</IncludeSymbols>
|
||||||
|
<IncludeSource>true</IncludeSource>
|
||||||
|
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
|
||||||
|
</PropertyGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<None Include="..\icon.png" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
<None Include="..\README.md" Pack="true" PackagePath="" CopyToOutputDirectory="Always" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<ProjectReference Include="..\Svrnty.CQRS\Svrnty.CQRS.csproj" />
|
||||||
|
<ProjectReference Include="..\Svrnty.CQRS.Sagas.Abstractions\Svrnty.CQRS.Sagas.Abstractions.csproj" />
|
||||||
|
</ItemGroup>
|
||||||
|
|
||||||
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.0" />
|
||||||
|
<PackageReference Include="Microsoft.Extensions.Options" Version="10.0.0" />
|
||||||
|
</ItemGroup>
|
||||||
|
</Project>
|
||||||
@ -31,6 +31,18 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.Sample", "Svrnty.Sam
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.DynamicQuery.MinimalApi", "Svrnty.CQRS.DynamicQuery.MinimalApi\Svrnty.CQRS.DynamicQuery.MinimalApi.csproj", "{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}"
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.DynamicQuery.MinimalApi", "Svrnty.CQRS.DynamicQuery.MinimalApi\Svrnty.CQRS.DynamicQuery.MinimalApi.csproj", "{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}"
|
||||||
EndProject
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Sagas.Abstractions", "Svrnty.CQRS.Sagas.Abstractions\Svrnty.CQRS.Sagas.Abstractions.csproj", "{13B6608A-596B-495B-9C08-F9B3F0D1915A}"
|
||||||
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Sagas", "Svrnty.CQRS.Sagas\Svrnty.CQRS.Sagas.csproj", "{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}"
|
||||||
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Sagas.RabbitMQ", "Svrnty.CQRS.Sagas.RabbitMQ\Svrnty.CQRS.Sagas.RabbitMQ.csproj", "{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}"
|
||||||
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.DynamicQuery.EntityFramework", "Svrnty.CQRS.DynamicQuery.EntityFramework\Svrnty.CQRS.DynamicQuery.EntityFramework.csproj", "{25456A0B-69AF-4251-B34D-2A3873CD8D80}"
|
||||||
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.Abstractions", "Svrnty.CQRS.Events.Abstractions\Svrnty.CQRS.Events.Abstractions.csproj", "{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}"
|
||||||
|
EndProject
|
||||||
|
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Svrnty.CQRS.Events.RabbitMQ", "Svrnty.CQRS.Events.RabbitMQ\Svrnty.CQRS.Events.RabbitMQ.csproj", "{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}"
|
||||||
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@ -173,6 +185,78 @@ Global
|
|||||||
{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}.Release|x64.Build.0 = Release|Any CPU
|
{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}.Release|x64.Build.0 = Release|Any CPU
|
||||||
{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}.Release|x86.ActiveCfg = Release|Any CPU
|
{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}.Release|x86.Build.0 = Release|Any CPU
|
{1D0E3388-5E4B-4C0E-B826-ACF256FF7C84}.Release|x86.Build.0 = Release|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Debug|x64.Build.0 = Debug|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Debug|x86.Build.0 = Debug|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Release|x64.ActiveCfg = Release|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Release|x64.Build.0 = Release|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
|
{13B6608A-596B-495B-9C08-F9B3F0D1915A}.Release|x86.Build.0 = Release|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Debug|x64.Build.0 = Debug|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Debug|x86.Build.0 = Debug|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Release|x64.ActiveCfg = Release|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Release|x64.Build.0 = Release|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
|
{8EC9D12F-C8CD-4187-A1ED-47365D1C6B61}.Release|x86.Build.0 = Release|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Debug|x64.Build.0 = Debug|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Debug|x86.Build.0 = Debug|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Release|x64.ActiveCfg = Release|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Release|x64.Build.0 = Release|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
|
{2EA39D64-B4A8-4A74-A2E6-D8A8E8312B68}.Release|x86.Build.0 = Release|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Debug|x64.Build.0 = Debug|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Debug|x86.Build.0 = Debug|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Release|x64.ActiveCfg = Release|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Release|x64.Build.0 = Release|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
|
{25456A0B-69AF-4251-B34D-2A3873CD8D80}.Release|x86.Build.0 = Release|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Debug|x64.Build.0 = Debug|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Debug|x86.Build.0 = Debug|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Release|x64.ActiveCfg = Release|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Release|x64.Build.0 = Release|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
|
{7905A4BB-2462-4FFF-9A29-3E4769D20FFC}.Release|x86.Build.0 = Release|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Debug|x64.Build.0 = Debug|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Debug|x86.Build.0 = Debug|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x64.ActiveCfg = Release|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x64.Build.0 = Release|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
|
{3C7412EF-13C2-41F3-9D4C-D2BEC4843C8C}.Release|x86.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user