# In-Memory Storage Fast in-memory storage for development and testing. ## Overview In-memory storage provides a lightweight, zero-setup option for development, testing, and prototyping. Events are stored in memory using thread-safe collections and are lost when the application stops. **Use Cases:** - ✅ Unit testing - ✅ Local development - ✅ Prototyping - ✅ Learning the framework - ✅ CI/CD test pipelines ## Installation ```bash dotnet add package Svrnty.CQRS.Events ``` ## Configuration ### Basic Setup ```csharp using Svrnty.CQRS.Events; var builder = WebApplication.CreateBuilder(args); // Register in-memory event streaming builder.Services.AddInMemoryEventStreaming(); var app = builder.Build(); app.Run(); ``` ### Full Example ```csharp using Svrnty.CQRS.Events; var builder = WebApplication.CreateBuilder(args); // Event streaming builder.Services.AddInMemoryEventStreaming(); // Your services builder.Services.AddScoped(); builder.Services.AddHostedService(); var app = builder.Build(); app.Run(); ``` ## Usage ### Publishing Events ```csharp public class OrderService { private readonly IEventStreamStore _eventStore; public OrderService(IEventStreamStore eventStore) { _eventStore = eventStore; } public async Task PlaceOrderAsync(int orderId, string customer, decimal amount) { var @event = new OrderPlacedEvent { OrderId = orderId, CustomerName = customer, TotalAmount = amount, PlacedAt = DateTimeOffset.UtcNow }; // Append to in-memory stream await _eventStore.AppendAsync("orders", new[] { @event }); } } ``` ### Reading Events ```csharp public class OrderEventProcessor { public async Task ProcessOrdersAsync() { // Read all events from in-memory stream await foreach (var @event in _eventStore.ReadStreamAsync("orders", fromOffset: 0)) { var eventData = JsonSerializer.Deserialize( @event.Data, Type.GetType(@event.EventType)); if (eventData is OrderPlacedEvent placed) { Console.WriteLine($"Order placed: {placed.OrderId}"); } } } } ``` ## Unit Testing ### Testing with In-Memory Store ```csharp public class OrderServiceTests { private readonly ServiceProvider _serviceProvider; private readonly IEventStreamStore _eventStore; private readonly OrderService _orderService; public OrderServiceTests() { var services = new ServiceCollection(); // Use in-memory storage for tests services.AddInMemoryEventStreaming(); services.AddScoped(); _serviceProvider = services.BuildServiceProvider(); _eventStore = _serviceProvider.GetRequiredService(); _orderService = _serviceProvider.GetRequiredService(); } [Fact] public async Task PlaceOrder_PublishesEvent() { // Act await _orderService.PlaceOrderAsync( orderId: 123, customer: "John Doe", amount: 99.99m); // Assert var events = new List(); await foreach (var evt in _eventStore.ReadStreamAsync("orders", 0)) { events.Add(evt); } Assert.Single(events); Assert.Equal("OrderPlacedEvent", events[0].EventType); var orderPlaced = JsonSerializer.Deserialize(events[0].Data); Assert.Equal(123, orderPlaced.OrderId); Assert.Equal("John Doe", orderPlaced.CustomerName); Assert.Equal(99.99m, orderPlaced.TotalAmount); } [Fact] public async Task PlaceMultipleOrders_StoresInOrder() { // Act await _orderService.PlaceOrderAsync(1, "Alice", 10m); await _orderService.PlaceOrderAsync(2, "Bob", 20m); await _orderService.PlaceOrderAsync(3, "Charlie", 30m); // Assert var events = new List(); await foreach (var evt in _eventStore.ReadStreamAsync("orders", 0)) { events.Add(evt); } Assert.Equal(3, events.Count); Assert.Equal(0, events[0].Offset); Assert.Equal(1, events[1].Offset); Assert.Equal(2, events[2].Offset); } } ``` ### Testing Projections ```csharp public class OrderSummaryProjectionTests { [Fact] public async Task Projection_UpdatesReadModel() { // Arrange var services = new ServiceCollection(); services.AddInMemoryEventStreaming(); services.AddSingleton(); services.AddSingleton(); var provider = services.BuildServiceProvider(); var eventStore = provider.GetRequiredService(); var projection = provider.GetRequiredService(); var repository = provider.GetRequiredService(); // Publish events await eventStore.AppendAsync("orders", new[] { new OrderPlacedEvent { OrderId = 1, CustomerName = "Alice", TotalAmount = 100m }, new OrderPlacedEvent { OrderId = 2, CustomerName = "Bob", TotalAmount = 200m } }); // Act await projection.RunAsync(); // Assert var summaries = repository.GetAllOrderSummaries(); Assert.Equal(2, summaries.Count); Assert.Contains(summaries, s => s.OrderId == 1 && s.TotalAmount == 100m); Assert.Contains(summaries, s => s.OrderId == 2 && s.TotalAmount == 200m); } } ``` ## Integration Testing ### Testing Background Workers ```csharp public class OrderProcessingWorkerTests { [Fact] public async Task Worker_ProcessesEvents() { // Arrange var services = new ServiceCollection(); services.AddInMemoryEventStreaming(); services.AddHostedService(); var provider = services.BuildServiceProvider(); var eventStore = provider.GetRequiredService(); // Publish test events await eventStore.AppendAsync("orders", new[] { new OrderPlacedEvent { OrderId = 1 }, new OrderPlacedEvent { OrderId = 2 } }); // Act var host = provider.GetRequiredService(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await host.StartAsync(cts.Token); await Task.Delay(1000); // Let worker process await host.StopAsync(cts.Token); // Assert // Verify worker processed events (check side effects) } } ``` ## Ephemeral Streams (Queue) ### Testing Message Queues ```csharp public class EmailQueueTests { [Fact] public async Task EnqueueDequeue_WorksCorrectly() { // Arrange var services = new ServiceCollection(); services.AddInMemoryEventStreaming(); var provider = services.BuildServiceProvider(); var eventStore = provider.GetRequiredService(); // Enqueue await eventStore.EnqueueAsync("email-queue", new SendEmailCommand { To = "test@example.com", Subject = "Test", Body = "Hello" }); // Dequeue var message = await eventStore.DequeueAsync( "email-queue", TimeSpan.FromMinutes(5)); Assert.NotNull(message); var command = JsonSerializer.Deserialize(message.Data); Assert.Equal("test@example.com", command.To); // Acknowledge await eventStore.AcknowledgeAsync("email-queue", message.MessageId); // Should be empty now var nextMessage = await eventStore.DequeueAsync("email-queue", TimeSpan.FromSeconds(1)); Assert.Null(nextMessage); } [Fact] public async Task Dequeue_WithoutAck_RedeliversMessage() { // Arrange var services = new ServiceCollection(); services.AddInMemoryEventStreaming(); var provider = services.BuildServiceProvider(); var eventStore = provider.GetRequiredService(); await eventStore.EnqueueAsync("queue", new { Data = "test" }); // Dequeue with short visibility timeout var message1 = await eventStore.DequeueAsync("queue", TimeSpan.FromMilliseconds(100)); Assert.NotNull(message1); // Don't acknowledge, wait for timeout await Task.Delay(150); // Message should be visible again var message2 = await eventStore.DequeueAsync("queue", TimeSpan.FromMinutes(1)); Assert.NotNull(message2); Assert.Equal(message1.MessageId, message2.MessageId); } } ``` ## Limitations ### No Persistence ```csharp // ❌ Data lost on application restart await _eventStore.AppendAsync("orders", events); // Stop application // Start application var count = await CountEventsAsync("orders"); // Returns 0 ``` ### No Consumer Groups ```csharp // ❌ Consumer groups not supported in-memory // Use PostgreSQL for consumer group coordination services.AddInMemoryEventStreaming(); // No consumer groups ``` ### Single Process Only ```csharp // ❌ Cannot share in-memory store across processes // Process 1: publishes events // Process 2: cannot see events from Process 1 ``` ## Performance ### Benchmarks In-memory storage is extremely fast: | Operation | Throughput | |-----------|------------| | Append (single) | ~200,000/sec | | Append (batch 100) | ~2,000,000 events/sec | | Read | ~500,000/sec | | Enqueue | ~150,000/sec | | Dequeue | ~100,000/sec | ### Memory Usage Monitor memory consumption for large streams: ```csharp // Track memory usage var before = GC.GetTotalMemory(forceFullCollection: false); // Append 100,000 events for (int i = 0; i < 100_000; i++) { await _eventStore.AppendAsync("large-stream", new[] { new TestEvent { Id = i } }); } var after = GC.GetTotalMemory(forceFullCollection: false); Console.WriteLine($"Memory used: {(after - before) / 1024 / 1024} MB"); ``` ## Best Practices ### ✅ DO - Use for unit tests - Use for local development - Clear state between tests - Monitor memory usage for large streams - Use for prototyping ### ❌ DON'T - Don't use in production - Don't expect persistence - Don't use for multi-instance scenarios - Don't use for long-term storage - Don't use for consumer group coordination ## Switching to PostgreSQL When ready for production, switch to PostgreSQL: **Before (Development):** ```csharp builder.Services.AddInMemoryEventStreaming(); ``` **After (Production):** ```csharp builder.Services.AddPostgresEventStreaming( builder.Configuration.GetConnectionString("EventStore")); builder.Services.AddPostgresConsumerGroups( builder.Configuration.GetSection("EventStreaming:ConsumerGroups")); ``` No code changes needed - same interface! ## See Also - [Storage Overview](README.md) - [PostgreSQL Storage](postgresql-storage.md) - [Getting Started](../fundamentals/getting-started.md) - [Testing Best Practices](../../best-practices/testing.md)