164 lines
5.8 KiB
PL/PgSQL
164 lines
5.8 KiB
PL/PgSQL
-- ============================================================================
|
|
-- Svrnty.CQRS Event Streaming - PostgreSQL Schema
|
|
-- Phase 3: Exactly-Once Delivery & Read Receipts - Read Receipts
|
|
-- ============================================================================
|
|
|
|
-- ============================================================================
|
|
-- READ RECEIPTS (Consumer Progress Tracking)
|
|
-- ============================================================================
|
|
|
|
CREATE TABLE IF NOT EXISTS event_streaming.read_receipts (
|
|
-- Composite key
|
|
consumer_id VARCHAR(255) NOT NULL,
|
|
stream_name VARCHAR(255) NOT NULL,
|
|
|
|
-- Last acknowledged event
|
|
last_event_id VARCHAR(255) NOT NULL,
|
|
last_offset BIGINT NOT NULL,
|
|
last_acknowledged_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
|
|
-- Progress tracking
|
|
first_acknowledged_at TIMESTAMPTZ,
|
|
total_acknowledged BIGINT NOT NULL DEFAULT 0,
|
|
|
|
-- Additional metadata (optional)
|
|
consumer_instance VARCHAR(255),
|
|
consumer_metadata JSONB,
|
|
|
|
-- Constraints
|
|
PRIMARY KEY (consumer_id, stream_name)
|
|
);
|
|
|
|
-- Index for stream queries (find all consumers for a stream)
|
|
CREATE INDEX IF NOT EXISTS idx_read_receipts_stream_name
|
|
ON event_streaming.read_receipts (stream_name);
|
|
|
|
-- Index for cleanup operations
|
|
CREATE INDEX IF NOT EXISTS idx_read_receipts_last_acknowledged_at
|
|
ON event_streaming.read_receipts (last_acknowledged_at);
|
|
|
|
-- Index for finding lagging consumers
|
|
CREATE INDEX IF NOT EXISTS idx_read_receipts_stream_offset
|
|
ON event_streaming.read_receipts (stream_name, last_offset);
|
|
|
|
-- ============================================================================
|
|
-- HELPER VIEWS
|
|
-- ============================================================================
|
|
|
|
-- View for monitoring consumer progress
|
|
CREATE OR REPLACE VIEW event_streaming.consumer_progress AS
|
|
SELECT
|
|
consumer_id,
|
|
stream_name,
|
|
last_event_id,
|
|
last_offset,
|
|
last_acknowledged_at,
|
|
first_acknowledged_at,
|
|
total_acknowledged,
|
|
EXTRACT(EPOCH FROM (NOW() - last_acknowledged_at)) as seconds_since_last_ack,
|
|
CASE
|
|
WHEN last_acknowledged_at > NOW() - INTERVAL '1 minute' THEN 'ACTIVE'
|
|
WHEN last_acknowledged_at > NOW() - INTERVAL '5 minutes' THEN 'SLOW'
|
|
WHEN last_acknowledged_at > NOW() - INTERVAL '1 hour' THEN 'STALE'
|
|
ELSE 'DEAD'
|
|
END as health_status
|
|
FROM event_streaming.read_receipts
|
|
ORDER BY stream_name, last_offset DESC;
|
|
|
|
-- View for stream lag analysis
|
|
CREATE OR REPLACE VIEW event_streaming.stream_consumer_lag AS
|
|
SELECT
|
|
rr.stream_name,
|
|
rr.consumer_id,
|
|
rr.last_offset as consumer_offset,
|
|
COALESCE(
|
|
(SELECT MAX(offset) FROM event_streaming.events WHERE stream_name = rr.stream_name),
|
|
0
|
|
) as stream_head_offset,
|
|
COALESCE(
|
|
(SELECT MAX(offset) FROM event_streaming.events WHERE stream_name = rr.stream_name),
|
|
0
|
|
) - rr.last_offset as lag_events,
|
|
rr.last_acknowledged_at,
|
|
EXTRACT(EPOCH FROM (NOW() - rr.last_acknowledged_at)) as lag_seconds
|
|
FROM event_streaming.read_receipts rr
|
|
ORDER BY rr.stream_name, lag_events DESC;
|
|
|
|
-- View for consumer health summary
|
|
CREATE OR REPLACE VIEW event_streaming.consumer_health_summary AS
|
|
SELECT
|
|
consumer_id,
|
|
COUNT(*) as streams_tracked,
|
|
MIN(last_acknowledged_at) as oldest_ack,
|
|
MAX(last_acknowledged_at) as newest_ack,
|
|
SUM(total_acknowledged) as total_events_processed,
|
|
COUNT(*) FILTER (WHERE last_acknowledged_at > NOW() - INTERVAL '1 minute') as active_streams,
|
|
COUNT(*) FILTER (WHERE last_acknowledged_at <= NOW() - INTERVAL '1 hour') as stale_streams
|
|
FROM event_streaming.read_receipts
|
|
GROUP BY consumer_id
|
|
ORDER BY consumer_id;
|
|
|
|
-- ============================================================================
|
|
-- CLEANUP FUNCTIONS
|
|
-- ============================================================================
|
|
|
|
-- Function to clean up old read receipts
|
|
CREATE OR REPLACE FUNCTION event_streaming.cleanup_old_read_receipts(p_older_than TIMESTAMPTZ)
|
|
RETURNS TABLE(deleted_count INT) AS $$
|
|
DECLARE
|
|
v_deleted_count INT := 0;
|
|
BEGIN
|
|
-- Delete read receipts older than specified time
|
|
DELETE FROM event_streaming.read_receipts
|
|
WHERE last_acknowledged_at < p_older_than;
|
|
|
|
GET DIAGNOSTICS v_deleted_count = ROW_COUNT;
|
|
|
|
RETURN QUERY SELECT v_deleted_count;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Function to identify lagging consumers
|
|
CREATE OR REPLACE FUNCTION event_streaming.get_lagging_consumers(
|
|
p_stream_name VARCHAR(255),
|
|
p_lag_threshold_events BIGINT DEFAULT 1000
|
|
)
|
|
RETURNS TABLE(
|
|
consumer_id VARCHAR(255),
|
|
consumer_offset BIGINT,
|
|
stream_head_offset BIGINT,
|
|
lag_events BIGINT,
|
|
lag_seconds NUMERIC
|
|
) AS $$
|
|
BEGIN
|
|
RETURN QUERY
|
|
SELECT
|
|
rr.consumer_id,
|
|
rr.last_offset as consumer_offset,
|
|
COALESCE(
|
|
(SELECT MAX(offset) FROM event_streaming.events WHERE stream_name = p_stream_name),
|
|
0
|
|
) as stream_head_offset,
|
|
COALESCE(
|
|
(SELECT MAX(offset) FROM event_streaming.events WHERE stream_name = p_stream_name),
|
|
0
|
|
) - rr.last_offset as lag_events,
|
|
EXTRACT(EPOCH FROM (NOW() - rr.last_acknowledged_at)) as lag_seconds
|
|
FROM event_streaming.read_receipts rr
|
|
WHERE rr.stream_name = p_stream_name
|
|
AND COALESCE(
|
|
(SELECT MAX(offset) FROM event_streaming.events WHERE stream_name = p_stream_name),
|
|
0
|
|
) - rr.last_offset > p_lag_threshold_events
|
|
ORDER BY lag_events DESC;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- ============================================================================
|
|
-- MIGRATION COMPLETE
|
|
-- ============================================================================
|
|
|
|
-- Summary
|
|
SELECT 'Migration 006 complete - Read Receipts' as status,
|
|
(SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'event_streaming' AND table_name = 'read_receipts') as new_table_count;
|