dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/Stores/PostgresSagaStateStore.cs

237 lines
9.9 KiB
C#

using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions.Sagas;
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
/// <summary>
/// PostgreSQL implementation of saga state store.
/// </summary>
public sealed class PostgresSagaStateStore : ISagaStateStore
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger<PostgresSagaStateStore> _logger;
private static readonly JsonSerializerOptions _jsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
public PostgresSagaStateStore(
IOptions<PostgresEventStreamStoreOptions> options,
ILogger<PostgresSagaStateStore> logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
private string SchemaQualified(string tableName)
{
return $"\"{_options.SchemaName}\".\"{tableName}\"";
}
/// <inheritdoc />
public async Task SaveStateAsync(SagaStateSnapshot state, CancellationToken cancellationToken = default)
{
if (state == null)
throw new ArgumentNullException(nameof(state));
var sql = $@"
INSERT INTO {SchemaQualified("saga_states")}
(saga_id, correlation_id, saga_name, state, current_step, total_steps,
completed_steps, started_at, last_updated, completed_at, error_message, data)
VALUES
(@saga_id, @correlation_id, @saga_name, @state, @current_step, @total_steps,
@completed_steps, @started_at, @last_updated, @completed_at, @error_message, @data)
ON CONFLICT (saga_id)
DO UPDATE SET
correlation_id = EXCLUDED.correlation_id,
saga_name = EXCLUDED.saga_name,
state = EXCLUDED.state,
current_step = EXCLUDED.current_step,
total_steps = EXCLUDED.total_steps,
completed_steps = EXCLUDED.completed_steps,
last_updated = EXCLUDED.last_updated,
completed_at = EXCLUDED.completed_at,
error_message = EXCLUDED.error_message,
data = EXCLUDED.data;";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@saga_id", state.SagaId);
command.Parameters.AddWithValue("@correlation_id", state.CorrelationId);
command.Parameters.AddWithValue("@saga_name", state.SagaName);
command.Parameters.AddWithValue("@state", (int)state.State);
command.Parameters.AddWithValue("@current_step", state.CurrentStep);
command.Parameters.AddWithValue("@total_steps", state.TotalSteps);
command.Parameters.AddWithValue("@completed_steps", NpgsqlTypes.NpgsqlDbType.Jsonb,
JsonSerializer.Serialize(state.CompletedSteps, _jsonOptions));
command.Parameters.AddWithValue("@started_at", state.StartedAt);
command.Parameters.AddWithValue("@last_updated", state.LastUpdated);
command.Parameters.AddWithValue("@completed_at", (object?)state.CompletedAt ?? DBNull.Value);
command.Parameters.AddWithValue("@error_message", (object?)state.ErrorMessage ?? DBNull.Value);
command.Parameters.AddWithValue("@data", NpgsqlTypes.NpgsqlDbType.Jsonb,
JsonSerializer.Serialize(state.Data, _jsonOptions));
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogDebug("Saved saga state for '{SagaName}' (ID: {SagaId}, State: {State})",
state.SagaName, state.SagaId, state.State);
}
/// <inheritdoc />
public async Task<SagaStateSnapshot?> LoadStateAsync(string sagaId, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(sagaId))
throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId));
var sql = $@"
SELECT saga_id, correlation_id, saga_name, state, current_step, total_steps,
completed_steps, started_at, last_updated, completed_at, error_message, data
FROM {SchemaQualified("saga_states")}
WHERE saga_id = @saga_id;";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@saga_id", sagaId);
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
return null;
return ReadSnapshot(reader);
}
/// <inheritdoc />
public async Task<List<SagaStateSnapshot>> GetByCorrelationIdAsync(
string correlationId,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(correlationId))
throw new ArgumentException("Correlation ID cannot be null or empty", nameof(correlationId));
var sql = $@"
SELECT saga_id, correlation_id, saga_name, state, current_step, total_steps,
completed_steps, started_at, last_updated, completed_at, error_message, data
FROM {SchemaQualified("saga_states")}
WHERE correlation_id = @correlation_id
ORDER BY started_at DESC;";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@correlation_id", correlationId);
var results = new List<SagaStateSnapshot>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
results.Add(ReadSnapshot(reader));
}
return results;
}
/// <inheritdoc />
public async Task<List<SagaStateSnapshot>> GetByStateAsync(
SagaState state,
CancellationToken cancellationToken = default)
{
var sql = $@"
SELECT saga_id, correlation_id, saga_name, state, current_step, total_steps,
completed_steps, started_at, last_updated, completed_at, error_message, data
FROM {SchemaQualified("saga_states")}
WHERE state = @state
ORDER BY started_at DESC;";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@state", (int)state);
var results = new List<SagaStateSnapshot>();
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
while (await reader.ReadAsync(cancellationToken))
{
results.Add(ReadSnapshot(reader));
}
return results;
}
/// <inheritdoc />
public async Task DeleteStateAsync(string sagaId, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(sagaId))
throw new ArgumentException("Saga ID cannot be null or empty", nameof(sagaId));
var sql = $@"
DELETE FROM {SchemaQualified("saga_states")}
WHERE saga_id = @saga_id;";
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
await using var command = new NpgsqlCommand(sql, connection);
command.Parameters.AddWithValue("@saga_id", sagaId);
await command.ExecuteNonQueryAsync(cancellationToken);
_logger.LogDebug("Deleted saga state for ID: {SagaId}", sagaId);
}
private static SagaStateSnapshot ReadSnapshot(NpgsqlDataReader reader)
{
var completedStepsJson = reader.GetString(reader.GetOrdinal("completed_steps"));
var completedSteps = JsonSerializer.Deserialize<List<string>>(completedStepsJson, _jsonOptions)
?? new List<string>();
var dataJson = reader.GetString(reader.GetOrdinal("data"));
var data = JsonSerializer.Deserialize<Dictionary<string, object>>(dataJson, _jsonOptions)
?? new Dictionary<string, object>();
var completedAtOrdinal = reader.GetOrdinal("completed_at");
var errorMessageOrdinal = reader.GetOrdinal("error_message");
return new SagaStateSnapshot
{
SagaId = reader.GetString(reader.GetOrdinal("saga_id")),
CorrelationId = reader.GetString(reader.GetOrdinal("correlation_id")),
SagaName = reader.GetString(reader.GetOrdinal("saga_name")),
State = (SagaState)reader.GetInt32(reader.GetOrdinal("state")),
CurrentStep = reader.GetInt32(reader.GetOrdinal("current_step")),
TotalSteps = reader.GetInt32(reader.GetOrdinal("total_steps")),
CompletedSteps = completedSteps,
StartedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("started_at")),
LastUpdated = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("last_updated")),
CompletedAt = reader.IsDBNull(completedAtOrdinal)
? null
: reader.GetFieldValue<DateTimeOffset>(completedAtOrdinal),
ErrorMessage = reader.IsDBNull(errorMessageOrdinal)
? null
: reader.GetString(errorMessageOrdinal),
Data = data
};
}
}