dotnet-cqrs/Svrnty.CQRS.Events.ConsumerGroups/Migrations/002_ConsumerGroups.sql

77 lines
2.9 KiB
PL/PgSQL

-- Migration 002: Consumer Groups Support
-- Adds consumer group coordination and offset tracking
-- Consumer registrations table for tracking active consumers
CREATE TABLE IF NOT EXISTS event_streaming.consumer_registrations (
group_id VARCHAR(255) NOT NULL,
consumer_id VARCHAR(255) NOT NULL,
registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB,
PRIMARY KEY (group_id, consumer_id)
);
CREATE INDEX IF NOT EXISTS idx_consumer_heartbeat
ON event_streaming.consumer_registrations(group_id, last_heartbeat);
COMMENT ON TABLE event_streaming.consumer_registrations IS
'Tracks active consumers in consumer groups with heartbeat monitoring';
COMMENT ON COLUMN event_streaming.consumer_registrations.group_id IS
'Consumer group identifier';
COMMENT ON COLUMN event_streaming.consumer_registrations.consumer_id IS
'Individual consumer identifier within the group';
COMMENT ON COLUMN event_streaming.consumer_registrations.last_heartbeat IS
'Last heartbeat timestamp from this consumer';
COMMENT ON COLUMN event_streaming.consumer_registrations.metadata IS
'Optional consumer metadata (hostname, version, etc.)';
-- Stored function for cleaning up stale consumers
CREATE OR REPLACE FUNCTION event_streaming.cleanup_stale_consumers(timeout_seconds INT)
RETURNS TABLE(group_id VARCHAR, consumer_id VARCHAR) AS $$
BEGIN
RETURN QUERY
DELETE FROM event_streaming.consumer_registrations
WHERE last_heartbeat < NOW() - (timeout_seconds || ' seconds')::INTERVAL
RETURNING event_streaming.consumer_registrations.group_id,
event_streaming.consumer_registrations.consumer_id;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION event_streaming.cleanup_stale_consumers IS
'Removes stale consumers that havent sent heartbeats within the timeout period';
-- View for consumer group status monitoring
CREATE OR REPLACE VIEW event_streaming.consumer_group_status AS
SELECT
cr.group_id,
cr.consumer_id,
cr.registered_at,
cr.last_heartbeat,
co.stream_name,
co.offset AS committed_offset,
co.committed_at,
CASE
WHEN cr.last_heartbeat > NOW() - INTERVAL '30 seconds' THEN 'active'
ELSE 'stale'
END AS status
FROM event_streaming.consumer_registrations cr
LEFT JOIN event_streaming.consumer_offsets co
ON cr.group_id = co.group_id
AND cr.consumer_id = co.consumer_id;
COMMENT ON VIEW event_streaming.consumer_group_status IS
'Provides comprehensive view of consumer group status including offsets and health';
-- Add additional index on consumer_offsets for group lookups
CREATE INDEX IF NOT EXISTS idx_consumer_offsets_group_stream
ON event_streaming.consumer_offsets(group_id, stream_name);
-- Migration version tracking
INSERT INTO event_streaming.schema_version (version, description, applied_at)
VALUES (2, 'Consumer Groups Support', NOW())
ON CONFLICT (version) DO NOTHING;