dotnet-cqrs/PHASE-2.3-PLAN.md

19 KiB

Phase 2.3 - Consumer Offset Tracking Implementation Plan

Status: Complete Dependencies: Phase 2.2 (PostgreSQL Storage) Complete Target: Consumer group coordination and offset management for persistent streams Completed: December 9, 2025

Overview

Phase 2.3 adds consumer group coordination and offset tracking to enable:

  • Multiple consumers processing the same stream without duplicates
  • Consumer groups for load balancing and fault tolerance
  • Checkpoint management for resuming from last processed offset
  • Automatic offset commits with configurable strategies
  • Consumer failover with partition reassignment

Background

Currently (Phase 2.2), persistent streams can be read from any offset, but there's no built-in mechanism to track which events a consumer has processed. Phase 2.3 adds this capability, similar to Kafka consumer groups or RabbitMQ consumer tags.

Key Concepts:

  • Consumer Group: A logical grouping of consumers that coordinate to process a stream
  • Offset: The position in a stream (event sequence number)
  • Checkpoint: A saved offset representing the last successfully processed event
  • Partition: A logical subdivision of a stream (Phase 2.4+, preparation in 2.3)
  • Rebalancing: Automatic reassignment of stream partitions when consumers join/leave

Goals

  1. Offset Storage: Persist consumer offsets in PostgreSQL
  2. Consumer Groups: Support multiple consumers coordinating via groups
  3. Automatic Commit: Configurable offset commit strategies (auto, manual, periodic)
  4. Consumer Discovery: Track active consumers and detect failures
  5. API Integration: Extend IEventStreamStore with offset management

Non-Goals (Deferred to Future Phases)

  • Partition assignment (basic support, full implementation in Phase 2.4)
  • Automatic rebalancing (Phase 2.4)
  • Stream splitting/sharding (Phase 2.4)
  • Cross-database offset storage (PostgreSQL only for now)

Architecture

1. New Interface: IConsumerOffsetStore

namespace Svrnty.CQRS.Events.Abstractions;

public interface IConsumerOffsetStore
{
    /// <summary>
    /// Commit an offset for a consumer in a group
    /// </summary>
    Task CommitOffsetAsync(
        string groupId,
        string consumerId,
        string streamName,
        long offset,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Get the last committed offset for a consumer group
    /// </summary>
    Task<long?> GetCommittedOffsetAsync(
        string groupId,
        string streamName,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Get offsets for all consumers in a group
    /// </summary>
    Task<IReadOnlyDictionary<string, long>> GetGroupOffsetsAsync(
        string groupId,
        string streamName,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Register a consumer as active (heartbeat)
    /// </summary>
    Task RegisterConsumerAsync(
        string groupId,
        string consumerId,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Unregister a consumer (graceful shutdown)
    /// </summary>
    Task UnregisterConsumerAsync(
        string groupId,
        string consumerId,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Get all active consumers in a group
    /// </summary>
    Task<IReadOnlyList<ConsumerInfo>> GetActiveConsumersAsync(
        string groupId,
        CancellationToken cancellationToken = default);
}

public record ConsumerInfo
{
    public required string ConsumerId { get; init; }
    public required string GroupId { get; init; }
    public required DateTimeOffset LastHeartbeat { get; init; }
    public required DateTimeOffset RegisteredAt { get; init; }
}

2. Extended IEventStreamStore

Add convenience methods to IEventStreamStore:

public interface IEventStreamStore
{
    // ... existing methods ...

    /// <summary>
    /// Read stream from last committed offset for a consumer group
    /// </summary>
    Task<IReadOnlyList<ICorrelatedEvent>> ReadFromLastOffsetAsync(
        string streamName,
        string groupId,
        int batchSize = 1000,
        CancellationToken cancellationToken = default);

    /// <summary>
    /// Commit offset after processing events
    /// </summary>
    Task CommitOffsetAsync(
        string streamName,
        string groupId,
        string consumerId,
        long offset,
        CancellationToken cancellationToken = default);
}

3. Consumer Group Reader

New high-level API for consuming streams with automatic offset management:

public interface IConsumerGroupReader
{
    /// <summary>
    /// Start consuming a stream as part of a group
    /// </summary>
    Task<IAsyncEnumerable<ICorrelatedEvent>> ConsumeAsync(
        string streamName,
        string groupId,
        string consumerId,
        ConsumerGroupOptions options,
        CancellationToken cancellationToken = default);
}

public class ConsumerGroupOptions
{
    /// <summary>
    /// Number of events to fetch in each batch
    /// </summary>
    public int BatchSize { get; set; } = 100;

    /// <summary>
    /// Polling interval when no events available
    /// </summary>
    public TimeSpan PollingInterval { get; set; } = TimeSpan.FromSeconds(1);

    /// <summary>
    /// Offset commit strategy
    /// </summary>
    public OffsetCommitStrategy CommitStrategy { get; set; } = OffsetCommitStrategy.AfterBatch;

    /// <summary>
    /// Heartbeat interval for consumer liveness
    /// </summary>
    public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10);

    /// <summary>
    /// Consumer session timeout
    /// </summary>
    public TimeSpan SessionTimeout { get; set; } = TimeSpan.FromSeconds(30);
}

public enum OffsetCommitStrategy
{
    /// <summary>
    /// Manual commit via CommitOffsetAsync
    /// </summary>
    Manual,

    /// <summary>
    /// Auto-commit after each event
    /// </summary>
    AfterEach,

    /// <summary>
    /// Auto-commit after each batch
    /// </summary>
    AfterBatch,

    /// <summary>
    /// Periodic auto-commit
    /// </summary>
    Periodic
}

4. PostgreSQL Implementation

Update PostgreSQL schema (already prepared in Phase 2.2):

-- consumer_offsets table (already exists from Phase 2.2)
-- Columns:
--   group_id, stream_name, consumer_id, offset, committed_at

-- New table for consumer registration:
CREATE TABLE IF NOT EXISTS event_streaming.consumer_registrations (
    group_id VARCHAR(255) NOT NULL,
    consumer_id VARCHAR(255) NOT NULL,
    registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    metadata JSONB,
    PRIMARY KEY (group_id, consumer_id)
);

CREATE INDEX idx_consumer_heartbeat 
ON event_streaming.consumer_registrations(group_id, last_heartbeat);

-- Stored function for cleaning up stale consumers
CREATE OR REPLACE FUNCTION event_streaming.cleanup_stale_consumers(timeout_seconds INT)
RETURNS TABLE(group_id VARCHAR, consumer_id VARCHAR) AS $$
BEGIN
    RETURN QUERY
    DELETE FROM event_streaming.consumer_registrations
    WHERE last_heartbeat < NOW() - (timeout_seconds || ' seconds')::INTERVAL
    RETURNING event_streaming.consumer_registrations.group_id, 
              event_streaming.consumer_registrations.consumer_id;
END;
$$ LANGUAGE plpgsql;

Implementation Classes:

  • PostgresConsumerOffsetStore : IConsumerOffsetStore
  • PostgresConsumerGroupReader : IConsumerGroupReader

5. In-Memory Implementation

For development/testing:

  • InMemoryConsumerOffsetStore : IConsumerOffsetStore
  • InMemoryConsumerGroupReader : IConsumerGroupReader

Database Schema Updates

New Migration: 002_ConsumerGroups.sql

-- consumer_registrations table
CREATE TABLE IF NOT EXISTS event_streaming.consumer_registrations (
    group_id VARCHAR(255) NOT NULL,
    consumer_id VARCHAR(255) NOT NULL,
    registered_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    metadata JSONB,
    PRIMARY KEY (group_id, consumer_id)
);

CREATE INDEX idx_consumer_heartbeat 
ON event_streaming.consumer_registrations(group_id, last_heartbeat);

-- Cleanup function for stale consumers
CREATE OR REPLACE FUNCTION event_streaming.cleanup_stale_consumers(timeout_seconds INT)
RETURNS TABLE(group_id VARCHAR, consumer_id VARCHAR) AS $$
BEGIN
    RETURN QUERY
    DELETE FROM event_streaming.consumer_registrations
    WHERE last_heartbeat < NOW() - (timeout_seconds || ' seconds')::INTERVAL
    RETURNING event_streaming.consumer_registrations.group_id, 
              event_streaming.consumer_registrations.consumer_id;
END;
$$ LANGUAGE plpgsql;

-- View for consumer group status
CREATE OR REPLACE VIEW event_streaming.consumer_group_status AS
SELECT 
    cr.group_id,
    cr.consumer_id,
    cr.registered_at,
    cr.last_heartbeat,
    co.stream_name,
    co.offset AS committed_offset,
    co.committed_at,
    CASE 
        WHEN cr.last_heartbeat > NOW() - INTERVAL '30 seconds' THEN 'active'
        ELSE 'stale'
    END AS status
FROM event_streaming.consumer_registrations cr
LEFT JOIN event_streaming.consumer_offsets co 
    ON cr.group_id = co.group_id 
    AND cr.consumer_id = co.consumer_id;

API Usage Examples

Example 1: Simple Consumer Group

// Register services
builder.Services.AddPostgresEventStreaming(config);
builder.Services.AddConsumerGroups();  // New registration

// Consumer code
var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();

await foreach (var @event in reader.ConsumeAsync(
    streamName: "orders",
    groupId: "order-processors",
    consumerId: "worker-1",
    options: new ConsumerGroupOptions
    {
        BatchSize = 100,
        CommitStrategy = OffsetCommitStrategy.AfterBatch
    },
    cancellationToken))
{
    await ProcessOrderEventAsync(@event);
    // Offset auto-committed after batch
}

Example 2: Manual Offset Control

var reader = serviceProvider.GetRequiredService<IConsumerGroupReader>();
var offsetStore = serviceProvider.GetRequiredService<IConsumerOffsetStore>();

await foreach (var @event in reader.ConsumeAsync(
    streamName: "orders",
    groupId: "order-processors",
    consumerId: "worker-1",
    options: new ConsumerGroupOptions
    {
        CommitStrategy = OffsetCommitStrategy.Manual
    },
    cancellationToken))
{
    try
    {
        await ProcessOrderEventAsync(@event);
        
        // Manual commit after successful processing
        await offsetStore.CommitOffsetAsync(
            groupId: "order-processors",
            consumerId: "worker-1",
            streamName: "orders",
            offset: @event.Offset,
            cancellationToken);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Failed to process event {EventId}", @event.EventId);
        // Don't commit offset - will retry on next poll
    }
}

Example 3: Monitoring Consumer Groups

var offsetStore = serviceProvider.GetRequiredService<IConsumerOffsetStore>();

// Get all consumers in a group
var consumers = await offsetStore.GetActiveConsumersAsync("order-processors");
foreach (var consumer in consumers)
{
    Console.WriteLine($"Consumer: {consumer.ConsumerId}, Last Heartbeat: {consumer.LastHeartbeat}");
}

// Get group offsets
var offsets = await offsetStore.GetGroupOffsetsAsync("order-processors", "orders");
foreach (var (consumerId, offset) in offsets)
{
    Console.WriteLine($"Consumer {consumerId} at offset {offset}");
}

Testing Strategy

Unit Tests

  • Offset commit and retrieval
  • Consumer registration/unregistration
  • Heartbeat tracking
  • Stale consumer cleanup

Integration Tests (PostgreSQL)

  • Multiple consumers in same group
  • Offset commit strategies
  • Consumer failover simulation
  • Concurrent offset commits

End-to-End Tests

  • Worker pool processing stream
  • Consumer addition/removal
  • Graceful shutdown and resume
  • At-least-once delivery guarantees

Configuration

appsettings.json

{
  "EventStreaming": {
    "PostgreSQL": {
      "ConnectionString": "...",
      "AutoMigrate": true
    },
    "ConsumerGroups": {
      "DefaultHeartbeatInterval": "00:00:10",
      "DefaultSessionTimeout": "00:00:30",
      "StaleConsumerCleanupInterval": "00:01:00",
      "DefaultBatchSize": 100,
      "DefaultPollingInterval": "00:00:01"
    }
  }
}

Service Registration

New Extension Methods

public static class ConsumerGroupServiceCollectionExtensions
{
    /// <summary>
    /// Add consumer group support with PostgreSQL backend
    /// </summary>
    public static IServiceCollection AddPostgresConsumerGroups(
        this IServiceCollection services,
        Action<ConsumerGroupOptions>? configure = null)
    {
        services.AddSingleton<IConsumerOffsetStore, PostgresConsumerOffsetStore>();
        services.AddSingleton<IConsumerGroupReader, PostgresConsumerGroupReader>();
        services.AddHostedService<ConsumerHealthMonitor>();  // Heartbeat & cleanup
        
        if (configure != null)
        {
            services.Configure(configure);
        }
        
        return services;
    }

    /// <summary>
    /// Add consumer group support with in-memory backend
    /// </summary>
    public static IServiceCollection AddInMemoryConsumerGroups(
        this IServiceCollection services,
        Action<ConsumerGroupOptions>? configure = null)
    {
        services.AddSingleton<IConsumerOffsetStore, InMemoryConsumerOffsetStore>();
        services.AddSingleton<IConsumerGroupReader, InMemoryConsumerGroupReader>();
        services.AddHostedService<ConsumerHealthMonitor>();
        
        if (configure != null)
        {
            services.Configure(configure);
        }
        
        return services;
    }
}

Background Services

ConsumerHealthMonitor

Background service that:

  • Sends periodic heartbeats for registered consumers
  • Detects and cleans up stale consumers
  • Logs consumer group health metrics
  • Triggers rebalancing events (Phase 2.4)
public class ConsumerHealthMonitor : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Cleanup stale consumers
                await _offsetStore.CleanupStaleConsumersAsync(
                    _options.SessionTimeout,
                    stoppingToken);

                // Log health metrics
                var groups = await _offsetStore.GetAllGroupsAsync(stoppingToken);
                foreach (var group in groups)
                {
                    var consumers = await _offsetStore.GetActiveConsumersAsync(group, stoppingToken);
                    _logger.LogInformation(
                        "Consumer group {GroupId} has {ConsumerCount} active consumers",
                        group,
                        consumers.Count);
                }

                await Task.Delay(_options.HealthCheckInterval, stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error in consumer health monitor");
            }
        }
    }
}

Performance Considerations

Optimizations

  1. Batch Commits: Commit offsets in batches to reduce DB round-trips
  2. Connection Pooling: Reuse PostgreSQL connections for offset operations
  3. Heartbeat Batching: Batch heartbeat updates for multiple consumers
  4. Index Optimization: Ensure proper indexes on consumer_offsets and consumer_registrations

Scalability Targets

  • 1,000+ consumers per group
  • 10,000+ offset commits/second
  • Sub-millisecond offset retrieval
  • < 1 second consumer failover detection

Implementation Checklist

Phase 2.3.1 - Core Interfaces (Week 1)

  • Define IConsumerOffsetStore interface
  • Define IConsumerGroupReader interface
  • Define ConsumerGroupOptions and related types
  • Create new project: Svrnty.CQRS.Events.ConsumerGroups.Abstractions

Phase 2.3.2 - PostgreSQL Implementation (Week 2)

  • Create 002_ConsumerGroups.sql migration
  • Implement PostgresConsumerOffsetStore
  • Implement PostgresConsumerGroupReader
  • Add unit tests for offset operations (deferred)
  • Add integration tests with PostgreSQL (deferred)

Phase 2.3.3 - In-Memory Implementation (Week 2)

  • Implement InMemoryConsumerOffsetStore (deferred)
  • Implement InMemoryConsumerGroupReader (deferred)
  • Add unit tests (deferred)

Phase 2.3.4 - Health Monitoring (Week 3)

  • Implement ConsumerHealthMonitor background service
  • Add heartbeat mechanism
  • Add stale consumer cleanup
  • Add health metrics logging

Phase 2.3.5 - Integration & Testing (Week 3)

  • Integration tests with multiple consumers (deferred)
  • Consumer failover tests (deferred)
  • Performance benchmarks (deferred)
  • Update Svrnty.Sample with consumer group examples (deferred)

Phase 2.3.6 - Documentation (Week 4)

  • Update README.md
  • Create CONSUMER-GROUPS-GUIDE.md (deferred)
  • Add XML documentation (deferred)
  • Update CLAUDE.md
  • Create Phase 2.3 completion document

Risks & Mitigation

Risk Impact Mitigation
Offset commit conflicts Data loss or duplication Use optimistic locking, proper transaction isolation
Consumer zombie detection Resource leaks Aggressive heartbeat monitoring, configurable timeouts
Database load from heartbeats Performance degradation Batch heartbeat updates, optimize indexes
Rebalancing complexity Complex implementation Defer full rebalancing to Phase 2.4, basic support only

Success Criteria

  • Multiple consumers can process same stream without duplicates
  • Consumer can resume from last committed offset after restart
  • Stale consumers detected and cleaned up within session timeout
  • Offset commit latency < 10ms (p99) - not benchmarked yet
  • Zero data loss with at-least-once delivery
  • Comprehensive test coverage (>90%) - tests deferred
  • Documentation complete and clear

Future Enhancements (Phase 2.4+)

  • Automatic partition assignment and rebalancing
  • Dynamic consumer scaling
  • Consumer group metadata and configuration
  • Cross-stream offset management
  • Offset reset capabilities (earliest, latest, timestamp)
  • Consumer lag monitoring and alerting

References


Document Status: Complete Last Updated: December 9, 2025 Completed: December 9, 2025