200 lines
6.9 KiB
PL/PgSQL
200 lines
6.9 KiB
PL/PgSQL
-- Migration 003: Retention Policies
|
|
-- Adds automatic retention policy enforcement for event streams
|
|
|
|
-- Retention policies table
|
|
CREATE TABLE IF NOT EXISTS event_streaming.retention_policies (
|
|
stream_name VARCHAR(255) PRIMARY KEY,
|
|
max_age_seconds INT, -- NULL = no time-based retention
|
|
max_event_count BIGINT, -- NULL = no size-based retention
|
|
enabled BOOLEAN NOT NULL DEFAULT true,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_retention_policies_enabled
|
|
ON event_streaming.retention_policies(enabled)
|
|
WHERE enabled = true;
|
|
|
|
COMMENT ON TABLE event_streaming.retention_policies IS
|
|
'Retention policies for event streams. stream_name="*" is the default policy for all streams.';
|
|
|
|
COMMENT ON COLUMN event_streaming.retention_policies.stream_name IS
|
|
'Stream name or "*" for default policy';
|
|
|
|
COMMENT ON COLUMN event_streaming.retention_policies.max_age_seconds IS
|
|
'Maximum age in seconds. Events older than this are deleted. NULL = no time-based retention.';
|
|
|
|
COMMENT ON COLUMN event_streaming.retention_policies.max_event_count IS
|
|
'Maximum number of events to retain. Oldest events beyond this count are deleted. NULL = no size-based retention.';
|
|
|
|
-- Default retention policy (no retention by default)
|
|
INSERT INTO event_streaming.retention_policies (stream_name, max_age_seconds, max_event_count, enabled)
|
|
VALUES ('*', NULL, NULL, false)
|
|
ON CONFLICT (stream_name) DO NOTHING;
|
|
|
|
-- Function to apply time-based retention for a specific stream
|
|
CREATE OR REPLACE FUNCTION event_streaming.apply_time_retention(
|
|
p_stream_name VARCHAR,
|
|
p_max_age_seconds INT
|
|
)
|
|
RETURNS BIGINT AS $$
|
|
DECLARE
|
|
deleted_count BIGINT;
|
|
cutoff_time TIMESTAMPTZ;
|
|
BEGIN
|
|
cutoff_time := NOW() - (p_max_age_seconds || ' seconds')::INTERVAL;
|
|
|
|
DELETE FROM event_streaming.event_store
|
|
WHERE stream_name = p_stream_name
|
|
AND stored_at < cutoff_time;
|
|
|
|
GET DIAGNOSTICS deleted_count = ROW_COUNT;
|
|
RETURN deleted_count;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION event_streaming.apply_time_retention IS
|
|
'Delete events older than max_age_seconds for a specific stream';
|
|
|
|
-- Function to apply size-based retention for a specific stream
|
|
CREATE OR REPLACE FUNCTION event_streaming.apply_size_retention(
|
|
p_stream_name VARCHAR,
|
|
p_max_event_count BIGINT
|
|
)
|
|
RETURNS BIGINT AS $$
|
|
DECLARE
|
|
deleted_count BIGINT;
|
|
current_count BIGINT;
|
|
events_to_delete BIGINT;
|
|
BEGIN
|
|
-- Count current events
|
|
SELECT COUNT(*) INTO current_count
|
|
FROM event_streaming.event_store
|
|
WHERE stream_name = p_stream_name;
|
|
|
|
-- Calculate how many to delete
|
|
events_to_delete := current_count - p_max_event_count;
|
|
|
|
IF events_to_delete <= 0 THEN
|
|
RETURN 0;
|
|
END IF;
|
|
|
|
-- Delete oldest events beyond max count (by offset)
|
|
DELETE FROM event_streaming.event_store
|
|
WHERE id IN (
|
|
SELECT id
|
|
FROM event_streaming.event_store
|
|
WHERE stream_name = p_stream_name
|
|
ORDER BY offset ASC
|
|
LIMIT events_to_delete
|
|
);
|
|
|
|
GET DIAGNOSTICS deleted_count = ROW_COUNT;
|
|
RETURN deleted_count;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION event_streaming.apply_size_retention IS
|
|
'Delete oldest events beyond max_event_count for a specific stream';
|
|
|
|
-- Function to apply all retention policies
|
|
CREATE OR REPLACE FUNCTION event_streaming.apply_all_retention_policies()
|
|
RETURNS TABLE(stream_name VARCHAR, events_deleted BIGINT) AS $$
|
|
DECLARE
|
|
policy RECORD;
|
|
time_deleted BIGINT;
|
|
size_deleted BIGINT;
|
|
total_deleted BIGINT;
|
|
all_streams CURSOR FOR
|
|
SELECT DISTINCT es.stream_name
|
|
FROM event_streaming.event_store es;
|
|
BEGIN
|
|
-- Process each policy
|
|
FOR policy IN
|
|
SELECT rp.stream_name, rp.max_age_seconds, rp.max_event_count
|
|
FROM event_streaming.retention_policies rp
|
|
WHERE rp.enabled = true
|
|
AND (rp.max_age_seconds IS NOT NULL OR rp.max_event_count IS NOT NULL)
|
|
LOOP
|
|
time_deleted := 0;
|
|
size_deleted := 0;
|
|
total_deleted := 0;
|
|
|
|
-- Handle wildcard policy (applies to all streams)
|
|
IF policy.stream_name = '*' THEN
|
|
-- Apply time-based retention to all streams
|
|
IF policy.max_age_seconds IS NOT NULL THEN
|
|
FOR stream_rec IN all_streams LOOP
|
|
SELECT event_streaming.apply_time_retention(stream_rec.stream_name, policy.max_age_seconds)
|
|
INTO time_deleted;
|
|
total_deleted := total_deleted + time_deleted;
|
|
END LOOP;
|
|
END IF;
|
|
|
|
-- Size-based retention doesn't make sense for wildcard policy
|
|
-- It would need to be per-stream
|
|
ELSE
|
|
-- Apply to specific stream
|
|
IF policy.max_age_seconds IS NOT NULL THEN
|
|
SELECT event_streaming.apply_time_retention(policy.stream_name, policy.max_age_seconds)
|
|
INTO time_deleted;
|
|
END IF;
|
|
|
|
IF policy.max_event_count IS NOT NULL THEN
|
|
SELECT event_streaming.apply_size_retention(policy.stream_name, policy.max_event_count)
|
|
INTO size_deleted;
|
|
END IF;
|
|
|
|
total_deleted := time_deleted + size_deleted;
|
|
END IF;
|
|
|
|
IF total_deleted > 0 THEN
|
|
stream_name := policy.stream_name;
|
|
events_deleted := total_deleted;
|
|
RETURN NEXT;
|
|
END IF;
|
|
END LOOP;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION event_streaming.apply_all_retention_policies IS
|
|
'Apply all enabled retention policies and return statistics. Called by background service.';
|
|
|
|
-- View for retention policy status and monitoring
|
|
CREATE OR REPLACE VIEW event_streaming.retention_policy_status AS
|
|
SELECT
|
|
rp.stream_name,
|
|
rp.max_age_seconds,
|
|
rp.max_event_count,
|
|
rp.enabled,
|
|
rp.created_at,
|
|
rp.updated_at,
|
|
COUNT(es.id) AS current_event_count,
|
|
MIN(es.stored_at) AS oldest_event_time,
|
|
MAX(es.stored_at) AS newest_event_time,
|
|
EXTRACT(EPOCH FROM (NOW() - MIN(es.stored_at))) AS oldest_age_seconds,
|
|
CASE
|
|
WHEN rp.max_age_seconds IS NOT NULL
|
|
AND MIN(es.stored_at) < NOW() - (rp.max_age_seconds || ' seconds')::INTERVAL
|
|
THEN true
|
|
ELSE false
|
|
END AS has_events_exceeding_max_age,
|
|
CASE
|
|
WHEN rp.max_event_count IS NOT NULL
|
|
AND COUNT(es.id) > rp.max_event_count
|
|
THEN true
|
|
ELSE false
|
|
END AS has_events_exceeding_max_count
|
|
FROM event_streaming.retention_policies rp
|
|
LEFT JOIN event_streaming.event_store es ON es.stream_name = rp.stream_name
|
|
WHERE rp.stream_name != '*'
|
|
GROUP BY rp.stream_name, rp.max_age_seconds, rp.max_event_count, rp.enabled, rp.created_at, rp.updated_at;
|
|
|
|
COMMENT ON VIEW event_streaming.retention_policy_status IS
|
|
'Monitor retention policy enforcement status. Shows streams with events exceeding retention limits.';
|
|
|
|
-- Migration version tracking
|
|
INSERT INTO event_streaming.schema_version (version, description, applied_at)
|
|
VALUES (3, 'Retention Policies', NOW())
|
|
ON CONFLICT (version) DO NOTHING;
|