dotnet-cqrs/Svrnty.CQRS.Events.PostgreSQL/Migrations/001_InitialSchema.sql

327 lines
10 KiB
PL/PgSQL

-- ============================================================================
-- Svrnty.CQRS Event Streaming - PostgreSQL Schema
-- Phase 2.2: Persistent and Ephemeral Stream Storage
-- ============================================================================
-- Create schema
CREATE SCHEMA IF NOT EXISTS event_streaming;
-- ============================================================================
-- PERSISTENT STREAMS (Event Sourcing / Event Log)
-- ============================================================================
-- Main events table with append-only semantics
CREATE TABLE IF NOT EXISTS event_streaming.events (
-- Primary identification
stream_name VARCHAR(255) NOT NULL,
offset BIGINT NOT NULL,
-- Event metadata
event_id VARCHAR(255) NOT NULL,
event_type VARCHAR(500) NOT NULL,
correlation_id VARCHAR(255) NOT NULL,
-- Event data (JSONB for queryability)
event_data JSONB NOT NULL,
-- Timestamps
occurred_at TIMESTAMPTZ NOT NULL,
stored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Optimistic concurrency control
version INT NOT NULL DEFAULT 1,
-- Constraints
PRIMARY KEY (stream_name, offset),
UNIQUE (event_id)
);
-- Indexes for efficient queries
CREATE INDEX IF NOT EXISTS idx_events_stream_name
ON event_streaming.events (stream_name);
CREATE INDEX IF NOT EXISTS idx_events_correlation_id
ON event_streaming.events (correlation_id);
CREATE INDEX IF NOT EXISTS idx_events_event_type
ON event_streaming.events (event_type);
CREATE INDEX IF NOT EXISTS idx_events_occurred_at
ON event_streaming.events (occurred_at DESC);
CREATE INDEX IF NOT EXISTS idx_events_stored_at
ON event_streaming.events (stored_at DESC);
-- JSONB index for querying event data
CREATE INDEX IF NOT EXISTS idx_events_event_data_gin
ON event_streaming.events USING GIN (event_data);
-- Stream metadata view
CREATE OR REPLACE VIEW event_streaming.stream_metadata AS
SELECT
stream_name,
COUNT(*) as length,
MIN(offset) as oldest_event_offset,
MAX(offset) as newest_event_offset,
MIN(occurred_at) as oldest_event_timestamp,
MAX(occurred_at) as newest_event_timestamp,
MIN(stored_at) as first_stored_at,
MAX(stored_at) as last_stored_at
FROM event_streaming.events
GROUP BY stream_name;
-- ============================================================================
-- EPHEMERAL STREAMS (Message Queue)
-- ============================================================================
-- Queue events table (messages are deleted after acknowledgment)
CREATE TABLE IF NOT EXISTS event_streaming.queue_events (
-- Primary identification
id BIGSERIAL PRIMARY KEY,
stream_name VARCHAR(255) NOT NULL,
-- Event metadata
event_id VARCHAR(255) NOT NULL UNIQUE,
event_type VARCHAR(500) NOT NULL,
correlation_id VARCHAR(255) NOT NULL,
-- Event data (JSONB for queryability)
event_data JSONB NOT NULL,
-- Queue metadata
enqueued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delivery_count INT NOT NULL DEFAULT 0,
-- Timestamps
occurred_at TIMESTAMPTZ NOT NULL,
-- Constraints
UNIQUE (event_id)
);
-- Indexes for queue operations
CREATE INDEX IF NOT EXISTS idx_queue_events_stream_name
ON event_streaming.queue_events (stream_name, enqueued_at);
CREATE INDEX IF NOT EXISTS idx_queue_events_event_id
ON event_streaming.queue_events (event_id);
-- ============================================================================
-- IN-FLIGHT EVENT TRACKING (Visibility Timeout)
-- ============================================================================
CREATE TABLE IF NOT EXISTS event_streaming.in_flight_events (
-- Primary identification
event_id VARCHAR(255) PRIMARY KEY,
stream_name VARCHAR(255) NOT NULL,
-- Consumer tracking
consumer_id VARCHAR(255) NOT NULL,
-- Visibility timeout
visible_after TIMESTAMPTZ NOT NULL,
dequeued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Delivery tracking
delivery_count INT NOT NULL DEFAULT 1,
-- Event reference (for requeue)
queue_event_id BIGINT NOT NULL,
FOREIGN KEY (queue_event_id) REFERENCES event_streaming.queue_events(id) ON DELETE CASCADE
);
-- Index for timeout cleanup
CREATE INDEX IF NOT EXISTS idx_in_flight_visible_after
ON event_streaming.in_flight_events (visible_after);
CREATE INDEX IF NOT EXISTS idx_in_flight_consumer
ON event_streaming.in_flight_events (consumer_id);
-- ============================================================================
-- DEAD LETTER QUEUE
-- ============================================================================
CREATE TABLE IF NOT EXISTS event_streaming.dead_letter_queue (
-- Primary identification
id BIGSERIAL PRIMARY KEY,
stream_name VARCHAR(255) NOT NULL,
-- Event metadata
event_id VARCHAR(255) NOT NULL,
event_type VARCHAR(500) NOT NULL,
correlation_id VARCHAR(255) NOT NULL,
-- Event data
event_data JSONB NOT NULL,
-- Failure tracking
original_enqueued_at TIMESTAMPTZ NOT NULL,
moved_to_dlq_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
delivery_attempts INT NOT NULL,
last_error TEXT,
last_consumer_id VARCHAR(255),
-- Original occurrence timestamp
occurred_at TIMESTAMPTZ NOT NULL
);
-- Indexes for DLQ operations
CREATE INDEX IF NOT EXISTS idx_dlq_stream_name
ON event_streaming.dead_letter_queue (stream_name, moved_to_dlq_at DESC);
CREATE INDEX IF NOT EXISTS idx_dlq_event_id
ON event_streaming.dead_letter_queue (event_id);
-- ============================================================================
-- CONSUMER OFFSETS (Phase 2.3 - Offset Tracking)
-- ============================================================================
CREATE TABLE IF NOT EXISTS event_streaming.consumer_offsets (
-- Composite key
subscription_id VARCHAR(255) NOT NULL,
consumer_id VARCHAR(255) NOT NULL,
stream_name VARCHAR(255) NOT NULL,
-- Position tracking
current_offset BIGINT NOT NULL,
-- Timestamps
last_updated TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Constraints
PRIMARY KEY (subscription_id, consumer_id, stream_name)
);
-- Index for consumer queries
CREATE INDEX IF NOT EXISTS idx_consumer_offsets_subscription
ON event_streaming.consumer_offsets (subscription_id, stream_name);
-- ============================================================================
-- RETENTION POLICY TRACKING (Phase 2.4)
-- ============================================================================
CREATE TABLE IF NOT EXISTS event_streaming.retention_policies (
stream_name VARCHAR(255) PRIMARY KEY,
-- Time-based retention
retention_days INT,
-- Size-based retention
max_size_bytes BIGINT,
-- Count-based retention
max_event_count BIGINT,
-- Tracking
deleted_event_count BIGINT NOT NULL DEFAULT 0,
last_cleanup_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- ============================================================================
-- FUNCTIONS AND TRIGGERS
-- ============================================================================
-- Function to automatically increment offset for new events
CREATE OR REPLACE FUNCTION event_streaming.get_next_offset(p_stream_name VARCHAR)
RETURNS BIGINT AS $$
DECLARE
v_next_offset BIGINT;
BEGIN
SELECT COALESCE(MAX(offset) + 1, 0)
INTO v_next_offset
FROM event_streaming.events
WHERE stream_name = p_stream_name;
RETURN v_next_offset;
END;
$$ LANGUAGE plpgsql;
-- Function to clean up expired in-flight events (visibility timeout)
CREATE OR REPLACE FUNCTION event_streaming.cleanup_expired_in_flight()
RETURNS TABLE(requeued_count INT) AS $$
DECLARE
v_requeued_count INT := 0;
BEGIN
-- Move expired in-flight events back to queue
WITH expired AS (
DELETE FROM event_streaming.in_flight_events
WHERE visible_after <= NOW()
RETURNING event_id, queue_event_id, delivery_count
)
UPDATE event_streaming.queue_events q
SET delivery_count = e.delivery_count
FROM expired e
WHERE q.id = e.queue_event_id;
GET DIAGNOSTICS v_requeued_count = ROW_COUNT;
RETURN QUERY SELECT v_requeued_count;
END;
$$ LANGUAGE plpgsql;
-- ============================================================================
-- PARTITIONING (Optional - for large datasets)
-- ============================================================================
-- Note: Partitioning is optional and should be enabled via configuration
-- This is a template for monthly partitioning on the events table
-- To enable partitioning, run:
-- 1. Drop existing events table
-- 2. Recreate as partitioned table
-- 3. Create partitions
-- Example (not executed by default):
/*
-- Drop and recreate as partitioned table
DROP TABLE IF EXISTS event_streaming.events CASCADE;
CREATE TABLE event_streaming.events (
stream_name VARCHAR(255) NOT NULL,
offset BIGINT NOT NULL,
event_id VARCHAR(255) NOT NULL,
event_type VARCHAR(500) NOT NULL,
correlation_id VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
occurred_at TIMESTAMPTZ NOT NULL,
stored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
version INT NOT NULL DEFAULT 1,
PRIMARY KEY (stream_name, offset, stored_at)
) PARTITION BY RANGE (stored_at);
-- Create monthly partitions
CREATE TABLE event_streaming.events_2025_01 PARTITION OF event_streaming.events
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE event_streaming.events_2025_02 PARTITION OF event_streaming.events
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- Continue creating partitions as needed...
*/
-- ============================================================================
-- GRANTS (Adjust as needed for your security model)
-- ============================================================================
-- Grant usage on schema
-- GRANT USAGE ON SCHEMA event_streaming TO your_app_user;
-- Grant permissions on tables
-- GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA event_streaming TO your_app_user;
-- Grant permissions on sequences
-- GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA event_streaming TO your_app_user;
-- ============================================================================
-- MIGRATION COMPLETE
-- ============================================================================
-- Summary
SELECT 'Migration 001 complete - Initial schema' as status,
(SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'event_streaming') as table_count,
(SELECT COUNT(*) FROM information_schema.views WHERE table_schema = 'event_streaming') as view_count;