dotnet-cqrs/docs/event-streaming/stream-configuration/performance-config.md

12 KiB

Performance Configuration

Optimize stream performance with batching, compression, indexing, and caching.

Overview

Performance configuration tunes stream operations for throughput and latency:

  • Batch Size - Events per database query
  • Compression - Reduce storage and network I/O
  • Indexing - Speed up queries on metadata fields
  • Caching - In-memory caching for hot events

Quick Start

using Svrnty.CQRS.Events.Abstractions;

var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "orders",
    Performance = new PerformanceConfiguration
    {
        BatchSize = 1000,
        EnableCompression = true,
        EnableIndexing = true,
        IndexedFields = new List<string> { "userId", "tenantId" }
    }
});

Performance Properties

public class PerformanceConfiguration
{
    public int BatchSize { get; set; }                         // Events per query
    public bool EnableCompression { get; set; }                // Compress event data
    public string CompressionAlgorithm { get; set; }           // gzip, lz4, zstd
    public bool EnableIndexing { get; set; }                   // Index metadata
    public List<string> IndexedFields { get; set; }            // Fields to index
    public int CacheSize { get; set; }                         // Cache event count
    public TimeSpan CacheTtl { get; set; }                     // Cache expiration
    public bool EnableReadAhead { get; set; }                  // Prefetch batches
    public int ReadAheadBatches { get; set; }                  // Prefetch count
}

Batch Size

Control database query batch size:

// Small batches - lower latency, higher overhead
var performance = new PerformanceConfiguration
{
    BatchSize = 100  // Good for real-time processing
};

// Medium batches - balanced (default)
var performance = new PerformanceConfiguration
{
    BatchSize = 500  // Good general-purpose setting
};

// Large batches - higher throughput, more memory
var performance = new PerformanceConfiguration
{
    BatchSize = 5000  // Good for bulk processing
};

Batch Size Impact

// Small batch - many queries
var performance = new PerformanceConfiguration
{
    BatchSize = 100
};
// Process 100k events = 1000 database queries

// Large batch - fewer queries
var performance = new PerformanceConfiguration
{
    BatchSize = 10000
};
// Process 100k events = 10 database queries

Compression

Reduce storage and network I/O:

// GZIP - Best compression ratio
var performance = new PerformanceConfiguration
{
    EnableCompression = true,
    CompressionAlgorithm = "gzip"  // Slower, best ratio
};

// LZ4 - Fastest compression
var performance = new PerformanceConfiguration
{
    EnableCompression = true,
    CompressionAlgorithm = "lz4"  // Fastest, good ratio
};

// Zstandard - Balanced
var performance = new PerformanceConfiguration
{
    EnableCompression = true,
    CompressionAlgorithm = "zstd"  // Fast, excellent ratio
};

Compression Comparison

Algorithm Speed Ratio Best For
gzip Slow High Cold storage, archives
lz4 Very Fast Good Real-time streams
zstd Fast Excellent General purpose

When to Use Compression

// ✅ Large events - Good candidate
var performance = new PerformanceConfiguration
{
    EnableCompression = true,  // Event size > 1 KB
    CompressionAlgorithm = "lz4"
};

// ❌ Small events - Not worth it
var performance = new PerformanceConfiguration
{
    EnableCompression = false  // Event size < 100 bytes
};

Indexing

Index metadata fields for fast queries:

// Index common query fields
var performance = new PerformanceConfiguration
{
    EnableIndexing = true,
    IndexedFields = new List<string>
    {
        "userId",      // Filter by user
        "tenantId",    // Multi-tenant isolation
        "orderId",     // Lookup by order
        "eventType"    // Filter by event type
    }
};

Index Selection

// ✅ Good - Frequently queried fields
var performance = new PerformanceConfiguration
{
    EnableIndexing = true,
    IndexedFields = new List<string> { "userId", "tenantId" }
};

// ❌ Bad - Too many indexes (slows writes)
var performance = new PerformanceConfiguration
{
    EnableIndexing = true,
    IndexedFields = new List<string>
    {
        "userId", "tenantId", "orderId", "productId",
        "categoryId", "regionId", "statusCode", "...20 more fields"
    }
};

Index Impact

// Without index
SELECT * FROM events
WHERE stream_name = 'orders'
  AND metadata->>'userId' = '12345'
  AND timestamp > NOW() - INTERVAL '7 days';
// Full table scan: 10 seconds for 10M events

// With index
CREATE INDEX idx_events_user_id ON events ((metadata->>'userId'));
// Index scan: 50ms for same query

Caching

Cache hot events in memory:

// Cache last 10,000 events
var performance = new PerformanceConfiguration
{
    CacheSize = 10000,
    CacheTtl = TimeSpan.FromMinutes(5)
};

// Cache last 100,000 events (high memory)
var performance = new PerformanceConfiguration
{
    CacheSize = 100000,
    CacheTtl = TimeSpan.FromMinutes(10)
};

// Disable caching
var performance = new PerformanceConfiguration
{
    CacheSize = 0
};

Cache Effectiveness

// ✅ Good - Read-heavy workload
var performance = new PerformanceConfiguration
{
    CacheSize = 50000  // Cache hot events
};
// Repeated reads from cache: 1ms vs 10ms from DB

// ❌ Not useful - Write-heavy workload
var performance = new PerformanceConfiguration
{
    CacheSize = 0  // No caching for write-heavy streams
};

Read-Ahead

Prefetch batches for sequential reads:

// Enable read-ahead for sequential processing
var performance = new PerformanceConfiguration
{
    BatchSize = 1000,
    EnableReadAhead = true,
    ReadAheadBatches = 2  // Prefetch 2 batches ahead
};

// Process events
await foreach (var @event in eventStore.ReadStreamAsync("orders"))
{
    // Batches prefetched in background
    await ProcessEventAsync(@event);
}

Domain-Specific Examples

High-Volume Orders

// Optimize for throughput
var orderPerformance = new PerformanceConfiguration
{
    BatchSize = 5000,                    // Large batches
    EnableCompression = true,
    CompressionAlgorithm = "lz4",        // Fast compression
    EnableIndexing = true,
    IndexedFields = new List<string> { "userId", "orderId", "tenantId" },
    CacheSize = 100000,                  // Cache last 100k
    CacheTtl = TimeSpan.FromMinutes(10),
    EnableReadAhead = true,
    ReadAheadBatches = 3                 // Aggressive prefetch
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "orders",
    Performance = orderPerformance,
    Tags = new List<string> { "high-volume", "production" }
});

Real-Time Analytics

// Optimize for low latency
var analyticsPerformance = new PerformanceConfiguration
{
    BatchSize = 100,                     // Small batches, low latency
    EnableCompression = false,           // Skip compression overhead
    EnableIndexing = true,
    IndexedFields = new List<string> { "userId", "eventType" },
    CacheSize = 10000,                   // Moderate cache
    CacheTtl = TimeSpan.FromMinutes(1),
    EnableReadAhead = false              // No prefetch needed
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "real-time-analytics",
    Performance = analyticsPerformance
});

Audit Logs

// Optimize for storage
var auditPerformance = new PerformanceConfiguration
{
    BatchSize = 1000,
    EnableCompression = true,
    CompressionAlgorithm = "gzip",       // Maximum compression
    EnableIndexing = true,
    IndexedFields = new List<string> { "userId", "action", "resourceId" },
    CacheSize = 0,                       // No caching (rarely re-read)
    EnableReadAhead = false
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "audit-logs",
    Performance = auditPerformance,
    Tags = new List<string> { "compliance", "audit" }
});

Session Events

// Optimize for memory
var sessionPerformance = new PerformanceConfiguration
{
    BatchSize = 500,
    EnableCompression = true,
    CompressionAlgorithm = "lz4",
    EnableIndexing = true,
    IndexedFields = new List<string> { "sessionId", "userId" },
    CacheSize = 50000,                   // Large cache (hot data)
    CacheTtl = TimeSpan.FromMinutes(30),
    EnableReadAhead = true,
    ReadAheadBatches = 2
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "user-sessions",
    Performance = sessionPerformance
});

Performance Tuning

Measuring Performance

// Benchmark different configurations
var stopwatch = Stopwatch.StartNew();

await foreach (var @event in eventStore.ReadStreamAsync("orders"))
{
    await ProcessEventAsync(@event);
}

stopwatch.Stop();

_logger.LogInformation(
    "Processed stream in {Duration}ms with batch size {BatchSize}",
    stopwatch.ElapsedMilliseconds,
    batchSize);

A/B Testing Configurations

// Test batch size impact
var configs = new[]
{
    new { BatchSize = 100, Name = "Small" },
    new { BatchSize = 500, Name = "Medium" },
    new { BatchSize = 2000, Name = "Large" }
};

foreach (var config in configs)
{
    var performance = new PerformanceConfiguration
    {
        BatchSize = config.BatchSize
    };

    await configStore.SetConfigurationAsync(new StreamConfiguration
    {
        StreamName = "test-stream",
        Performance = performance
    });

    var duration = await BenchmarkStreamProcessingAsync("test-stream");

    _logger.LogInformation(
        "{Name} batch ({Size}): {Duration}ms",
        config.Name,
        config.BatchSize,
        duration);
}

Monitoring Performance

// Track performance metrics
var metrics = new
{
    StreamName = "orders",
    BatchSize = config.Performance.BatchSize,
    CompressionEnabled = config.Performance.EnableCompression,
    CacheHitRate = await GetCacheHitRateAsync("orders"),
    AvgQueryTime = await GetAvgQueryTimeAsync("orders"),
    EventsPerSecond = await GetThroughputAsync("orders")
};

_logger.LogInformation(
    "Stream {Stream}: {EventsPerSec} events/sec, {CacheHitRate:F1}% cache hits, {AvgQueryTime}ms avg query",
    metrics.StreamName,
    metrics.EventsPerSecond,
    metrics.CacheHitRate,
    metrics.AvgQueryTime);

Best Practices

DO

  • Start with default settings and tune based on metrics
  • Use larger batches for bulk processing
  • Enable compression for large events (> 1 KB)
  • Index fields used in queries
  • Cache hot streams with frequent reads
  • Enable read-ahead for sequential processing
  • Benchmark configuration changes
  • Monitor cache hit rates
  • Use LZ4 for real-time streams
  • Use GZIP for archives

DON'T

  • Don't use very large batches (> 10000) - high memory usage
  • Don't compress small events (< 100 bytes)
  • Don't over-index (slows writes)
  • Don't cache cold streams
  • Don't enable read-ahead for random access
  • Don't forget to monitor performance
  • Don't use same config for all streams
  • Don't optimize prematurely

See Also