# Persistent Streams Event sourcing with append-only event logs. ## Overview Persistent streams store events as an append-only log, providing a complete history of all changes. This enables event sourcing, audit logs, and the ability to rebuild state by replaying events. **Key Features:** - ✅ **Append-only** - Events cannot be modified or deleted - ✅ **Ordered** - Events stored in sequential order with offsets - ✅ **Durable** - Events persisted to storage - ✅ **Replayable** - Rebuild state from any point in time - ✅ **Auditable** - Complete history of all changes ## Append-Only Log ### Basic Appending ```csharp public class AccountService { private readonly IEventStreamStore _eventStore; public async Task OpenAccountAsync(int accountId, string owner, decimal initialBalance) { var @event = new AccountOpenedEvent { AccountId = accountId, Owner = owner, InitialBalance = initialBalance, OpenedAt = DateTimeOffset.UtcNow }; // Append to persistent stream await _eventStore.AppendAsync( streamName: $"account-{accountId}", events: new[] { @event }); } public async Task DepositAsync(int accountId, decimal amount) { var @event = new MoneyDepositedEvent { AccountId = accountId, Amount = amount, DepositedAt = DateTimeOffset.UtcNow }; await _eventStore.AppendAsync($"account-{accountId}", new[] { @event }); } public async Task WithdrawAsync(int accountId, decimal amount) { var @event = new MoneyWithdrawnEvent { AccountId = accountId, Amount = amount, WithdrawnAt = DateTimeOffset.UtcNow }; await _eventStore.AppendAsync($"account-{accountId}", new[] { @event }); } } ``` ### Atomic Multi-Event Append Append multiple events atomically (all-or-nothing): ```csharp public async Task TransferAsync(int fromAccountId, int toAccountId, decimal amount) { var transferId = Guid.NewGuid().ToString(); // Append to source account stream await _eventStore.AppendAsync($"account-{fromAccountId}", new object[] { new MoneyWithdrawnEvent { AccountId = fromAccountId, Amount = amount, TransferId = transferId, WithdrawnAt = DateTimeOffset.UtcNow } }); // Append to destination account stream await _eventStore.AppendAsync($"account-{toAccountId}", new object[] { new MoneyDepositedEvent { AccountId = toAccountId, Amount = amount, TransferId = transferId, DepositedAt = DateTimeOffset.UtcNow } }); // Append transfer completed event to transfers stream await _eventStore.AppendAsync("transfers", new object[] { new TransferCompletedEvent { TransferId = transferId, FromAccountId = fromAccountId, ToAccountId = toAccountId, Amount = amount, CompletedAt = DateTimeOffset.UtcNow } }); } ``` ## Reading Events ### Read All Events ```csharp public async Task GetAccountBalanceAsync(int accountId) { decimal balance = 0; // Read all events from beginning await foreach (var storedEvent in _eventStore.ReadStreamAsync( streamName: $"account-{accountId}", fromOffset: 0)) { var eventData = DeserializeEvent(storedEvent); // Apply event to calculate current balance balance = eventData switch { AccountOpenedEvent opened => opened.InitialBalance, MoneyDepositedEvent deposited => balance + deposited.Amount, MoneyWithdrawnEvent withdrawn => balance - withdrawn.Amount, _ => balance }; } return balance; } ``` ### Read from Offset Resume reading from a specific position: ```csharp public async Task CatchUpProjectionAsync(long lastProcessedOffset) { // Read only new events since last checkpoint await foreach (var @event in _eventStore.ReadStreamAsync( streamName: "orders", fromOffset: lastProcessedOffset + 1)) { await UpdateProjectionAsync(@event); // Update checkpoint lastProcessedOffset = @event.Offset; await SaveCheckpointAsync(lastProcessedOffset); } } ``` ### Read with Batch Size Process events in batches for better performance: ```csharp public async Task ProcessEventsInBatchesAsync() { const int batchSize = 100; long currentOffset = 0; while (true) { var batch = new List(); await foreach (var @event in _eventStore.ReadStreamAsync("orders", currentOffset)) { batch.Add(@event); if (batch.Count >= batchSize) break; } if (batch.Count == 0) break; // No more events // Process batch await ProcessBatchAsync(batch); currentOffset = batch.Max(e => e.Offset) + 1; } } ``` ## Event Sourcing Pattern ### Aggregate Root ```csharp public class Account { private readonly List _uncommittedEvents = new(); public int AccountId { get; private set; } public string Owner { get; private set; } = string.Empty; public decimal Balance { get; private set; } public AccountStatus Status { get; private set; } // Factory method public static Account Open(int accountId, string owner, decimal initialBalance) { if (initialBalance < 0) throw new ArgumentException("Initial balance cannot be negative"); var account = new Account(); account.Apply(new AccountOpenedEvent { AccountId = accountId, Owner = owner, InitialBalance = initialBalance, OpenedAt = DateTimeOffset.UtcNow }); return account; } // Command methods public void Deposit(decimal amount) { if (Status == AccountStatus.Closed) throw new InvalidOperationException("Cannot deposit to closed account"); if (amount <= 0) throw new ArgumentException("Amount must be positive"); Apply(new MoneyDepositedEvent { AccountId = AccountId, Amount = amount, DepositedAt = DateTimeOffset.UtcNow }); } public void Withdraw(decimal amount) { if (Status == AccountStatus.Closed) throw new InvalidOperationException("Cannot withdraw from closed account"); if (amount <= 0) throw new ArgumentException("Amount must be positive"); if (Balance < amount) throw new InvalidOperationException("Insufficient funds"); Apply(new MoneyWithdrawnEvent { AccountId = AccountId, Amount = amount, WithdrawnAt = DateTimeOffset.UtcNow }); } public void Close() { if (Status == AccountStatus.Closed) throw new InvalidOperationException("Account already closed"); if (Balance != 0) throw new InvalidOperationException("Cannot close account with non-zero balance"); Apply(new AccountClosedEvent { AccountId = AccountId, ClosedAt = DateTimeOffset.UtcNow }); } // Apply events private void Apply(object @event) { When(@event); _uncommittedEvents.Add(@event); } private void When(object @event) { switch (@event) { case AccountOpenedEvent opened: AccountId = opened.AccountId; Owner = opened.Owner; Balance = opened.InitialBalance; Status = AccountStatus.Active; break; case MoneyDepositedEvent deposited: Balance += deposited.Amount; break; case MoneyWithdrawnEvent withdrawn: Balance -= withdrawn.Amount; break; case AccountClosedEvent closed: Status = AccountStatus.Closed; break; } } // Hydrate from history public static Account LoadFromHistory(IEnumerable history) { var account = new Account(); foreach (var @event in history) { account.When(@event); } return account; } public IReadOnlyList GetUncommittedEvents() => _uncommittedEvents.AsReadOnly(); public void MarkEventsAsCommitted() => _uncommittedEvents.Clear(); } public enum AccountStatus { Active, Closed } ``` ### Repository ```csharp public class AccountRepository { private readonly IEventStreamStore _eventStore; public AccountRepository(IEventStreamStore eventStore) { _eventStore = eventStore; } public async Task GetByIdAsync(int accountId) { var events = new List(); // Load all events from stream await foreach (var storedEvent in _eventStore.ReadStreamAsync( streamName: $"account-{accountId}", fromOffset: 0)) { var eventData = DeserializeEvent(storedEvent); events.Add(eventData); } if (events.Count == 0) throw new KeyNotFoundException($"Account {accountId} not found"); // Rebuild aggregate from events return Account.LoadFromHistory(events); } public async Task SaveAsync(Account account) { var uncommittedEvents = account.GetUncommittedEvents(); if (uncommittedEvents.Count == 0) return; // No changes // Append events to stream await _eventStore.AppendAsync( streamName: $"account-{account.AccountId}", events: uncommittedEvents.ToArray()); account.MarkEventsAsCommitted(); } private object DeserializeEvent(StoredEvent storedEvent) { var eventType = Type.GetType(storedEvent.EventType) ?? throw new InvalidOperationException($"Unknown event type: {storedEvent.EventType}"); return JsonSerializer.Deserialize(storedEvent.Data, eventType) ?? throw new InvalidOperationException($"Failed to deserialize event: {storedEvent.EventType}"); } } ``` ### Command Handler ```csharp public class OpenAccountCommandHandler : ICommandHandler { private readonly AccountRepository _repository; public async Task HandleAsync(OpenAccountCommand command, CancellationToken ct) { // Create new aggregate var account = Account.Open( command.AccountId, command.Owner, command.InitialBalance); // Save (appends events) await _repository.SaveAsync(account); return account.AccountId; } } public class DepositCommandHandler : ICommandHandler { private readonly AccountRepository _repository; public async Task HandleAsync(DepositCommand command, CancellationToken ct) { // Load aggregate from event stream var account = await _repository.GetByIdAsync(command.AccountId); // Execute business logic account.Deposit(command.Amount); // Save (appends new events) await _repository.SaveAsync(account); } } ``` ## Audit Log Pattern Persistent streams provide natural audit trails: ```csharp public class AuditService { private readonly IEventStreamStore _eventStore; public async Task> GetAccountAuditLogAsync(int accountId) { var auditLog = new List(); await foreach (var storedEvent in _eventStore.ReadStreamAsync($"account-{accountId}", 0)) { auditLog.Add(new AuditEntry { Offset = storedEvent.Offset, EventType = storedEvent.EventType, Timestamp = storedEvent.Timestamp, EventId = storedEvent.EventId, Data = JsonSerializer.Deserialize(storedEvent.Data) }); } return auditLog; } public async Task> GetAccountAuditLogForPeriodAsync( int accountId, DateTimeOffset from, DateTimeOffset to) { var auditLog = new List(); await foreach (var storedEvent in _eventStore.ReadStreamAsync($"account-{accountId}", 0)) { if (storedEvent.Timestamp >= from && storedEvent.Timestamp <= to) { auditLog.Add(new AuditEntry { Offset = storedEvent.Offset, EventType = storedEvent.EventType, Timestamp = storedEvent.Timestamp, EventId = storedEvent.EventId, Data = JsonSerializer.Deserialize(storedEvent.Data) }); } } return auditLog; } } ``` ## Stream Naming Conventions ### Per-Aggregate Streams One stream per aggregate instance: ```csharp // ✅ Good - One stream per account await _eventStore.AppendAsync($"account-{accountId}", events); // ✅ Good - One stream per order await _eventStore.AppendAsync($"order-{orderId}", events); ``` ### Category Streams All aggregates of same type in one stream: ```csharp // All account events in single stream await _eventStore.AppendAsync("accounts", new[] { new AccountOpenedEvent { AccountId = 123, ... } }); await _eventStore.AppendAsync("accounts", new[] { new AccountOpenedEvent { AccountId = 456, ... } }); // Read specific account by filtering await foreach (var evt in _eventStore.ReadStreamAsync("accounts", 0)) { if (evt.EventType == "AccountOpenedEvent") { var opened = JsonSerializer.Deserialize(evt.Data); if (opened.AccountId == targetAccountId) { // Process event for specific account } } } ``` ## Best Practices ### ✅ DO - Use one stream per aggregate instance for clean boundaries - Include all data needed to process events - Version events for schema evolution - Use correlation IDs to track causation - Implement idempotent event handlers - Store snapshots for large streams (performance optimization) ### ❌ DON'T - Don't modify events after appending - Don't delete events (use compensating events) - Don't store large binary data in events - Don't skip validation in aggregate methods - Don't expose uncommitted events outside aggregate - Don't load entire large streams without snapshots ## See Also - [Getting Started](getting-started.md) - [Event Replay](../event-replay/README.md) - [Projections](../projections/README.md) - [Event Sourcing Tutorial](../../tutorials/event-sourcing/README.md)