dotnet-cqrs/docs/event-streaming/storage/database-schema.md

10 KiB

Database Schema

PostgreSQL database schema for event streaming.

Overview

The PostgreSQL storage implementation uses a carefully designed schema optimized for event sourcing, message queuing, and consumer group coordination.

Core Tables

events (Persistent Streams)

Stores events in append-only log format:

CREATE TABLE events (
    offset BIGSERIAL PRIMARY KEY,
    event_id TEXT NOT NULL UNIQUE,
    stream_name TEXT NOT NULL,
    event_type TEXT NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB,
    timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    correlation_id TEXT,
    causation_id TEXT,
    version INTEGER NOT NULL DEFAULT 1
);

CREATE INDEX idx_events_stream_name ON events(stream_name);
CREATE INDEX idx_events_timestamp ON events(timestamp);
CREATE INDEX idx_events_event_type ON events(event_type);
CREATE INDEX idx_events_correlation_id ON events(correlation_id) WHERE correlation_id IS NOT NULL;

Columns:

  • offset - Sequential number, auto-incrementing
  • event_id - Unique identifier (GUID)
  • stream_name - Name of the stream
  • event_type - Full type name (e.g., "OrderPlacedEvent")
  • data - JSON event payload
  • metadata - Additional metadata (JSON)
  • timestamp - When event was appended
  • correlation_id - Links related events
  • causation_id - Event/command that caused this event
  • version - Event schema version

messages (Ephemeral Streams)

Stores messages for queue semantics:

CREATE TABLE messages (
    offset BIGSERIAL PRIMARY KEY,
    message_id TEXT NOT NULL UNIQUE,
    stream_name TEXT NOT NULL,
    data JSONB NOT NULL,
    metadata JSONB,
    enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    visibility_timeout TIMESTAMPTZ,
    delivery_attempts INTEGER NOT NULL DEFAULT 0,
    max_delivery_attempts INTEGER NOT NULL DEFAULT 5,
    dead_letter_stream TEXT
);

CREATE INDEX idx_messages_stream_visibility ON messages(stream_name, visibility_timeout);
CREATE INDEX idx_messages_visibility_timeout ON messages(visibility_timeout);

Columns:

  • offset - Sequential number
  • message_id - Unique identifier
  • stream_name - Queue name
  • data - JSON message payload
  • visibility_timeout - When message becomes visible again
  • delivery_attempts - How many times dequeued
  • max_delivery_attempts - Move to DLQ after this many attempts
  • dead_letter_stream - Where to move failed messages

Consumer Groups

consumer_offsets

Tracks consumer group positions:

CREATE TABLE consumer_offsets (
    stream_name TEXT NOT NULL,
    group_id TEXT NOT NULL,
    consumer_id TEXT NOT NULL,
    offset BIGINT NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (stream_name, group_id, consumer_id)
);

CREATE INDEX idx_consumer_offsets_group ON consumer_offsets(stream_name, group_id);

Usage:

-- Get consumer position
SELECT offset FROM consumer_offsets
WHERE stream_name = 'orders'
  AND group_id = 'order-processing'
  AND consumer_id = 'worker-1';

-- Commit offset
INSERT INTO consumer_offsets (stream_name, group_id, consumer_id, offset)
VALUES ('orders', 'order-processing', 'worker-1', 1000)
ON CONFLICT (stream_name, group_id, consumer_id)
DO UPDATE SET offset = EXCLUDED.offset, updated_at = NOW();

consumer_registrations

Tracks active consumers with heartbeats:

CREATE TABLE consumer_registrations (
    stream_name TEXT NOT NULL,
    group_id TEXT NOT NULL,
    consumer_id TEXT NOT NULL,
    registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    session_timeout_ms INTEGER NOT NULL DEFAULT 30000,
    PRIMARY KEY (stream_name, group_id, consumer_id)
);

CREATE INDEX idx_consumer_registrations_heartbeat ON consumer_registrations(last_heartbeat);

Heartbeat Function:

CREATE OR REPLACE FUNCTION update_consumer_heartbeat(
    p_stream_name TEXT,
    p_group_id TEXT,
    p_consumer_id TEXT
) RETURNS VOID AS $$
BEGIN
    UPDATE consumer_registrations
    SET last_heartbeat = NOW()
    WHERE stream_name = p_stream_name
      AND group_id = p_group_id
      AND consumer_id = p_consumer_id;
END;
$$ LANGUAGE plpgsql;

Cleanup Function:

CREATE OR REPLACE FUNCTION cleanup_stale_consumers()
RETURNS INTEGER AS $$
DECLARE
    deleted_count INTEGER;
BEGIN
    DELETE FROM consumer_registrations
    WHERE last_heartbeat < NOW() - (session_timeout_ms || ' milliseconds')::INTERVAL;

    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;

Retention Policies

retention_policies

Stores retention policy configuration:

CREATE TABLE retention_policies (
    stream_name TEXT PRIMARY KEY,
    max_age_seconds INTEGER,
    max_event_count INTEGER,
    enabled BOOLEAN NOT NULL DEFAULT TRUE,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_retention_policies_enabled ON retention_policies(enabled);

Apply Retention Functions:

-- Time-based retention
CREATE OR REPLACE FUNCTION apply_time_retention(
    p_stream_name TEXT,
    p_max_age_seconds INTEGER
) RETURNS INTEGER AS $$
DECLARE
    deleted_count INTEGER;
BEGIN
    DELETE FROM events
    WHERE stream_name = p_stream_name
      AND timestamp < NOW() - (p_max_age_seconds || ' seconds')::INTERVAL;

    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;

-- Size-based retention
CREATE OR REPLACE FUNCTION apply_size_retention(
    p_stream_name TEXT,
    p_max_event_count INTEGER
) RETURNS INTEGER AS $$
DECLARE
    deleted_count INTEGER;
BEGIN
    DELETE FROM events
    WHERE stream_name = p_stream_name
      AND offset < (
          SELECT MAX(offset) - p_max_event_count
          FROM events
          WHERE stream_name = p_stream_name
      );

    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;

Stream Configuration

stream_configurations

Per-stream configuration:

CREATE TABLE stream_configurations (
    stream_name TEXT PRIMARY KEY,
    retention_config JSONB,
    dead_letter_config JSONB,
    lifecycle_config JSONB,
    performance_config JSONB,
    access_control_config JSONB,
    tags JSONB,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Example Configuration:

INSERT INTO stream_configurations (stream_name, retention_config, performance_config)
VALUES (
    'orders',
    '{"maxAge": "90.00:00:00", "maxSizeBytes": 10737418240}'::JSONB,
    '{"batchSize": 1000, "enableCompression": true}'::JSONB
);

Views

consumer_group_status

View for monitoring consumer health:

CREATE VIEW consumer_group_status AS
SELECT
    cr.stream_name,
    cr.group_id,
    cr.consumer_id,
    co.offset as current_offset,
    (SELECT MAX(offset) FROM events WHERE stream_name = cr.stream_name) as stream_head,
    (SELECT MAX(offset) FROM events WHERE stream_name = cr.stream_name) - COALESCE(co.offset, 0) as lag,
    cr.last_heartbeat,
    CASE
        WHEN cr.last_heartbeat < NOW() - (cr.session_timeout_ms || ' milliseconds')::INTERVAL
        THEN true
        ELSE false
    END as is_stale
FROM consumer_registrations cr
LEFT JOIN consumer_offsets co
    ON cr.stream_name = co.stream_name
    AND cr.group_id = co.group_id
    AND cr.consumer_id = co.consumer_id;

Usage:

-- Monitor consumer lag
SELECT * FROM consumer_group_status
WHERE lag > 1000
ORDER BY lag DESC;

-- Find stale consumers
SELECT * FROM consumer_group_status
WHERE is_stale = true;

retention_policy_status

View for retention policy monitoring:

CREATE VIEW retention_policy_status AS
SELECT
    rp.stream_name,
    rp.max_age_seconds,
    rp.max_event_count,
    rp.enabled,
    (SELECT COUNT(*) FROM events WHERE stream_name = rp.stream_name) as current_event_count,
    (SELECT MIN(timestamp) FROM events WHERE stream_name = rp.stream_name) as oldest_event,
    (SELECT MAX(timestamp) FROM events WHERE stream_name = rp.stream_name) as newest_event
FROM retention_policies rp;

Indexes

Performance Indexes

-- Stream reads (most common query)
CREATE INDEX idx_events_stream_offset ON events(stream_name, offset);

-- Correlation queries
CREATE INDEX idx_events_correlation ON events(correlation_id)
WHERE correlation_id IS NOT NULL;

-- Time-based queries
CREATE INDEX idx_events_timestamp_stream ON events(timestamp, stream_name);

-- Message queue dequeue (critical for performance)
CREATE INDEX idx_messages_dequeue ON messages(stream_name, visibility_timeout)
WHERE visibility_timeout IS NOT NULL;

Partial Indexes

-- Only index visible messages
CREATE INDEX idx_messages_visible ON messages(stream_name, offset)
WHERE visibility_timeout IS NULL OR visibility_timeout < NOW();

-- Only index active consumers
CREATE INDEX idx_consumers_active ON consumer_registrations(stream_name, group_id)
WHERE last_heartbeat > NOW() - INTERVAL '5 minutes';

Partitioning (Optional)

For very large event stores, consider partitioning:

-- Partition events by stream_name
CREATE TABLE events_partitioned (
    LIKE events INCLUDING ALL
) PARTITION BY HASH (stream_name);

CREATE TABLE events_partition_0 PARTITION OF events_partitioned
    FOR VALUES WITH (MODULUS 4, REMAINDER 0);

CREATE TABLE events_partition_1 PARTITION OF events_partitioned
    FOR VALUES WITH (MODULUS 4, REMAINDER 1);

CREATE TABLE events_partition_2 PARTITION OF events_partitioned
    FOR VALUES WITH (MODULUS 4, REMAINDER 2);

CREATE TABLE events_partition_3 PARTITION OF events_partitioned
    FOR VALUES WITH (MODULUS 4, REMAINDER 3);

Or partition by time:

-- Partition events by month
CREATE TABLE events_partitioned (
    LIKE events INCLUDING ALL
) PARTITION BY RANGE (timestamp);

CREATE TABLE events_2025_01 PARTITION OF events_partitioned
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE events_2025_02 PARTITION OF events_partitioned
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

Maintenance

Vacuum

-- Regular vacuum
VACUUM ANALYZE events;
VACUUM ANALYZE messages;

-- Full vacuum (reclaims disk space)
VACUUM FULL events;

Reindex

-- Rebuild indexes
REINDEX TABLE events;
REINDEX TABLE messages;

Statistics

-- Update statistics
ANALYZE events;
ANALYZE messages;
ANALYZE consumer_offsets;

See Also