using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Svrnty.CQRS.Sagas.Abstractions;
using Svrnty.CQRS.Sagas.Abstractions.Persistence;
namespace Svrnty.CQRS.Sagas.Persistence;
///
/// In-memory saga state store for development and testing.
///
public class InMemorySagaStateStore : ISagaStateStore
{
private readonly ConcurrentDictionary _states = new();
///
public Task CreateAsync(SagaState state, CancellationToken cancellationToken = default)
{
if (!_states.TryAdd(state.SagaId, state))
{
throw new InvalidOperationException($"Saga with ID {state.SagaId} already exists.");
}
return Task.FromResult(state);
}
///
public Task GetByIdAsync(Guid sagaId, CancellationToken cancellationToken = default)
{
_states.TryGetValue(sagaId, out var state);
return Task.FromResult(state);
}
///
public Task GetByCorrelationIdAsync(Guid correlationId, CancellationToken cancellationToken = default)
{
var state = _states.Values.FirstOrDefault(s => s.CorrelationId == correlationId);
return Task.FromResult(state);
}
///
public Task UpdateAsync(SagaState state, CancellationToken cancellationToken = default)
{
state.UpdatedAt = DateTimeOffset.UtcNow;
_states[state.SagaId] = state;
return Task.FromResult(state);
}
///
public Task> GetPendingSagasAsync(CancellationToken cancellationToken = default)
{
var pending = _states.Values
.Where(s => s.Status == SagaStatus.InProgress || s.Status == SagaStatus.Compensating)
.ToList();
return Task.FromResult>(pending);
}
///
public Task> GetSagasByStatusAsync(SagaStatus status, CancellationToken cancellationToken = default)
{
var sagas = _states.Values
.Where(s => s.Status == status)
.ToList();
return Task.FromResult>(sagas);
}
}