-- Migration 002: Consumer Groups Support -- Adds consumer group coordination and offset tracking -- Consumer registrations table for tracking active consumers CREATE TABLE IF NOT EXISTS event_streaming.consumer_registrations ( group_id VARCHAR(255) NOT NULL, consumer_id VARCHAR(255) NOT NULL, registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(), metadata JSONB, PRIMARY KEY (group_id, consumer_id) ); CREATE INDEX IF NOT EXISTS idx_consumer_heartbeat ON event_streaming.consumer_registrations(group_id, last_heartbeat); COMMENT ON TABLE event_streaming.consumer_registrations IS 'Tracks active consumers in consumer groups with heartbeat monitoring'; COMMENT ON COLUMN event_streaming.consumer_registrations.group_id IS 'Consumer group identifier'; COMMENT ON COLUMN event_streaming.consumer_registrations.consumer_id IS 'Individual consumer identifier within the group'; COMMENT ON COLUMN event_streaming.consumer_registrations.last_heartbeat IS 'Last heartbeat timestamp from this consumer'; COMMENT ON COLUMN event_streaming.consumer_registrations.metadata IS 'Optional consumer metadata (hostname, version, etc.)'; -- Stored function for cleaning up stale consumers CREATE OR REPLACE FUNCTION event_streaming.cleanup_stale_consumers(timeout_seconds INT) RETURNS TABLE(group_id VARCHAR, consumer_id VARCHAR) AS $$ BEGIN RETURN QUERY DELETE FROM event_streaming.consumer_registrations WHERE last_heartbeat < NOW() - (timeout_seconds || ' seconds')::INTERVAL RETURNING event_streaming.consumer_registrations.group_id, event_streaming.consumer_registrations.consumer_id; END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION event_streaming.cleanup_stale_consumers IS 'Removes stale consumers that havent sent heartbeats within the timeout period'; -- View for consumer group status monitoring CREATE OR REPLACE VIEW event_streaming.consumer_group_status AS SELECT cr.group_id, cr.consumer_id, cr.registered_at, cr.last_heartbeat, co.stream_name, co.offset AS committed_offset, co.committed_at, CASE WHEN cr.last_heartbeat > NOW() - INTERVAL '30 seconds' THEN 'active' ELSE 'stale' END AS status FROM event_streaming.consumer_registrations cr LEFT JOIN event_streaming.consumer_offsets co ON cr.group_id = co.group_id AND cr.consumer_id = co.consumer_id; COMMENT ON VIEW event_streaming.consumer_group_status IS 'Provides comprehensive view of consumer group status including offsets and health'; -- Add additional index on consumer_offsets for group lookups CREATE INDEX IF NOT EXISTS idx_consumer_offsets_group_stream ON event_streaming.consumer_offsets(group_id, stream_name); -- Migration version tracking INSERT INTO event_streaming.schema_version (version, description, applied_at) VALUES (2, 'Consumer Groups Support', NOW()) ON CONFLICT (version) DO NOTHING;