#!/usr/bin/env python3 """Plan Codex retention cleanup without mutating Codex state.""" from __future__ import annotations import argparse import json import os import sqlite3 import time from dataclasses import dataclass from pathlib import Path from typing import Any SCHEMA_VERSION = "cto-codex-retention-policy-plan.v1" DEFAULT_INACTIVE_DAYS = 7 DEFAULT_DELETE_AFTER_ARCHIVED_DAYS = 30 DEFAULT_LARGE_FILE_MIB = 10 SECONDS_PER_DAY = 24 * 60 * 60 @dataclass(frozen=True) class ThreadMeta: updated_at: int archived: bool archived_at: int | None rollout_path: Path rollout_bytes: int def bytes_on_disk(path: Path) -> int: if not path.exists(): return 0 if path.is_file(): return path.stat().st_size return sum(item.stat().st_size for item in path.rglob("*") if item.is_file()) def query_all(db_path: Path, sql: str, params: tuple[Any, ...] = ()) -> list[list[Any]]: if not db_path.exists(): return [] with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn: return [list(row) for row in conn.execute(sql, params).fetchall()] def safe_rollout_path(codex_home: Path, value: str) -> Path: path = Path(value).expanduser() if not path.is_absolute(): path = codex_home / path return path def rollout_file_bytes(codex_home: Path, path: Path) -> int: sessions_dir = (codex_home / "sessions").resolve() resolved = path.resolve() try: resolved.relative_to(sessions_dir) except ValueError: return 0 return resolved.stat().st_size if resolved.exists() and resolved.is_file() else 0 def load_threads(codex_home: Path) -> list[ThreadMeta]: state_db = codex_home / "state_5.sqlite" rows = query_all( state_db, """ select rollout_path, updated_at, archived, archived_at from threads """, ) threads: list[ThreadMeta] = [] for rollout_path, updated_at, archived, archived_at in rows: path = safe_rollout_path(codex_home, str(rollout_path)) threads.append( ThreadMeta( updated_at=int(updated_at), archived=bool(archived), archived_at=int(archived_at) if archived_at is not None else None, rollout_path=path, rollout_bytes=rollout_file_bytes(codex_home, path), ) ) return threads def summarize_threads(threads: list[ThreadMeta], *, now: int, inactive_days: int, delete_after_archived_days: int, large_file_mib: int) -> dict[str, object]: inactive_cutoff = now - inactive_days * SECONDS_PER_DAY delete_cutoff = now - delete_after_archived_days * SECONDS_PER_DAY large_cutoff = large_file_mib * 1024 * 1024 active = [thread for thread in threads if not thread.archived] archived = [thread for thread in threads if thread.archived] archive_candidates = [thread for thread in active if thread.updated_at <= inactive_cutoff] delete_candidates = [ thread for thread in archived if thread.archived_at is not None and thread.archived_at <= delete_cutoff ] large_active = [thread for thread in active if thread.rollout_bytes >= large_cutoff] return { "thread_count": len(threads), "active_thread_count": len(active), "archived_thread_count": len(archived), "session_jsonl_bytes": sum(thread.rollout_bytes for thread in threads), "archive_only_candidates": { "criteria": f"archived=false and updated_at older than {inactive_days} days", "thread_count": len(archive_candidates), "session_jsonl_bytes": sum(thread.rollout_bytes for thread in archive_candidates), "mutation_required": "state_5.sqlite threads.archived update", "approval_required": True, }, "destructive_delete_candidates": { "criteria": f"archived=true and archived_at older than {delete_after_archived_days} days", "thread_count": len(delete_candidates), "session_jsonl_bytes": sum(thread.rollout_bytes for thread in delete_candidates), "mutation_required": "session JSONL deletion", "approval_required": True, }, "large_active_sessions": { "criteria": f"archived=false and rollout file >= {large_file_mib} MiB", "thread_count": len(large_active), "session_jsonl_bytes": sum(thread.rollout_bytes for thread in large_active), "approval_required": False, }, "age_buckets": { "inactive_over_1d": sum((not thread.archived) and thread.updated_at <= now - SECONDS_PER_DAY for thread in threads), "inactive_over_7d": sum((not thread.archived) and thread.updated_at <= now - 7 * SECONDS_PER_DAY for thread in threads), "inactive_over_14d": sum((not thread.archived) and thread.updated_at <= now - 14 * SECONDS_PER_DAY for thread in threads), "inactive_over_30d": sum((not thread.archived) and thread.updated_at <= now - 30 * SECONDS_PER_DAY for thread in threads), }, } def log_pressure(codex_home: Path) -> dict[str, object]: logs_db = codex_home / "logs_2.sqlite" rows = query_all( logs_db, """ select target, count(*) as rows, coalesce(sum(estimated_bytes), 0) as bytes from logs group by target order by bytes desc limit 5 """, ) total = query_all(logs_db, "select count(*), coalesce(sum(estimated_bytes), 0) from logs") return { "row_count": total[0][0] if total else 0, "estimated_bytes": total[0][1] if total else 0, "top_targets": [ {"target": row[0], "rows": row[1], "estimated_bytes": row[2]} for row in rows ], "mutation_required_for_reduction": "logs table deletion plus SQLite checkpoint/vacuum", "approval_required": True, } def build_plan(codex_home: Path, *, inactive_days: int, delete_after_archived_days: int, large_file_mib: int) -> dict[str, object]: codex_home = codex_home.expanduser() now = int(time.time()) threads = load_threads(codex_home) thread_summary = summarize_threads( threads, now=now, inactive_days=inactive_days, delete_after_archived_days=delete_after_archived_days, large_file_mib=large_file_mib, ) return { "schema_version": SCHEMA_VERSION, "codex_home": str(codex_home), "metadata_only": True, "raw_transcript_bodies_read": False, "raw_thread_text_fields_read": False, "mutation_performed": False, "policy": { "prevention_default": "Use `codex exec --ephemeral` for disposable non-interactive worker runs.", "phase_0": "Run this planner and the pressure reporter. No mutation.", "phase_1": "Backup `state_5.sqlite`, `logs_2.sqlite`, WAL, and SHM files before any mutation.", "phase_2": "Archive-only candidate threads by DB flag if explicitly approved.", "phase_3": "Delete archived session JSONL only after a separate destructive approval.", "phase_4": "Delete/truncate logs and checkpoint/vacuum only after Codex is stopped and destructive approval is explicit.", }, "current_state": { "codex_home_bytes": bytes_on_disk(codex_home), "sessions_bytes": bytes_on_disk(codex_home / "sessions"), "logs_sqlite_bytes": bytes_on_disk(codex_home / "logs_2.sqlite"), "logs_sqlite_wal_bytes": bytes_on_disk(codex_home / "logs_2.sqlite-wal"), }, "thread_retention": thread_summary, "log_retention": log_pressure(codex_home), "approval_boundaries": [ "archive threads by updating `threads.archived`", "delete session JSONL files", "delete or truncate Codex log rows", "run SQLite checkpoint or vacuum", "touch raw transcript bodies", "import raw transcripts into Cortex OS Core", ], "recommended_next": "Ask for explicit archive-only approval first; keep delete/vacuum as separate later approvals.", "false_effects": { "archive_threads": False, "delete_session_jsonl": False, "delete_logs": False, "sqlite_checkpoint_or_vacuum": False, "raw_transcript_body_read": False, "raw_thread_text_field_read": False, "core_source_mutation": False, "runtime_start": False, "secret_value_read": False, "product_readiness_claim": False, }, } def validate_plan(plan: dict[str, object]) -> list[str]: errors: list[str] = [] if plan.get("schema_version") != SCHEMA_VERSION: errors.append("schema_version_invalid") for field in ["metadata_only", "raw_transcript_bodies_read", "raw_thread_text_fields_read", "mutation_performed"]: expected = field == "metadata_only" if plan.get(field) is not expected: errors.append(f"{field}_invalid") boundaries = plan.get("approval_boundaries") if isinstance(plan.get("approval_boundaries"), list) else [] for required in ["delete session JSONL files", "run SQLite checkpoint or vacuum", "import raw transcripts into Cortex OS Core"]: if required not in boundaries: errors.append(f"approval_boundary_missing:{required}") false_effects = plan.get("false_effects") if isinstance(plan.get("false_effects"), dict) else {} for field, value in false_effects.items(): if value is not False: errors.append(f"false_effect_not_false:{field}") thread_retention = plan.get("thread_retention") if isinstance(plan.get("thread_retention"), dict) else {} if not isinstance(thread_retention.get("active_thread_count"), int): errors.append("active_thread_count_invalid") return errors def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--codex-home", default=os.environ.get("CODEX_HOME", str(Path.home() / ".codex"))) parser.add_argument("--inactive-days", type=int, default=DEFAULT_INACTIVE_DAYS) parser.add_argument("--delete-after-archived-days", type=int, default=DEFAULT_DELETE_AFTER_ARCHIVED_DAYS) parser.add_argument("--large-file-mib", type=int, default=DEFAULT_LARGE_FILE_MIB) parser.add_argument("--check", action="store_true") args = parser.parse_args() plan = build_plan( Path(args.codex_home), inactive_days=args.inactive_days, delete_after_archived_days=args.delete_after_archived_days, large_file_mib=args.large_file_mib, ) errors = validate_plan(plan) if args.check: print(json.dumps({"ok": not errors, "validator": "cto-codex-retention-policy-plan", "errors": errors, "warnings": []}, indent=2, sort_keys=True)) return 0 if not errors else 1 print(json.dumps(plan, indent=2, sort_keys=True)) return 0 if not errors else 1 if __name__ == "__main__": raise SystemExit(main())