462 lines
16 KiB
C#
462 lines
16 KiB
C#
using System;
|
|
using Svrnty.CQRS.Events.Abstractions.Delivery;
|
|
using Svrnty.CQRS.Events.Abstractions.EventStore;
|
|
using System.Collections.Generic;
|
|
using System.Diagnostics;
|
|
using System.Linq;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
using Svrnty.CQRS.Events.Abstractions.Models;
|
|
using Svrnty.CQRS.Events.Storage;
|
|
|
|
namespace Svrnty.Phase2Testing;
|
|
|
|
/// <summary>
|
|
/// Phase 2.8: Comprehensive testing of event streaming features with InMemory provider
|
|
/// </summary>
|
|
public class Phase2TestProgram
|
|
{
|
|
private static readonly ILogger<InMemoryEventStreamStore> _logger = NullLogger<InMemoryEventStreamStore>.Instance;
|
|
private static int _testsPassed = 0;
|
|
private static int _testsFailed = 0;
|
|
|
|
public static async Task Main(string[] args)
|
|
{
|
|
Console.WriteLine("╔═══════════════════════════════════════════════════════════╗");
|
|
Console.WriteLine("║ Phase 2.8: Event Streaming Testing (InMemory Provider) ║");
|
|
Console.WriteLine("╚═══════════════════════════════════════════════════════════╝");
|
|
Console.WriteLine();
|
|
|
|
// Create store instance
|
|
var store = new InMemoryEventStreamStore(
|
|
Enumerable.Empty<IEventDeliveryProvider>(),
|
|
_logger);
|
|
|
|
// Run all test suites
|
|
await TestPersistentStreamAppendRead(store);
|
|
await TestEventReplay(store);
|
|
await TestStressLargeVolumes(store);
|
|
await TestEphemeralStreams(store);
|
|
|
|
// Print summary
|
|
PrintSummary();
|
|
}
|
|
|
|
// ========================================================================
|
|
// Phase 2.8.1: Test Persistent Stream Append/Read
|
|
// ========================================================================
|
|
|
|
private static async Task TestPersistentStreamAppendRead(IEventStreamStore store)
|
|
{
|
|
PrintHeader("Phase 2.8.1: Persistent Stream Append/Read");
|
|
|
|
const string streamName = "test-persistent-stream";
|
|
|
|
// Test 1: Append single event
|
|
PrintTest("Append single event to persistent stream");
|
|
var offset1 = await store.AppendAsync(streamName, CreateTestEvent("evt-001", "corr-001"));
|
|
if (offset1 == 0)
|
|
{
|
|
PrintPass("Event appended at offset 0");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected offset 0, got {offset1}");
|
|
}
|
|
|
|
// Test 2: Append multiple events
|
|
PrintTest("Append multiple events sequentially");
|
|
var offset2 = await store.AppendAsync(streamName, CreateTestEvent("evt-002", "corr-002"));
|
|
var offset3 = await store.AppendAsync(streamName, CreateTestEvent("evt-003", "corr-003"));
|
|
var offset4 = await store.AppendAsync(streamName, CreateTestEvent("evt-004", "corr-004"));
|
|
|
|
if (offset2 == 1 && offset3 == 2 && offset4 == 3)
|
|
{
|
|
PrintPass("Events appended with sequential offsets (1, 2, 3)");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected offsets 1,2,3 but got {offset2},{offset3},{offset4}");
|
|
}
|
|
|
|
// Test 3: Read stream from beginning
|
|
PrintTest("Read stream from offset 0");
|
|
var events = await store.ReadStreamAsync(streamName, fromOffset: 0, maxCount: 100);
|
|
|
|
if (events.Count == 4 &&
|
|
events[0].EventId == "evt-001" &&
|
|
events[3].EventId == "evt-004")
|
|
{
|
|
PrintPass($"Read {events.Count} events successfully");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 4 events starting with evt-001, got {events.Count} events");
|
|
}
|
|
|
|
// Test 4: Read stream from specific offset
|
|
PrintTest("Read stream from offset 2");
|
|
var eventsFromOffset = await store.ReadStreamAsync(streamName, fromOffset: 2, maxCount: 100);
|
|
|
|
if (eventsFromOffset.Count == 2 &&
|
|
eventsFromOffset[0].EventId == "evt-003" &&
|
|
eventsFromOffset[1].EventId == "evt-004")
|
|
{
|
|
PrintPass("Read from specific offset successful (2 events)");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 2 events (evt-003, evt-004), got {eventsFromOffset.Count} events");
|
|
}
|
|
|
|
// Test 5: Get stream length
|
|
PrintTest("Get stream length");
|
|
var length = await store.GetStreamLengthAsync(streamName);
|
|
|
|
if (length == 4)
|
|
{
|
|
PrintPass($"Stream length is correct: {length}");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected length 4, got {length}");
|
|
}
|
|
|
|
// Test 6: Get stream metadata
|
|
PrintTest("Get stream metadata");
|
|
var metadata = await store.GetStreamMetadataAsync(streamName);
|
|
|
|
if (metadata.StreamName == streamName &&
|
|
metadata.Length == 4 &&
|
|
metadata.OldestEventOffset == 0)
|
|
{
|
|
PrintPass("Stream metadata retrieved successfully");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Metadata incorrect: StreamName={metadata.StreamName}, Length={metadata.Length}");
|
|
}
|
|
}
|
|
|
|
// ========================================================================
|
|
// Phase 2.8.4: Test Event Replay from Various Positions
|
|
// ========================================================================
|
|
|
|
private static async Task TestEventReplay(IEventStreamStore store)
|
|
{
|
|
PrintHeader("Phase 2.8.4: Event Replay from Various Positions");
|
|
|
|
const string replayStream = "replay-test-stream";
|
|
|
|
// Create stream with 10 events
|
|
PrintTest("Creating stream with 10 events for replay testing");
|
|
for (int i = 1; i <= 10; i++)
|
|
{
|
|
await store.AppendAsync(replayStream, CreateTestEvent($"replay-evt-{i}", $"replay-corr-{i}"));
|
|
}
|
|
PrintPass("Created stream with 10 events");
|
|
|
|
// Test 1: Replay from beginning with limit
|
|
PrintTest("Replay from beginning (offset 0, maxCount 5)");
|
|
var eventsFromStart = await store.ReadStreamAsync(replayStream, fromOffset: 0, maxCount: 5);
|
|
|
|
if (eventsFromStart.Count == 5)
|
|
{
|
|
PrintPass($"Replay from beginning returned 5 events (limited by maxCount)");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 5 events, got {eventsFromStart.Count}");
|
|
}
|
|
|
|
// Test 2: Replay from middle
|
|
PrintTest("Replay from middle (offset 5)");
|
|
var eventsFromMiddle = await store.ReadStreamAsync(replayStream, fromOffset: 5, maxCount: 100);
|
|
|
|
if (eventsFromMiddle.Count == 5 &&
|
|
eventsFromMiddle[0].EventId == "replay-evt-6" &&
|
|
eventsFromMiddle[4].EventId == "replay-evt-10")
|
|
{
|
|
PrintPass("Replay from middle successful (5 events from offset 5)");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 5 events starting at replay-evt-6, got {eventsFromMiddle.Count}");
|
|
}
|
|
|
|
// Test 3: Replay from near end
|
|
PrintTest("Replay from near end (offset 8)");
|
|
var eventsFromEnd = await store.ReadStreamAsync(replayStream, fromOffset: 8, maxCount: 100);
|
|
|
|
if (eventsFromEnd.Count == 2)
|
|
{
|
|
PrintPass("Replay from near end returned 2 events (offsets 8 and 9)");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 2 events, got {eventsFromEnd.Count}");
|
|
}
|
|
|
|
// Test 4: Read entire stream
|
|
PrintTest("Read entire stream (maxCount 100)");
|
|
var allEvents = await store.ReadStreamAsync(replayStream, fromOffset: 0, maxCount: 100);
|
|
|
|
if (allEvents.Count == 10)
|
|
{
|
|
PrintPass($"Read entire stream successfully (10 events)");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 10 events, got {allEvents.Count}");
|
|
}
|
|
}
|
|
|
|
// ========================================================================
|
|
// Phase 2.8.6: Stress Test with Large Event Volumes
|
|
// ========================================================================
|
|
|
|
private static async Task TestStressLargeVolumes(IEventStreamStore store)
|
|
{
|
|
PrintHeader("Phase 2.8.6: Stress Test with Large Event Volumes");
|
|
|
|
const string stressStream = "stress-test-stream";
|
|
const int totalEvents = 1000;
|
|
|
|
// Test 1: Append 1000 events
|
|
PrintTest($"Appending {totalEvents} events");
|
|
var sw = Stopwatch.StartNew();
|
|
|
|
for (int i = 1; i <= totalEvents; i++)
|
|
{
|
|
await store.AppendAsync(
|
|
stressStream,
|
|
CreateTestEvent($"stress-evt-{i}", $"stress-corr-{i}", $"{{\"index\":{i},\"data\":\"Lorem ipsum dolor sit amet\"}}"));
|
|
|
|
if (i % 100 == 0)
|
|
{
|
|
Console.Write(".");
|
|
}
|
|
}
|
|
|
|
sw.Stop();
|
|
Console.WriteLine();
|
|
PrintPass($"Appended {totalEvents} events in {sw.ElapsedMilliseconds}ms");
|
|
|
|
// Test 2: Verify stream length
|
|
PrintTest($"Verify stream length is {totalEvents}");
|
|
var length = await store.GetStreamLengthAsync(stressStream);
|
|
|
|
if (length == totalEvents)
|
|
{
|
|
PrintPass($"Stream length verified: {length} events");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected {totalEvents} events, got {length}");
|
|
}
|
|
|
|
// Test 3: Read large batch from stream
|
|
PrintTest("Reading 500 events from stream (offset 0)");
|
|
sw.Restart();
|
|
var events = await store.ReadStreamAsync(stressStream, fromOffset: 0, maxCount: 500);
|
|
sw.Stop();
|
|
|
|
if (events.Count == 500)
|
|
{
|
|
PrintPass($"Read 500 events in {sw.ElapsedMilliseconds}ms");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 500 events, got {events.Count}");
|
|
}
|
|
|
|
// Test 4: Read from middle of large stream
|
|
PrintTest("Reading events from middle of stream (offset 500)");
|
|
var eventsFromMiddle = await store.ReadStreamAsync(stressStream, fromOffset: 500, maxCount: 100);
|
|
|
|
if (eventsFromMiddle.Count == 100 && eventsFromMiddle[0].EventId == "stress-evt-501")
|
|
{
|
|
PrintPass("Successfully read from middle of large stream");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 100 events starting at stress-evt-501, got {eventsFromMiddle.Count}");
|
|
}
|
|
|
|
// Test 5: Multiple concurrent reads
|
|
PrintTest("Concurrent read performance (10 simultaneous reads)");
|
|
sw.Restart();
|
|
|
|
var tasks = new List<Task>();
|
|
for (int i = 0; i < 10; i++)
|
|
{
|
|
tasks.Add(store.ReadStreamAsync(stressStream, fromOffset: 0, maxCount: 100));
|
|
}
|
|
|
|
await Task.WhenAll(tasks);
|
|
sw.Stop();
|
|
|
|
PrintPass($"Completed 10 concurrent reads in {sw.ElapsedMilliseconds}ms");
|
|
}
|
|
|
|
// ========================================================================
|
|
// Backward Compatibility: Ephemeral Streams
|
|
// ========================================================================
|
|
|
|
private static async Task TestEphemeralStreams(IEventStreamStore store)
|
|
{
|
|
PrintHeader("Backward Compatibility: Ephemeral Streams");
|
|
|
|
const string ephemeralStream = "ephemeral-test-queue";
|
|
|
|
// Test 1: Enqueue event
|
|
PrintTest("Enqueue event to ephemeral stream");
|
|
await store.EnqueueAsync(ephemeralStream, CreateTestEvent("eph-evt-001", "eph-corr-001"));
|
|
PrintPass("Enqueued event to ephemeral stream");
|
|
|
|
// Test 2: Dequeue event
|
|
PrintTest("Dequeue event from ephemeral stream");
|
|
var dequeuedEvent = await store.DequeueAsync(
|
|
ephemeralStream,
|
|
consumerId: "test-consumer",
|
|
visibilityTimeout: TimeSpan.FromSeconds(30));
|
|
|
|
if (dequeuedEvent != null && dequeuedEvent.EventId == "eph-evt-001")
|
|
{
|
|
PrintPass("Dequeued event successfully");
|
|
}
|
|
else
|
|
{
|
|
PrintFail("Failed to dequeue event or wrong event returned");
|
|
}
|
|
|
|
// Test 3: Acknowledge event
|
|
PrintTest("Acknowledge dequeued event");
|
|
var ackResult = await store.AcknowledgeAsync(
|
|
ephemeralStream,
|
|
eventId: "eph-evt-001",
|
|
consumerId: "test-consumer");
|
|
|
|
if (ackResult)
|
|
{
|
|
PrintPass("Event acknowledged successfully");
|
|
}
|
|
else
|
|
{
|
|
PrintFail("Failed to acknowledge event");
|
|
}
|
|
|
|
// Test 4: Verify queue is empty
|
|
PrintTest("Verify queue is empty after acknowledgment");
|
|
var count = await store.GetPendingCountAsync(ephemeralStream);
|
|
|
|
if (count == 0)
|
|
{
|
|
PrintPass("Queue is empty after acknowledgment");
|
|
}
|
|
else
|
|
{
|
|
PrintFail($"Expected 0 pending events, got {count}");
|
|
}
|
|
}
|
|
|
|
// ========================================================================
|
|
// Helper Methods
|
|
// ========================================================================
|
|
|
|
private static ICorrelatedEvent CreateTestEvent(string eventId, string correlationId, string? eventData = null)
|
|
{
|
|
return new TestEvent
|
|
{
|
|
EventId = eventId,
|
|
CorrelationId = correlationId,
|
|
EventData = eventData ?? $"{{\"test\":\"data-{eventId}\"}}",
|
|
OccurredAt = DateTimeOffset.UtcNow
|
|
};
|
|
}
|
|
|
|
private static void PrintHeader(string message)
|
|
{
|
|
Console.WriteLine();
|
|
Console.ForegroundColor = ConsoleColor.Blue;
|
|
Console.WriteLine("========================================");
|
|
Console.WriteLine(message);
|
|
Console.WriteLine("========================================");
|
|
Console.ResetColor();
|
|
Console.WriteLine();
|
|
}
|
|
|
|
private static void PrintTest(string message)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Yellow;
|
|
Console.WriteLine($"▶ Test: {message}");
|
|
Console.ResetColor();
|
|
}
|
|
|
|
private static void PrintPass(string message)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Green;
|
|
Console.WriteLine($"✓ PASS: {message}");
|
|
Console.ResetColor();
|
|
_testsPassed++;
|
|
}
|
|
|
|
private static void PrintFail(string message)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Red;
|
|
Console.WriteLine($"✗ FAIL: {message}");
|
|
Console.ResetColor();
|
|
_testsFailed++;
|
|
}
|
|
|
|
private static void PrintSummary()
|
|
{
|
|
Console.WriteLine();
|
|
Console.ForegroundColor = ConsoleColor.Blue;
|
|
Console.WriteLine("========================================");
|
|
Console.WriteLine("Test Summary");
|
|
Console.WriteLine("========================================");
|
|
Console.ResetColor();
|
|
|
|
Console.ForegroundColor = ConsoleColor.Green;
|
|
Console.WriteLine($"Tests Passed: {_testsPassed}");
|
|
Console.ResetColor();
|
|
|
|
Console.ForegroundColor = ConsoleColor.Red;
|
|
Console.WriteLine($"Tests Failed: {_testsFailed}");
|
|
Console.ResetColor();
|
|
|
|
Console.ForegroundColor = ConsoleColor.Blue;
|
|
Console.WriteLine("========================================");
|
|
Console.ResetColor();
|
|
Console.WriteLine();
|
|
|
|
if (_testsFailed == 0)
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Green;
|
|
Console.WriteLine("All tests passed!");
|
|
Console.ResetColor();
|
|
Environment.Exit(0);
|
|
}
|
|
else
|
|
{
|
|
Console.ForegroundColor = ConsoleColor.Red;
|
|
Console.WriteLine("Some tests failed!");
|
|
Console.ResetColor();
|
|
Environment.Exit(1);
|
|
}
|
|
}
|
|
|
|
// Simple test event class
|
|
private class TestEvent : ICorrelatedEvent
|
|
{
|
|
public required string EventId { get; set; }
|
|
public required string CorrelationId { get; set; }
|
|
public string EventData { get; set; } = string.Empty;
|
|
public DateTimeOffset OccurredAt { get; set; }
|
|
}
|
|
}
|