201 lines
6.9 KiB
Python
201 lines
6.9 KiB
Python
#!/usr/bin/env python3
|
|
"""Probe installed Codex CLI retention support without mutating Codex state."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
SCHEMA_VERSION = "cto-codex-native-retention-probe.v1"
|
|
RETENTION_TERMS = ("retention", "cleanup", "prune", "archive")
|
|
|
|
|
|
def run_help(codex_bin: str, args: list[str]) -> dict[str, object]:
|
|
command = [codex_bin, *args]
|
|
try:
|
|
completed = subprocess.run(
|
|
command,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
except Exception as exc:
|
|
return {
|
|
"command": command,
|
|
"returncode": None,
|
|
"stdout": "",
|
|
"stderr": str(exc),
|
|
"ok": False,
|
|
}
|
|
return {
|
|
"command": command,
|
|
"returncode": completed.returncode,
|
|
"stdout": completed.stdout,
|
|
"stderr": completed.stderr,
|
|
"ok": completed.returncode == 0,
|
|
}
|
|
|
|
|
|
def parse_version(output: str) -> str | None:
|
|
match = re.search(r"(\d+\.\d+\.\d+)", output)
|
|
return match.group(1) if match else None
|
|
|
|
|
|
def read_version_cache(codex_home: Path) -> dict[str, object]:
|
|
path = codex_home / "version.json"
|
|
if not path.exists():
|
|
return {"path": str(path), "present": False}
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
return {"path": str(path), "present": True, "parse_error": str(exc)}
|
|
return {
|
|
"path": str(path),
|
|
"present": True,
|
|
"latest_version": payload.get("latest_version"),
|
|
"last_checked_at": payload.get("last_checked_at"),
|
|
"dismissed_version": payload.get("dismissed_version"),
|
|
}
|
|
|
|
|
|
def term_hits(text: str) -> list[str]:
|
|
lowered = text.lower()
|
|
return [term for term in RETENTION_TERMS if term in lowered]
|
|
|
|
|
|
def build_probe(codex_home: Path, codex_bin: str | None) -> dict[str, object]:
|
|
resolved_bin = codex_bin or shutil.which("codex")
|
|
help_results: list[dict[str, object]] = []
|
|
installed_version = None
|
|
if resolved_bin:
|
|
version_result = run_help(resolved_bin, ["--version"])
|
|
installed_version = parse_version(str(version_result.get("stdout", "")))
|
|
help_results = [
|
|
run_help(resolved_bin, ["--help"]),
|
|
run_help(resolved_bin, ["exec", "--help"]),
|
|
run_help(resolved_bin, ["exec", "resume", "--help"]),
|
|
run_help(resolved_bin, ["resume", "--help"]),
|
|
run_help(resolved_bin, ["features", "list"]),
|
|
]
|
|
combined_help = "\n".join(str(result.get("stdout", "")) for result in help_results)
|
|
version_cache = read_version_cache(codex_home)
|
|
latest_version = version_cache.get("latest_version")
|
|
update_available = bool(installed_version and latest_version and installed_version != latest_version)
|
|
retention_hits = term_hits(combined_help)
|
|
return {
|
|
"schema_version": SCHEMA_VERSION,
|
|
"codex_home": str(codex_home),
|
|
"codex_bin": resolved_bin,
|
|
"metadata_only": True,
|
|
"mutation_performed": False,
|
|
"raw_transcript_bodies_read": False,
|
|
"raw_thread_text_fields_read": False,
|
|
"state_db_mutation": False,
|
|
"session_jsonl_deleted": False,
|
|
"logs_deleted_or_truncated": False,
|
|
"installed_version": installed_version,
|
|
"version_cache": version_cache,
|
|
"update_available": update_available,
|
|
"native_retention_cleanup_command_advertised": bool(retention_hits),
|
|
"native_retention_terms_seen": retention_hits,
|
|
"ephemeral_prevention_advertised": "--ephemeral" in combined_help,
|
|
"help_surfaces_checked": [
|
|
"codex --help",
|
|
"codex exec --help",
|
|
"codex exec resume --help",
|
|
"codex resume --help",
|
|
"codex features list",
|
|
],
|
|
"decision": (
|
|
"Installed Codex advertises ephemeral prevention but no native retention cleanup command."
|
|
if not retention_hits
|
|
else "Installed Codex advertises possible retention cleanup terms; inspect before custom mutation."
|
|
),
|
|
"recommended_next": (
|
|
"Decide whether to update Codex and re-run this probe before archive-only execution."
|
|
if update_available
|
|
else "Use guarded archive executor only with exact approval token."
|
|
),
|
|
"false_effects": {
|
|
"codex_update": False,
|
|
"archive_threads": False,
|
|
"delete_session_jsonl": False,
|
|
"delete_logs": False,
|
|
"sqlite_checkpoint_or_vacuum": False,
|
|
"raw_transcript_body_read": False,
|
|
"raw_thread_text_field_read": False,
|
|
"core_source_mutation": False,
|
|
},
|
|
}
|
|
|
|
|
|
def validate_probe(probe: dict[str, object]) -> list[str]:
|
|
errors: list[str] = []
|
|
if probe.get("schema_version") != SCHEMA_VERSION:
|
|
errors.append("schema_version_invalid")
|
|
for field in [
|
|
"metadata_only",
|
|
"mutation_performed",
|
|
"raw_transcript_bodies_read",
|
|
"raw_thread_text_fields_read",
|
|
"state_db_mutation",
|
|
"session_jsonl_deleted",
|
|
"logs_deleted_or_truncated",
|
|
]:
|
|
expected = field == "metadata_only"
|
|
if probe.get(field) is not expected:
|
|
errors.append(f"{field}_invalid")
|
|
if not probe.get("codex_bin"):
|
|
errors.append("codex_bin_missing")
|
|
if not probe.get("installed_version"):
|
|
errors.append("installed_version_missing")
|
|
if probe.get("ephemeral_prevention_advertised") is not True:
|
|
errors.append("ephemeral_prevention_not_advertised")
|
|
false_effects = probe.get("false_effects")
|
|
if not isinstance(false_effects, dict):
|
|
errors.append("false_effects_missing")
|
|
else:
|
|
for key, value in false_effects.items():
|
|
if value is not False:
|
|
errors.append(f"false_effect_not_false:{key}")
|
|
return errors
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--codex-home", default=os.environ.get("CODEX_HOME", str(Path.home() / ".codex")))
|
|
parser.add_argument("--codex-bin")
|
|
parser.add_argument("--check", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
probe = build_probe(Path(args.codex_home).expanduser(), args.codex_bin)
|
|
errors = validate_probe(probe)
|
|
if args.check:
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"ok": not errors,
|
|
"validator": "cto-codex-native-retention-probe",
|
|
"errors": errors,
|
|
"warnings": [],
|
|
},
|
|
indent=2,
|
|
sort_keys=True,
|
|
)
|
|
)
|
|
return 0 if not errors else 1
|
|
probe["ok"] = not errors
|
|
probe["errors"] = errors
|
|
print(json.dumps(probe, indent=2, sort_keys=True))
|
|
return 0 if not errors else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|