CC: Add Codex retention policy plan
This commit is contained in:
@@ -0,0 +1,262 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Plan Codex retention cleanup without mutating Codex state."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
SCHEMA_VERSION = "cto-codex-retention-policy-plan.v1"
|
||||
DEFAULT_INACTIVE_DAYS = 7
|
||||
DEFAULT_DELETE_AFTER_ARCHIVED_DAYS = 30
|
||||
DEFAULT_LARGE_FILE_MIB = 10
|
||||
SECONDS_PER_DAY = 24 * 60 * 60
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ThreadMeta:
|
||||
updated_at: int
|
||||
archived: bool
|
||||
archived_at: int | None
|
||||
rollout_path: Path
|
||||
rollout_bytes: int
|
||||
|
||||
|
||||
def bytes_on_disk(path: Path) -> int:
|
||||
if not path.exists():
|
||||
return 0
|
||||
if path.is_file():
|
||||
return path.stat().st_size
|
||||
return sum(item.stat().st_size for item in path.rglob("*") if item.is_file())
|
||||
|
||||
|
||||
def query_all(db_path: Path, sql: str, params: tuple[Any, ...] = ()) -> list[list[Any]]:
|
||||
if not db_path.exists():
|
||||
return []
|
||||
with sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) as conn:
|
||||
return [list(row) for row in conn.execute(sql, params).fetchall()]
|
||||
|
||||
|
||||
def safe_rollout_path(codex_home: Path, value: str) -> Path:
|
||||
path = Path(value).expanduser()
|
||||
if not path.is_absolute():
|
||||
path = codex_home / path
|
||||
return path
|
||||
|
||||
|
||||
def rollout_file_bytes(codex_home: Path, path: Path) -> int:
|
||||
sessions_dir = (codex_home / "sessions").resolve()
|
||||
resolved = path.resolve()
|
||||
try:
|
||||
resolved.relative_to(sessions_dir)
|
||||
except ValueError:
|
||||
return 0
|
||||
return resolved.stat().st_size if resolved.exists() and resolved.is_file() else 0
|
||||
|
||||
|
||||
def load_threads(codex_home: Path) -> list[ThreadMeta]:
|
||||
state_db = codex_home / "state_5.sqlite"
|
||||
rows = query_all(
|
||||
state_db,
|
||||
"""
|
||||
select rollout_path, updated_at, archived, archived_at
|
||||
from threads
|
||||
""",
|
||||
)
|
||||
threads: list[ThreadMeta] = []
|
||||
for rollout_path, updated_at, archived, archived_at in rows:
|
||||
path = safe_rollout_path(codex_home, str(rollout_path))
|
||||
threads.append(
|
||||
ThreadMeta(
|
||||
updated_at=int(updated_at),
|
||||
archived=bool(archived),
|
||||
archived_at=int(archived_at) if archived_at is not None else None,
|
||||
rollout_path=path,
|
||||
rollout_bytes=rollout_file_bytes(codex_home, path),
|
||||
)
|
||||
)
|
||||
return threads
|
||||
|
||||
|
||||
def summarize_threads(threads: list[ThreadMeta], *, now: int, inactive_days: int, delete_after_archived_days: int, large_file_mib: int) -> dict[str, object]:
|
||||
inactive_cutoff = now - inactive_days * SECONDS_PER_DAY
|
||||
delete_cutoff = now - delete_after_archived_days * SECONDS_PER_DAY
|
||||
large_cutoff = large_file_mib * 1024 * 1024
|
||||
active = [thread for thread in threads if not thread.archived]
|
||||
archived = [thread for thread in threads if thread.archived]
|
||||
archive_candidates = [thread for thread in active if thread.updated_at <= inactive_cutoff]
|
||||
delete_candidates = [
|
||||
thread
|
||||
for thread in archived
|
||||
if thread.archived_at is not None and thread.archived_at <= delete_cutoff
|
||||
]
|
||||
large_active = [thread for thread in active if thread.rollout_bytes >= large_cutoff]
|
||||
return {
|
||||
"thread_count": len(threads),
|
||||
"active_thread_count": len(active),
|
||||
"archived_thread_count": len(archived),
|
||||
"session_jsonl_bytes": sum(thread.rollout_bytes for thread in threads),
|
||||
"archive_only_candidates": {
|
||||
"criteria": f"archived=false and updated_at older than {inactive_days} days",
|
||||
"thread_count": len(archive_candidates),
|
||||
"session_jsonl_bytes": sum(thread.rollout_bytes for thread in archive_candidates),
|
||||
"mutation_required": "state_5.sqlite threads.archived update",
|
||||
"approval_required": True,
|
||||
},
|
||||
"destructive_delete_candidates": {
|
||||
"criteria": f"archived=true and archived_at older than {delete_after_archived_days} days",
|
||||
"thread_count": len(delete_candidates),
|
||||
"session_jsonl_bytes": sum(thread.rollout_bytes for thread in delete_candidates),
|
||||
"mutation_required": "session JSONL deletion",
|
||||
"approval_required": True,
|
||||
},
|
||||
"large_active_sessions": {
|
||||
"criteria": f"archived=false and rollout file >= {large_file_mib} MiB",
|
||||
"thread_count": len(large_active),
|
||||
"session_jsonl_bytes": sum(thread.rollout_bytes for thread in large_active),
|
||||
"approval_required": False,
|
||||
},
|
||||
"age_buckets": {
|
||||
"inactive_over_1d": sum((not thread.archived) and thread.updated_at <= now - SECONDS_PER_DAY for thread in threads),
|
||||
"inactive_over_7d": sum((not thread.archived) and thread.updated_at <= now - 7 * SECONDS_PER_DAY for thread in threads),
|
||||
"inactive_over_14d": sum((not thread.archived) and thread.updated_at <= now - 14 * SECONDS_PER_DAY for thread in threads),
|
||||
"inactive_over_30d": sum((not thread.archived) and thread.updated_at <= now - 30 * SECONDS_PER_DAY for thread in threads),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def log_pressure(codex_home: Path) -> dict[str, object]:
|
||||
logs_db = codex_home / "logs_2.sqlite"
|
||||
rows = query_all(
|
||||
logs_db,
|
||||
"""
|
||||
select target, count(*) as rows, coalesce(sum(estimated_bytes), 0) as bytes
|
||||
from logs
|
||||
group by target
|
||||
order by bytes desc
|
||||
limit 5
|
||||
""",
|
||||
)
|
||||
total = query_all(logs_db, "select count(*), coalesce(sum(estimated_bytes), 0) from logs")
|
||||
return {
|
||||
"row_count": total[0][0] if total else 0,
|
||||
"estimated_bytes": total[0][1] if total else 0,
|
||||
"top_targets": [
|
||||
{"target": row[0], "rows": row[1], "estimated_bytes": row[2]} for row in rows
|
||||
],
|
||||
"mutation_required_for_reduction": "logs table deletion plus SQLite checkpoint/vacuum",
|
||||
"approval_required": True,
|
||||
}
|
||||
|
||||
|
||||
def build_plan(codex_home: Path, *, inactive_days: int, delete_after_archived_days: int, large_file_mib: int) -> dict[str, object]:
|
||||
codex_home = codex_home.expanduser()
|
||||
now = int(time.time())
|
||||
threads = load_threads(codex_home)
|
||||
thread_summary = summarize_threads(
|
||||
threads,
|
||||
now=now,
|
||||
inactive_days=inactive_days,
|
||||
delete_after_archived_days=delete_after_archived_days,
|
||||
large_file_mib=large_file_mib,
|
||||
)
|
||||
return {
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"codex_home": str(codex_home),
|
||||
"metadata_only": True,
|
||||
"raw_transcript_bodies_read": False,
|
||||
"raw_thread_text_fields_read": False,
|
||||
"mutation_performed": False,
|
||||
"policy": {
|
||||
"prevention_default": "Use `codex exec --ephemeral` for disposable non-interactive worker runs.",
|
||||
"phase_0": "Run this planner and the pressure reporter. No mutation.",
|
||||
"phase_1": "Backup `state_5.sqlite`, `logs_2.sqlite`, WAL, and SHM files before any mutation.",
|
||||
"phase_2": "Archive-only candidate threads by DB flag if explicitly approved.",
|
||||
"phase_3": "Delete archived session JSONL only after a separate destructive approval.",
|
||||
"phase_4": "Delete/truncate logs and checkpoint/vacuum only after Codex is stopped and destructive approval is explicit.",
|
||||
},
|
||||
"current_state": {
|
||||
"codex_home_bytes": bytes_on_disk(codex_home),
|
||||
"sessions_bytes": bytes_on_disk(codex_home / "sessions"),
|
||||
"logs_sqlite_bytes": bytes_on_disk(codex_home / "logs_2.sqlite"),
|
||||
"logs_sqlite_wal_bytes": bytes_on_disk(codex_home / "logs_2.sqlite-wal"),
|
||||
},
|
||||
"thread_retention": thread_summary,
|
||||
"log_retention": log_pressure(codex_home),
|
||||
"approval_boundaries": [
|
||||
"archive threads by updating `threads.archived`",
|
||||
"delete session JSONL files",
|
||||
"delete or truncate Codex log rows",
|
||||
"run SQLite checkpoint or vacuum",
|
||||
"touch raw transcript bodies",
|
||||
"import raw transcripts into Cortex OS Core",
|
||||
],
|
||||
"recommended_next": "Ask for explicit archive-only approval first; keep delete/vacuum as separate later approvals.",
|
||||
"false_effects": {
|
||||
"archive_threads": False,
|
||||
"delete_session_jsonl": False,
|
||||
"delete_logs": False,
|
||||
"sqlite_checkpoint_or_vacuum": False,
|
||||
"raw_transcript_body_read": False,
|
||||
"raw_thread_text_field_read": False,
|
||||
"core_source_mutation": False,
|
||||
"runtime_start": False,
|
||||
"secret_value_read": False,
|
||||
"product_readiness_claim": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def validate_plan(plan: dict[str, object]) -> list[str]:
|
||||
errors: list[str] = []
|
||||
if plan.get("schema_version") != SCHEMA_VERSION:
|
||||
errors.append("schema_version_invalid")
|
||||
for field in ["metadata_only", "raw_transcript_bodies_read", "raw_thread_text_fields_read", "mutation_performed"]:
|
||||
expected = field == "metadata_only"
|
||||
if plan.get(field) is not expected:
|
||||
errors.append(f"{field}_invalid")
|
||||
boundaries = plan.get("approval_boundaries") if isinstance(plan.get("approval_boundaries"), list) else []
|
||||
for required in ["delete session JSONL files", "run SQLite checkpoint or vacuum", "import raw transcripts into Cortex OS Core"]:
|
||||
if required not in boundaries:
|
||||
errors.append(f"approval_boundary_missing:{required}")
|
||||
false_effects = plan.get("false_effects") if isinstance(plan.get("false_effects"), dict) else {}
|
||||
for field, value in false_effects.items():
|
||||
if value is not False:
|
||||
errors.append(f"false_effect_not_false:{field}")
|
||||
thread_retention = plan.get("thread_retention") if isinstance(plan.get("thread_retention"), dict) else {}
|
||||
if not isinstance(thread_retention.get("active_thread_count"), int):
|
||||
errors.append("active_thread_count_invalid")
|
||||
return errors
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--codex-home", default=os.environ.get("CODEX_HOME", str(Path.home() / ".codex")))
|
||||
parser.add_argument("--inactive-days", type=int, default=DEFAULT_INACTIVE_DAYS)
|
||||
parser.add_argument("--delete-after-archived-days", type=int, default=DEFAULT_DELETE_AFTER_ARCHIVED_DAYS)
|
||||
parser.add_argument("--large-file-mib", type=int, default=DEFAULT_LARGE_FILE_MIB)
|
||||
parser.add_argument("--check", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
plan = build_plan(
|
||||
Path(args.codex_home),
|
||||
inactive_days=args.inactive_days,
|
||||
delete_after_archived_days=args.delete_after_archived_days,
|
||||
large_file_mib=args.large_file_mib,
|
||||
)
|
||||
errors = validate_plan(plan)
|
||||
if args.check:
|
||||
print(json.dumps({"ok": not errors, "validator": "cto-codex-retention-policy-plan", "errors": errors, "warnings": []}, indent=2, sort_keys=True))
|
||||
return 0 if not errors else 1
|
||||
print(json.dumps(plan, indent=2, sort_keys=True))
|
||||
return 0 if not errors else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user