-- ============================================================================ -- Svrnty.CQRS Event Streaming - PostgreSQL Schema -- Phase 2.2: Persistent and Ephemeral Stream Storage -- ============================================================================ -- Create schema CREATE SCHEMA IF NOT EXISTS event_streaming; -- ============================================================================ -- PERSISTENT STREAMS (Event Sourcing / Event Log) -- ============================================================================ -- Main events table with append-only semantics CREATE TABLE IF NOT EXISTS event_streaming.events ( -- Primary identification stream_name VARCHAR(255) NOT NULL, offset BIGINT NOT NULL, -- Event metadata event_id VARCHAR(255) NOT NULL, event_type VARCHAR(500) NOT NULL, correlation_id VARCHAR(255) NOT NULL, -- Event data (JSONB for queryability) event_data JSONB NOT NULL, -- Timestamps occurred_at TIMESTAMPTZ NOT NULL, stored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- Optimistic concurrency control version INT NOT NULL DEFAULT 1, -- Constraints PRIMARY KEY (stream_name, offset), UNIQUE (event_id) ); -- Indexes for efficient queries CREATE INDEX IF NOT EXISTS idx_events_stream_name ON event_streaming.events (stream_name); CREATE INDEX IF NOT EXISTS idx_events_correlation_id ON event_streaming.events (correlation_id); CREATE INDEX IF NOT EXISTS idx_events_event_type ON event_streaming.events (event_type); CREATE INDEX IF NOT EXISTS idx_events_occurred_at ON event_streaming.events (occurred_at DESC); CREATE INDEX IF NOT EXISTS idx_events_stored_at ON event_streaming.events (stored_at DESC); -- JSONB index for querying event data CREATE INDEX IF NOT EXISTS idx_events_event_data_gin ON event_streaming.events USING GIN (event_data); -- Stream metadata view CREATE OR REPLACE VIEW event_streaming.stream_metadata AS SELECT stream_name, COUNT(*) as length, MIN(offset) as oldest_event_offset, MAX(offset) as newest_event_offset, MIN(occurred_at) as oldest_event_timestamp, MAX(occurred_at) as newest_event_timestamp, MIN(stored_at) as first_stored_at, MAX(stored_at) as last_stored_at FROM event_streaming.events GROUP BY stream_name; -- ============================================================================ -- EPHEMERAL STREAMS (Message Queue) -- ============================================================================ -- Queue events table (messages are deleted after acknowledgment) CREATE TABLE IF NOT EXISTS event_streaming.queue_events ( -- Primary identification id BIGSERIAL PRIMARY KEY, stream_name VARCHAR(255) NOT NULL, -- Event metadata event_id VARCHAR(255) NOT NULL UNIQUE, event_type VARCHAR(500) NOT NULL, correlation_id VARCHAR(255) NOT NULL, -- Event data (JSONB for queryability) event_data JSONB NOT NULL, -- Queue metadata enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), delivery_count INT NOT NULL DEFAULT 0, -- Timestamps occurred_at TIMESTAMPTZ NOT NULL, -- Constraints UNIQUE (event_id) ); -- Indexes for queue operations CREATE INDEX IF NOT EXISTS idx_queue_events_stream_name ON event_streaming.queue_events (stream_name, enqueued_at); CREATE INDEX IF NOT EXISTS idx_queue_events_event_id ON event_streaming.queue_events (event_id); -- ============================================================================ -- IN-FLIGHT EVENT TRACKING (Visibility Timeout) -- ============================================================================ CREATE TABLE IF NOT EXISTS event_streaming.in_flight_events ( -- Primary identification event_id VARCHAR(255) PRIMARY KEY, stream_name VARCHAR(255) NOT NULL, -- Consumer tracking consumer_id VARCHAR(255) NOT NULL, -- Visibility timeout visible_after TIMESTAMPTZ NOT NULL, dequeued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- Delivery tracking delivery_count INT NOT NULL DEFAULT 1, -- Event reference (for requeue) queue_event_id BIGINT NOT NULL, FOREIGN KEY (queue_event_id) REFERENCES event_streaming.queue_events(id) ON DELETE CASCADE ); -- Index for timeout cleanup CREATE INDEX IF NOT EXISTS idx_in_flight_visible_after ON event_streaming.in_flight_events (visible_after); CREATE INDEX IF NOT EXISTS idx_in_flight_consumer ON event_streaming.in_flight_events (consumer_id); -- ============================================================================ -- DEAD LETTER QUEUE -- ============================================================================ CREATE TABLE IF NOT EXISTS event_streaming.dead_letter_queue ( -- Primary identification id BIGSERIAL PRIMARY KEY, stream_name VARCHAR(255) NOT NULL, -- Event metadata event_id VARCHAR(255) NOT NULL, event_type VARCHAR(500) NOT NULL, correlation_id VARCHAR(255) NOT NULL, -- Event data event_data JSONB NOT NULL, -- Failure tracking original_enqueued_at TIMESTAMPTZ NOT NULL, moved_to_dlq_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), delivery_attempts INT NOT NULL, last_error TEXT, last_consumer_id VARCHAR(255), -- Original occurrence timestamp occurred_at TIMESTAMPTZ NOT NULL ); -- Indexes for DLQ operations CREATE INDEX IF NOT EXISTS idx_dlq_stream_name ON event_streaming.dead_letter_queue (stream_name, moved_to_dlq_at DESC); CREATE INDEX IF NOT EXISTS idx_dlq_event_id ON event_streaming.dead_letter_queue (event_id); -- ============================================================================ -- CONSUMER OFFSETS (Phase 2.3 - Offset Tracking) -- ============================================================================ CREATE TABLE IF NOT EXISTS event_streaming.consumer_offsets ( -- Composite key subscription_id VARCHAR(255) NOT NULL, consumer_id VARCHAR(255) NOT NULL, stream_name VARCHAR(255) NOT NULL, -- Position tracking current_offset BIGINT NOT NULL, -- Timestamps last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- Constraints PRIMARY KEY (subscription_id, consumer_id, stream_name) ); -- Index for consumer queries CREATE INDEX IF NOT EXISTS idx_consumer_offsets_subscription ON event_streaming.consumer_offsets (subscription_id, stream_name); -- ============================================================================ -- RETENTION POLICY TRACKING (Phase 2.4) -- ============================================================================ CREATE TABLE IF NOT EXISTS event_streaming.retention_policies ( stream_name VARCHAR(255) PRIMARY KEY, -- Time-based retention retention_days INT, -- Size-based retention max_size_bytes BIGINT, -- Count-based retention max_event_count BIGINT, -- Tracking deleted_event_count BIGINT NOT NULL DEFAULT 0, last_cleanup_at TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- ============================================================================ -- FUNCTIONS AND TRIGGERS -- ============================================================================ -- Function to automatically increment offset for new events CREATE OR REPLACE FUNCTION event_streaming.get_next_offset(p_stream_name VARCHAR) RETURNS BIGINT AS $$ DECLARE v_next_offset BIGINT; BEGIN SELECT COALESCE(MAX(offset) + 1, 0) INTO v_next_offset FROM event_streaming.events WHERE stream_name = p_stream_name; RETURN v_next_offset; END; $$ LANGUAGE plpgsql; -- Function to clean up expired in-flight events (visibility timeout) CREATE OR REPLACE FUNCTION event_streaming.cleanup_expired_in_flight() RETURNS TABLE(requeued_count INT) AS $$ DECLARE v_requeued_count INT := 0; BEGIN -- Move expired in-flight events back to queue WITH expired AS ( DELETE FROM event_streaming.in_flight_events WHERE visible_after <= NOW() RETURNING event_id, queue_event_id, delivery_count ) UPDATE event_streaming.queue_events q SET delivery_count = e.delivery_count FROM expired e WHERE q.id = e.queue_event_id; GET DIAGNOSTICS v_requeued_count = ROW_COUNT; RETURN QUERY SELECT v_requeued_count; END; $$ LANGUAGE plpgsql; -- ============================================================================ -- PARTITIONING (Optional - for large datasets) -- ============================================================================ -- Note: Partitioning is optional and should be enabled via configuration -- This is a template for monthly partitioning on the events table -- To enable partitioning, run: -- 1. Drop existing events table -- 2. Recreate as partitioned table -- 3. Create partitions -- Example (not executed by default): /* -- Drop and recreate as partitioned table DROP TABLE IF EXISTS event_streaming.events CASCADE; CREATE TABLE event_streaming.events ( stream_name VARCHAR(255) NOT NULL, offset BIGINT NOT NULL, event_id VARCHAR(255) NOT NULL, event_type VARCHAR(500) NOT NULL, correlation_id VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, occurred_at TIMESTAMPTZ NOT NULL, stored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), version INT NOT NULL DEFAULT 1, PRIMARY KEY (stream_name, offset, stored_at) ) PARTITION BY RANGE (stored_at); -- Create monthly partitions CREATE TABLE event_streaming.events_2025_01 PARTITION OF event_streaming.events FOR VALUES FROM ('2025-01-01') TO ('2025-02-01'); CREATE TABLE event_streaming.events_2025_02 PARTITION OF event_streaming.events FOR VALUES FROM ('2025-02-01') TO ('2025-03-01'); -- Continue creating partitions as needed... */ -- ============================================================================ -- GRANTS (Adjust as needed for your security model) -- ============================================================================ -- Grant usage on schema -- GRANT USAGE ON SCHEMA event_streaming TO your_app_user; -- Grant permissions on tables -- GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA event_streaming TO your_app_user; -- Grant permissions on sequences -- GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA event_streaming TO your_app_user; -- ============================================================================ -- MIGRATION COMPLETE -- ============================================================================ -- Summary SELECT 'Migration 001 complete - Initial schema' as status, (SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'event_streaming') as table_count, (SELECT COUNT(*) FROM information_schema.views WHERE table_schema = 'event_streaming') as view_count;