400 lines
22 KiB
C#
400 lines
22 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.PostgreSQL.Configuration;
|
|
using Svrnty.CQRS.Events.Abstractions.Streaming;
|
|
using Svrnty.CQRS.Events.Abstractions.Configuration;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text.Json;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using Npgsql;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
namespace Svrnty.CQRS.Events.PostgreSQL.Stores;
|
|
|
|
/// <summary>
|
|
/// PostgreSQL implementation of stream configuration store.
|
|
/// </summary>
|
|
public class PostgresStreamConfigurationStore : IStreamConfigurationStore
|
|
{
|
|
private readonly PostgresEventStreamStoreOptions _options;
|
|
private readonly ILogger<PostgresStreamConfigurationStore> _logger;
|
|
|
|
public PostgresStreamConfigurationStore(
|
|
IOptions<PostgresEventStreamStoreOptions> options,
|
|
ILogger<PostgresStreamConfigurationStore> logger)
|
|
{
|
|
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
|
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
|
}
|
|
|
|
public async Task<StreamConfiguration?> GetConfigurationAsync(
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
const string sql = @"
|
|
SELECT * FROM event_streaming.stream_configurations
|
|
WHERE stream_name = @StreamName";
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@StreamName", streamName);
|
|
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
if (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
return MapToStreamConfiguration(reader);
|
|
}
|
|
|
|
_logger.LogDebug("No configuration found for stream {StreamName}", streamName);
|
|
return null;
|
|
}
|
|
|
|
public async Task<IReadOnlyList<StreamConfiguration>> GetAllConfigurationsAsync(
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
const string sql = "SELECT * FROM event_streaming.stream_configurations ORDER BY stream_name";
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
|
|
|
var configurations = new List<StreamConfiguration>();
|
|
while (await reader.ReadAsync(cancellationToken))
|
|
{
|
|
configurations.Add(MapToStreamConfiguration(reader));
|
|
}
|
|
|
|
_logger.LogDebug("Retrieved {Count} stream configurations", configurations.Count);
|
|
return configurations;
|
|
}
|
|
|
|
public async Task SetConfigurationAsync(
|
|
StreamConfiguration configuration,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
if (configuration == null)
|
|
throw new ArgumentNullException(nameof(configuration));
|
|
|
|
configuration.Validate();
|
|
|
|
const string sql = @"
|
|
INSERT INTO event_streaming.stream_configurations (
|
|
stream_name, description, tags,
|
|
retention_max_age_seconds, retention_max_size_bytes, retention_max_event_count,
|
|
retention_enable_partitioning, retention_partition_interval_seconds,
|
|
dlq_enabled, dlq_stream_name, dlq_max_delivery_attempts,
|
|
dlq_retry_delay_seconds, dlq_store_original_event, dlq_store_error_details,
|
|
lifecycle_auto_create, lifecycle_auto_archive, lifecycle_archive_after_seconds,
|
|
lifecycle_archive_location, lifecycle_auto_delete, lifecycle_delete_after_seconds,
|
|
performance_batch_size, performance_enable_compression, performance_compression_algorithm,
|
|
performance_enable_indexing, performance_indexed_fields, performance_cache_size,
|
|
access_public_read, access_public_write, access_allowed_readers, access_allowed_writers,
|
|
access_max_consumer_groups, access_max_events_per_second,
|
|
created_at, updated_at, created_by, updated_by
|
|
)
|
|
VALUES (
|
|
@StreamName, @Description, @Tags::jsonb,
|
|
@RetentionMaxAge, @RetentionMaxSize, @RetentionMaxCount,
|
|
@RetentionPartitioning, @RetentionPartitionInterval,
|
|
@DlqEnabled, @DlqStreamName, @DlqMaxAttempts,
|
|
@DlqRetryDelay, @DlqStoreOriginal, @DlqStoreError,
|
|
@LifecycleAutoCreate, @LifecycleAutoArchive, @LifecycleArchiveAfter,
|
|
@LifecycleArchiveLocation, @LifecycleAutoDelete, @LifecycleDeleteAfter,
|
|
@PerfBatchSize, @PerfCompression, @PerfCompressionAlgorithm,
|
|
@PerfIndexing, @PerfIndexedFields::jsonb, @PerfCacheSize,
|
|
@AccessPublicRead, @AccessPublicWrite, @AccessReaders::jsonb, @AccessWriters::jsonb,
|
|
@AccessMaxConsumerGroups, @AccessMaxEventsPerSecond,
|
|
@CreatedAt, @UpdatedAt, @CreatedBy, @UpdatedBy
|
|
)
|
|
ON CONFLICT (stream_name) DO UPDATE SET
|
|
description = EXCLUDED.description,
|
|
tags = EXCLUDED.tags,
|
|
retention_max_age_seconds = EXCLUDED.retention_max_age_seconds,
|
|
retention_max_size_bytes = EXCLUDED.retention_max_size_bytes,
|
|
retention_max_event_count = EXCLUDED.retention_max_event_count,
|
|
retention_enable_partitioning = EXCLUDED.retention_enable_partitioning,
|
|
retention_partition_interval_seconds = EXCLUDED.retention_partition_interval_seconds,
|
|
dlq_enabled = EXCLUDED.dlq_enabled,
|
|
dlq_stream_name = EXCLUDED.dlq_stream_name,
|
|
dlq_max_delivery_attempts = EXCLUDED.dlq_max_delivery_attempts,
|
|
dlq_retry_delay_seconds = EXCLUDED.dlq_retry_delay_seconds,
|
|
dlq_store_original_event = EXCLUDED.dlq_store_original_event,
|
|
dlq_store_error_details = EXCLUDED.dlq_store_error_details,
|
|
lifecycle_auto_create = EXCLUDED.lifecycle_auto_create,
|
|
lifecycle_auto_archive = EXCLUDED.lifecycle_auto_archive,
|
|
lifecycle_archive_after_seconds = EXCLUDED.lifecycle_archive_after_seconds,
|
|
lifecycle_archive_location = EXCLUDED.lifecycle_archive_location,
|
|
lifecycle_auto_delete = EXCLUDED.lifecycle_auto_delete,
|
|
lifecycle_delete_after_seconds = EXCLUDED.lifecycle_delete_after_seconds,
|
|
performance_batch_size = EXCLUDED.performance_batch_size,
|
|
performance_enable_compression = EXCLUDED.performance_enable_compression,
|
|
performance_compression_algorithm = EXCLUDED.performance_compression_algorithm,
|
|
performance_enable_indexing = EXCLUDED.performance_enable_indexing,
|
|
performance_indexed_fields = EXCLUDED.performance_indexed_fields,
|
|
performance_cache_size = EXCLUDED.performance_cache_size,
|
|
access_public_read = EXCLUDED.access_public_read,
|
|
access_public_write = EXCLUDED.access_public_write,
|
|
access_allowed_readers = EXCLUDED.access_allowed_readers,
|
|
access_allowed_writers = EXCLUDED.access_allowed_writers,
|
|
access_max_consumer_groups = EXCLUDED.access_max_consumer_groups,
|
|
access_max_events_per_second = EXCLUDED.access_max_events_per_second,
|
|
updated_at = EXCLUDED.updated_at,
|
|
updated_by = EXCLUDED.updated_by";
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
|
|
// Basic fields
|
|
command.Parameters.AddWithValue("@StreamName", configuration.StreamName);
|
|
command.Parameters.AddWithValue("@Description", (object?)configuration.Description ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("@Tags", configuration.Tags != null
|
|
? JsonSerializer.Serialize(configuration.Tags)
|
|
: DBNull.Value);
|
|
|
|
// Retention
|
|
var retention = configuration.Retention;
|
|
command.Parameters.AddWithValue("@RetentionMaxAge", retention?.MaxAge?.TotalSeconds ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@RetentionMaxSize", retention?.MaxSizeBytes ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@RetentionMaxCount", retention?.MaxEventCount ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@RetentionPartitioning", retention?.EnablePartitioning ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@RetentionPartitionInterval", retention?.PartitionInterval?.TotalSeconds ?? (object)DBNull.Value);
|
|
|
|
// DLQ
|
|
var dlq = configuration.DeadLetterQueue;
|
|
command.Parameters.AddWithValue("@DlqEnabled", dlq?.Enabled ?? false);
|
|
command.Parameters.AddWithValue("@DlqStreamName", (object?)dlq?.DeadLetterStreamName ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("@DlqMaxAttempts", dlq?.MaxDeliveryAttempts ?? 3);
|
|
command.Parameters.AddWithValue("@DlqRetryDelay", dlq?.RetryDelay?.TotalSeconds ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@DlqStoreOriginal", dlq?.StoreOriginalEvent ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@DlqStoreError", dlq?.StoreErrorDetails ?? (object)DBNull.Value);
|
|
|
|
// Lifecycle
|
|
var lifecycle = configuration.Lifecycle;
|
|
command.Parameters.AddWithValue("@LifecycleAutoCreate", lifecycle?.AutoCreate ?? true);
|
|
command.Parameters.AddWithValue("@LifecycleAutoArchive", lifecycle?.AutoArchive ?? false);
|
|
command.Parameters.AddWithValue("@LifecycleArchiveAfter", lifecycle?.ArchiveAfter?.TotalSeconds ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@LifecycleArchiveLocation", (object?)lifecycle?.ArchiveLocation ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("@LifecycleAutoDelete", lifecycle?.AutoDelete ?? false);
|
|
command.Parameters.AddWithValue("@LifecycleDeleteAfter", lifecycle?.DeleteAfter?.TotalSeconds ?? (object)DBNull.Value);
|
|
|
|
// Performance
|
|
var perf = configuration.Performance;
|
|
command.Parameters.AddWithValue("@PerfBatchSize", perf?.BatchSize ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@PerfCompression", perf?.EnableCompression ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@PerfCompressionAlgorithm", (object?)perf?.CompressionAlgorithm ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("@PerfIndexing", perf?.EnableIndexing ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@PerfIndexedFields", perf?.IndexedFields != null
|
|
? JsonSerializer.Serialize(perf.IndexedFields)
|
|
: DBNull.Value);
|
|
command.Parameters.AddWithValue("@PerfCacheSize", perf?.CacheSize ?? (object)DBNull.Value);
|
|
|
|
// Access Control
|
|
var access = configuration.AccessControl;
|
|
command.Parameters.AddWithValue("@AccessPublicRead", access?.PublicRead ?? false);
|
|
command.Parameters.AddWithValue("@AccessPublicWrite", access?.PublicWrite ?? false);
|
|
command.Parameters.AddWithValue("@AccessReaders", access?.AllowedReaders != null
|
|
? JsonSerializer.Serialize(access.AllowedReaders)
|
|
: DBNull.Value);
|
|
command.Parameters.AddWithValue("@AccessWriters", access?.AllowedWriters != null
|
|
? JsonSerializer.Serialize(access.AllowedWriters)
|
|
: DBNull.Value);
|
|
command.Parameters.AddWithValue("@AccessMaxConsumerGroups", access?.MaxConsumerGroups ?? (object)DBNull.Value);
|
|
command.Parameters.AddWithValue("@AccessMaxEventsPerSecond", access?.MaxEventsPerSecond ?? (object)DBNull.Value);
|
|
|
|
// Metadata
|
|
command.Parameters.AddWithValue("@CreatedAt", configuration.CreatedAt != default ? configuration.CreatedAt : DateTimeOffset.UtcNow);
|
|
command.Parameters.AddWithValue("@UpdatedAt", configuration.UpdatedAt ?? DateTimeOffset.UtcNow);
|
|
command.Parameters.AddWithValue("@CreatedBy", (object?)configuration.CreatedBy ?? DBNull.Value);
|
|
command.Parameters.AddWithValue("@UpdatedBy", (object?)configuration.UpdatedBy ?? DBNull.Value);
|
|
|
|
await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
_logger.LogInformation("Set configuration for stream {StreamName}", configuration.StreamName);
|
|
}
|
|
|
|
public async Task DeleteConfigurationAsync(
|
|
string streamName,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
const string sql = @"
|
|
DELETE FROM event_streaming.stream_configurations
|
|
WHERE stream_name = @StreamName";
|
|
|
|
await using var connection = new NpgsqlConnection(_options.ConnectionString);
|
|
await connection.OpenAsync(cancellationToken);
|
|
|
|
await using var command = new NpgsqlCommand(sql, connection);
|
|
command.Parameters.AddWithValue("@StreamName", streamName);
|
|
|
|
var rowsAffected = await command.ExecuteNonQueryAsync(cancellationToken);
|
|
|
|
if (rowsAffected > 0)
|
|
{
|
|
_logger.LogInformation("Deleted configuration for stream {StreamName}", streamName);
|
|
}
|
|
else
|
|
{
|
|
_logger.LogDebug("No configuration found to delete for stream {StreamName}", streamName);
|
|
}
|
|
}
|
|
|
|
public async Task<IReadOnlyList<StreamConfiguration>> FindConfigurationsAsync(
|
|
Func<StreamConfiguration, bool> predicate,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
var allConfigurations = await GetAllConfigurationsAsync(cancellationToken);
|
|
return allConfigurations.Where(predicate).ToList();
|
|
}
|
|
|
|
private static StreamConfiguration MapToStreamConfiguration(NpgsqlDataReader reader)
|
|
{
|
|
var config = new StreamConfiguration
|
|
{
|
|
StreamName = reader.GetString(reader.GetOrdinal("stream_name")),
|
|
Description = reader.IsDBNull(reader.GetOrdinal("description"))
|
|
? null
|
|
: reader.GetString(reader.GetOrdinal("description")),
|
|
Tags = reader.IsDBNull(reader.GetOrdinal("tags"))
|
|
? null
|
|
: JsonSerializer.Deserialize<Dictionary<string, string>>(
|
|
reader.GetString(reader.GetOrdinal("tags"))),
|
|
CreatedAt = reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("created_at")),
|
|
UpdatedAt = reader.IsDBNull(reader.GetOrdinal("updated_at"))
|
|
? null
|
|
: reader.GetFieldValue<DateTimeOffset>(reader.GetOrdinal("updated_at")),
|
|
CreatedBy = reader.IsDBNull(reader.GetOrdinal("created_by"))
|
|
? null
|
|
: reader.GetString(reader.GetOrdinal("created_by")),
|
|
UpdatedBy = reader.IsDBNull(reader.GetOrdinal("updated_by"))
|
|
? null
|
|
: reader.GetString(reader.GetOrdinal("updated_by"))
|
|
};
|
|
|
|
// Map retention configuration
|
|
if (!reader.IsDBNull(reader.GetOrdinal("retention_max_age_seconds")) ||
|
|
!reader.IsDBNull(reader.GetOrdinal("retention_max_size_bytes")) ||
|
|
!reader.IsDBNull(reader.GetOrdinal("retention_max_event_count")))
|
|
{
|
|
config.Retention = new RetentionConfiguration
|
|
{
|
|
MaxAge = reader.IsDBNull(reader.GetOrdinal("retention_max_age_seconds"))
|
|
? null
|
|
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("retention_max_age_seconds"))),
|
|
MaxSizeBytes = reader.IsDBNull(reader.GetOrdinal("retention_max_size_bytes"))
|
|
? null
|
|
: reader.GetInt64(reader.GetOrdinal("retention_max_size_bytes")),
|
|
MaxEventCount = reader.IsDBNull(reader.GetOrdinal("retention_max_event_count"))
|
|
? null
|
|
: reader.GetInt64(reader.GetOrdinal("retention_max_event_count")),
|
|
EnablePartitioning = reader.IsDBNull(reader.GetOrdinal("retention_enable_partitioning"))
|
|
? null
|
|
: reader.GetBoolean(reader.GetOrdinal("retention_enable_partitioning")),
|
|
PartitionInterval = reader.IsDBNull(reader.GetOrdinal("retention_partition_interval_seconds"))
|
|
? null
|
|
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("retention_partition_interval_seconds")))
|
|
};
|
|
}
|
|
|
|
// Map DLQ configuration
|
|
var dlqEnabled = !reader.IsDBNull(reader.GetOrdinal("dlq_enabled")) && reader.GetBoolean(reader.GetOrdinal("dlq_enabled"));
|
|
if (dlqEnabled)
|
|
{
|
|
config.DeadLetterQueue = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
DeadLetterStreamName = reader.IsDBNull(reader.GetOrdinal("dlq_stream_name"))
|
|
? null
|
|
: reader.GetString(reader.GetOrdinal("dlq_stream_name")),
|
|
MaxDeliveryAttempts = reader.GetInt32(reader.GetOrdinal("dlq_max_delivery_attempts")),
|
|
RetryDelay = reader.IsDBNull(reader.GetOrdinal("dlq_retry_delay_seconds"))
|
|
? null
|
|
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("dlq_retry_delay_seconds"))),
|
|
StoreOriginalEvent = reader.IsDBNull(reader.GetOrdinal("dlq_store_original_event"))
|
|
? null
|
|
: reader.GetBoolean(reader.GetOrdinal("dlq_store_original_event")),
|
|
StoreErrorDetails = reader.IsDBNull(reader.GetOrdinal("dlq_store_error_details"))
|
|
? null
|
|
: reader.GetBoolean(reader.GetOrdinal("dlq_store_error_details"))
|
|
};
|
|
}
|
|
|
|
// Map lifecycle configuration (always create since fields have defaults)
|
|
config.Lifecycle = new LifecycleConfiguration
|
|
{
|
|
AutoCreate = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_create")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_create")),
|
|
AutoArchive = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_archive")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_archive")),
|
|
ArchiveAfter = reader.IsDBNull(reader.GetOrdinal("lifecycle_archive_after_seconds"))
|
|
? null
|
|
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("lifecycle_archive_after_seconds"))),
|
|
ArchiveLocation = reader.IsDBNull(reader.GetOrdinal("lifecycle_archive_location"))
|
|
? null
|
|
: reader.GetString(reader.GetOrdinal("lifecycle_archive_location")),
|
|
AutoDelete = !reader.IsDBNull(reader.GetOrdinal("lifecycle_auto_delete")) && reader.GetBoolean(reader.GetOrdinal("lifecycle_auto_delete")),
|
|
DeleteAfter = reader.IsDBNull(reader.GetOrdinal("lifecycle_delete_after_seconds"))
|
|
? null
|
|
: TimeSpan.FromSeconds(reader.GetInt64(reader.GetOrdinal("lifecycle_delete_after_seconds")))
|
|
};
|
|
|
|
// Map performance configuration
|
|
if (!reader.IsDBNull(reader.GetOrdinal("performance_batch_size")) ||
|
|
!reader.IsDBNull(reader.GetOrdinal("performance_enable_compression")))
|
|
{
|
|
config.Performance = new PerformanceConfiguration
|
|
{
|
|
BatchSize = reader.IsDBNull(reader.GetOrdinal("performance_batch_size"))
|
|
? null
|
|
: reader.GetInt32(reader.GetOrdinal("performance_batch_size")),
|
|
EnableCompression = reader.IsDBNull(reader.GetOrdinal("performance_enable_compression"))
|
|
? null
|
|
: reader.GetBoolean(reader.GetOrdinal("performance_enable_compression")),
|
|
CompressionAlgorithm = reader.IsDBNull(reader.GetOrdinal("performance_compression_algorithm"))
|
|
? null
|
|
: reader.GetString(reader.GetOrdinal("performance_compression_algorithm")),
|
|
EnableIndexing = reader.IsDBNull(reader.GetOrdinal("performance_enable_indexing"))
|
|
? null
|
|
: reader.GetBoolean(reader.GetOrdinal("performance_enable_indexing")),
|
|
IndexedFields = reader.IsDBNull(reader.GetOrdinal("performance_indexed_fields"))
|
|
? null
|
|
: JsonSerializer.Deserialize<List<string>>(
|
|
reader.GetString(reader.GetOrdinal("performance_indexed_fields"))),
|
|
CacheSize = reader.IsDBNull(reader.GetOrdinal("performance_cache_size"))
|
|
? null
|
|
: reader.GetInt32(reader.GetOrdinal("performance_cache_size"))
|
|
};
|
|
}
|
|
|
|
// Map access control configuration (always create since fields have defaults)
|
|
config.AccessControl = new AccessControlConfiguration
|
|
{
|
|
PublicRead = !reader.IsDBNull(reader.GetOrdinal("access_public_read")) && reader.GetBoolean(reader.GetOrdinal("access_public_read")),
|
|
PublicWrite = !reader.IsDBNull(reader.GetOrdinal("access_public_write")) && reader.GetBoolean(reader.GetOrdinal("access_public_write")),
|
|
AllowedReaders = reader.IsDBNull(reader.GetOrdinal("access_allowed_readers"))
|
|
? null
|
|
: JsonSerializer.Deserialize<List<string>>(
|
|
reader.GetString(reader.GetOrdinal("access_allowed_readers"))),
|
|
AllowedWriters = reader.IsDBNull(reader.GetOrdinal("access_allowed_writers"))
|
|
? null
|
|
: JsonSerializer.Deserialize<List<string>>(
|
|
reader.GetString(reader.GetOrdinal("access_allowed_writers"))),
|
|
MaxConsumerGroups = reader.IsDBNull(reader.GetOrdinal("access_max_consumer_groups"))
|
|
? null
|
|
: reader.GetInt32(reader.GetOrdinal("access_max_consumer_groups")),
|
|
MaxEventsPerSecond = reader.IsDBNull(reader.GetOrdinal("access_max_events_per_second"))
|
|
? null
|
|
: reader.GetInt64(reader.GetOrdinal("access_max_events_per_second"))
|
|
};
|
|
|
|
return config;
|
|
}
|
|
}
|