500 lines
12 KiB
Markdown
500 lines
12 KiB
Markdown
# Getting Started with Event Streaming
|
|
|
|
Your first event stream - from installation to publishing and consuming events.
|
|
|
|
## Installation
|
|
|
|
### Install NuGet Package
|
|
|
|
```bash
|
|
# For development (in-memory storage)
|
|
dotnet add package Svrnty.CQRS.Events
|
|
|
|
# For production (PostgreSQL storage)
|
|
dotnet add package Svrnty.CQRS.Events.PostgreSQL
|
|
```
|
|
|
|
## Configuration
|
|
|
|
### In-Memory (Development)
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register in-memory event streaming
|
|
builder.Services.AddInMemoryEventStreaming();
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
### PostgreSQL (Production)
|
|
|
|
**appsettings.json:**
|
|
```json
|
|
{
|
|
"ConnectionStrings": {
|
|
"EventStore": "Host=localhost;Database=eventstore;Username=postgres;Password=postgres"
|
|
}
|
|
}
|
|
```
|
|
|
|
**Program.cs:**
|
|
```csharp
|
|
using Svrnty.CQRS.Events.PostgreSQL;
|
|
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Register PostgreSQL event streaming
|
|
builder.Services.AddPostgresEventStreaming(
|
|
builder.Configuration.GetConnectionString("EventStore"));
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
### Database Migration
|
|
|
|
PostgreSQL storage automatically migrates the database on startup:
|
|
|
|
```bash
|
|
# Start PostgreSQL
|
|
docker run -d --name postgres \
|
|
-e POSTGRES_PASSWORD=postgres \
|
|
-p 5432:5432 \
|
|
postgres:16
|
|
|
|
# Run application - tables created automatically
|
|
dotnet run
|
|
```
|
|
|
|
## Define Events
|
|
|
|
Events are immutable records describing facts:
|
|
|
|
```csharp
|
|
public record UserRegisteredEvent
|
|
{
|
|
public int UserId { get; init; }
|
|
public string Email { get; init; } = string.Empty;
|
|
public string Name { get; init; } = string.Empty;
|
|
public DateTimeOffset RegisteredAt { get; init; }
|
|
}
|
|
|
|
public record EmailVerifiedEvent
|
|
{
|
|
public int UserId { get; init; }
|
|
public string Email { get; init; } = string.Empty;
|
|
public DateTimeOffset VerifiedAt { get; init; }
|
|
}
|
|
```
|
|
|
|
## Publishing Events
|
|
|
|
### Append to Persistent Stream
|
|
|
|
```csharp
|
|
public class UserService
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public UserService(IEventStreamStore eventStore)
|
|
{
|
|
_eventStore = eventStore;
|
|
}
|
|
|
|
public async Task RegisterUserAsync(string email, string name)
|
|
{
|
|
var userId = GenerateUserId();
|
|
|
|
var @event = new UserRegisteredEvent
|
|
{
|
|
UserId = userId,
|
|
Email = email,
|
|
Name = name,
|
|
RegisteredAt = DateTimeOffset.UtcNow
|
|
};
|
|
|
|
// Append to persistent stream
|
|
await _eventStore.AppendAsync(
|
|
streamName: "users",
|
|
events: new[] { @event });
|
|
|
|
Console.WriteLine($"User registered: {userId}");
|
|
}
|
|
}
|
|
```
|
|
|
|
### Publish Multiple Events
|
|
|
|
```csharp
|
|
public async Task RegisterAndVerifyUserAsync(string email, string name)
|
|
{
|
|
var userId = GenerateUserId();
|
|
var now = DateTimeOffset.UtcNow;
|
|
|
|
// Publish multiple events atomically
|
|
await _eventStore.AppendAsync("users", new object[]
|
|
{
|
|
new UserRegisteredEvent
|
|
{
|
|
UserId = userId,
|
|
Email = email,
|
|
Name = name,
|
|
RegisteredAt = now
|
|
},
|
|
new EmailVerifiedEvent
|
|
{
|
|
UserId = userId,
|
|
Email = email,
|
|
VerifiedAt = now
|
|
}
|
|
});
|
|
}
|
|
```
|
|
|
|
### Enqueue to Ephemeral Stream
|
|
|
|
For background jobs and notifications:
|
|
|
|
```csharp
|
|
public async Task SendWelcomeEmailAsync(int userId, string email)
|
|
{
|
|
var command = new SendEmailCommand
|
|
{
|
|
To = email,
|
|
Subject = "Welcome!",
|
|
Body = "Thanks for registering."
|
|
};
|
|
|
|
// Enqueue to ephemeral stream (message queue)
|
|
await _eventStore.EnqueueAsync(
|
|
streamName: "email-queue",
|
|
message: command);
|
|
}
|
|
```
|
|
|
|
## Consuming Events
|
|
|
|
### Read from Persistent Stream
|
|
|
|
```csharp
|
|
public class EventConsumer
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public EventConsumer(IEventStreamStore eventStore)
|
|
{
|
|
_eventStore = eventStore;
|
|
}
|
|
|
|
public async Task ProcessUserEventsAsync()
|
|
{
|
|
// Read all events from beginning
|
|
await foreach (var storedEvent in _eventStore.ReadStreamAsync(
|
|
streamName: "users",
|
|
fromOffset: 0))
|
|
{
|
|
Console.WriteLine($"Event {storedEvent.Offset}: {storedEvent.EventType}");
|
|
|
|
// Deserialize event
|
|
var eventData = JsonSerializer.Deserialize(
|
|
storedEvent.Data,
|
|
Type.GetType(storedEvent.EventType));
|
|
|
|
// Process event
|
|
await ProcessEventAsync(eventData);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessEventAsync(object? eventData)
|
|
{
|
|
switch (eventData)
|
|
{
|
|
case UserRegisteredEvent registered:
|
|
Console.WriteLine($"User {registered.UserId} registered: {registered.Email}");
|
|
break;
|
|
|
|
case EmailVerifiedEvent verified:
|
|
Console.WriteLine($"Email verified: {verified.Email}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Read from Specific Offset
|
|
|
|
```csharp
|
|
// Resume from last processed offset
|
|
long lastProcessedOffset = 1000;
|
|
|
|
await foreach (var @event in _eventStore.ReadStreamAsync("users", fromOffset: lastProcessedOffset + 1))
|
|
{
|
|
await ProcessEventAsync(@event);
|
|
lastProcessedOffset = @event.Offset;
|
|
|
|
// Save checkpoint
|
|
await SaveCheckpointAsync(lastProcessedOffset);
|
|
}
|
|
```
|
|
|
|
### Dequeue from Ephemeral Stream
|
|
|
|
```csharp
|
|
public class EmailWorker : BackgroundService
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
// Dequeue message with 5-minute visibility timeout
|
|
var message = await _eventStore.DequeueAsync(
|
|
streamName: "email-queue",
|
|
visibilityTimeout: TimeSpan.FromMinutes(5),
|
|
cancellationToken: stoppingToken);
|
|
|
|
if (message == null)
|
|
{
|
|
// No messages available, wait
|
|
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
|
|
continue;
|
|
}
|
|
|
|
// Process message
|
|
var command = JsonSerializer.Deserialize<SendEmailCommand>(message.Data);
|
|
await SendEmailAsync(command);
|
|
|
|
// Acknowledge successful processing
|
|
await _eventStore.AcknowledgeAsync("email-queue", message.MessageId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// Error - message will be redelivered after visibility timeout
|
|
Console.WriteLine($"Error processing message: {ex.Message}");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Complete Example
|
|
|
|
**Define Events:**
|
|
```csharp
|
|
public record OrderPlacedEvent
|
|
{
|
|
public int OrderId { get; init; }
|
|
public string CustomerName { get; init; } = string.Empty;
|
|
public decimal TotalAmount { get; init; }
|
|
public List<OrderItem> Items { get; init; } = new();
|
|
}
|
|
|
|
public record OrderItem
|
|
{
|
|
public int ProductId { get; init; }
|
|
public int Quantity { get; init; }
|
|
public decimal Price { get; init; }
|
|
}
|
|
```
|
|
|
|
**Publisher Service:**
|
|
```csharp
|
|
public class OrderService
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
|
|
public OrderService(IEventStreamStore eventStore)
|
|
{
|
|
_eventStore = eventStore;
|
|
}
|
|
|
|
public async Task<int> PlaceOrderAsync(string customerName, List<OrderItem> items)
|
|
{
|
|
var orderId = GenerateOrderId();
|
|
|
|
var @event = new OrderPlacedEvent
|
|
{
|
|
OrderId = orderId,
|
|
CustomerName = customerName,
|
|
TotalAmount = items.Sum(i => i.Price * i.Quantity),
|
|
Items = items
|
|
};
|
|
|
|
// Append to persistent stream
|
|
await _eventStore.AppendAsync("orders", new[] { @event });
|
|
|
|
// Enqueue notification
|
|
await _eventStore.EnqueueAsync("order-notifications", new
|
|
{
|
|
OrderId = orderId,
|
|
CustomerName = customerName,
|
|
Type = "OrderPlaced"
|
|
});
|
|
|
|
return orderId;
|
|
}
|
|
}
|
|
```
|
|
|
|
**Consumer Worker:**
|
|
```csharp
|
|
public class OrderNotificationWorker : BackgroundService
|
|
{
|
|
private readonly IEventStreamStore _eventStore;
|
|
private readonly ILogger<OrderNotificationWorker> _logger;
|
|
|
|
public OrderNotificationWorker(
|
|
IEventStreamStore eventStore,
|
|
ILogger<OrderNotificationWorker> logger)
|
|
{
|
|
_eventStore = eventStore;
|
|
_logger = logger;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
_logger.LogInformation("Order notification worker started");
|
|
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
var message = await _eventStore.DequeueAsync(
|
|
"order-notifications",
|
|
TimeSpan.FromMinutes(5),
|
|
stoppingToken);
|
|
|
|
if (message == null)
|
|
{
|
|
await Task.Delay(100, stoppingToken);
|
|
continue;
|
|
}
|
|
|
|
// Process notification
|
|
var notification = JsonSerializer.Deserialize<dynamic>(message.Data);
|
|
_logger.LogInformation("Sending notification for order {OrderId}", notification.OrderId);
|
|
|
|
await SendNotificationAsync(notification);
|
|
await _eventStore.AcknowledgeAsync("order-notifications", message.MessageId);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
// Shutdown requested
|
|
break;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error processing notification");
|
|
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
|
|
}
|
|
}
|
|
|
|
_logger.LogInformation("Order notification worker stopped");
|
|
}
|
|
|
|
private async Task SendNotificationAsync(dynamic notification)
|
|
{
|
|
// Send email, SMS, push notification, etc.
|
|
await Task.Delay(100);
|
|
}
|
|
}
|
|
```
|
|
|
|
**Registration:**
|
|
```csharp
|
|
var builder = WebApplication.CreateBuilder(args);
|
|
|
|
// Event streaming
|
|
builder.Services.AddPostgresEventStreaming(
|
|
builder.Configuration.GetConnectionString("EventStore"));
|
|
|
|
// Services
|
|
builder.Services.AddScoped<OrderService>();
|
|
|
|
// Background workers
|
|
builder.Services.AddHostedService<OrderNotificationWorker>();
|
|
|
|
var app = builder.Build();
|
|
app.Run();
|
|
```
|
|
|
|
## Testing
|
|
|
|
### Unit Testing with In-Memory Store
|
|
|
|
```csharp
|
|
public class OrderServiceTests
|
|
{
|
|
[Fact]
|
|
public async Task PlaceOrder_PublishesEvent()
|
|
{
|
|
// Arrange
|
|
var services = new ServiceCollection();
|
|
services.AddInMemoryEventStreaming();
|
|
var provider = services.BuildServiceProvider();
|
|
|
|
var store = provider.GetRequiredService<IEventStreamStore>();
|
|
var service = new OrderService(store);
|
|
|
|
// Act
|
|
var orderId = await service.PlaceOrderAsync("John Doe", new List<OrderItem>
|
|
{
|
|
new() { ProductId = 1, Quantity = 2, Price = 10.00m }
|
|
});
|
|
|
|
// Assert
|
|
var events = new List<StoredEvent>();
|
|
await foreach (var evt in store.ReadStreamAsync("orders", 0))
|
|
{
|
|
events.Add(evt);
|
|
}
|
|
|
|
Assert.Single(events);
|
|
Assert.Equal("OrderPlacedEvent", events[0].EventType);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Next Steps
|
|
|
|
- Learn about [Persistent Streams](persistent-streams.md) for event sourcing
|
|
- Explore [Ephemeral Streams](ephemeral-streams.md) for message queues
|
|
- Design events with [Events and Workflows](events-and-workflows.md)
|
|
- Configure [Subscriptions](subscriptions.md) for consuming events
|
|
- Use [Consumer Groups](../consumer-groups/getting-started.md) for load balancing
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Use persistent streams for audit logs and event sourcing
|
|
- Use ephemeral streams for background jobs and notifications
|
|
- Acknowledge messages after successful processing
|
|
- Handle deserialization errors gracefully
|
|
- Use correlation IDs for distributed tracing
|
|
- Version your events for schema evolution
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't modify events after appending
|
|
- Don't process messages without acknowledging or nacking
|
|
- Don't store large payloads in events (use references)
|
|
- Don't forget error handling in consumers
|
|
- Don't block event processing with synchronous I/O
|
|
- Don't skip checkpointing in long-running consumers
|
|
|
|
## See Also
|
|
|
|
- [Event Streaming Overview](../README.md)
|
|
- [Persistent Streams](persistent-streams.md)
|
|
- [Ephemeral Streams](ephemeral-streams.md)
|
|
- [PostgreSQL Storage](../storage/postgresql-storage.md)
|
|
- [Consumer Groups](../consumer-groups/README.md)
|