From d694ca5f8a0cd225ab9047b361d22566b6936214 Mon Sep 17 00:00:00 2001 From: Svrnty Date: Thu, 4 Jun 2026 13:30:15 -0400 Subject: [PATCH] CC: Add Codex inactive archive executor --- ...CODEX-RETENTION-ARCHIVE-EXECUTOR-PACKET.md | 89 +++++ WORKBOARD.yaml | 5 + tools/archive_codex_inactive_threads.py | 325 ++++++++++++++++++ 3 files changed, 419 insertions(+) create mode 100644 .sot/03-PROTOCOLS/CTO-CODEX-RETENTION-ARCHIVE-EXECUTOR-PACKET.md create mode 100644 tools/archive_codex_inactive_threads.py diff --git a/.sot/03-PROTOCOLS/CTO-CODEX-RETENTION-ARCHIVE-EXECUTOR-PACKET.md b/.sot/03-PROTOCOLS/CTO-CODEX-RETENTION-ARCHIVE-EXECUTOR-PACKET.md new file mode 100644 index 0000000..0ecb0db --- /dev/null +++ b/.sot/03-PROTOCOLS/CTO-CODEX-RETENTION-ARCHIVE-EXECUTOR-PACKET.md @@ -0,0 +1,89 @@ +--- +name: cto-codex-retention-archive-executor-packet +tier: T1 +status: validated +owner: jp +source: CTO-WORK-095 +last_reviewed: 2026-06-04 +description: Local CTO packet for the guarded Codex inactive-thread archive executor. +--- + +# CTO Codex Retention Archive Executor Packet + +Local planning SOT only. Not a Core Protocol. Not active Core authority. + +## Claim + +Codex retention cleanup now has a guarded archive-only executor. Default mode is dry-run. Mutation requires an exact approval token. + +## Context + +`CTO-WORK-094` defined the retention policy. The next vertical move is executable archive-only cleanup, not more Core output tuning. + +This packet keeps the destructive boundary intact. It prepares phase 2 only: set inactive Codex thread records to archived. It does not delete session JSONL, truncate logs, checkpoint, vacuum, read transcript bodies, or import transcripts into Core. + +## Executor + +Dry-run: + +```bash +python3 tools/archive_codex_inactive_threads.py +``` + +Focused check: + +```bash +python3 tools/archive_codex_inactive_threads.py --check +``` + +Approved archive-only execution: + +```bash +python3 tools/archive_codex_inactive_threads.py --execute --approval-token "I approve CTO-WORK-095 archive-only Codex threads older than 7 days." +``` + +## Guardrails + +- candidate selection reads only `id`, `rollout_path`, `updated_at`, `archived`, and file size; +- raw transcript bodies are not read; +- thread title, preview, and first user message are not read; +- default execution is dry-run; +- execution requires the exact approval token; +- backup runs before DB mutation; +- mutation is limited to `threads.archived=1` and `archived_at`; +- session JSONL deletion is blocked; +- log deletion or truncation is blocked; +- SQLite checkpoint or vacuum is blocked; +- Core source mutation is blocked. + +## Backup + +Before any approved archive update, the executor backs up: + +- `state_5.sqlite`; +- `logs_2.sqlite`; +- `state_5.sqlite-wal` when present; +- `state_5.sqlite-shm` when present; +- `logs_2.sqlite-wal` when present; +- `logs_2.sqlite-shm` when present. + +Default backup path is inside `~/.codex/backups/cto-codex-retention//`. + +## Approval Boundary + +Still blocked without separate approval: + +- delete archived session JSONL; +- delete or truncate Codex logs; +- run SQLite checkpoint or vacuum; +- read raw transcript bodies; +- import raw transcripts into Cortex OS Core. + +## Decision + +Use this executor only after JP gives the exact archive-only approval token. Keep delete, log cleanup, checkpoint, and vacuum as later decisions. + +## New Issues + +- must-fix: obtain exact approval token before running `--execute`. +- follow-up: after archive-only execution, re-run retention planner and decide whether deletion is still worth separate approval. diff --git a/WORKBOARD.yaml b/WORKBOARD.yaml index d1b1b7c..7fde9a6 100644 --- a/WORKBOARD.yaml +++ b/WORKBOARD.yaml @@ -471,3 +471,8 @@ items: status: validated source: .sot/03-PROTOCOLS/CTO-CODEX-RETENTION-POLICY-PACKET.md owner: "" + - id: CTO-WORK-095 + title: Codex Retention Archive Executor Packet + status: validated + source: .sot/03-PROTOCOLS/CTO-CODEX-RETENTION-ARCHIVE-EXECUTOR-PACKET.md + owner: "" diff --git a/tools/archive_codex_inactive_threads.py b/tools/archive_codex_inactive_threads.py new file mode 100644 index 0000000..c65f12a --- /dev/null +++ b/tools/archive_codex_inactive_threads.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +"""Archive inactive Codex threads after explicit operator approval.""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import sqlite3 +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +SCHEMA_VERSION = "cto-codex-inactive-thread-archive.v1" +WORK_ITEM_ID = "CTO-WORK-095" +DEFAULT_INACTIVE_DAYS = 7 +SECONDS_PER_DAY = 24 * 60 * 60 + + +@dataclass(frozen=True) +class ThreadMeta: + thread_id: str + updated_at: int + rollout_path: Path + rollout_bytes: int + + +def approval_token(inactive_days: int) -> str: + return f"I approve {WORK_ITEM_ID} archive-only Codex threads older than {inactive_days} days." + + +def query_all(db_path: Path, sql: str, params: tuple[Any, ...] = ()) -> list[list[Any]]: + if not db_path.exists(): + return [] + with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn: + return [list(row) for row in conn.execute(sql, params).fetchall()] + + +def safe_rollout_path(codex_home: Path, value: str) -> Path: + path = Path(value).expanduser() + if not path.is_absolute(): + path = codex_home / path + return path + + +def rollout_file_bytes(codex_home: Path, path: Path) -> int: + sessions_dir = (codex_home / "sessions").resolve() + resolved = path.resolve() + try: + resolved.relative_to(sessions_dir) + except ValueError: + return 0 + return resolved.stat().st_size if resolved.exists() and resolved.is_file() else 0 + + +def load_archive_candidates(codex_home: Path, *, now: int, inactive_days: int) -> list[ThreadMeta]: + cutoff = now - inactive_days * SECONDS_PER_DAY + rows = query_all( + codex_home / "state_5.sqlite", + """ + select id, rollout_path, updated_at + from threads + where archived = 0 and updated_at <= ? + order by updated_at asc + """, + (cutoff,), + ) + candidates: list[ThreadMeta] = [] + for thread_id, rollout_path, updated_at in rows: + path = safe_rollout_path(codex_home, str(rollout_path)) + candidates.append( + ThreadMeta( + thread_id=str(thread_id), + updated_at=int(updated_at), + rollout_path=path, + rollout_bytes=rollout_file_bytes(codex_home, path), + ) + ) + return candidates + + +def default_backup_dir(codex_home: Path, *, now: int) -> Path: + stamp = time.strftime("%Y%m%dT%H%M%SZ", time.gmtime(now)) + return codex_home / "backups" / "cto-codex-retention" / stamp + + +def sqlite_backup(source: Path, destination: Path) -> None: + destination.parent.mkdir(parents=True, exist_ok=True) + if not source.exists(): + return + with sqlite3.connect(f"file:{source}?mode=ro", uri=True) as src: + with sqlite3.connect(destination) as dst: + src.backup(dst) + + +def copy_if_exists(source: Path, destination: Path) -> bool: + if not source.exists(): + return False + destination.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(source, destination) + return True + + +def backup_codex_state(codex_home: Path, backup_dir: Path) -> dict[str, object]: + copied: list[str] = [] + for name in ["state_5.sqlite", "logs_2.sqlite"]: + source = codex_home / name + destination = backup_dir / name + sqlite_backup(source, destination) + if destination.exists(): + copied.append(str(destination)) + for name in [ + "state_5.sqlite-wal", + "state_5.sqlite-shm", + "logs_2.sqlite-wal", + "logs_2.sqlite-shm", + ]: + source = codex_home / name + destination = backup_dir / name + if copy_if_exists(source, destination): + copied.append(str(destination)) + return { + "backup_dir": str(backup_dir), + "copied_files": copied, + "copied_file_count": len(copied), + } + + +def archive_threads(codex_home: Path, candidates: list[ThreadMeta], *, archived_at: int) -> int: + if not candidates: + return 0 + candidate_ids = [candidate.thread_id for candidate in candidates] + with sqlite3.connect(codex_home / "state_5.sqlite") as conn: + conn.execute("begin immediate") + try: + before = conn.total_changes + conn.executemany( + """ + update threads + set archived = 1, archived_at = ? + where id = ? and archived = 0 + """, + [(archived_at, thread_id) for thread_id in candidate_ids], + ) + changed = conn.total_changes - before + conn.commit() + except Exception: + conn.rollback() + raise + return int(changed) + + +def build_report( + codex_home: Path, + *, + inactive_days: int, + execute: bool, + provided_approval_token: str, + backup_dir: Path | None, +) -> dict[str, object]: + codex_home = codex_home.expanduser() + now = int(time.time()) + candidates = load_archive_candidates(codex_home, now=now, inactive_days=inactive_days) + expected_token = approval_token(inactive_days) + approval_valid = provided_approval_token == expected_token + resolved_backup_dir = backup_dir.expanduser() if backup_dir else default_backup_dir(codex_home, now=now) + + errors: list[str] = [] + backup: dict[str, object] = { + "backup_dir": str(resolved_backup_dir), + "copied_files": [], + "copied_file_count": 0, + } + archived_thread_count = 0 + mutation_performed = False + + if execute and not approval_valid: + errors.append("approval_token_invalid") + if execute and not (codex_home / "state_5.sqlite").exists(): + errors.append("state_db_missing") + + if execute and not errors: + backup = backup_codex_state(codex_home, resolved_backup_dir) + if int(backup.get("copied_file_count", 0)) < 1: + errors.append("backup_failed") + else: + archived_thread_count = archive_threads(codex_home, candidates, archived_at=now) + mutation_performed = archived_thread_count > 0 + + return { + "schema_version": SCHEMA_VERSION, + "work_item_id": WORK_ITEM_ID, + "codex_home": str(codex_home), + "inactive_days": inactive_days, + "metadata_only_candidate_selection": True, + "raw_transcript_bodies_read": False, + "raw_thread_text_fields_read": False, + "session_jsonl_deleted": False, + "logs_deleted_or_truncated": False, + "sqlite_checkpoint_or_vacuum": False, + "core_source_mutation": False, + "execute_requested": execute, + "approval_token_expected": expected_token, + "approval_token_valid": approval_valid, + "mutation_performed": mutation_performed, + "candidate_summary": { + "criteria": f"threads.archived = 0 and updated_at older than {inactive_days} days", + "thread_count": len(candidates), + "session_jsonl_bytes": sum(candidate.rollout_bytes for candidate in candidates), + "oldest_updated_at": min((candidate.updated_at for candidate in candidates), default=None), + "newest_updated_at": max((candidate.updated_at for candidate in candidates), default=None), + }, + "backup": backup, + "archive_result": { + "archived_thread_count": archived_thread_count, + "mutation": "threads.archived=1 plus archived_at timestamp", + }, + "blocked_without_separate_approval": [ + "delete session JSONL files", + "delete or truncate Codex logs", + "run SQLite checkpoint or vacuum", + "read transcript bodies or thread text fields", + "import raw transcripts into Cortex OS Core", + ], + "false_effects": { + "delete_session_jsonl": False, + "delete_logs": False, + "sqlite_checkpoint_or_vacuum": False, + "raw_transcript_body_read": False, + "raw_thread_text_field_read": False, + "core_source_mutation": False, + "runtime_start": False, + "secret_value_read": False, + }, + "errors": errors, + "ok": not errors, + } + + +def validate_report(report: dict[str, object]) -> list[str]: + errors: list[str] = [] + if report.get("schema_version") != SCHEMA_VERSION: + errors.append("schema_version_invalid") + if report.get("work_item_id") != WORK_ITEM_ID: + errors.append("work_item_id_invalid") + for field in [ + "metadata_only_candidate_selection", + "raw_transcript_bodies_read", + "raw_thread_text_fields_read", + "session_jsonl_deleted", + "logs_deleted_or_truncated", + "sqlite_checkpoint_or_vacuum", + "core_source_mutation", + ]: + expected = field == "metadata_only_candidate_selection" + if report.get(field) is not expected: + errors.append(f"{field}_invalid") + candidate_summary = report.get("candidate_summary") + if not isinstance(candidate_summary, dict): + errors.append("candidate_summary_missing") + elif not isinstance(candidate_summary.get("thread_count"), int): + errors.append("candidate_thread_count_invalid") + false_effects = report.get("false_effects") + if not isinstance(false_effects, dict): + errors.append("false_effects_missing") + else: + for key, value in false_effects.items(): + if value is not False: + errors.append(f"false_effect_not_false:{key}") + blocked = report.get("blocked_without_separate_approval") + if not isinstance(blocked, list): + errors.append("blocked_without_separate_approval_missing") + else: + for required in ["delete session JSONL files", "run SQLite checkpoint or vacuum"]: + if required not in blocked: + errors.append(f"blocked_boundary_missing:{required}") + return errors + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--codex-home", default=os.environ.get("CODEX_HOME", str(Path.home() / ".codex"))) + parser.add_argument("--inactive-days", type=int, default=DEFAULT_INACTIVE_DAYS) + parser.add_argument("--backup-dir") + parser.add_argument("--execute", action="store_true") + parser.add_argument("--approval-token", default="") + parser.add_argument("--check", action="store_true") + args = parser.parse_args() + + report = build_report( + Path(args.codex_home), + inactive_days=args.inactive_days, + execute=args.execute, + provided_approval_token=args.approval_token, + backup_dir=Path(args.backup_dir) if args.backup_dir else None, + ) + errors = validate_report(report) + if report.get("errors"): + errors.extend(str(error) for error in report["errors"]) + + if args.check: + print( + json.dumps( + { + "ok": not errors, + "validator": "cto-codex-inactive-thread-archive", + "errors": errors, + "warnings": [], + }, + indent=2, + sort_keys=True, + ) + ) + return 0 if not errors else 1 + + report["ok"] = not errors + report["errors"] = errors + print(json.dumps(report, indent=2, sort_keys=True)) + return 0 if not errors else 1 + + +if __name__ == "__main__": + raise SystemExit(main())