77 lines
2.9 KiB
PL/PgSQL
77 lines
2.9 KiB
PL/PgSQL
-- Migration 002: Consumer Groups Support
|
|
-- Adds consumer group coordination and offset tracking
|
|
|
|
-- Consumer registrations table for tracking active consumers
|
|
CREATE TABLE IF NOT EXISTS event_streaming.consumer_registrations (
|
|
group_id VARCHAR(255) NOT NULL,
|
|
consumer_id VARCHAR(255) NOT NULL,
|
|
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
metadata JSONB,
|
|
PRIMARY KEY (group_id, consumer_id)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_consumer_heartbeat
|
|
ON event_streaming.consumer_registrations(group_id, last_heartbeat);
|
|
|
|
COMMENT ON TABLE event_streaming.consumer_registrations IS
|
|
'Tracks active consumers in consumer groups with heartbeat monitoring';
|
|
|
|
COMMENT ON COLUMN event_streaming.consumer_registrations.group_id IS
|
|
'Consumer group identifier';
|
|
|
|
COMMENT ON COLUMN event_streaming.consumer_registrations.consumer_id IS
|
|
'Individual consumer identifier within the group';
|
|
|
|
COMMENT ON COLUMN event_streaming.consumer_registrations.last_heartbeat IS
|
|
'Last heartbeat timestamp from this consumer';
|
|
|
|
COMMENT ON COLUMN event_streaming.consumer_registrations.metadata IS
|
|
'Optional consumer metadata (hostname, version, etc.)';
|
|
|
|
-- Stored function for cleaning up stale consumers
|
|
CREATE OR REPLACE FUNCTION event_streaming.cleanup_stale_consumers(timeout_seconds INT)
|
|
RETURNS TABLE(group_id VARCHAR, consumer_id VARCHAR) AS $$
|
|
BEGIN
|
|
RETURN QUERY
|
|
DELETE FROM event_streaming.consumer_registrations
|
|
WHERE last_heartbeat < NOW() - (timeout_seconds || ' seconds')::INTERVAL
|
|
RETURNING event_streaming.consumer_registrations.group_id,
|
|
event_streaming.consumer_registrations.consumer_id;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION event_streaming.cleanup_stale_consumers IS
|
|
'Removes stale consumers that havent sent heartbeats within the timeout period';
|
|
|
|
-- View for consumer group status monitoring
|
|
CREATE OR REPLACE VIEW event_streaming.consumer_group_status AS
|
|
SELECT
|
|
cr.group_id,
|
|
cr.consumer_id,
|
|
cr.registered_at,
|
|
cr.last_heartbeat,
|
|
co.stream_name,
|
|
co.offset AS committed_offset,
|
|
co.committed_at,
|
|
CASE
|
|
WHEN cr.last_heartbeat > NOW() - INTERVAL '30 seconds' THEN 'active'
|
|
ELSE 'stale'
|
|
END AS status
|
|
FROM event_streaming.consumer_registrations cr
|
|
LEFT JOIN event_streaming.consumer_offsets co
|
|
ON cr.group_id = co.group_id
|
|
AND cr.consumer_id = co.consumer_id;
|
|
|
|
COMMENT ON VIEW event_streaming.consumer_group_status IS
|
|
'Provides comprehensive view of consumer group status including offsets and health';
|
|
|
|
-- Add additional index on consumer_offsets for group lookups
|
|
CREATE INDEX IF NOT EXISTS idx_consumer_offsets_group_stream
|
|
ON event_streaming.consumer_offsets(group_id, stream_name);
|
|
|
|
-- Migration version tracking
|
|
INSERT INTO event_streaming.schema_version (version, description, applied_at)
|
|
VALUES (2, 'Consumer Groups Support', NOW())
|
|
ON CONFLICT (version) DO NOTHING;
|