using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Svrnty.CQRS.Sagas.Abstractions; using Svrnty.CQRS.Sagas.Abstractions.Persistence; namespace Svrnty.CQRS.Sagas.Persistence; /// /// In-memory saga state store for development and testing. /// public class InMemorySagaStateStore : ISagaStateStore { private readonly ConcurrentDictionary _states = new(); /// public Task CreateAsync(SagaState state, CancellationToken cancellationToken = default) { if (!_states.TryAdd(state.SagaId, state)) { throw new InvalidOperationException($"Saga with ID {state.SagaId} already exists."); } return Task.FromResult(state); } /// public Task GetByIdAsync(Guid sagaId, CancellationToken cancellationToken = default) { _states.TryGetValue(sagaId, out var state); return Task.FromResult(state); } /// public Task GetByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken = default) { var state = _states.Values.FirstOrDefault(s => s.CorrelationId == correlationId); return Task.FromResult(state); } /// public Task UpdateAsync(SagaState state, CancellationToken cancellationToken = default) { state.UpdatedAt = DateTimeOffset.UtcNow; _states[state.SagaId] = state; return Task.FromResult(state); } /// public Task> GetPendingSagasAsync(CancellationToken cancellationToken = default) { var pending = _states.Values .Where(s => s.Status == SagaStatus.InProgress || s.Status == SagaStatus.Compensating) .ToList(); return Task.FromResult>(pending); } /// public Task> GetSagasByStatusAsync(SagaStatus status, CancellationToken cancellationToken = default) { var sagas = _states.Values .Where(s => s.Status == status) .ToList(); return Task.FromResult>(sagas); } }