dotnet-cqrs/Phase2TestProgram.cs

462 lines
16 KiB
C#

using System;
using Svrnty.CQRS.Events.Abstractions.Delivery;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Svrnty.CQRS.Events.Abstractions;
using Svrnty.CQRS.Events.Abstractions.Models;
using Svrnty.CQRS.Events.Storage;
namespace Svrnty.Phase2Testing;
/// <summary>
/// Phase 2.8: Comprehensive testing of event streaming features with InMemory provider
/// </summary>
public class Phase2TestProgram
{
private static readonly ILogger<InMemoryEventStreamStore> _logger = NullLogger<InMemoryEventStreamStore>.Instance;
private static int _testsPassed = 0;
private static int _testsFailed = 0;
public static async Task Main(string[] args)
{
Console.WriteLine("╔═══════════════════════════════════════════════════════════╗");
Console.WriteLine("║ Phase 2.8: Event Streaming Testing (InMemory Provider) ║");
Console.WriteLine("╚═══════════════════════════════════════════════════════════╝");
Console.WriteLine();
// Create store instance
var store = new InMemoryEventStreamStore(
Enumerable.Empty<IEventDeliveryProvider>(),
_logger);
// Run all test suites
await TestPersistentStreamAppendRead(store);
await TestEventReplay(store);
await TestStressLargeVolumes(store);
await TestEphemeralStreams(store);
// Print summary
PrintSummary();
}
// ========================================================================
// Phase 2.8.1: Test Persistent Stream Append/Read
// ========================================================================
private static async Task TestPersistentStreamAppendRead(IEventStreamStore store)
{
PrintHeader("Phase 2.8.1: Persistent Stream Append/Read");
const string streamName = "test-persistent-stream";
// Test 1: Append single event
PrintTest("Append single event to persistent stream");
var offset1 = await store.AppendAsync(streamName, CreateTestEvent("evt-001", "corr-001"));
if (offset1 == 0)
{
PrintPass("Event appended at offset 0");
}
else
{
PrintFail($"Expected offset 0, got {offset1}");
}
// Test 2: Append multiple events
PrintTest("Append multiple events sequentially");
var offset2 = await store.AppendAsync(streamName, CreateTestEvent("evt-002", "corr-002"));
var offset3 = await store.AppendAsync(streamName, CreateTestEvent("evt-003", "corr-003"));
var offset4 = await store.AppendAsync(streamName, CreateTestEvent("evt-004", "corr-004"));
if (offset2 == 1 && offset3 == 2 && offset4 == 3)
{
PrintPass("Events appended with sequential offsets (1, 2, 3)");
}
else
{
PrintFail($"Expected offsets 1,2,3 but got {offset2},{offset3},{offset4}");
}
// Test 3: Read stream from beginning
PrintTest("Read stream from offset 0");
var events = await store.ReadStreamAsync(streamName, fromOffset: 0, maxCount: 100);
if (events.Count == 4 &&
events[0].EventId == "evt-001" &&
events[3].EventId == "evt-004")
{
PrintPass($"Read {events.Count} events successfully");
}
else
{
PrintFail($"Expected 4 events starting with evt-001, got {events.Count} events");
}
// Test 4: Read stream from specific offset
PrintTest("Read stream from offset 2");
var eventsFromOffset = await store.ReadStreamAsync(streamName, fromOffset: 2, maxCount: 100);
if (eventsFromOffset.Count == 2 &&
eventsFromOffset[0].EventId == "evt-003" &&
eventsFromOffset[1].EventId == "evt-004")
{
PrintPass("Read from specific offset successful (2 events)");
}
else
{
PrintFail($"Expected 2 events (evt-003, evt-004), got {eventsFromOffset.Count} events");
}
// Test 5: Get stream length
PrintTest("Get stream length");
var length = await store.GetStreamLengthAsync(streamName);
if (length == 4)
{
PrintPass($"Stream length is correct: {length}");
}
else
{
PrintFail($"Expected length 4, got {length}");
}
// Test 6: Get stream metadata
PrintTest("Get stream metadata");
var metadata = await store.GetStreamMetadataAsync(streamName);
if (metadata.StreamName == streamName &&
metadata.Length == 4 &&
metadata.OldestEventOffset == 0)
{
PrintPass("Stream metadata retrieved successfully");
}
else
{
PrintFail($"Metadata incorrect: StreamName={metadata.StreamName}, Length={metadata.Length}");
}
}
// ========================================================================
// Phase 2.8.4: Test Event Replay from Various Positions
// ========================================================================
private static async Task TestEventReplay(IEventStreamStore store)
{
PrintHeader("Phase 2.8.4: Event Replay from Various Positions");
const string replayStream = "replay-test-stream";
// Create stream with 10 events
PrintTest("Creating stream with 10 events for replay testing");
for (int i = 1; i <= 10; i++)
{
await store.AppendAsync(replayStream, CreateTestEvent($"replay-evt-{i}", $"replay-corr-{i}"));
}
PrintPass("Created stream with 10 events");
// Test 1: Replay from beginning with limit
PrintTest("Replay from beginning (offset 0, maxCount 5)");
var eventsFromStart = await store.ReadStreamAsync(replayStream, fromOffset: 0, maxCount: 5);
if (eventsFromStart.Count == 5)
{
PrintPass($"Replay from beginning returned 5 events (limited by maxCount)");
}
else
{
PrintFail($"Expected 5 events, got {eventsFromStart.Count}");
}
// Test 2: Replay from middle
PrintTest("Replay from middle (offset 5)");
var eventsFromMiddle = await store.ReadStreamAsync(replayStream, fromOffset: 5, maxCount: 100);
if (eventsFromMiddle.Count == 5 &&
eventsFromMiddle[0].EventId == "replay-evt-6" &&
eventsFromMiddle[4].EventId == "replay-evt-10")
{
PrintPass("Replay from middle successful (5 events from offset 5)");
}
else
{
PrintFail($"Expected 5 events starting at replay-evt-6, got {eventsFromMiddle.Count}");
}
// Test 3: Replay from near end
PrintTest("Replay from near end (offset 8)");
var eventsFromEnd = await store.ReadStreamAsync(replayStream, fromOffset: 8, maxCount: 100);
if (eventsFromEnd.Count == 2)
{
PrintPass("Replay from near end returned 2 events (offsets 8 and 9)");
}
else
{
PrintFail($"Expected 2 events, got {eventsFromEnd.Count}");
}
// Test 4: Read entire stream
PrintTest("Read entire stream (maxCount 100)");
var allEvents = await store.ReadStreamAsync(replayStream, fromOffset: 0, maxCount: 100);
if (allEvents.Count == 10)
{
PrintPass($"Read entire stream successfully (10 events)");
}
else
{
PrintFail($"Expected 10 events, got {allEvents.Count}");
}
}
// ========================================================================
// Phase 2.8.6: Stress Test with Large Event Volumes
// ========================================================================
private static async Task TestStressLargeVolumes(IEventStreamStore store)
{
PrintHeader("Phase 2.8.6: Stress Test with Large Event Volumes");
const string stressStream = "stress-test-stream";
const int totalEvents = 1000;
// Test 1: Append 1000 events
PrintTest($"Appending {totalEvents} events");
var sw = Stopwatch.StartNew();
for (int i = 1; i <= totalEvents; i++)
{
await store.AppendAsync(
stressStream,
CreateTestEvent($"stress-evt-{i}", $"stress-corr-{i}", $"{{\"index\":{i},\"data\":\"Lorem ipsum dolor sit amet\"}}"));
if (i % 100 == 0)
{
Console.Write(".");
}
}
sw.Stop();
Console.WriteLine();
PrintPass($"Appended {totalEvents} events in {sw.ElapsedMilliseconds}ms");
// Test 2: Verify stream length
PrintTest($"Verify stream length is {totalEvents}");
var length = await store.GetStreamLengthAsync(stressStream);
if (length == totalEvents)
{
PrintPass($"Stream length verified: {length} events");
}
else
{
PrintFail($"Expected {totalEvents} events, got {length}");
}
// Test 3: Read large batch from stream
PrintTest("Reading 500 events from stream (offset 0)");
sw.Restart();
var events = await store.ReadStreamAsync(stressStream, fromOffset: 0, maxCount: 500);
sw.Stop();
if (events.Count == 500)
{
PrintPass($"Read 500 events in {sw.ElapsedMilliseconds}ms");
}
else
{
PrintFail($"Expected 500 events, got {events.Count}");
}
// Test 4: Read from middle of large stream
PrintTest("Reading events from middle of stream (offset 500)");
var eventsFromMiddle = await store.ReadStreamAsync(stressStream, fromOffset: 500, maxCount: 100);
if (eventsFromMiddle.Count == 100 && eventsFromMiddle[0].EventId == "stress-evt-501")
{
PrintPass("Successfully read from middle of large stream");
}
else
{
PrintFail($"Expected 100 events starting at stress-evt-501, got {eventsFromMiddle.Count}");
}
// Test 5: Multiple concurrent reads
PrintTest("Concurrent read performance (10 simultaneous reads)");
sw.Restart();
var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
tasks.Add(store.ReadStreamAsync(stressStream, fromOffset: 0, maxCount: 100));
}
await Task.WhenAll(tasks);
sw.Stop();
PrintPass($"Completed 10 concurrent reads in {sw.ElapsedMilliseconds}ms");
}
// ========================================================================
// Backward Compatibility: Ephemeral Streams
// ========================================================================
private static async Task TestEphemeralStreams(IEventStreamStore store)
{
PrintHeader("Backward Compatibility: Ephemeral Streams");
const string ephemeralStream = "ephemeral-test-queue";
// Test 1: Enqueue event
PrintTest("Enqueue event to ephemeral stream");
await store.EnqueueAsync(ephemeralStream, CreateTestEvent("eph-evt-001", "eph-corr-001"));
PrintPass("Enqueued event to ephemeral stream");
// Test 2: Dequeue event
PrintTest("Dequeue event from ephemeral stream");
var dequeuedEvent = await store.DequeueAsync(
ephemeralStream,
consumerId: "test-consumer",
visibilityTimeout: TimeSpan.FromSeconds(30));
if (dequeuedEvent != null && dequeuedEvent.EventId == "eph-evt-001")
{
PrintPass("Dequeued event successfully");
}
else
{
PrintFail("Failed to dequeue event or wrong event returned");
}
// Test 3: Acknowledge event
PrintTest("Acknowledge dequeued event");
var ackResult = await store.AcknowledgeAsync(
ephemeralStream,
eventId: "eph-evt-001",
consumerId: "test-consumer");
if (ackResult)
{
PrintPass("Event acknowledged successfully");
}
else
{
PrintFail("Failed to acknowledge event");
}
// Test 4: Verify queue is empty
PrintTest("Verify queue is empty after acknowledgment");
var count = await store.GetPendingCountAsync(ephemeralStream);
if (count == 0)
{
PrintPass("Queue is empty after acknowledgment");
}
else
{
PrintFail($"Expected 0 pending events, got {count}");
}
}
// ========================================================================
// Helper Methods
// ========================================================================
private static ICorrelatedEvent CreateTestEvent(string eventId, string correlationId, string? eventData = null)
{
return new TestEvent
{
EventId = eventId,
CorrelationId = correlationId,
EventData = eventData ?? $"{{\"test\":\"data-{eventId}\"}}",
OccurredAt = DateTimeOffset.UtcNow
};
}
private static void PrintHeader(string message)
{
Console.WriteLine();
Console.ForegroundColor = ConsoleColor.Blue;
Console.WriteLine("========================================");
Console.WriteLine(message);
Console.WriteLine("========================================");
Console.ResetColor();
Console.WriteLine();
}
private static void PrintTest(string message)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"▶ Test: {message}");
Console.ResetColor();
}
private static void PrintPass(string message)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"✓ PASS: {message}");
Console.ResetColor();
_testsPassed++;
}
private static void PrintFail(string message)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"✗ FAIL: {message}");
Console.ResetColor();
_testsFailed++;
}
private static void PrintSummary()
{
Console.WriteLine();
Console.ForegroundColor = ConsoleColor.Blue;
Console.WriteLine("========================================");
Console.WriteLine("Test Summary");
Console.WriteLine("========================================");
Console.ResetColor();
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"Tests Passed: {_testsPassed}");
Console.ResetColor();
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"Tests Failed: {_testsFailed}");
Console.ResetColor();
Console.ForegroundColor = ConsoleColor.Blue;
Console.WriteLine("========================================");
Console.ResetColor();
Console.WriteLine();
if (_testsFailed == 0)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("All tests passed!");
Console.ResetColor();
Environment.Exit(0);
}
else
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("Some tests failed!");
Console.ResetColor();
Environment.Exit(1);
}
}
// Simple test event class
private class TestEvent : ICorrelatedEvent
{
public required string EventId { get; set; }
public required string CorrelationId { get; set; }
public string EventData { get; set; } = string.Empty;
public DateTimeOffset OccurredAt { get; set; }
}
}