459 lines
11 KiB
Markdown
459 lines
11 KiB
Markdown
# PostgreSQL Event Streaming - Testing Guide
|
|
|
|
This guide explains how to test the PostgreSQL event streaming implementation in Svrnty.CQRS.
|
|
|
|
## Prerequisites
|
|
|
|
1. **PostgreSQL Server**: You need a running PostgreSQL instance
|
|
- Default connection: `Host=localhost;Port=5432;Database=svrnty_events;Username=postgres;Password=postgres`
|
|
- You can use Docker: `docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres:16`
|
|
|
|
2. **.NET 10 SDK**: Ensure you have .NET 10 installed
|
|
- Check: `dotnet --version`
|
|
|
|
## Configuration
|
|
|
|
The sample application is configured via `Svrnty.Sample/appsettings.json`:
|
|
|
|
```json
|
|
"EventStreaming": {
|
|
"UsePostgreSQL": true,
|
|
"PostgreSQL": {
|
|
"ConnectionString": "Host=localhost;Port=5432;Database=svrnty_events;Username=postgres;Password=postgres",
|
|
"SchemaName": "event_streaming",
|
|
"AutoMigrate": true,
|
|
"MaxPoolSize": 100,
|
|
"MinPoolSize": 5
|
|
}
|
|
}
|
|
```
|
|
|
|
**Configuration Options:**
|
|
- `UsePostgreSQL`: Set to `true` to use PostgreSQL, `false` for in-memory storage
|
|
- `ConnectionString`: PostgreSQL connection string
|
|
- `SchemaName`: Database schema name (default: `event_streaming`)
|
|
- `AutoMigrate`: Automatically create database schema on startup (default: `true`)
|
|
- `MaxPoolSize`: Maximum connection pool size (default: `100`)
|
|
- `MinPoolSize`: Minimum connection pool size (default: `5`)
|
|
|
|
## Quick Start
|
|
|
|
### Option 1: Using Docker PostgreSQL
|
|
|
|
```bash
|
|
# Start PostgreSQL
|
|
docker run -d --name svrnty-postgres \
|
|
-p 5432:5432 \
|
|
-e POSTGRES_PASSWORD=postgres \
|
|
-e POSTGRES_DB=svrnty_events \
|
|
postgres:16
|
|
|
|
# Wait for PostgreSQL to be ready
|
|
sleep 5
|
|
|
|
# Run the sample application
|
|
cd /Users/mathias/Documents/workspaces/svrnty/dotnet-cqrs
|
|
dotnet run --project Svrnty.Sample
|
|
```
|
|
|
|
### Option 2: Using Existing PostgreSQL
|
|
|
|
If you already have PostgreSQL running:
|
|
|
|
1. Update the connection string in `Svrnty.Sample/appsettings.json`
|
|
2. Run: `dotnet run --project Svrnty.Sample`
|
|
|
|
The database schema will be created automatically on first startup (if `AutoMigrate` is `true`).
|
|
|
|
## Testing Persistent Streams (Event Sourcing)
|
|
|
|
Persistent streams are append-only logs suitable for event sourcing.
|
|
|
|
### Test 1: Append Events via gRPC
|
|
|
|
```bash
|
|
# Terminal 1: Start the application
|
|
dotnet run --project Svrnty.Sample
|
|
|
|
# Terminal 2: Test persistent stream append
|
|
grpcurl -d '{
|
|
"streamName": "user-123",
|
|
"events": [
|
|
{
|
|
"eventType": "UserCreated",
|
|
"eventId": "evt-001",
|
|
"correlationId": "corr-001",
|
|
"eventData": "{\"name\":\"Alice\",\"email\":\"alice@example.com\"}",
|
|
"occurredAt": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
|
|
}
|
|
]
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/AppendToStream
|
|
```
|
|
|
|
**Expected Response:**
|
|
```json
|
|
{
|
|
"offsets": ["0"]
|
|
}
|
|
```
|
|
|
|
### Test 2: Read Stream Events
|
|
|
|
```bash
|
|
grpcurl -d '{
|
|
"streamName": "user-123",
|
|
"fromOffset": "0"
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/ReadStream
|
|
```
|
|
|
|
**Expected Response:**
|
|
```json
|
|
{
|
|
"events": [
|
|
{
|
|
"eventId": "evt-001",
|
|
"eventType": "UserCreated",
|
|
"correlationId": "corr-001",
|
|
"eventData": "{\"name\":\"Alice\",\"email\":\"alice@example.com\"}",
|
|
"occurredAt": "2025-12-09T...",
|
|
"offset": "0"
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
### Test 3: Get Stream Length
|
|
|
|
```bash
|
|
grpcurl -d '{
|
|
"streamName": "user-123"
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/GetStreamLength
|
|
```
|
|
|
|
**Expected Response:**
|
|
```json
|
|
{
|
|
"length": "1"
|
|
}
|
|
```
|
|
|
|
### Test 4: Verify PostgreSQL Storage
|
|
|
|
Connect to PostgreSQL and verify the data:
|
|
|
|
```bash
|
|
# Using psql
|
|
psql -h localhost -U postgres -d svrnty_events
|
|
|
|
# Query persistent events
|
|
SELECT stream_name, offset, event_id, event_type, occurred_at, stored_at
|
|
FROM event_streaming.events
|
|
WHERE stream_name = 'user-123'
|
|
ORDER BY offset;
|
|
|
|
# Check stream metadata view
|
|
SELECT * FROM event_streaming.stream_metadata
|
|
WHERE stream_name = 'user-123';
|
|
```
|
|
|
|
## Testing Ephemeral Streams (Message Queue)
|
|
|
|
Ephemeral streams provide message queue semantics with visibility timeout and dead letter queue support.
|
|
|
|
### Test 5: Enqueue Events
|
|
|
|
```bash
|
|
grpcurl -d '{
|
|
"streamName": "notifications",
|
|
"events": [
|
|
{
|
|
"eventType": "EmailNotification",
|
|
"eventId": "email-001",
|
|
"correlationId": "corr-002",
|
|
"eventData": "{\"to\":\"user@example.com\",\"subject\":\"Welcome\"}",
|
|
"occurredAt": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
|
|
},
|
|
{
|
|
"eventType": "SMSNotification",
|
|
"eventId": "sms-001",
|
|
"correlationId": "corr-003",
|
|
"eventData": "{\"phone\":\"+1234567890\",\"message\":\"Welcome!\"}",
|
|
"occurredAt": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
|
|
}
|
|
]
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/EnqueueEvents
|
|
```
|
|
|
|
### Test 6: Dequeue Events (At-Least-Once Semantics)
|
|
|
|
```bash
|
|
# Dequeue first message
|
|
grpcurl -d '{
|
|
"streamName": "notifications",
|
|
"consumerId": "worker-1",
|
|
"visibilityTimeout": "30s",
|
|
"maxCount": 1
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/DequeueEvents
|
|
```
|
|
|
|
**Expected Response:**
|
|
```json
|
|
{
|
|
"events": [
|
|
{
|
|
"eventId": "email-001",
|
|
"eventType": "EmailNotification",
|
|
...
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
### Test 7: Acknowledge Event (Success)
|
|
|
|
```bash
|
|
grpcurl -d '{
|
|
"streamName": "notifications",
|
|
"eventId": "email-001",
|
|
"consumerId": "worker-1"
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/AcknowledgeEvent
|
|
```
|
|
|
|
This removes the event from the queue.
|
|
|
|
### Test 8: Negative Acknowledge (Failure)
|
|
|
|
```bash
|
|
# Dequeue next message
|
|
grpcurl -d '{
|
|
"streamName": "notifications",
|
|
"consumerId": "worker-2",
|
|
"visibilityTimeout": "30s",
|
|
"maxCount": 1
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/DequeueEvents
|
|
|
|
# Simulate processing failure - nack the message
|
|
grpcurl -d '{
|
|
"streamName": "notifications",
|
|
"eventId": "sms-001",
|
|
"consumerId": "worker-2",
|
|
"requeue": true
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/NegativeAcknowledgeEvent
|
|
```
|
|
|
|
The event will be requeued and available for dequeue again.
|
|
|
|
### Test 9: Dead Letter Queue
|
|
|
|
```bash
|
|
# Verify DLQ behavior (after max delivery attempts)
|
|
psql -h localhost -U postgres -d svrnty_events -c "
|
|
SELECT event_id, event_type, moved_at, reason, delivery_count
|
|
FROM event_streaming.dead_letter_queue
|
|
ORDER BY moved_at DESC;"
|
|
```
|
|
|
|
### Test 10: Get Pending Count
|
|
|
|
```bash
|
|
grpcurl -d '{
|
|
"streamName": "notifications"
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/GetPendingCount
|
|
```
|
|
|
|
### Test 11: Verify Visibility Timeout
|
|
|
|
```bash
|
|
# Dequeue a message
|
|
grpcurl -d '{
|
|
"streamName": "test-queue",
|
|
"consumerId": "worker-3",
|
|
"visibilityTimeout": "5s",
|
|
"maxCount": 1
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/DequeueEvents
|
|
|
|
# Immediately try to dequeue again (should get nothing - message is in-flight)
|
|
grpcurl -d '{
|
|
"streamName": "test-queue",
|
|
"consumerId": "worker-4",
|
|
"visibilityTimeout": "5s",
|
|
"maxCount": 1
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/DequeueEvents
|
|
|
|
# Wait 6 seconds and try again (should get the message - timeout expired)
|
|
sleep 6
|
|
grpcurl -d '{
|
|
"streamName": "test-queue",
|
|
"consumerId": "worker-4",
|
|
"visibilityTimeout": "5s",
|
|
"maxCount": 1
|
|
}' \
|
|
-plaintext localhost:6000 \
|
|
svrnty.cqrs.events.EventStreamService/DequeueEvents
|
|
```
|
|
|
|
## Database Schema Verification
|
|
|
|
After running the application with `AutoMigrate: true`, verify the schema was created:
|
|
|
|
```bash
|
|
psql -h localhost -U postgres -d svrnty_events
|
|
```
|
|
|
|
```sql
|
|
-- List all tables in event_streaming schema
|
|
\dt event_streaming.*
|
|
|
|
-- Expected tables:
|
|
-- events
|
|
-- queue_events
|
|
-- in_flight_events
|
|
-- dead_letter_queue
|
|
-- consumer_offsets
|
|
-- retention_policies
|
|
|
|
-- Check table structures
|
|
\d event_streaming.events
|
|
\d event_streaming.queue_events
|
|
\d event_streaming.in_flight_events
|
|
|
|
-- View stream metadata
|
|
SELECT * FROM event_streaming.stream_metadata;
|
|
|
|
-- Check stored function
|
|
\df event_streaming.get_next_offset
|
|
|
|
-- Check indexes
|
|
\di event_streaming.*
|
|
```
|
|
|
|
## Performance Testing
|
|
|
|
### Bulk Insert Performance
|
|
|
|
```bash
|
|
# Create a test script
|
|
cat > test_bulk_insert.sh << 'SCRIPT'
|
|
#!/bin/bash
|
|
for i in {1..100}; do
|
|
grpcurl -d "{
|
|
\"streamName\": \"perf-test\",
|
|
\"events\": [
|
|
{
|
|
\"eventType\": \"TestEvent\",
|
|
\"eventId\": \"evt-$i\",
|
|
\"correlationId\": \"corr-$i\",
|
|
\"eventData\": \"{\\\"iteration\\\":$i}\",
|
|
\"occurredAt\": \"$(date -u +%Y-%m-%dT%H:%M:%SZ)\"
|
|
}
|
|
]
|
|
}" -plaintext localhost:6000 svrnty.cqrs.events.EventStreamService/AppendToStream
|
|
done
|
|
SCRIPT
|
|
|
|
chmod +x test_bulk_insert.sh
|
|
time ./test_bulk_insert.sh
|
|
```
|
|
|
|
### Query Performance
|
|
|
|
```sql
|
|
-- Enable timing
|
|
\timing
|
|
|
|
-- Query event count
|
|
SELECT COUNT(*) FROM event_streaming.events;
|
|
|
|
-- Query by stream name (should use index)
|
|
EXPLAIN ANALYZE
|
|
SELECT * FROM event_streaming.events
|
|
WHERE stream_name = 'perf-test'
|
|
ORDER BY offset;
|
|
|
|
-- Query by event ID (should use unique index)
|
|
EXPLAIN ANALYZE
|
|
SELECT * FROM event_streaming.events
|
|
WHERE event_id = 'evt-50';
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### Connection Issues
|
|
|
|
If you see connection errors:
|
|
|
|
1. Verify PostgreSQL is running: `pg_isready -h localhost -p 5432`
|
|
2. Check connection string in `appsettings.json`
|
|
3. Verify database exists: `psql -h localhost -U postgres -l`
|
|
4. Check logs: Look for `Svrnty.CQRS.Events.PostgreSQL` log entries
|
|
|
|
### Schema Creation Issues
|
|
|
|
If auto-migration fails:
|
|
|
|
1. Check PostgreSQL logs: `docker logs svrnty-postgres`
|
|
2. Manually create schema: `psql -h localhost -U postgres -d svrnty_events -f Svrnty.CQRS.Events.PostgreSQL/Migrations/001_InitialSchema.sql`
|
|
3. Verify permissions: User needs CREATE TABLE, CREATE INDEX, CREATE FUNCTION privileges
|
|
|
|
### Type Resolution Errors
|
|
|
|
If you see "Could not resolve event type" warnings:
|
|
|
|
- Ensure your event classes are in the same assembly or referenced assemblies
|
|
- Event type names are stored as fully qualified names (e.g., `MyApp.Events.UserCreated, MyApp`)
|
|
- For testing, use events defined in Svrnty.Sample
|
|
|
|
## Switching Between Storage Backends
|
|
|
|
To switch back to in-memory storage:
|
|
|
|
```json
|
|
"EventStreaming": {
|
|
"UsePostgreSQL": false
|
|
}
|
|
```
|
|
|
|
Or comment out the PostgreSQL configuration block in `appsettings.json`.
|
|
|
|
## Cleanup
|
|
|
|
```bash
|
|
# Stop and remove Docker container
|
|
docker stop svrnty-postgres
|
|
docker rm svrnty-postgres
|
|
|
|
# Or drop the database
|
|
psql -h localhost -U postgres -c "DROP DATABASE IF EXISTS svrnty_events;"
|
|
```
|
|
|
|
## Next Steps
|
|
|
|
After verifying the PostgreSQL implementation:
|
|
|
|
1. **Phase 2.3**: Implement Consumer Offset Tracking (IConsumerOffsetStore)
|
|
2. **Phase 2.4**: Implement Retention Policies
|
|
3. **Phase 2.5**: Add Event Replay API
|
|
4. **Phase 2.6**: Add Stream Configuration Extensions
|