dotnet-cqrs/docs/event-streaming/storage/in-memory-storage.md

11 KiB

In-Memory Storage

Fast in-memory storage for development and testing.

Overview

In-memory storage provides a lightweight, zero-setup option for development, testing, and prototyping. Events are stored in memory using thread-safe collections and are lost when the application stops.

Use Cases:

  • Unit testing
  • Local development
  • Prototyping
  • Learning the framework
  • CI/CD test pipelines

Installation

dotnet add package Svrnty.CQRS.Events

Configuration

Basic Setup

using Svrnty.CQRS.Events;

var builder = WebApplication.CreateBuilder(args);

// Register in-memory event streaming
builder.Services.AddInMemoryEventStreaming();

var app = builder.Build();
app.Run();

Full Example

using Svrnty.CQRS.Events;

var builder = WebApplication.CreateBuilder(args);

// Event streaming
builder.Services.AddInMemoryEventStreaming();

// Your services
builder.Services.AddScoped<OrderService>();
builder.Services.AddHostedService<OrderEventProcessor>();

var app = builder.Build();
app.Run();

Usage

Publishing Events

public class OrderService
{
    private readonly IEventStreamStore _eventStore;

    public OrderService(IEventStreamStore eventStore)
    {
        _eventStore = eventStore;
    }

    public async Task PlaceOrderAsync(int orderId, string customer, decimal amount)
    {
        var @event = new OrderPlacedEvent
        {
            OrderId = orderId,
            CustomerName = customer,
            TotalAmount = amount,
            PlacedAt = DateTimeOffset.UtcNow
        };

        // Append to in-memory stream
        await _eventStore.AppendAsync("orders", new[] { @event });
    }
}

Reading Events

public class OrderEventProcessor
{
    public async Task ProcessOrdersAsync()
    {
        // Read all events from in-memory stream
        await foreach (var @event in _eventStore.ReadStreamAsync("orders", fromOffset: 0))
        {
            var eventData = JsonSerializer.Deserialize(
                @event.Data,
                Type.GetType(@event.EventType));

            if (eventData is OrderPlacedEvent placed)
            {
                Console.WriteLine($"Order placed: {placed.OrderId}");
            }
        }
    }
}

Unit Testing

Testing with In-Memory Store

public class OrderServiceTests
{
    private readonly ServiceProvider _serviceProvider;
    private readonly IEventStreamStore _eventStore;
    private readonly OrderService _orderService;

    public OrderServiceTests()
    {
        var services = new ServiceCollection();

        // Use in-memory storage for tests
        services.AddInMemoryEventStreaming();
        services.AddScoped<OrderService>();

        _serviceProvider = services.BuildServiceProvider();
        _eventStore = _serviceProvider.GetRequiredService<IEventStreamStore>();
        _orderService = _serviceProvider.GetRequiredService<OrderService>();
    }

    [Fact]
    public async Task PlaceOrder_PublishesEvent()
    {
        // Act
        await _orderService.PlaceOrderAsync(
            orderId: 123,
            customer: "John Doe",
            amount: 99.99m);

        // Assert
        var events = new List<StoredEvent>();
        await foreach (var evt in _eventStore.ReadStreamAsync("orders", 0))
        {
            events.Add(evt);
        }

        Assert.Single(events);
        Assert.Equal("OrderPlacedEvent", events[0].EventType);

        var orderPlaced = JsonSerializer.Deserialize<OrderPlacedEvent>(events[0].Data);
        Assert.Equal(123, orderPlaced.OrderId);
        Assert.Equal("John Doe", orderPlaced.CustomerName);
        Assert.Equal(99.99m, orderPlaced.TotalAmount);
    }

    [Fact]
    public async Task PlaceMultipleOrders_StoresInOrder()
    {
        // Act
        await _orderService.PlaceOrderAsync(1, "Alice", 10m);
        await _orderService.PlaceOrderAsync(2, "Bob", 20m);
        await _orderService.PlaceOrderAsync(3, "Charlie", 30m);

        // Assert
        var events = new List<StoredEvent>();
        await foreach (var evt in _eventStore.ReadStreamAsync("orders", 0))
        {
            events.Add(evt);
        }

        Assert.Equal(3, events.Count);
        Assert.Equal(0, events[0].Offset);
        Assert.Equal(1, events[1].Offset);
        Assert.Equal(2, events[2].Offset);
    }
}

Testing Projections

public class OrderSummaryProjectionTests
{
    [Fact]
    public async Task Projection_UpdatesReadModel()
    {
        // Arrange
        var services = new ServiceCollection();
        services.AddInMemoryEventStreaming();
        services.AddSingleton<InMemoryReadRepository>();
        services.AddSingleton<OrderSummaryProjection>();

        var provider = services.BuildServiceProvider();
        var eventStore = provider.GetRequiredService<IEventStreamStore>();
        var projection = provider.GetRequiredService<OrderSummaryProjection>();
        var repository = provider.GetRequiredService<InMemoryReadRepository>();

        // Publish events
        await eventStore.AppendAsync("orders", new[]
        {
            new OrderPlacedEvent { OrderId = 1, CustomerName = "Alice", TotalAmount = 100m },
            new OrderPlacedEvent { OrderId = 2, CustomerName = "Bob", TotalAmount = 200m }
        });

        // Act
        await projection.RunAsync();

        // Assert
        var summaries = repository.GetAllOrderSummaries();
        Assert.Equal(2, summaries.Count);
        Assert.Contains(summaries, s => s.OrderId == 1 && s.TotalAmount == 100m);
        Assert.Contains(summaries, s => s.OrderId == 2 && s.TotalAmount == 200m);
    }
}

Integration Testing

Testing Background Workers

public class OrderProcessingWorkerTests
{
    [Fact]
    public async Task Worker_ProcessesEvents()
    {
        // Arrange
        var services = new ServiceCollection();
        services.AddInMemoryEventStreaming();
        services.AddHostedService<OrderProcessingWorker>();

        var provider = services.BuildServiceProvider();
        var eventStore = provider.GetRequiredService<IEventStreamStore>();

        // Publish test events
        await eventStore.AppendAsync("orders", new[]
        {
            new OrderPlacedEvent { OrderId = 1 },
            new OrderPlacedEvent { OrderId = 2 }
        });

        // Act
        var host = provider.GetRequiredService<IHostedService>();
        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

        await host.StartAsync(cts.Token);
        await Task.Delay(1000);  // Let worker process
        await host.StopAsync(cts.Token);

        // Assert
        // Verify worker processed events (check side effects)
    }
}

Ephemeral Streams (Queue)

Testing Message Queues

public class EmailQueueTests
{
    [Fact]
    public async Task EnqueueDequeue_WorksCorrectly()
    {
        // Arrange
        var services = new ServiceCollection();
        services.AddInMemoryEventStreaming();
        var provider = services.BuildServiceProvider();
        var eventStore = provider.GetRequiredService<IEventStreamStore>();

        // Enqueue
        await eventStore.EnqueueAsync("email-queue", new SendEmailCommand
        {
            To = "test@example.com",
            Subject = "Test",
            Body = "Hello"
        });

        // Dequeue
        var message = await eventStore.DequeueAsync(
            "email-queue",
            TimeSpan.FromMinutes(5));

        Assert.NotNull(message);

        var command = JsonSerializer.Deserialize<SendEmailCommand>(message.Data);
        Assert.Equal("test@example.com", command.To);

        // Acknowledge
        await eventStore.AcknowledgeAsync("email-queue", message.MessageId);

        // Should be empty now
        var nextMessage = await eventStore.DequeueAsync("email-queue", TimeSpan.FromSeconds(1));
        Assert.Null(nextMessage);
    }

    [Fact]
    public async Task Dequeue_WithoutAck_RedeliversMessage()
    {
        // Arrange
        var services = new ServiceCollection();
        services.AddInMemoryEventStreaming();
        var provider = services.BuildServiceProvider();
        var eventStore = provider.GetRequiredService<IEventStreamStore>();

        await eventStore.EnqueueAsync("queue", new { Data = "test" });

        // Dequeue with short visibility timeout
        var message1 = await eventStore.DequeueAsync("queue", TimeSpan.FromMilliseconds(100));
        Assert.NotNull(message1);

        // Don't acknowledge, wait for timeout
        await Task.Delay(150);

        // Message should be visible again
        var message2 = await eventStore.DequeueAsync("queue", TimeSpan.FromMinutes(1));
        Assert.NotNull(message2);
        Assert.Equal(message1.MessageId, message2.MessageId);
    }
}

Limitations

No Persistence

// ❌ Data lost on application restart
await _eventStore.AppendAsync("orders", events);
// Stop application
// Start application
var count = await CountEventsAsync("orders");  // Returns 0

No Consumer Groups

// ❌ Consumer groups not supported in-memory
// Use PostgreSQL for consumer group coordination
services.AddInMemoryEventStreaming();  // No consumer groups

Single Process Only

// ❌ Cannot share in-memory store across processes
// Process 1: publishes events
// Process 2: cannot see events from Process 1

Performance

Benchmarks

In-memory storage is extremely fast:

Operation Throughput
Append (single) ~200,000/sec
Append (batch 100) ~2,000,000 events/sec
Read ~500,000/sec
Enqueue ~150,000/sec
Dequeue ~100,000/sec

Memory Usage

Monitor memory consumption for large streams:

// Track memory usage
var before = GC.GetTotalMemory(forceFullCollection: false);

// Append 100,000 events
for (int i = 0; i < 100_000; i++)
{
    await _eventStore.AppendAsync("large-stream", new[] { new TestEvent { Id = i } });
}

var after = GC.GetTotalMemory(forceFullCollection: false);
Console.WriteLine($"Memory used: {(after - before) / 1024 / 1024} MB");

Best Practices

DO

  • Use for unit tests
  • Use for local development
  • Clear state between tests
  • Monitor memory usage for large streams
  • Use for prototyping

DON'T

  • Don't use in production
  • Don't expect persistence
  • Don't use for multi-instance scenarios
  • Don't use for long-term storage
  • Don't use for consumer group coordination

Switching to PostgreSQL

When ready for production, switch to PostgreSQL:

Before (Development):

builder.Services.AddInMemoryEventStreaming();

After (Production):

builder.Services.AddPostgresEventStreaming(
    builder.Configuration.GetConnectionString("EventStore"));

builder.Services.AddPostgresConsumerGroups(
    builder.Configuration.GetSection("EventStreaming:ConsumerGroups"));

No code changes needed - same interface!

See Also