563 lines
15 KiB
Markdown
563 lines
15 KiB
Markdown
# Persistent Streams
|
|
|
|
Event sourcing with append-only event logs.
|
|
|
|
## Overview
|
|
|
|
Persistent streams store events as an append-only log, providing a complete history of all changes. This enables event sourcing, audit logs, and the ability to rebuild state by replaying events.
|
|
|
|
**Key Features:**
|
|
|
|
- ✅ **Append-only** - Events cannot be modified or deleted
|
|
- ✅ **Ordered** - Events stored in sequential order with offsets
|
|
- ✅ **Durable** - Events persisted to storage
|
|
- ✅ **Replayable** - Rebuild state from any point in time
|
|
- ✅ **Auditable** - Complete history of all changes
|
|
|
|
## Append-Only Log
|
|
|
|
### Basic Appending
|
|
|
|
```csharp
|
|
public class AccountService
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public async Task OpenAccountAsync(int accountId, string owner, decimal initialBalance)
|
|
{
|
|
var @event = new AccountOpenedEvent
|
|
{
|
|
AccountId = accountId,
|
|
Owner = owner,
|
|
InitialBalance = initialBalance,
|
|
OpenedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
// Append to persistent stream
|
|
await _eventStore.AppendAsync(
|
|
streamName: $"account-{accountId}",
|
|
events: new[] { @event });
|
|
}
|
|
|
|
public async Task DepositAsync(int accountId, decimal amount)
|
|
{
|
|
var @event = new MoneyDepositedEvent
|
|
{
|
|
AccountId = accountId,
|
|
Amount = amount,
|
|
DepositedAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await _eventStore.AppendAsync($"account-{accountId}", new[] { @event });
|
|
}
|
|
|
|
public async Task WithdrawAsync(int accountId, decimal amount)
|
|
{
|
|
var @event = new MoneyWithdrawnEvent
|
|
{
|
|
AccountId = accountId,
|
|
Amount = amount,
|
|
WithdrawnAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
await _eventStore.AppendAsync($"account-{accountId}", new[] { @event });
|
|
}
|
|
}
|
|
```
|
|
|
|
### Atomic Multi-Event Append
|
|
|
|
Append multiple events atomically (all-or-nothing):
|
|
|
|
```csharp
|
|
public async Task TransferAsync(int fromAccountId, int toAccountId, decimal amount)
|
|
{
|
|
var transferId = Guid.NewGuid().ToString();
|
|
|
|
// Append to source account stream
|
|
await _eventStore.AppendAsync($"account-{fromAccountId}", new object[]
|
|
{
|
|
new MoneyWithdrawnEvent
|
|
{
|
|
AccountId = fromAccountId,
|
|
Amount = amount,
|
|
TransferId = transferId,
|
|
WithdrawnAt = DateTimeOffset.UtcNow
|
|
}
|
|
});
|
|
|
|
// Append to destination account stream
|
|
await _eventStore.AppendAsync($"account-{toAccountId}", new object[]
|
|
{
|
|
new MoneyDepositedEvent
|
|
{
|
|
AccountId = toAccountId,
|
|
Amount = amount,
|
|
TransferId = transferId,
|
|
DepositedAt = DateTimeOffset.UtcNow
|
|
}
|
|
});
|
|
|
|
// Append transfer completed event to transfers stream
|
|
await _eventStore.AppendAsync("transfers", new object[]
|
|
{
|
|
new TransferCompletedEvent
|
|
{
|
|
TransferId = transferId,
|
|
FromAccountId = fromAccountId,
|
|
ToAccountId = toAccountId,
|
|
Amount = amount,
|
|
CompletedAt = DateTimeOffset.UtcNow
|
|
}
|
|
});
|
|
}
|
|
```
|
|
|
|
## Reading Events
|
|
|
|
### Read All Events
|
|
|
|
```csharp
|
|
public async Task<decimal> GetAccountBalanceAsync(int accountId)
|
|
{
|
|
decimal balance = 0;
|
|
|
|
// Read all events from beginning
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync(
|
|
streamName: $"account-{accountId}",
|
|
fromOffset: 0))
|
|
{
|
|
var eventData = DeserializeEvent(storedEvent);
|
|
|
|
// Apply event to calculate current balance
|
|
balance = eventData switch
|
|
{
|
|
AccountOpenedEvent opened => opened.InitialBalance,
|
|
MoneyDepositedEvent deposited => balance + deposited.Amount,
|
|
MoneyWithdrawnEvent withdrawn => balance - withdrawn.Amount,
|
|
_ => balance
|
|
};
|
|
}
|
|
|
|
return balance;
|
|
}
|
|
```
|
|
|
|
### Read from Offset
|
|
|
|
Resume reading from a specific position:
|
|
|
|
```csharp
|
|
public async Task CatchUpProjectionAsync(long lastProcessedOffset)
|
|
{
|
|
// Read only new events since last checkpoint
|
|
await foreach (var @event in _eventStore.ReadStreamAsync(
|
|
streamName: "orders",
|
|
fromOffset: lastProcessedOffset + 1))
|
|
{
|
|
await UpdateProjectionAsync(@event);
|
|
|
|
// Update checkpoint
|
|
lastProcessedOffset = @event.Offset;
|
|
await SaveCheckpointAsync(lastProcessedOffset);
|
|
}
|
|
}
|
|
```
|
|
|
|
### Read with Batch Size
|
|
|
|
Process events in batches for better performance:
|
|
|
|
```csharp
|
|
public async Task ProcessEventsInBatchesAsync()
|
|
{
|
|
const int batchSize = 100;
|
|
long currentOffset = 0;
|
|
|
|
while (true)
|
|
{
|
|
var batch = new List<StoredEvent>();
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync("orders", currentOffset))
|
|
{
|
|
batch.Add(@event);
|
|
|
|
if (batch.Count >= batchSize)
|
|
break;
|
|
}
|
|
|
|
if (batch.Count == 0)
|
|
break; // No more events
|
|
|
|
// Process batch
|
|
await ProcessBatchAsync(batch);
|
|
|
|
currentOffset = batch.Max(e => e.Offset) + 1;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Event Sourcing Pattern
|
|
|
|
### Aggregate Root
|
|
|
|
```csharp
|
|
public class Account
|
|
{
|
|
private readonly List<object> _uncommittedEvents = new();
|
|
|
|
public int AccountId { get; private set; }
|
|
public string Owner { get; private set; } = string.Empty;
|
|
public decimal Balance { get; private set; }
|
|
public AccountStatus Status { get; private set; }
|
|
|
|
// Factory method
|
|
public static Account Open(int accountId, string owner, decimal initialBalance)
|
|
{
|
|
if (initialBalance < 0)
|
|
throw new ArgumentException("Initial balance cannot be negative");
|
|
|
|
var account = new Account();
|
|
account.Apply(new AccountOpenedEvent
|
|
{
|
|
AccountId = accountId,
|
|
Owner = owner,
|
|
InitialBalance = initialBalance,
|
|
OpenedAt = DateTimeOffset.UtcNow
|
|
});
|
|
|
|
return account;
|
|
}
|
|
|
|
// Command methods
|
|
public void Deposit(decimal amount)
|
|
{
|
|
if (Status == AccountStatus.Closed)
|
|
throw new InvalidOperationException("Cannot deposit to closed account");
|
|
|
|
if (amount <= 0)
|
|
throw new ArgumentException("Amount must be positive");
|
|
|
|
Apply(new MoneyDepositedEvent
|
|
{
|
|
AccountId = AccountId,
|
|
Amount = amount,
|
|
DepositedAt = DateTimeOffset.UtcNow
|
|
});
|
|
}
|
|
|
|
public void Withdraw(decimal amount)
|
|
{
|
|
if (Status == AccountStatus.Closed)
|
|
throw new InvalidOperationException("Cannot withdraw from closed account");
|
|
|
|
if (amount <= 0)
|
|
throw new ArgumentException("Amount must be positive");
|
|
|
|
if (Balance < amount)
|
|
throw new InvalidOperationException("Insufficient funds");
|
|
|
|
Apply(new MoneyWithdrawnEvent
|
|
{
|
|
AccountId = AccountId,
|
|
Amount = amount,
|
|
WithdrawnAt = DateTimeOffset.UtcNow
|
|
});
|
|
}
|
|
|
|
public void Close()
|
|
{
|
|
if (Status == AccountStatus.Closed)
|
|
throw new InvalidOperationException("Account already closed");
|
|
|
|
if (Balance != 0)
|
|
throw new InvalidOperationException("Cannot close account with non-zero balance");
|
|
|
|
Apply(new AccountClosedEvent
|
|
{
|
|
AccountId = AccountId,
|
|
ClosedAt = DateTimeOffset.UtcNow
|
|
});
|
|
}
|
|
|
|
// Apply events
|
|
private void Apply(object @event)
|
|
{
|
|
When(@event);
|
|
_uncommittedEvents.Add(@event);
|
|
}
|
|
|
|
private void When(object @event)
|
|
{
|
|
switch (@event)
|
|
{
|
|
case AccountOpenedEvent opened:
|
|
AccountId = opened.AccountId;
|
|
Owner = opened.Owner;
|
|
Balance = opened.InitialBalance;
|
|
Status = AccountStatus.Active;
|
|
break;
|
|
|
|
case MoneyDepositedEvent deposited:
|
|
Balance += deposited.Amount;
|
|
break;
|
|
|
|
case MoneyWithdrawnEvent withdrawn:
|
|
Balance -= withdrawn.Amount;
|
|
break;
|
|
|
|
case AccountClosedEvent closed:
|
|
Status = AccountStatus.Closed;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Hydrate from history
|
|
public static Account LoadFromHistory(IEnumerable<object> history)
|
|
{
|
|
var account = new Account();
|
|
|
|
foreach (var @event in history)
|
|
{
|
|
account.When(@event);
|
|
}
|
|
|
|
return account;
|
|
}
|
|
|
|
public IReadOnlyList<object> GetUncommittedEvents() => _uncommittedEvents.AsReadOnly();
|
|
|
|
public void MarkEventsAsCommitted() => _uncommittedEvents.Clear();
|
|
}
|
|
|
|
public enum AccountStatus
|
|
{
|
|
Active,
|
|
Closed
|
|
}
|
|
```
|
|
|
|
### Repository
|
|
|
|
```csharp
|
|
public class AccountRepository
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public AccountRepository(IEventStreamStore eventStore)
|
|
{
|
|
_eventStore = eventStore;
|
|
}
|
|
|
|
public async Task<Account> GetByIdAsync(int accountId)
|
|
{
|
|
var events = new List<object>();
|
|
|
|
// Load all events from stream
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync(
|
|
streamName: $"account-{accountId}",
|
|
fromOffset: 0))
|
|
{
|
|
var eventData = DeserializeEvent(storedEvent);
|
|
events.Add(eventData);
|
|
}
|
|
|
|
if (events.Count == 0)
|
|
throw new KeyNotFoundException($"Account {accountId} not found");
|
|
|
|
// Rebuild aggregate from events
|
|
return Account.LoadFromHistory(events);
|
|
}
|
|
|
|
public async Task SaveAsync(Account account)
|
|
{
|
|
var uncommittedEvents = account.GetUncommittedEvents();
|
|
|
|
if (uncommittedEvents.Count == 0)
|
|
return; // No changes
|
|
|
|
// Append events to stream
|
|
await _eventStore.AppendAsync(
|
|
streamName: $"account-{account.AccountId}",
|
|
events: uncommittedEvents.ToArray());
|
|
|
|
account.MarkEventsAsCommitted();
|
|
}
|
|
|
|
private object DeserializeEvent(StoredEvent storedEvent)
|
|
{
|
|
var eventType = Type.GetType(storedEvent.EventType)
|
|
?? throw new InvalidOperationException($"Unknown event type: {storedEvent.EventType}");
|
|
|
|
return JsonSerializer.Deserialize(storedEvent.Data, eventType)
|
|
?? throw new InvalidOperationException($"Failed to deserialize event: {storedEvent.EventType}");
|
|
}
|
|
}
|
|
```
|
|
|
|
### Command Handler
|
|
|
|
```csharp
|
|
public class OpenAccountCommandHandler : ICommandHandler<OpenAccountCommand, int>
|
|
{
|
|
private readonly AccountRepository _repository;
|
|
|
|
public async Task<int> HandleAsync(OpenAccountCommand command, CancellationToken ct)
|
|
{
|
|
// Create new aggregate
|
|
var account = Account.Open(
|
|
command.AccountId,
|
|
command.Owner,
|
|
command.InitialBalance);
|
|
|
|
// Save (appends events)
|
|
await _repository.SaveAsync(account);
|
|
|
|
return account.AccountId;
|
|
}
|
|
}
|
|
|
|
public class DepositCommandHandler : ICommandHandler<DepositCommand>
|
|
{
|
|
private readonly AccountRepository _repository;
|
|
|
|
public async Task HandleAsync(DepositCommand command, CancellationToken ct)
|
|
{
|
|
// Load aggregate from event stream
|
|
var account = await _repository.GetByIdAsync(command.AccountId);
|
|
|
|
// Execute business logic
|
|
account.Deposit(command.Amount);
|
|
|
|
// Save (appends new events)
|
|
await _repository.SaveAsync(account);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Audit Log Pattern
|
|
|
|
Persistent streams provide natural audit trails:
|
|
|
|
```csharp
|
|
public class AuditService
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public async Task<List<AuditEntry>> GetAccountAuditLogAsync(int accountId)
|
|
{
|
|
var auditLog = new List<AuditEntry>();
|
|
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync($"account-{accountId}", 0))
|
|
{
|
|
auditLog.Add(new AuditEntry
|
|
{
|
|
Offset = storedEvent.Offset,
|
|
EventType = storedEvent.EventType,
|
|
Timestamp = storedEvent.Timestamp,
|
|
EventId = storedEvent.EventId,
|
|
Data = JsonSerializer.Deserialize<dynamic>(storedEvent.Data)
|
|
});
|
|
}
|
|
|
|
return auditLog;
|
|
}
|
|
|
|
public async Task<List<AuditEntry>> GetAccountAuditLogForPeriodAsync(
|
|
int accountId,
|
|
DateTimeOffset from,
|
|
DateTimeOffset to)
|
|
{
|
|
var auditLog = new List<AuditEntry>();
|
|
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync($"account-{accountId}", 0))
|
|
{
|
|
if (storedEvent.Timestamp >= from && storedEvent.Timestamp <= to)
|
|
{
|
|
auditLog.Add(new AuditEntry
|
|
{
|
|
Offset = storedEvent.Offset,
|
|
EventType = storedEvent.EventType,
|
|
Timestamp = storedEvent.Timestamp,
|
|
EventId = storedEvent.EventId,
|
|
Data = JsonSerializer.Deserialize<dynamic>(storedEvent.Data)
|
|
});
|
|
}
|
|
}
|
|
|
|
return auditLog;
|
|
}
|
|
}
|
|
```
|
|
|
|
## Stream Naming Conventions
|
|
|
|
### Per-Aggregate Streams
|
|
|
|
One stream per aggregate instance:
|
|
|
|
```csharp
|
|
// ✅ Good - One stream per account
|
|
await _eventStore.AppendAsync($"account-{accountId}", events);
|
|
|
|
// ✅ Good - One stream per order
|
|
await _eventStore.AppendAsync($"order-{orderId}", events);
|
|
```
|
|
|
|
### Category Streams
|
|
|
|
All aggregates of same type in one stream:
|
|
|
|
```csharp
|
|
// All account events in single stream
|
|
await _eventStore.AppendAsync("accounts", new[]
|
|
{
|
|
new AccountOpenedEvent { AccountId = 123, ... }
|
|
});
|
|
|
|
await _eventStore.AppendAsync("accounts", new[]
|
|
{
|
|
new AccountOpenedEvent { AccountId = 456, ... }
|
|
});
|
|
|
|
// Read specific account by filtering
|
|
await foreach (var evt in _eventStore.ReadStreamAsync("accounts", 0))
|
|
{
|
|
if (evt.EventType == "AccountOpenedEvent")
|
|
{
|
|
var opened = JsonSerializer.Deserialize<AccountOpenedEvent>(evt.Data);
|
|
if (opened.AccountId == targetAccountId)
|
|
{
|
|
// Process event for specific account
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use one stream per aggregate instance for clean boundaries
|
|
- Include all data needed to process events
|
|
- Version events for schema evolution
|
|
- Use correlation IDs to track causation
|
|
- Implement idempotent event handlers
|
|
- Store snapshots for large streams (performance optimization)
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't modify events after appending
|
|
- Don't delete events (use compensating events)
|
|
- Don't store large binary data in events
|
|
- Don't skip validation in aggregate methods
|
|
- Don't expose uncommitted events outside aggregate
|
|
- Don't load entire large streams without snapshots
|
|
|
|
## See Also
|
|
|
|
- [Getting Started](getting-started.md)
|
|
- [Event Replay](../event-replay/README.md)
|
|
- [Projections](../projections/README.md)
|
|
- [Event Sourcing Tutorial](../../tutorials/event-sourcing/README.md)
|