using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Svrnty.CQRS.Events.Abstractions.Sagas;
namespace Svrnty.CQRS.Events.Sagas;
///
/// In-memory implementation of saga state store.
/// Suitable for development and testing. Not for production use.
///
public sealed class InMemorySagaStateStore : ISagaStateStore
{
private readonly ConcurrentDictionary _statesBySagaId = new();
private readonly ConcurrentDictionary> _sagaIdsByCorrelationId = new();
///
public Task SaveStateAsync(SagaStateSnapshot state, CancellationToken cancellationToken = default)
{
if (state == null)
throw new ArgumentNullException(nameof(state));
// Store by saga ID
_statesBySagaId[state.SagaId] = state;
// Index by correlation ID
_sagaIdsByCorrelationId.AddOrUpdate(
state.CorrelationId,
_ => new List { state.SagaId },
(_, list) =>
{
if (!list.Contains(state.SagaId))
list.Add(state.SagaId);
return list;
});
return Task.CompletedTask;
}
///
public Task LoadStateAsync(string sagaId, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(sagaId))
throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId));
_statesBySagaId.TryGetValue(sagaId, out var state);
return Task.FromResult(state);
}
///
public Task> GetByCorrelationIdAsync(
string correlationId,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(correlationId))
throw new ArgumentException("Correlation ID cannot be null or empty", nameof(correlationId));
if (!_sagaIdsByCorrelationId.TryGetValue(correlationId, out var sagaIds))
return Task.FromResult(new List());
var states = sagaIds
.Select(id => _statesBySagaId.TryGetValue(id, out var state) ? state : null)
.Where(s => s != null)
.Cast()
.ToList();
return Task.FromResult(states);
}
///
public Task> GetByStateAsync(
SagaState state,
CancellationToken cancellationToken = default)
{
var states = _statesBySagaId.Values
.Where(s => s.State == state)
.ToList();
return Task.FromResult(states);
}
///
public Task DeleteStateAsync(string sagaId, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(sagaId))
throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId));
if (_statesBySagaId.TryRemove(sagaId, out var state))
{
// Remove from correlation ID index
if (_sagaIdsByCorrelationId.TryGetValue(state.CorrelationId, out var list))
{
list.Remove(sagaId);
if (list.Count == 0)
_sagaIdsByCorrelationId.TryRemove(state.CorrelationId, out _);
}
}
return Task.CompletedTask;
}
}