69 lines
2.3 KiB
C#
69 lines
2.3 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Svrnty.CQRS.Sagas.Abstractions;
|
|
using Svrnty.CQRS.Sagas.Abstractions.Persistence;
|
|
|
|
namespace Svrnty.CQRS.Sagas.Persistence;
|
|
|
|
/// <summary>
|
|
/// In-memory saga state store for development and testing.
|
|
/// </summary>
|
|
public class InMemorySagaStateStore : ISagaStateStore
|
|
{
|
|
private readonly ConcurrentDictionary<Guid, SagaState> _states = new();
|
|
|
|
/// <inheritdoc />
|
|
public Task<SagaState> CreateAsync(SagaState state, CancellationToken cancellationToken = default)
|
|
{
|
|
if (!_states.TryAdd(state.SagaId, state))
|
|
{
|
|
throw new InvalidOperationException($"Saga with ID {state.SagaId} already exists.");
|
|
}
|
|
return Task.FromResult(state);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public Task<SagaState?> GetByIdAsync(Guid sagaId, CancellationToken cancellationToken = default)
|
|
{
|
|
_states.TryGetValue(sagaId, out var state);
|
|
return Task.FromResult(state);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public Task<SagaState?> GetByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken = default)
|
|
{
|
|
var state = _states.Values.FirstOrDefault(s => s.CorrelationId == correlationId);
|
|
return Task.FromResult(state);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public Task<SagaState> UpdateAsync(SagaState state, CancellationToken cancellationToken = default)
|
|
{
|
|
state.UpdatedAt = DateTimeOffset.UtcNow;
|
|
_states[state.SagaId] = state;
|
|
return Task.FromResult(state);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public Task<IReadOnlyList<SagaState>> GetPendingSagasAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
var pending = _states.Values
|
|
.Where(s => s.Status == SagaStatus.InProgress || s.Status == SagaStatus.Compensating)
|
|
.ToList();
|
|
return Task.FromResult<IReadOnlyList<SagaState>>(pending);
|
|
}
|
|
|
|
/// <inheritdoc />
|
|
public Task<IReadOnlyList<SagaState>> GetSagasByStatusAsync(SagaStatus status, CancellationToken cancellationToken = default)
|
|
{
|
|
var sagas = _states.Values
|
|
.Where(s => s.Status == status)
|
|
.ToList();
|
|
return Task.FromResult<IReadOnlyList<SagaState>>(sagas);
|
|
}
|
|
}
|