using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; using Svrnty.CQRS.Events.Abstractions.Projections; namespace Svrnty.CQRS.Events.Projections; /// /// In-memory implementation of projection checkpoint storage for development/testing. /// /// /// /// Warning: This is an in-memory store. All checkpoints are lost on restart. /// For production, use PostgresProjectionCheckpointStore or another persistent implementation. /// /// /// Thread-safe implementation using ConcurrentDictionary. /// /// public sealed class InMemoryProjectionCheckpointStore : IProjectionCheckpointStore { private readonly ConcurrentDictionary _checkpoints = new(); /// public Task GetCheckpointAsync( string projectionName, string streamName, CancellationToken cancellationToken = default) { var key = GetKey(projectionName, streamName); var checkpoint = _checkpoints.TryGetValue(key, out var value) ? value : null; return Task.FromResult(checkpoint); } /// public Task SaveCheckpointAsync( ProjectionCheckpoint checkpoint, CancellationToken cancellationToken = default) { if (checkpoint == null) throw new ArgumentNullException(nameof(checkpoint)); var key = GetKey(checkpoint.ProjectionName, checkpoint.StreamName); _checkpoints[key] = checkpoint; return Task.CompletedTask; } /// public Task ResetCheckpointAsync( string projectionName, string streamName, CancellationToken cancellationToken = default) { var key = GetKey(projectionName, streamName); _checkpoints.TryRemove(key, out _); return Task.CompletedTask; } /// public Task GetAllCheckpointsAsync( string projectionName, CancellationToken cancellationToken = default) { var checkpoints = _checkpoints.Values .Where(c => c.ProjectionName == projectionName) .ToArray(); return Task.FromResult(checkpoints); } private static string GetKey(string projectionName, string streamName) { return $"{projectionName}::{streamName}"; } }