#!/usr/bin/env python3 """Archive inactive Codex threads after explicit operator approval.""" from __future__ import annotations import argparse import json import os import shutil import sqlite3 import time from dataclasses import dataclass from pathlib import Path from typing import Any SCHEMA_VERSION = "cto-codex-inactive-thread-archive.v1" WORK_ITEM_ID = "CTO-WORK-095" DEFAULT_INACTIVE_DAYS = 7 SECONDS_PER_DAY = 24 * 60 * 60 @dataclass(frozen=True) class ThreadMeta: thread_id: str updated_at: int rollout_path: Path rollout_bytes: int def approval_token(inactive_days: int) -> str: return f"I approve {WORK_ITEM_ID} archive-only Codex threads older than {inactive_days} days." def query_all(db_path: Path, sql: str, params: tuple[Any, ...] = ()) -> list[list[Any]]: if not db_path.exists(): return [] with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn: return [list(row) for row in conn.execute(sql, params).fetchall()] def safe_rollout_path(codex_home: Path, value: str) -> Path: path = Path(value).expanduser() if not path.is_absolute(): path = codex_home / path return path def rollout_file_bytes(codex_home: Path, path: Path) -> int: sessions_dir = (codex_home / "sessions").resolve() resolved = path.resolve() try: resolved.relative_to(sessions_dir) except ValueError: return 0 return resolved.stat().st_size if resolved.exists() and resolved.is_file() else 0 def load_archive_candidates(codex_home: Path, *, now: int, inactive_days: int) -> list[ThreadMeta]: cutoff = now - inactive_days * SECONDS_PER_DAY rows = query_all( codex_home / "state_5.sqlite", """ select id, rollout_path, updated_at from threads where archived = 0 and updated_at <= ? order by updated_at asc """, (cutoff,), ) candidates: list[ThreadMeta] = [] for thread_id, rollout_path, updated_at in rows: path = safe_rollout_path(codex_home, str(rollout_path)) candidates.append( ThreadMeta( thread_id=str(thread_id), updated_at=int(updated_at), rollout_path=path, rollout_bytes=rollout_file_bytes(codex_home, path), ) ) return candidates def default_backup_dir(codex_home: Path, *, now: int) -> Path: stamp = time.strftime("%Y%m%dT%H%M%SZ", time.gmtime(now)) return codex_home / "backups" / "cto-codex-retention" / stamp def sqlite_backup(source: Path, destination: Path) -> None: destination.parent.mkdir(parents=True, exist_ok=True) if not source.exists(): return with sqlite3.connect(f"file:{source}?mode=ro", uri=True) as src: with sqlite3.connect(destination) as dst: src.backup(dst) def copy_if_exists(source: Path, destination: Path) -> bool: if not source.exists(): return False destination.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source, destination) return True def backup_codex_state(codex_home: Path, backup_dir: Path) -> dict[str, object]: copied: list[str] = [] for name in ["state_5.sqlite", "logs_2.sqlite"]: source = codex_home / name destination = backup_dir / name sqlite_backup(source, destination) if destination.exists(): copied.append(str(destination)) for name in [ "state_5.sqlite-wal", "state_5.sqlite-shm", "logs_2.sqlite-wal", "logs_2.sqlite-shm", ]: source = codex_home / name destination = backup_dir / name if copy_if_exists(source, destination): copied.append(str(destination)) return { "backup_dir": str(backup_dir), "copied_files": copied, "copied_file_count": len(copied), } def archive_threads(codex_home: Path, candidates: list[ThreadMeta], *, archived_at: int) -> int: if not candidates: return 0 candidate_ids = [candidate.thread_id for candidate in candidates] with sqlite3.connect(codex_home / "state_5.sqlite") as conn: conn.execute("begin immediate") try: before = conn.total_changes conn.executemany( """ update threads set archived = 1, archived_at = ? where id = ? and archived = 0 """, [(archived_at, thread_id) for thread_id in candidate_ids], ) changed = conn.total_changes - before conn.commit() except Exception: conn.rollback() raise return int(changed) def build_report( codex_home: Path, *, inactive_days: int, execute: bool, provided_approval_token: str, backup_dir: Path | None, ) -> dict[str, object]: codex_home = codex_home.expanduser() now = int(time.time()) candidates = load_archive_candidates(codex_home, now=now, inactive_days=inactive_days) expected_token = approval_token(inactive_days) approval_valid = provided_approval_token == expected_token resolved_backup_dir = backup_dir.expanduser() if backup_dir else default_backup_dir(codex_home, now=now) errors: list[str] = [] backup: dict[str, object] = { "backup_dir": str(resolved_backup_dir), "copied_files": [], "copied_file_count": 0, } archived_thread_count = 0 mutation_performed = False if execute and not approval_valid: errors.append("approval_token_invalid") if execute and not (codex_home / "state_5.sqlite").exists(): errors.append("state_db_missing") if execute and not errors: backup = backup_codex_state(codex_home, resolved_backup_dir) if int(backup.get("copied_file_count", 0)) < 1: errors.append("backup_failed") else: archived_thread_count = archive_threads(codex_home, candidates, archived_at=now) mutation_performed = archived_thread_count > 0 return { "schema_version": SCHEMA_VERSION, "work_item_id": WORK_ITEM_ID, "codex_home": str(codex_home), "inactive_days": inactive_days, "metadata_only_candidate_selection": True, "raw_transcript_bodies_read": False, "raw_thread_text_fields_read": False, "session_jsonl_deleted": False, "logs_deleted_or_truncated": False, "sqlite_checkpoint_or_vacuum": False, "core_source_mutation": False, "execute_requested": execute, "approval_token_expected": expected_token, "approval_token_valid": approval_valid, "mutation_performed": mutation_performed, "candidate_summary": { "criteria": f"threads.archived = 0 and updated_at older than {inactive_days} days", "thread_count": len(candidates), "session_jsonl_bytes": sum(candidate.rollout_bytes for candidate in candidates), "oldest_updated_at": min((candidate.updated_at for candidate in candidates), default=None), "newest_updated_at": max((candidate.updated_at for candidate in candidates), default=None), }, "backup": backup, "archive_result": { "archived_thread_count": archived_thread_count, "mutation": "threads.archived=1 plus archived_at timestamp", }, "blocked_without_separate_approval": [ "delete session JSONL files", "delete or truncate Codex logs", "run SQLite checkpoint or vacuum", "read transcript bodies or thread text fields", "import raw transcripts into Cortex OS Core", ], "false_effects": { "delete_session_jsonl": False, "delete_logs": False, "sqlite_checkpoint_or_vacuum": False, "raw_transcript_body_read": False, "raw_thread_text_field_read": False, "core_source_mutation": False, "runtime_start": False, "secret_value_read": False, }, "errors": errors, "ok": not errors, } def validate_report(report: dict[str, object]) -> list[str]: errors: list[str] = [] if report.get("schema_version") != SCHEMA_VERSION: errors.append("schema_version_invalid") if report.get("work_item_id") != WORK_ITEM_ID: errors.append("work_item_id_invalid") for field in [ "metadata_only_candidate_selection", "raw_transcript_bodies_read", "raw_thread_text_fields_read", "session_jsonl_deleted", "logs_deleted_or_truncated", "sqlite_checkpoint_or_vacuum", "core_source_mutation", ]: expected = field == "metadata_only_candidate_selection" if report.get(field) is not expected: errors.append(f"{field}_invalid") candidate_summary = report.get("candidate_summary") if not isinstance(candidate_summary, dict): errors.append("candidate_summary_missing") elif not isinstance(candidate_summary.get("thread_count"), int): errors.append("candidate_thread_count_invalid") false_effects = report.get("false_effects") if not isinstance(false_effects, dict): errors.append("false_effects_missing") else: for key, value in false_effects.items(): if value is not False: errors.append(f"false_effect_not_false:{key}") blocked = report.get("blocked_without_separate_approval") if not isinstance(blocked, list): errors.append("blocked_without_separate_approval_missing") else: for required in ["delete session JSONL files", "run SQLite checkpoint or vacuum"]: if required not in blocked: errors.append(f"blocked_boundary_missing:{required}") return errors def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--codex-home", default=os.environ.get("CODEX_HOME", str(Path.home() / ".codex"))) parser.add_argument("--inactive-days", type=int, default=DEFAULT_INACTIVE_DAYS) parser.add_argument("--backup-dir") parser.add_argument("--execute", action="store_true") parser.add_argument("--approval-token", default="") parser.add_argument("--check", action="store_true") args = parser.parse_args() report = build_report( Path(args.codex_home), inactive_days=args.inactive_days, execute=args.execute, provided_approval_token=args.approval_token, backup_dir=Path(args.backup_dir) if args.backup_dir else None, ) errors = validate_report(report) if report.get("errors"): errors.extend(str(error) for error in report["errors"]) if args.check: print( json.dumps( { "ok": not errors, "validator": "cto-codex-inactive-thread-archive", "errors": errors, "warnings": [], }, indent=2, sort_keys=True, ) ) return 0 if not errors else 1 report["ok"] = not errors report["errors"] = errors print(json.dumps(report, indent=2, sort_keys=True)) return 0 if not errors else 1 if __name__ == "__main__": raise SystemExit(main())