dotnet-cqrs/PHASE-2.4-PLAN.md

20 KiB

Phase 2.4 - Retention Policies Implementation Plan

Status: Complete Completed: 2025-12-10 Dependencies: Phase 2.2 (PostgreSQL Storage) , Phase 2.3 (Consumer Groups) Target: Automatic retention policies with time-based and size-based cleanup for persistent streams

Note: Table partitioning (Phase 2.4.4) has been deferred to a future phase as it requires data migration and is not critical for initial release.

Overview

Phase 2.4 adds automatic retention policies to manage event stream lifecycle and prevent unbounded growth. This enables:

  • Time-based retention: Automatically delete events older than a specified duration (e.g., 30 days)
  • Size-based retention: Keep only the most recent N events per stream
  • Automatic cleanup: Background service to enforce retention policies
  • Table partitioning: PostgreSQL partitioning for better performance with large volumes
  • Per-stream configuration: Different retention policies for different streams

Background

Currently (Phase 2.3), persistent streams grow indefinitely. While this is correct for pure event sourcing, many use cases require automatic cleanup:

  • Compliance: GDPR and data retention regulations
  • Cost management: Storage costs for high-volume streams
  • Performance: Query performance degrades with very large tables
  • Operational simplicity: Automatic maintenance without manual intervention

Key Concepts:

  • Retention Policy: Rules defining how long events are kept
  • Time-based Retention: Delete events older than X days/hours
  • Size-based Retention: Keep only the last N events per stream
  • Table Partitioning: Split large tables into smaller partitions by time
  • Cleanup Window: Time window when cleanup runs (to avoid peak hours)

Goals

  1. Retention Policy API: Define and store retention policies per stream
  2. Time-based Cleanup: Automatically delete events older than configured duration
  3. Size-based Cleanup: Automatically trim streams to maximum event count
  4. Table Partitioning: Partition event_store table by month for performance
  5. Background Service: Scheduled cleanup service respecting configured policies
  6. Monitoring: Metrics for cleanup operations and retained event counts

Non-Goals (Deferred to Future Phases)

  • Custom retention logic (Phase 3.x)
  • Event archiving to cold storage (Phase 3.x)
  • Retention policies for ephemeral streams (they're already auto-deleted)
  • Cross-database retention coordination (PostgreSQL only for now)

Architecture

1. New Interface: IRetentionPolicy

namespace Svrnty.CQRS.Events.Abstractions;

public interface IRetentionPolicy
{
    /// <summary>
    /// Stream name this policy applies to. Use "*" for default policy.
    /// </summary>
    string StreamName { get; }

    /// <summary>
    /// Maximum age for events (null = no time-based retention)
    /// </summary>
    TimeSpan? MaxAge { get; }

    /// <summary>
    /// Maximum number of events to retain (null = no size-based retention)
    /// </summary>
    long? MaxEventCount { get; }

    /// <summary>
    /// Whether this policy is enabled
    /// </summary>
    bool Enabled { get; }
}

public record RetentionPolicyConfig : IRetentionPolicy
{
    public required string StreamName { get; init; }
    public TimeSpan? MaxAge { get; init; }
    public long? MaxEventCount { get; init; }
    public bool Enabled { get; init; } = true;
}

2. New Interface: IRetentionPolicyStore

public interface IRetentionPolicyStore
{
    /// <summary>
    /// Set retention policy for a stream
    /// </summary>
    Task SetPolicyAsync(IRetentionPolicy policy, CancellationToken cancellationToken = default);

    /// <summary>
    /// Get retention policy for a specific stream
    /// </summary>
    Task<IRetentionPolicy?> GetPolicyAsync(string streamName, CancellationToken cancellationToken = default);

    /// <summary>
    /// Get all configured retention policies
    /// </summary>
    Task<IReadOnlyList<IRetentionPolicy>> GetAllPoliciesAsync(CancellationToken cancellationToken = default);

    /// <summary>
    /// Delete retention policy for a stream
    /// </summary>
    Task DeletePolicyAsync(string streamName, CancellationToken cancellationToken = default);

    /// <summary>
    /// Apply retention policies and return cleanup statistics
    /// </summary>
    Task<RetentionCleanupResult> ApplyRetentionPoliciesAsync(CancellationToken cancellationToken = default);
}

public record RetentionCleanupResult
{
    public required int StreamsProcessed { get; init; }
    public required long EventsDeleted { get; init; }
    public required TimeSpan Duration { get; init; }
    public required DateTimeOffset CompletedAt { get; init; }
}

3. PostgreSQL Table Partitioning

Update event_store table to use declarative partitioning by month:

-- New partitioned table (migration creates this)
CREATE TABLE event_streaming.event_store_partitioned (
    id BIGSERIAL NOT NULL,
    stream_name VARCHAR(255) NOT NULL,
    event_id VARCHAR(255) NOT NULL,
    correlation_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(500) NOT NULL,
    event_data JSONB NOT NULL,
    occurred_at TIMESTAMPTZ NOT NULL,
    stored_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    offset BIGINT NOT NULL,
    metadata JSONB,
    PRIMARY KEY (id, stored_at)
) PARTITION BY RANGE (stored_at);

-- Create initial partitions (last 3 months + current + next month)
CREATE TABLE event_streaming.event_store_2024_11 PARTITION OF event_streaming.event_store_partitioned
    FOR VALUES FROM ('2024-11-01') TO ('2024-12-01');

CREATE TABLE event_streaming.event_store_2024_12 PARTITION OF event_streaming.event_store_partitioned
    FOR VALUES FROM ('2024-12-01') TO ('2025-01-01');

-- Function to automatically create partitions for next month
CREATE OR REPLACE FUNCTION event_streaming.create_partition_for_next_month()
RETURNS void AS $$
DECLARE
    next_month_start DATE;
    next_month_end DATE;
    partition_name TEXT;
BEGIN
    next_month_start := DATE_TRUNC('month', NOW() + INTERVAL '1 month');
    next_month_end := next_month_start + INTERVAL '1 month';
    partition_name := 'event_store_' || TO_CHAR(next_month_start, 'YYYY_MM');

    EXECUTE format(
        'CREATE TABLE IF NOT EXISTS event_streaming.%I PARTITION OF event_streaming.event_store_partitioned FOR VALUES FROM (%L) TO (%L)',
        partition_name,
        next_month_start,
        next_month_end
    );
END;
$$ LANGUAGE plpgsql;

4. Retention Policies Table

CREATE TABLE event_streaming.retention_policies (
    stream_name VARCHAR(255) PRIMARY KEY,
    max_age_seconds INT,  -- NULL = no time-based retention
    max_event_count BIGINT,  -- NULL = no size-based retention
    enabled BOOLEAN NOT NULL DEFAULT true,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Default policy for all streams (stream_name = '*')
INSERT INTO event_streaming.retention_policies (stream_name, max_age_seconds, max_event_count)
VALUES ('*', NULL, NULL);  -- No retention by default

COMMENT ON TABLE event_streaming.retention_policies IS
'Retention policies for event streams. stream_name="*" is the default policy.';

5. Background Service: RetentionPolicyService

public class RetentionPolicyService : BackgroundService
{
    private readonly IRetentionPolicyStore _policyStore;
    private readonly RetentionServiceOptions _options;
    private readonly ILogger<RetentionPolicyService> _logger;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                // Wait for configured cleanup interval
                await Task.Delay(_options.CleanupInterval, stoppingToken);

                // Check if we're in the cleanup window
                if (!IsInCleanupWindow())
                {
                    _logger.LogDebug("Outside cleanup window, skipping retention");
                    continue;
                }

                _logger.LogInformation("Starting retention policy enforcement");

                var result = await _policyStore.ApplyRetentionPoliciesAsync(stoppingToken);

                _logger.LogInformation(
                    "Retention cleanup complete: {StreamsProcessed} streams, {EventsDeleted} events deleted in {Duration}",
                    result.StreamsProcessed,
                    result.EventsDeleted,
                    result.Duration);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error during retention policy enforcement");
            }
        }
    }

    private bool IsInCleanupWindow()
    {
        var now = DateTime.UtcNow.TimeOfDay;
        return now >= _options.CleanupWindowStart && now <= _options.CleanupWindowEnd;
    }
}

public class RetentionServiceOptions
{
    /// <summary>
    /// How often to check and enforce retention policies
    /// Default: 1 hour
    /// </summary>
    public TimeSpan CleanupInterval { get; set; } = TimeSpan.FromHours(1);

    /// <summary>
    /// Start of cleanup window (UTC time)
    /// Default: 2 AM
    /// </summary>
    public TimeSpan CleanupWindowStart { get; set; } = TimeSpan.FromHours(2);

    /// <summary>
    /// End of cleanup window (UTC time)
    /// Default: 6 AM
    /// </summary>
    public TimeSpan CleanupWindowEnd { get; set; } = TimeSpan.FromHours(6);

    /// <summary>
    /// Whether the retention service is enabled
    /// Default: true
    /// </summary>
    public bool Enabled { get; set; } = true;
}

Database Migration: 003_RetentionPolicies.sql

-- Retention policies table
CREATE TABLE IF NOT EXISTS event_streaming.retention_policies (
    stream_name VARCHAR(255) PRIMARY KEY,
    max_age_seconds INT,
    max_event_count BIGINT,
    enabled BOOLEAN NOT NULL DEFAULT true,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Default retention policy (no retention)
INSERT INTO event_streaming.retention_policies (stream_name, max_age_seconds, max_event_count)
VALUES ('*', NULL, NULL)
ON CONFLICT (stream_name) DO NOTHING;

-- Function to apply time-based retention for a stream
CREATE OR REPLACE FUNCTION event_streaming.apply_time_retention(
    p_stream_name VARCHAR,
    p_max_age_seconds INT
)
RETURNS BIGINT AS $$
DECLARE
    deleted_count BIGINT;
BEGIN
    DELETE FROM event_streaming.event_store
    WHERE stream_name = p_stream_name
      AND stored_at < NOW() - (p_max_age_seconds || ' seconds')::INTERVAL;

    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;

-- Function to apply size-based retention for a stream
CREATE OR REPLACE FUNCTION event_streaming.apply_size_retention(
    p_stream_name VARCHAR,
    p_max_event_count BIGINT
)
RETURNS BIGINT AS $$
DECLARE
    deleted_count BIGINT;
    current_count BIGINT;
    events_to_delete BIGINT;
BEGIN
    -- Count current events
    SELECT COUNT(*) INTO current_count
    FROM event_streaming.event_store
    WHERE stream_name = p_stream_name;

    -- Calculate how many to delete
    events_to_delete := current_count - p_max_event_count;

    IF events_to_delete <= 0 THEN
        RETURN 0;
    END IF;

    -- Delete oldest events beyond max count
    DELETE FROM event_streaming.event_store
    WHERE id IN (
        SELECT id
        FROM event_streaming.event_store
        WHERE stream_name = p_stream_name
        ORDER BY offset ASC
        LIMIT events_to_delete
    );

    GET DIAGNOSTICS deleted_count = ROW_COUNT;
    RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;

-- Function to apply all retention policies
CREATE OR REPLACE FUNCTION event_streaming.apply_all_retention_policies()
RETURNS TABLE(stream_name VARCHAR, events_deleted BIGINT) AS $$
DECLARE
    policy RECORD;
    deleted BIGINT;
    total_deleted BIGINT := 0;
BEGIN
    FOR policy IN
        SELECT rp.stream_name, rp.max_age_seconds, rp.max_event_count
        FROM event_streaming.retention_policies rp
        WHERE rp.enabled = true
          AND (rp.max_age_seconds IS NOT NULL OR rp.max_event_count IS NOT NULL)
    LOOP
        deleted := 0;

        -- Apply time-based retention
        IF policy.max_age_seconds IS NOT NULL THEN
            IF policy.stream_name = '*' THEN
                -- Apply to all streams
                DELETE FROM event_streaming.event_store
                WHERE stored_at < NOW() - (policy.max_age_seconds || ' seconds')::INTERVAL;
                GET DIAGNOSTICS deleted = ROW_COUNT;
            ELSE
                -- Apply to specific stream
                SELECT event_streaming.apply_time_retention(policy.stream_name, policy.max_age_seconds)
                INTO deleted;
            END IF;
        END IF;

        -- Apply size-based retention
        IF policy.max_event_count IS NOT NULL AND policy.stream_name != '*' THEN
            SELECT deleted + event_streaming.apply_size_retention(policy.stream_name, policy.max_event_count)
            INTO deleted;
        END IF;

        IF deleted > 0 THEN
            stream_name := policy.stream_name;
            events_deleted := deleted;
            RETURN NEXT;
        END IF;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

-- View for retention policy status
CREATE OR REPLACE VIEW event_streaming.retention_policy_status AS
SELECT
    rp.stream_name,
    rp.max_age_seconds,
    rp.max_event_count,
    rp.enabled,
    COUNT(es.id) AS current_event_count,
    MIN(es.stored_at) AS oldest_event,
    MAX(es.stored_at) AS newest_event,
    EXTRACT(EPOCH FROM (NOW() - MIN(es.stored_at))) AS oldest_age_seconds
FROM event_streaming.retention_policies rp
LEFT JOIN event_streaming.event_store es ON es.stream_name = rp.stream_name
WHERE rp.stream_name != '*'
GROUP BY rp.stream_name, rp.max_age_seconds, rp.max_event_count, rp.enabled;

-- Migration version tracking
INSERT INTO event_streaming.schema_version (version, description, applied_at)
VALUES (3, 'Retention Policies', NOW())
ON CONFLICT (version) DO NOTHING;

API Usage Examples

Example 1: Configure Time-based Retention

var policyStore = serviceProvider.GetRequiredService<IRetentionPolicyStore>();

// Keep user events for 90 days
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "user-events",
    MaxAge = TimeSpan.FromDays(90),
    Enabled = true
});

// Keep audit logs for 7 years (compliance)
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "audit-logs",
    MaxAge = TimeSpan.FromDays(7 * 365),
    Enabled = true
});

Example 2: Configure Size-based Retention

// Keep only last 10,000 events for analytics stream
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "analytics-events",
    MaxEventCount = 10000,
    Enabled = true
});

Example 3: Combined Time and Size Retention

// Keep last 1M events OR 30 days, whichever comes first
await policyStore.SetPolicyAsync(new RetentionPolicyConfig
{
    StreamName = "orders",
    MaxAge = TimeSpan.FromDays(30),
    MaxEventCount = 1_000_000,
    Enabled = true
});

Example 4: Manual Cleanup Trigger

var policyStore = serviceProvider.GetRequiredService<IRetentionPolicyStore>();

// Manually trigger retention cleanup
var result = await policyStore.ApplyRetentionPoliciesAsync();

Console.WriteLine($"Cleaned up {result.EventsDeleted} events from {result.StreamsProcessed} streams in {result.Duration}");

Example 5: Monitor Retention Status

// Get all retention policies
var policies = await policyStore.GetAllPoliciesAsync();

foreach (var policy in policies)
{
    Console.WriteLine($"Stream: {policy.StreamName}");
    Console.WriteLine($"  Max Age: {policy.MaxAge}");
    Console.WriteLine($"  Max Count: {policy.MaxEventCount}");
    Console.WriteLine($"  Enabled: {policy.Enabled}");
}

Configuration

appsettings.json

{
  "EventStreaming": {
    "Retention": {
      "Enabled": true,
      "CleanupInterval": "01:00:00",
      "CleanupWindowStart": "02:00:00",
      "CleanupWindowEnd": "06:00:00"
    },
    "DefaultRetentionPolicy": {
      "MaxAge": "30.00:00:00",
      "MaxEventCount": null,
      "Enabled": false
    }
  }
}

Implementation Checklist

Phase 2.4.1 - Core Interfaces (Week 1)

  • Define IRetentionPolicy interface
  • Define IRetentionPolicyStore interface
  • Define RetentionPolicyConfig record
  • Define RetentionServiceOptions
  • Define RetentionCleanupResult record

Phase 2.4.2 - Database Schema (Week 1)

  • Create 003_RetentionPolicies.sql migration
  • Create retention_policies table
  • Create apply_time_retention() function
  • Create apply_size_retention() function
  • Create apply_all_retention_policies() function
  • Create retention_policy_status view

Phase 2.4.3 - PostgreSQL Implementation (Week 2)

  • Implement PostgresRetentionPolicyStore
  • Implement time-based cleanup logic
  • Implement size-based cleanup logic
  • Add cleanup metrics and logging
  • Add unit tests (deferred)

Phase 2.4.4 - Background Service (Week 2)

  • Implement RetentionPolicyService
  • Add cleanup window logic (with midnight crossing support)
  • Add configurable intervals
  • Add service registration extensions
  • Add health checks (deferred)
  • Integration tests (deferred)

Phase 2.4.5 - Table Partitioning (Week 3) ⏸️ Deferred

  • Create partitioned event_store table
  • Create initial partitions
  • Create auto-partition function
  • Migrate existing data (if needed)
  • Performance testing

Note: Table partitioning has been deferred as it requires data migration and is not critical for initial release. Will be implemented in a future phase when migration strategy is finalized.

Phase 2.4.6 - Documentation (Week 3)

  • Update README.md
  • Update CLAUDE.md
  • Update Phase 2.4 plan to complete

Performance Considerations

Cleanup Strategy

  • Batch Deletes: Delete in batches to avoid long-running transactions
  • Off-Peak Hours: Run cleanup during configured window (default: 2-6 AM)
  • Index Optimization: Ensure indexes on stored_at and stream_name
  • Vacuum: Run VACUUM ANALYZE after large deletes

Partitioning Benefits

  • Query Performance: Partition pruning for time-range queries
  • Maintenance: Drop old partitions instead of DELETE (instant)
  • Parallel Operations: Multiple partitions can be processed in parallel
  • Backup/Restore: Partition-level backup and restore

Success Criteria

  • Time-based retention policies can be configured per stream
  • Size-based retention policies can be configured per stream
  • Background service enforces retention policies automatically
  • Cleanup respects configured time windows (with midnight crossing support)
  • Table partitioning improves query performance (deferred)
  • Old partitions can be dropped instantly (deferred)
  • Retention metrics are logged and observable
  • Documentation is complete

Risks & Mitigation

Risk Impact Mitigation
Accidental data loss Critical Require explicit policy configuration, disable default retention
Long-running deletes Performance impact Batch deletes, run during off-peak hours
Partition migration Downtime Create partitioned table separately, migrate incrementally
Misconfigured policies Data loss or retention failure Policy validation, dry-run mode

Future Enhancements (Phase 3.x)

  • Event archiving to S3/blob storage before deletion
  • Custom retention logic via user-defined functions
  • Retention policy templates
  • Retention compliance reporting
  • Cross-region retention coordination

Document Status: 📋 Planning Last Updated: December 10, 2025 Next Review: Upon Phase 2.3 completion confirmation