-- ============================================================================ -- Svrnty.CQRS Event Streaming - PostgreSQL Schema -- Phase 3: Exactly-Once Delivery & Idempotency Store -- ============================================================================ -- ============================================================================ -- PROCESSED EVENTS (Duplicate Detection) -- ============================================================================ CREATE TABLE IF NOT EXISTS event_streaming.processed_events ( -- Composite key consumer_id VARCHAR(255) NOT NULL, event_id VARCHAR(255) NOT NULL, -- Processing metadata processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), -- Additional context (optional) processing_duration_ms INT, processor_instance VARCHAR(255), -- Constraints PRIMARY KEY (consumer_id, event_id) ); -- Index for cleanup operations CREATE INDEX IF NOT EXISTS idx_processed_events_processed_at ON event_streaming.processed_events (processed_at); -- Index for consumer queries CREATE INDEX IF NOT EXISTS idx_processed_events_consumer_id ON event_streaming.processed_events (consumer_id); -- ============================================================================ -- IDEMPOTENCY LOCKS (Distributed Locking) -- ============================================================================ CREATE TABLE IF NOT EXISTS event_streaming.idempotency_locks ( -- Primary key lock_key VARCHAR(255) PRIMARY KEY, -- Lock metadata acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), expires_at TIMESTAMPTZ NOT NULL, -- Who acquired the lock acquired_by VARCHAR(255), -- Lock payload (optional metadata) lock_data JSONB ); -- Index for expiration cleanup CREATE INDEX IF NOT EXISTS idx_idempotency_locks_expires_at ON event_streaming.idempotency_locks (expires_at); -- ============================================================================ -- CLEANUP FUNCTIONS -- ============================================================================ -- Function to clean up expired idempotency locks CREATE OR REPLACE FUNCTION event_streaming.cleanup_expired_idempotency_locks() RETURNS TABLE(deleted_count INT) AS $$ DECLARE v_deleted_count INT := 0; BEGIN -- Delete expired locks DELETE FROM event_streaming.idempotency_locks WHERE expires_at <= NOW(); GET DIAGNOSTICS v_deleted_count = ROW_COUNT; RETURN QUERY SELECT v_deleted_count; END; $$ LANGUAGE plpgsql; -- Function to clean up old processed events CREATE OR REPLACE FUNCTION event_streaming.cleanup_old_processed_events(p_older_than TIMESTAMPTZ) RETURNS TABLE(deleted_count INT) AS $$ DECLARE v_deleted_count INT := 0; BEGIN -- Delete processed events older than specified time DELETE FROM event_streaming.processed_events WHERE processed_at < p_older_than; GET DIAGNOSTICS v_deleted_count = ROW_COUNT; RETURN QUERY SELECT v_deleted_count; END; $$ LANGUAGE plpgsql; -- ============================================================================ -- HELPER VIEWS -- ============================================================================ -- View for monitoring idempotency lock usage CREATE OR REPLACE VIEW event_streaming.idempotency_lock_status AS SELECT lock_key, acquired_at, expires_at, acquired_by, CASE WHEN expires_at <= NOW() THEN 'EXPIRED' ELSE 'ACTIVE' END as status, EXTRACT(EPOCH FROM (expires_at - acquired_at)) as lock_duration_seconds, EXTRACT(EPOCH FROM (expires_at - NOW())) as remaining_seconds FROM event_streaming.idempotency_locks; -- View for processed events statistics CREATE OR REPLACE VIEW event_streaming.processed_events_stats AS SELECT consumer_id, COUNT(*) as total_processed, MIN(processed_at) as first_processed_at, MAX(processed_at) as last_processed_at, AVG(processing_duration_ms) as avg_processing_duration_ms FROM event_streaming.processed_events GROUP BY consumer_id; -- ============================================================================ -- MIGRATION COMPLETE -- ============================================================================ -- Summary SELECT 'Migration 005 complete - Idempotency Store' as status, (SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'event_streaming' AND table_name IN ('processed_events', 'idempotency_locks')) as new_table_count;