dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/Subscriptions/PostgresSubscriptionStore.cs

340 lines
14 KiB
C#

using System;
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Svrnty.CQRS.Events.Abstractions.Subscriptions;
namespace Svrnty.CQRS.Events.PostgreSQL.Subscriptions;
/// <summary>
/// PostgreSQL implementation of subscription store.
/// </summary>
public sealed class PostgresSubscriptionStore : IPersistentSubscriptionStore
{
private readonly PostgresEventStreamStoreOptions _options;
private readonly ILogger<PostgresSubscriptionStore> _logger;
public PostgresSubscriptionStore(
IOptions<PostgresEventStreamStoreOptions> options,
ILogger<PostgresSubscriptionStore> logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<PersistentSubscription> CreateAsync(
PersistentSubscription subscription,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
INSERT INTO persistent_subscriptions (
id, subscriber_id, correlation_id, event_types, terminal_event_types,
delivery_mode, created_at, expires_at, last_delivered_sequence, status,
connection_id, data_source_id
) VALUES (
@Id, @SubscriberId, @CorrelationId, @EventTypes::jsonb, @TerminalEventTypes::jsonb,
@DeliveryMode, @CreatedAt, @ExpiresAt, @LastDeliveredSequence, @Status,
@ConnectionId, @DataSourceId
)
""";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("Id", subscription.Id);
cmd.Parameters.AddWithValue("SubscriberId", subscription.SubscriberId);
cmd.Parameters.AddWithValue("CorrelationId", subscription.CorrelationId);
cmd.Parameters.AddWithValue("EventTypes", JsonSerializer.Serialize(subscription.EventTypes));
cmd.Parameters.AddWithValue("TerminalEventTypes", JsonSerializer.Serialize(subscription.TerminalEventTypes));
cmd.Parameters.AddWithValue("DeliveryMode", (int)subscription.DeliveryMode);
cmd.Parameters.AddWithValue("CreatedAt", subscription.CreatedAt);
cmd.Parameters.AddWithValue("ExpiresAt", subscription.ExpiresAt.HasValue ? subscription.ExpiresAt.Value : DBNull.Value);
cmd.Parameters.AddWithValue("LastDeliveredSequence", subscription.LastDeliveredSequence);
cmd.Parameters.AddWithValue("Status", (int)subscription.Status);
cmd.Parameters.AddWithValue("ConnectionId", subscription.ConnectionId ?? (object)DBNull.Value);
cmd.Parameters.AddWithValue("DataSourceId", subscription.DataSourceId ?? (object)DBNull.Value);
await cmd.ExecuteNonQueryAsync(cancellationToken);
return subscription;
}
public async Task<PersistentSubscription?> GetByIdAsync(
string id,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types,
delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence,
status, connection_id, data_source_id
FROM persistent_subscriptions
WHERE id = @Id
""";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("Id", id);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
if (!await reader.ReadAsync(cancellationToken))
{
return null;
}
return MapSubscription(reader);
}
public async Task<IReadOnlyList<PersistentSubscription>> GetBySubscriberIdAsync(
string subscriberId,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types,
delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence,
status, connection_id, data_source_id
FROM persistent_subscriptions
WHERE subscriber_id = @SubscriberId
ORDER BY created_at DESC
""";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("SubscriberId", subscriberId);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var subscriptions = new List<PersistentSubscription>();
while (await reader.ReadAsync(cancellationToken))
{
subscriptions.Add(MapSubscription(reader));
}
return subscriptions;
}
public async Task<IReadOnlyList<PersistentSubscription>> GetByCorrelationIdAsync(
string correlationId,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types,
delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence,
status, connection_id, data_source_id
FROM persistent_subscriptions
WHERE correlation_id = @CorrelationId
ORDER BY created_at ASC
""";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("CorrelationId", correlationId);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var subscriptions = new List<PersistentSubscription>();
while (await reader.ReadAsync(cancellationToken))
{
subscriptions.Add(MapSubscription(reader));
}
return subscriptions;
}
public async Task<IReadOnlyList<PersistentSubscription>> GetByStatusAsync(
SubscriptionStatus status,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types,
delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence,
status, connection_id, data_source_id
FROM persistent_subscriptions
WHERE status = @Status
ORDER BY created_at DESC
""";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("Status", (int)status);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var subscriptions = new List<PersistentSubscription>();
while (await reader.ReadAsync(cancellationToken))
{
subscriptions.Add(MapSubscription(reader));
}
return subscriptions;
}
public async Task<IReadOnlyList<PersistentSubscription>> GetByConnectionIdAsync(
string connectionId,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types,
delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence,
status, connection_id, data_source_id
FROM persistent_subscriptions
WHERE connection_id = @ConnectionId
ORDER BY created_at ASC
""";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("ConnectionId", connectionId);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var subscriptions = new List<PersistentSubscription>();
while (await reader.ReadAsync(cancellationToken))
{
subscriptions.Add(MapSubscription(reader));
}
return subscriptions;
}
public async Task UpdateAsync(
PersistentSubscription subscription,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
UPDATE persistent_subscriptions
SET subscriber_id = @SubscriberId,
correlation_id = @CorrelationId,
event_types = @EventTypes::jsonb,
terminal_event_types = @TerminalEventTypes::jsonb,
delivery_mode = @DeliveryMode,
expires_at = @ExpiresAt,
completed_at = @CompletedAt,
last_delivered_sequence = @LastDeliveredSequence,
status = @Status,
connection_id = @ConnectionId,
data_source_id = @DataSourceId
WHERE id = @Id
""";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("Id", subscription.Id);
cmd.Parameters.AddWithValue("SubscriberId", subscription.SubscriberId);
cmd.Parameters.AddWithValue("CorrelationId", subscription.CorrelationId);
cmd.Parameters.AddWithValue("EventTypes", JsonSerializer.Serialize(subscription.EventTypes));
cmd.Parameters.AddWithValue("TerminalEventTypes", JsonSerializer.Serialize(subscription.TerminalEventTypes));
cmd.Parameters.AddWithValue("DeliveryMode", (int)subscription.DeliveryMode);
cmd.Parameters.AddWithValue("ExpiresAt", subscription.ExpiresAt.HasValue ? subscription.ExpiresAt.Value : DBNull.Value);
cmd.Parameters.AddWithValue("CompletedAt", subscription.CompletedAt.HasValue ? subscription.CompletedAt.Value : DBNull.Value);
cmd.Parameters.AddWithValue("LastDeliveredSequence", subscription.LastDeliveredSequence);
cmd.Parameters.AddWithValue("Status", (int)subscription.Status);
cmd.Parameters.AddWithValue("ConnectionId", subscription.ConnectionId ?? (object)DBNull.Value);
cmd.Parameters.AddWithValue("DataSourceId", subscription.DataSourceId ?? (object)DBNull.Value);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
public async Task DeleteAsync(
string id,
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = "DELETE FROM persistent_subscriptions WHERE id = @Id";
await using var cmd = new NpgsqlCommand(sql, connection);
cmd.Parameters.AddWithValue("Id", id);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
public async Task<IReadOnlyList<PersistentSubscription>> GetExpiredSubscriptionsAsync(
CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync(cancellationToken);
const string sql = """
SELECT id, subscriber_id, correlation_id, event_types, terminal_event_types,
delivery_mode, created_at, expires_at, completed_at, last_delivered_sequence,
status, connection_id, data_source_id
FROM persistent_subscriptions
WHERE expires_at IS NOT NULL
AND expires_at < NOW()
AND status = 0
ORDER BY expires_at ASC
""";
await using var cmd = new NpgsqlCommand(sql, connection);
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
var subscriptions = new List<PersistentSubscription>();
while (await reader.ReadAsync(cancellationToken))
{
subscriptions.Add(MapSubscription(reader));
}
return subscriptions;
}
private static PersistentSubscription MapSubscription(NpgsqlDataReader reader)
{
var eventTypesJson = reader.GetString(3);
var terminalEventTypesJson = reader.GetString(4);
var subscription = new PersistentSubscription
{
Id = reader.GetString(0),
SubscriberId = reader.GetString(1),
CorrelationId = reader.GetString(2),
EventTypes = JsonSerializer.Deserialize<HashSet<string>>(eventTypesJson) ?? new HashSet<string>(),
TerminalEventTypes = JsonSerializer.Deserialize<HashSet<string>>(terminalEventTypesJson) ?? new HashSet<string>(),
DeliveryMode = (DeliveryMode)reader.GetInt32(5),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(6),
ExpiresAt = reader.IsDBNull(7) ? null : reader.GetFieldValue<DateTimeOffset>(7),
ConnectionId = reader.IsDBNull(11) ? null : reader.GetString(11),
DataSourceId = reader.IsDBNull(12) ? null : reader.GetString(12)
};
// Set private properties via reflection or use methods
var completedAt = reader.IsDBNull(8) ? (DateTimeOffset?)null : reader.GetFieldValue<DateTimeOffset>(8);
var lastDeliveredSequence = reader.GetInt64(9);
var status = (SubscriptionStatus)reader.GetInt32(10);
// Use reflection to set private setters
typeof(PersistentSubscription)
.GetProperty(nameof(PersistentSubscription.CompletedAt))!
.SetValue(subscription, completedAt);
typeof(PersistentSubscription)
.GetProperty(nameof(PersistentSubscription.LastDeliveredSequence))!
.SetValue(subscription, lastDeliveredSequence);
typeof(PersistentSubscription)
.GetProperty(nameof(PersistentSubscription.Status))!
.SetValue(subscription, status);
return subscription;
}
}