Files
steev/tools/validate_steev_child.py
T

1390 lines
74 KiB
Python
Executable File

#!/usr/bin/env python3
"""Validate the Steev-named personal-agent profile distribution."""
from __future__ import annotations
import hashlib
import json
import re
from pathlib import Path
import yaml
ROOT = Path(__file__).resolve().parents[1]
REQUIRED = [
"AGENTS.md",
"INDEX.md",
"README.md",
"WORKBOARD.yaml",
"manifest.yaml",
"AGENT.md",
"CONTRACT.md",
"DISCLOSURE.md",
"docs/STEEV-MASTER.md",
"docs/contracts/personal-agent-profile-surface-contract.json",
"docs/contracts/personal-agent-bluebubbles-binding.json",
"docs/contracts/personal-agent-proton-rclone-package.json",
"docs/contracts/personal-agent-secondbrain-proposal-route.json",
"docs/contracts/personal-agent-conductor-curator-service-handoff.json",
"docs/contracts/personal-agent-runtime-readiness-snapshot.json",
"docs/contracts/personal-agent-desktop-exposure-contract.json",
"docs/evidence/2026-06-14-personal-agent-proton-rclone-runtime-reconciliation.md",
"docs/evidence/2026-06-15-personal-agent-core-seed-readiness-reconciliation.md",
"docs/evidence/2026-06-15-personal-agent-current-governed-boundary.md",
"docs/prd/2026-06-14-personal-agent-context-runtime-prd.md",
"docs/issues/2026-06-14-personal-agent-context-runtime-work-orders.md",
"docs/supersession/2026-06-14-personal-agent-context-runtime-supersession-register.md",
]
REQUIRED_SURFACES = {
"imessage.read",
"mail.read",
"mail.draft",
"mail.send_with_confirmation",
"calendar.read",
"calendar.propose_event",
"calendar.write_with_confirmation",
"contacts.read",
"contacts.write_with_confirmation",
"drive.read",
"drive.write_with_confirmation",
"browser.host_runtime.full_control",
}
REQUIRED_REDACTION_TERMS = {
"raw_messages",
"mail_bodies",
"contact_details",
"calendar_event_details",
"drive_file_names",
"endpoint_payloads",
"credentials",
"cookies",
"keychain_values",
"password_manager_values",
"secret_values",
}
REQUIRED_PROTON_RCLONE_SURFACES = {
"mail.read",
"mail.draft",
"mail.send_with_confirmation",
"calendar.read",
"calendar.propose_event",
"calendar.write_with_confirmation",
"contacts.read",
"contacts.write_with_confirmation",
"drive.read",
"drive.write_with_confirmation",
}
REQUIRED_PROTON_RCLONE_DENIED_EFFECTS = {
"send_without_confirmation",
"calendar_write_without_confirmation",
"contact_mutation_without_confirmation",
"drive_delete",
"orgbrain_write",
}
REQUIRED_MEMORY_SOURCE_SURFACES = {
"imessage.read",
"mail.read",
"calendar.read",
"contacts.read",
"drive.read",
}
REQUIRED_PROPOSAL_ENVELOPE_FIELDS = {
"schema_version",
"proposal_id",
"profile_identity",
"human_authority_principal",
"target_memory_domain",
"target_domain_term",
"source_capability_package",
"source_surface",
"proposal_type",
"target_lifecycle_state",
"source_handle_redacted",
"content_digest",
"redacted_summary",
"changed_fields",
"validator_plan",
"rollback_note",
"approval_state",
"proof_redaction",
}
REQUIRED_HANDOFF_SERVICES = {
"personal-agent.imessage.bluebubbles.readonly",
"personal-agent.proton-rclone.package-candidate",
"personal-agent.secondbrain.proposal-route",
}
REQUIRED_RUNTIME_SURFACES = {
"imessage.read",
"mail.read",
"calendar.read",
"contacts.read",
"drive.read",
}
REQUIRED_RUNTIME_GAPS = {
"proton-runtime-gate-repair-source-lock-refresh",
"proton-rclone-service-posture-disabled",
"proton-bridge-native-units-disabled-docker-route-active",
"stale-protonmail-bridge-container",
"proton-rclone-child-registered-core-s606",
"seed-local-acceptance-proven",
"proton-suite-provider-smoke-blocked",
"profile-exposure-route-required",
"longer-standing-runtime-proof-beyond-three-poll",
"secondbrain-apply-blocked",
"desktop-adapter-exposure-blocked",
}
REQUIRED_ADAPTER_SURFACES = {
"package.status",
"runtime.health",
"onboarding.state",
"profile.distribution",
"capability.catalog",
}
REQUIRED_DESKTOP_STATE_VOCABULARY = {
"ready",
"degraded",
"pending",
"blocked",
"disabled",
}
REQUIRED_DESKTOP_ROWS = {
"personal-agent.profile",
"personal-agent.imessage.read",
"personal-agent.mail.read",
"personal-agent.calendar.read",
"personal-agent.contacts.read",
"personal-agent.drive.read",
"personal-agent.seed-local-acceptance",
"personal-agent.proton-suite.provider-smoke",
"personal-agent.secondbrain.proposal",
"personal-agent.browser.host-runtime",
"personal-agent.write-actions",
}
GOVERNED_BOUNDARY_SOURCE_LOCKS = {
"../core/.sot/08-OUTPUTS/2026-06-15-s654-seed-main-branch-authority-gate-pickup.json": "d92e77e23418b67e27cc3058a9f415a9b4a023cfcd71e4465cbe859df9b8d7e4",
"../core/tools/check_personal_agent_21_seed_main_branch_authority_gate_pickup.py": "227eca8a3711a765994d0c5f6d4a430d8cc50719bf0e35ba340b9665baec665a",
"../seed/outputs/research/2026-06-15-cortex-os-seed-main-branch-authority-gate.json": "12515390f89263318f853c26918155b36376f7b976009101a026043d4d3c2379",
"../seed/tools/validate_cortex_os_seed_main_branch_authority_gate.py": "b7ce32bcfe48e8e568280c1659c09ec46729af8aa7d3c9e6433fb028506847e1",
"../proton-rclone/contracts/personal-agent-proton-suite-health-contract.json": "ec835d487aae52fe0aa251076caafbdb1fc7b7ec7a4923ca89de8c246f87495f",
"../proton-rclone/.sot/08-OUTPUTS/proton-suite-redacted-health-panel.json": "0cb6938f00618fa794081a04a45ecc258e14e9f31ded990d67845dd35f0f1207",
"../proton-rclone/.sot/08-OUTPUTS/proton-suite-health-panel-proof.json": "03ece893a3c7678365741cfdd01cb2c6cc2c30c20519e5d8649c25afac5ce31b",
}
GOVERNED_BOUNDARY_REPAIRED_SOURCE_HASHES: dict[str, set[str]] = {
"../core/tools/check_personal_agent_21_seed_main_branch_authority_gate_pickup.py": {
"20fffdb88f1e7a023e715465aa944c33201bc83ccff218833d6ba72f900f0944",
},
}
GOVERNED_BOUNDARY_SNIPPETS = [
"PACR-015",
"Core S654",
"fd880ef15232895da05bc31ae4449e32418190ec",
"56a1a36cc51d3cd084a65e01eb77210f58d7b6fd",
"must not be used for current branch authority",
"Proton Suite health-panel architecture may inform Keyvault successor work",
"future-governed-route",
"No Core mutation",
"No Core mutation, Seed mutation, Proton mutation, Keyvault mutation",
"broad goal-completion claim",
]
AGENT_CONTRACT_SNIPPETS = [
"# Steev Profile Endgoal",
"## Universal Cortex OS Agent Contract",
"## Repo-Custom Agent Contract",
"child-local personal-agent profile workspace",
"not Cortex OS Core authority",
"Profile Exposure authority",
"memory-domain authority",
"browser-host authority",
"Do not install or start Steev",
"read raw messages",
"read mail bodies",
"write durable memory",
"python3 tools/validate_steev_child.py",
]
README_CONTRACT_SNIPPETS = [
"## Cortex OS Boundary",
"child-local personal-agent profile workspace",
"does not own Core truth",
"Profile Exposure authority",
"credential authority",
"send authority",
"memory-domain authority",
"Do not install or start Steev",
"mutate `~/.hermes`",
"read raw messages",
"write durable memory",
]
def has_snippet(text: str, snippet: str) -> bool:
normalized_text = re.sub(r"\s+", " ", text)
normalized_snippet = re.sub(r"\s+", " ", snippet)
return normalized_snippet in normalized_text
def read_text(rel: str) -> str:
return (ROOT / rel).read_text(encoding="utf-8")
def umbrella_root() -> Path:
root = ROOT.resolve()
if root.parent.name == "worktrees":
return root.parent.parent.parent
return root.parent
def resolve_external(rel: str) -> Path:
if rel.startswith("../"):
return umbrella_root() / rel[3:]
return ROOT / rel
def sha256_file(path: Path) -> str:
return hashlib.sha256(path.read_bytes()).hexdigest()
def load_contract(errors: list[str]) -> dict:
return load_json("docs/contracts/personal-agent-profile-surface-contract.json", errors)
def load_json(rel: str, errors: list[str]) -> dict:
path = ROOT / rel
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
errors.append(f"json_invalid:{rel}:{exc.lineno}:{exc.colno}")
return {}
def main() -> int:
errors: list[str] = []
for rel in REQUIRED:
if not (ROOT / rel).exists():
errors.append(f"missing:{rel}")
board = ROOT / "WORKBOARD.yaml"
if board.exists():
text = board.read_text(encoding="utf-8")
for snippet in [
"STEEV-WORK-001",
"PACR-001",
"PACR-002",
"PACR-003",
"PACR-004",
"PACR-005",
"PACR-006",
"PACR-007",
"PACR-008",
"PACR-010",
"PACR-011",
"PACR-012",
"PACR-013",
"PACR-014",
"PACR-015",
"STEEV-WORK-003",
"STEEV-WORK-004",
"Steev Agent Contract Enforcement",
"Steev Navigation Index",
"source: INDEX.md",
"status: validated",
"owner: \"\"",
]:
if snippet not in text:
errors.append(f"workboard_missing:{snippet}")
agents = ROOT / "AGENTS.md"
if agents.exists():
text = agents.read_text(encoding="utf-8")
for snippet in AGENT_CONTRACT_SNIPPETS:
if not has_snippet(text, snippet):
errors.append(f"agents_missing:{snippet}")
readme = ROOT / "README.md"
if readme.exists():
text = readme.read_text(encoding="utf-8")
for snippet in README_CONTRACT_SNIPPETS:
if not has_snippet(text, snippet):
errors.append(f"readme_missing:{snippet}")
index = ROOT / "INDEX.md"
if index.exists():
text = index.read_text(encoding="utf-8")
for snippet in [
"Route: `steev`.",
"Category: child-local `personal-agent` profile workspace",
"Steev owns child-local profile identity",
"Steev is not Core authority, Runtime authority, Profile Exposure authority",
"Do not import raw messages, mail bodies, contacts",
"Stage: CLEAN.",
"Clean score: 100.",
"no readiness claim from local validation alone",
]:
if not has_snippet(text, snippet):
errors.append(f"index_missing:{snippet}")
manifest = ROOT / "manifest.yaml"
if manifest.exists():
data = yaml.safe_load(manifest.read_text(encoding="utf-8")) or {}
if data.get("profile_identity") != "personal-agent":
errors.append("manifest_profile_identity_not_personal_agent")
if data.get("display_name") != "Steev":
errors.append("manifest_display_name_not_steev")
if data.get("profile") != "steev":
errors.append("manifest_distribution_alias_not_steev")
contract = load_contract(errors)
if contract:
if contract.get("profile_identity") != "personal-agent":
errors.append("contract_profile_identity_not_personal_agent")
if contract.get("display_name") != "Steev":
errors.append("contract_display_name_not_steev")
if contract.get("distribution_alias") != "steev":
errors.append("contract_distribution_alias_not_steev")
memory = contract.get("memory_policy", {})
if memory.get("allowed_target") != "secondbrain-personal":
errors.append("contract_memory_allowed_target_not_secondbrain_personal")
if "orgbrain" not in memory.get("forbidden_targets", []):
errors.append("contract_memory_orgbrain_not_forbidden")
if "proposal-only" not in memory.get("durable_write_policy", ""):
errors.append("contract_memory_not_proposal_only")
credential = contract.get("credential_policy", {})
if credential.get("mode") != "keyvault-reference-names-only":
errors.append("contract_credential_policy_not_keyvault_refs_only")
redaction = set(contract.get("proof_redaction_policy", {}).get("forbidden_in_core_or_proof", []))
missing_redaction = sorted(REQUIRED_REDACTION_TERMS - redaction)
if missing_redaction:
errors.append(f"contract_redaction_missing:{','.join(missing_redaction)}")
states = set(contract.get("readiness_states", []))
for state in ["ready", "degraded", "pending", "blocked", "disabled"]:
if state not in states:
errors.append(f"contract_readiness_state_missing:{state}")
surfaces = contract.get("surfaces", [])
surface_names = {surface.get("name") for surface in surfaces}
missing_surfaces = sorted(REQUIRED_SURFACES - surface_names)
if missing_surfaces:
errors.append(f"contract_surfaces_missing:{','.join(missing_surfaces)}")
for surface in surfaces:
name = surface.get("name", "<unknown>")
if not surface.get("capability_package"):
errors.append(f"surface_missing_capability_package:{name}")
if not isinstance(surface.get("allowed_effects"), list):
errors.append(f"surface_allowed_effects_not_list:{name}")
denied = surface.get("denied_effects")
if not isinstance(denied, list):
errors.append(f"surface_denied_effects_not_list:{name}")
elif "orgbrain_write" not in denied:
errors.append(f"surface_orgbrain_write_not_denied:{name}")
if not surface.get("confirmation"):
errors.append(f"surface_missing_confirmation_policy:{name}")
binding = load_json("docs/contracts/personal-agent-bluebubbles-binding.json", errors)
if binding:
if binding.get("profile_identity") != "personal-agent":
errors.append("bluebubbles_binding_profile_identity_not_personal_agent")
if binding.get("surface") != "imessage.read":
errors.append("bluebubbles_binding_surface_not_imessage_read")
package = binding.get("capability_package", {})
if package.get("id") != "bluebubbles":
errors.append("bluebubbles_binding_package_not_bluebubbles")
if package.get("package_surface") != "bluebubbles.imessage.readonly":
errors.append("bluebubbles_binding_package_surface_not_readonly")
if package.get("live_connector") != "hermes-agent":
errors.append("bluebubbles_binding_live_connector_not_hermes")
if package.get("profile_local_connector_allowed") is not False:
errors.append("bluebubbles_binding_profile_local_connector_not_denied")
if package.get("duplicate_connector_allowed") is not False:
errors.append("bluebubbles_binding_duplicate_connector_not_denied")
policy = binding.get("binding_policy", {})
for key in [
"profile_consumes_package",
"package_owns_runtime_wrapper",
"package_owns_readonly_adapter",
"package_owns_redacted_health",
"package_owns_seed_candidate",
"profile_owns_surface_exposure",
]:
if policy.get(key) is not True:
errors.append(f"bluebubbles_binding_policy_not_true:{key}")
if policy.get("profile_runtime_readiness_claimed") is not False:
errors.append("bluebubbles_binding_profile_runtime_readiness_claimed")
memory = binding.get("memory_policy", {})
if memory.get("target") != "secondbrain-personal":
errors.append("bluebubbles_binding_memory_target_not_secondbrain_personal")
if "orgbrain" not in memory.get("forbidden", []):
errors.append("bluebubbles_binding_orgbrain_not_forbidden")
if "proposal-only" not in memory.get("durable_write_policy", ""):
errors.append("bluebubbles_binding_memory_not_proposal_only")
denied = set(binding.get("denied_effects", []))
for effect in [
"send_message",
"read_receipt",
"mark_read",
"attachment_content_download",
"secondbrain_durable_write",
"orgbrain_write",
"browser_full_control",
]:
if effect not in denied:
errors.append(f"bluebubbles_binding_denied_effect_missing:{effect}")
forbidden_fields = set(binding.get("proof_policy", {}).get("forbidden_fields", []))
for field in ["raw_messages", "message_text", "endpoint_payloads", "credentials", "secret_values"]:
if field not in forbidden_fields:
errors.append(f"bluebubbles_binding_forbidden_field_missing:{field}")
evidence = binding.get("bluebubbles_package_evidence", {})
if evidence.get("validator_command") != "python3 tools/validate_bluebubbles_child.py":
errors.append("bluebubbles_binding_validator_command_missing")
if evidence.get("validator_result_observed") != "ok":
errors.append("bluebubbles_binding_validator_result_not_ok")
if evidence.get("runtime_claims_from_validator") is not False:
errors.append("bluebubbles_binding_runtime_claims_not_false")
refs = set(evidence.get("referenced_artifacts", []))
for ref in [
"contracts/personal-agent-imessage-readonly-contract.json",
"contracts/runtime-compliance-boundary.json",
"contracts/secondbrain-proposal-envelope-contract.json",
"../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-apply-contract.md",
]:
if ref not in refs:
errors.append(f"bluebubbles_binding_reference_missing:{ref}")
proton = load_json("docs/contracts/personal-agent-proton-rclone-package.json", errors)
if proton:
if proton.get("schema_version") != "personal-agent-proton-rclone-package/v1":
errors.append("proton_rclone_schema_version_invalid")
if proton.get("status") != "registered-child-local-package-degraded":
errors.append("proton_rclone_status_not_registered_child_local_degraded")
if proton.get("package_id") != "proton-rclone":
errors.append("proton_rclone_package_id_invalid")
if proton.get("profile_identity") != "personal-agent":
errors.append("proton_rclone_profile_identity_not_personal_agent")
if proton.get("display_name") != "Steev":
errors.append("proton_rclone_display_name_not_steev")
if proton.get("child_workspace_registered") is not True:
errors.append("proton_rclone_child_workspace_not_registered")
for key in [
"package_runtime_readiness_claimed",
"profile_runtime_readiness_claimed",
"seed_readiness_claimed",
"core_promotion_claimed",
]:
if proton.get(key) is not False:
errors.append(f"proton_rclone_overclaim:{key}")
if proton.get("child_workspace_candidate_created") is not True:
errors.append("proton_rclone_child_candidate_not_created")
candidate = proton.get("child_workspace_candidate", {})
expected_candidate = {
"path": "../proton-rclone",
"commit": "f8403f1e5927933a0a5e283d2020119336e4e5e7",
"validator_command": "python3 tools/validate_proton_rclone_child.py",
"validator_result_observed": "ok",
"core_registration_candidate_packet": "../proton-rclone/.sot/08-OUTPUTS/proton-rclone-core-registration-candidate-packet.json",
"live_redacted_health_proof": "../proton-rclone/.sot/08-OUTPUTS/proton-rclone-live-redacted-health.json",
"runtime_gate_repair_proof": "../proton-rclone/.sot/08-OUTPUTS/proton-rclone-runtime-gate-repair-proof.json",
"bridge_unit_convergence_proof": "../proton-rclone/.sot/08-OUTPUTS/proton-rclone-bridge-unit-convergence-proof.json",
"current_runtime_state_reconciliation": "../proton-rclone/.sot/08-OUTPUTS/proton-rclone-current-runtime-state-reconciliation.json",
"core_registration_pickup": "../proton-rclone/.sot/08-OUTPUTS/proton-rclone-core-registration-pickup.json",
"core_s606_registration_output": "../core/.sot/08-OUTPUTS/2026-06-14-s606-proton-rclone-child-registration.json",
"core_s641_governance_pickup": "../core/.sot/08-OUTPUTS/2026-06-15-s641-proton-suite-governance-pickup.json",
"core_s642_seed_refresh_pickup": "../core/.sot/08-OUTPUTS/2026-06-15-s642-seed-proton-suite-refresh-pickup.json",
"core_s643_seed_validator_repair_pickup": "../core/.sot/08-OUTPUTS/2026-06-15-s643-seed-personal-agent-validator-repair-pickup.json",
"seed_final_acceptance_gate": "../seed/outputs/research/2026-06-14-cortex-os-seed-personal-agent-final-full-tool-acceptance-gate.json",
"seed_boundary_decision": "../seed/outputs/research/2026-06-14-cortex-os-seed-personal-agent-core-promotion-productization-boundary-decision.json",
"seed_objective_completion_audit": "../seed/outputs/research/2026-06-14-cortex-os-seed-personal-agent-objective-completion-audit.json",
}
for key, expected in expected_candidate.items():
if candidate.get(key) != expected:
errors.append(f"proton_rclone_child_candidate_mismatch:{key}")
expected_hashes = {
"readonly_contract": "d233a763ddb4fa49f5ff0bff02f5ec28595539375a735585902e535452f18686",
"live_redacted_health": "eebbb75e69c407f6b1a82fc847c30185bfa3b28d95848ea501333141a3c50edf",
"runtime_gate_repair_proof": "e9ebe2268209b6e9262a2d651d0baf9170c710e425fc591891f8b4ed81f21fbb",
"current_runtime_state_reconciliation": "4562a62053ef4805833a41e9bba744ecf5ee9698d325f90b4a98191fe7aa579c",
"bridge_unit_convergence_proof": "8a7c07e331ff3b49ff5462caa9a691fd29f6e4db7fb4c968e8a44a99b152c46b",
"core_registration_pickup": "d7ebfa239026b4e6d2667f4337ae7acaf763251ee11123f8974581137f34aa46",
"core_s606_registration_output": "ff7e0f93a705ce9149d48879a4a00f30ad5abf5903d569a738ba7f26ccc60d59",
"core_s641_governance_pickup": "224b12db17306764208cc16ae6d8dc3df342c77c05c0cba65df11d7ba20b0de6",
"core_s642_seed_refresh_pickup": "b3604875422663033772ba09a1a96e6152b654bcb020d1acc2dc6ccb9f44541f",
"core_s643_seed_validator_repair_pickup": "c378f7e25c5cd2668060aada18f3a8a0ebdceb76c30431cae48e109e41610c5c",
"seed_final_acceptance_gate": "1d56599c5fbc763e95a5734fa4a507767371189c56ec26f0da36b232f12f4869",
"seed_boundary_decision": "230accd38c9608656935858db576d5b1b19d71184387ef9015d6b7945c0ae136",
"seed_objective_completion_audit": "5bda7600319daee01348870bbe3c7cb716457f5507cdac974adb614540e08951",
}
hashes = candidate.get("source_hashes", {})
for key, expected in expected_hashes.items():
if hashes.get(key) != expected:
errors.append(f"proton_rclone_child_hash_mismatch:{key}")
if candidate.get("core_registration_claimed") is not True:
errors.append("proton_rclone_child_candidate_core_registration_not_claimed")
if candidate.get("runtime_readiness_claimed") is not False:
errors.append("proton_rclone_child_candidate_runtime_overclaim")
boundary = proton.get("authority_boundary", {})
if boundary.get("profile_owns_surface_exposure") is not True:
errors.append("proton_rclone_profile_surface_boundary_missing")
if boundary.get("package_candidate_owns_runtime_inventory") is not True:
errors.append("proton_rclone_inventory_boundary_missing")
if boundary.get("legacy_repositories_are_reference_only") is not True:
errors.append("proton_rclone_legacy_reference_boundary_missing")
if boundary.get("duplicate_profile_local_connectors_allowed") is not False:
errors.append("proton_rclone_duplicate_connectors_allowed")
memory = proton.get("memory_policy", {})
if memory.get("target") != "secondbrain-personal":
errors.append("proton_rclone_memory_target_not_secondbrain_personal")
if "orgbrain" not in memory.get("forbidden", []):
errors.append("proton_rclone_orgbrain_not_forbidden")
if "proposal-only" not in memory.get("durable_write_policy", ""):
errors.append("proton_rclone_memory_not_proposal_only")
credential = proton.get("credential_policy", {})
if credential.get("mode") != "keyvault-reference-names-only":
errors.append("proton_rclone_credential_mode_invalid")
if credential.get("secret_values_in_contract") is not False:
errors.append("proton_rclone_secret_values_allowed")
if credential.get("credential_mutation_allowed") is not False:
errors.append("proton_rclone_credential_mutation_allowed")
surfaces = proton.get("surfaces", [])
surface_names = {surface.get("name") for surface in surfaces}
missing_surfaces = sorted(REQUIRED_PROTON_RCLONE_SURFACES - surface_names)
if missing_surfaces:
errors.append(f"proton_rclone_surfaces_missing:{','.join(missing_surfaces)}")
readiness_by_surface = {surface.get("name"): surface.get("readiness") for surface in surfaces}
if readiness_by_surface.get("drive.read") != "degraded":
errors.append("proton_rclone_drive_read_not_degraded")
for disabled_surface in [
"mail.send_with_confirmation",
"calendar.write_with_confirmation",
"contacts.write_with_confirmation",
"drive.write_with_confirmation",
]:
if readiness_by_surface.get(disabled_surface) != "disabled":
errors.append(f"proton_rclone_confirmation_surface_not_disabled:{disabled_surface}")
observed_denied: set[str] = set()
for surface in surfaces:
name = surface.get("name", "<unknown>")
if not surface.get("runtime_route"):
errors.append(f"proton_rclone_surface_missing_runtime_route:{name}")
if surface.get("readiness") not in {"ready", "degraded", "pending", "blocked", "disabled"}:
errors.append(f"proton_rclone_surface_readiness_invalid:{name}")
if not isinstance(surface.get("allowed_effects"), list):
errors.append(f"proton_rclone_allowed_effects_not_list:{name}")
denied = surface.get("denied_effects")
if not isinstance(denied, list):
errors.append(f"proton_rclone_denied_effects_not_list:{name}")
else:
observed_denied.update(denied)
if "orgbrain_write" not in denied:
errors.append(f"proton_rclone_orgbrain_write_not_denied:{name}")
if not surface.get("confirmation"):
errors.append(f"proton_rclone_confirmation_missing:{name}")
missing_denied = sorted(REQUIRED_PROTON_RCLONE_DENIED_EFFECTS - observed_denied)
if missing_denied:
errors.append(f"proton_rclone_denied_effects_missing:{','.join(missing_denied)}")
inventory = proton.get("runtime_inventory", {})
if inventory.get("overall_state") != "degraded":
errors.append("proton_rclone_inventory_overall_state_not_degraded")
if "MCP facades" not in inventory.get("chosen_runtime_path", ""):
errors.append("proton_rclone_inventory_chosen_path_missing_mcp")
mcp = {item.get("name"): item.get("observed_status") for item in inventory.get("mcp_servers", [])}
for name in ["proton-calendar", "proton-email", "proton-contacts"]:
if mcp.get(name) != "enabled":
errors.append(f"proton_rclone_mcp_not_enabled:{name}")
docker = {item.get("name"): item.get("observed_state") for item in inventory.get("docker_routes", [])}
for name in ["sdo-calendar-gate", "sdo-email-gate", "sdo-contacts-gate"]:
if name not in docker:
errors.append(f"proton_rclone_docker_route_missing:{name}")
if docker.get("sdo-email-gate") != "up":
errors.append("proton_rclone_email_gate_state_not_up")
if docker.get("sdo-contacts-gate") != "up":
errors.append("proton_rclone_contacts_gate_state_not_up")
units = {item.get("name"): item for item in inventory.get("systemd_user_units", [])}
for unit in ["proton-bridge.service", "proton-bridge-proxy.service", "rclone-rc.service", "rclone-proxy.service"]:
if units.get(unit, {}).get("unit_file_state") != "disabled":
errors.append(f"proton_rclone_unit_not_disabled:{unit}")
for unit in ["proton-bridge.service", "proton-bridge-proxy.service"]:
if units.get(unit, {}).get("observed_state") != "inactive-dead":
errors.append(f"proton_rclone_bridge_unit_not_inactive:{unit}")
rclone = inventory.get("rclone", {})
if rclone.get("remote") != "proton:":
errors.append("proton_rclone_remote_missing")
if rclone.get("about_probe") != "ok-redacted":
errors.append("proton_rclone_about_probe_not_redacted_ok")
if rclone.get("file_names_observed") is not False:
errors.append("proton_rclone_file_names_observed")
if rclone.get("file_contents_observed") is not False:
errors.append("proton_rclone_file_contents_observed")
legacy_states = {item.get("path"): item.get("state") for item in proton.get("legacy_sources", [])}
for path in [
"/home/svrnty/workspaces/cortex/L4-svrnty.api-proton",
"/home/svrnty/workspaces/cortex/L4-svrnty.tool-storage",
"/home/svrnty/workspaces/cortex/L5-vendor.lib-proton-bridge",
"/home/svrnty/workspaces/cortex/L6-vendor.lib-proton-api",
"/home/svrnty/workspaces/cortex/L6-vendor.lib-rclone",
]:
if legacy_states.get(path) != "legacy-reference":
errors.append(f"proton_rclone_legacy_source_not_reference:{path}")
duplicate_states = {item.get("id"): item.get("state") for item in proton.get("duplicate_skill_policy", [])}
for skill in ["skills/proton-tools", "proton-access", "proton-mail-operations", "proton-services"]:
state = duplicate_states.get(skill, "")
if "superseded" not in state:
errors.append(f"proton_rclone_duplicate_skill_not_superseded:{skill}")
forbidden_fields = set(proton.get("proof_policy", {}).get("forbidden_fields", []))
for field in [
"mail_bodies",
"mail_subjects",
"contact_details",
"calendar_event_details",
"drive_file_names",
"drive_file_contents",
"endpoint_payloads",
"credentials",
"secret_values",
]:
if field not in forbidden_fields:
errors.append(f"proton_rclone_forbidden_field_missing:{field}")
commands = set(proton.get("observed_commands", []))
for command in [
"hermes -p steev mcp list",
"rclone --config /home/svrnty/.config/rclone/rclone.conf listremotes",
"rclone --config /home/svrnty/.config/rclone/rclone.conf about proton: --json",
]:
if command not in commands:
errors.append(f"proton_rclone_observed_command_missing:{command}")
remaining_gates = proton.get("remaining_gates", {})
if remaining_gates.get("child_workspace_candidate") != "complete-child-local":
errors.append("proton_rclone_child_candidate_gate_missing")
if remaining_gates.get("registered_child_workspace") != "complete-core-s606-child-local":
errors.append("proton_rclone_registered_child_gate_missing")
if remaining_gates.get("email_gate_repair") != "complete-child-local":
errors.append("proton_rclone_email_gate_repair_not_complete")
if remaining_gates.get("contacts_gate_repair") != "complete-child-local":
errors.append("proton_rclone_contacts_gate_repair_not_complete")
if remaining_gates.get("seed_personal_agent_local_acceptance") != "complete-governed-local-jp-only":
errors.append("proton_rclone_seed_local_acceptance_gate_missing")
if remaining_gates.get("proton_suite_provider_smoke") != "blocked-follow-up":
errors.append("proton_rclone_provider_smoke_gate_missing")
if remaining_gates.get("proton_suite_seed_package_pickup") != "blocked-provider-smoke":
errors.append("proton_rclone_suite_seed_pickup_gate_missing")
if remaining_gates.get("secondbrain_durable_apply") != "blocked-follow-up":
errors.append("proton_rclone_secondbrain_durable_apply_gate_missing")
if remaining_gates.get("systemd_bridge_convergence") != "complete-child-local-docker-route-active":
errors.append("proton_rclone_bridge_convergence_not_complete")
if remaining_gates.get("seed_package_pickup") != "complete-governed-local-personal-agent":
errors.append("proton_rclone_seed_pickup_gate_invalid")
memory_route = load_json("docs/contracts/personal-agent-secondbrain-proposal-route.json", errors)
if memory_route:
if memory_route.get("schema_version") != "personal-agent-secondbrain-proposal-route/v1":
errors.append("memory_route_schema_version_invalid")
if memory_route.get("status") != "active-profile-memory-proposal-route":
errors.append("memory_route_status_invalid")
if memory_route.get("route_id") != "personal-agent-secondbrain-proposal-route":
errors.append("memory_route_id_invalid")
if memory_route.get("profile_identity") != "personal-agent":
errors.append("memory_route_profile_identity_not_personal_agent")
if memory_route.get("display_name") != "Steev":
errors.append("memory_route_display_name_not_steev")
if memory_route.get("target_memory_domain") != "secondbrain-personal":
errors.append("memory_route_target_not_secondbrain_personal")
if memory_route.get("target_domain_term") != "Personal Memory Domain":
errors.append("memory_route_domain_term_invalid")
if "orgbrain" not in memory_route.get("forbidden_memory_domains", []):
errors.append("memory_route_orgbrain_not_forbidden")
for key in [
"durable_write_allowed",
"direct_write_allowed",
"profile_runtime_readiness_claimed",
"secondbrain_runtime_readiness_claimed",
"seed_readiness_claimed",
]:
if memory_route.get(key) is not False:
errors.append(f"memory_route_overclaim:{key}")
boundary = memory_route.get("authority_boundary", {})
for key in [
"profile_owns_source_surface_exposure",
"secondbrain_owns_personal_memory_domain",
"curator_owns_hygiene_review_queue",
"capability_packages_emit_proposals_only",
]:
if boundary.get(key) is not True:
errors.append(f"memory_route_boundary_missing:{key}")
if boundary.get("apply_owner") != "secondbrain":
errors.append("memory_route_apply_owner_not_secondbrain")
if boundary.get("hygiene_owner") != "curator":
errors.append("memory_route_hygiene_owner_not_curator")
source_routes = memory_route.get("source_routes", [])
source_surface_names = {route.get("source_surface") for route in source_routes}
missing_sources = sorted(REQUIRED_MEMORY_SOURCE_SURFACES - source_surface_names)
if missing_sources:
errors.append(f"memory_route_source_surfaces_missing:{','.join(missing_sources)}")
for route in source_routes:
surface = route.get("source_surface", "<unknown>")
if route.get("capability_package") not in {"bluebubbles", "proton-rclone"}:
errors.append(f"memory_route_bad_capability_package:{surface}")
if not str(route.get("proposal_type", "")).startswith("secondbrain.memory.propose_"):
errors.append(f"memory_route_bad_proposal_type:{surface}")
if route.get("target_lifecycle_state") != "inbox":
errors.append(f"memory_route_lifecycle_not_inbox:{surface}")
if (
surface == "imessage.read"
and route.get("secondbrain_intake_contract")
!= "../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-intake-contract.md"
):
errors.append("memory_route_imessage_intake_contract_missing")
if (
surface == "imessage.read"
and route.get("secondbrain_apply_contract")
!= "../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-apply-contract.md"
):
errors.append("memory_route_imessage_apply_contract_missing")
allowed = route.get("allowed_effects")
if not isinstance(allowed, list) or "emit_redacted_proposal" not in allowed:
errors.append(f"memory_route_redacted_proposal_not_allowed:{surface}")
denied = route.get("denied_effects")
if not isinstance(denied, list):
errors.append(f"memory_route_denied_effects_not_list:{surface}")
else:
for effect in ["durable_memory_write", "orgbrain_write"]:
if effect not in denied:
errors.append(f"memory_route_denied_effect_missing:{surface}:{effect}")
envelope = memory_route.get("proposal_envelope_contract", {})
if envelope.get("schema_version") != "personal-agent.secondbrain.proposal-envelope.v1":
errors.append("memory_route_envelope_schema_invalid")
fields = set(envelope.get("required_fields", []))
missing_fields = sorted(REQUIRED_PROPOSAL_ENVELOPE_FIELDS - fields)
if missing_fields:
errors.append(f"memory_route_envelope_fields_missing:{','.join(missing_fields)}")
if envelope.get("target_memory_domain") != "secondbrain-personal":
errors.append("memory_route_envelope_target_invalid")
if envelope.get("approval_state") != "pending":
errors.append("memory_route_envelope_approval_not_pending")
if envelope.get("raw_payload_in_core_or_profile_proof") is not False:
errors.append("memory_route_envelope_allows_raw_payload_proof")
if envelope.get("durable_apply_authorized_by_envelope") is not False:
errors.append("memory_route_envelope_authorizes_apply")
apply_policy = memory_route.get("apply_policy", {})
if apply_policy.get("apply_route") != "Secondbrain governed memory write path":
errors.append("memory_route_apply_policy_route_invalid")
if apply_policy.get("apply_route_contract") != "../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-apply-contract.md":
errors.append("memory_route_apply_policy_contract_missing")
if apply_policy.get("governed_apply_route_defined") is not True:
errors.append("memory_route_apply_policy_route_not_defined")
if apply_policy.get("apply_allowed_now") is not False:
errors.append("memory_route_apply_allowed_now")
if apply_policy.get("live_apply_executed") is not False:
errors.append("memory_route_live_apply_executed")
if apply_policy.get("durable_apply_without_approval") is not False:
errors.append("memory_route_apply_without_approval")
if apply_policy.get("requires_secondbrain_validator") != "python3 tools/validate_secondbrain_child.py":
errors.append("memory_route_secondbrain_validator_missing")
if apply_policy.get("focused_secondbrain_gate_command") != "python3 tools/check_secondbrain_personal_agent_imessage_intake.py":
errors.append("memory_route_focused_secondbrain_gate_command_missing")
if apply_policy.get("focused_secondbrain_apply_gate_command") != "python3 tools/check_secondbrain_personal_agent_imessage_apply.py":
errors.append("memory_route_focused_secondbrain_apply_gate_command_missing")
for key in [
"requires_focused_secondbrain_gate",
"requires_human_or_governed_approval",
"requires_local_evidence_and_handoff",
]:
if apply_policy.get(key) is not True:
errors.append(f"memory_route_apply_policy_missing:{key}")
if apply_policy.get("push_allowed") is not False:
errors.append("memory_route_apply_policy_allows_push")
cases = {case.get("case"): case.get("result") for case in memory_route.get("rejection_cases", [])}
expected_cases = {
"target_orgbrain": "rejected",
"direct_durable_write": "rejected",
"raw_payload_in_core_or_profile_proof": "rejected",
"apply_without_approval": "blocked",
}
for case, result in expected_cases.items():
if cases.get(case) != result:
errors.append(f"memory_route_rejection_case_missing:{case}")
refs = set(memory_route.get("referenced_secondbrain_contracts", []))
for ref in [
"../secondbrain/docs/integration/2026-06-09-secondbrain-personal-memory-domain-runtime-contract.md",
"../secondbrain/docs/integration/2026-06-09-secondbrain-governed-agent-retrieval-contract.md",
"../secondbrain/docs/integration/2026-06-09-secondbrain-governed-memory-write-path-contract.md",
"../secondbrain/docs/integration/2026-06-09-secondbrain-curator-hygiene-queue-contract.md",
"../secondbrain/docs/integration/2026-06-09-secondbrain-hermes-runtime-boundary.md",
"../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-intake-contract.md",
"../secondbrain/docs/evidence/2026-06-14-secondbrain-personal-agent-imessage-intake-proof.md",
"../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-apply-contract.md",
"../secondbrain/docs/evidence/2026-06-14-secondbrain-personal-agent-imessage-apply-proof.md",
]:
if ref not in refs:
errors.append(f"memory_route_secondbrain_ref_missing:{ref}")
forbidden_fields = set(memory_route.get("proof_policy", {}).get("forbidden_fields", []))
for field in [
"raw_messages",
"message_text",
"mail_bodies",
"contact_details",
"calendar_event_details",
"drive_file_names",
"endpoint_payloads",
"credentials",
"secret_values",
]:
if field not in forbidden_fields:
errors.append(f"memory_route_forbidden_field_missing:{field}")
remaining_gates = memory_route.get("remaining_gates", {})
if remaining_gates.get("secondbrain_governed_apply_route") != "defined-no-live-apply":
errors.append("memory_route_governed_apply_route_not_defined")
for gate in [
"secondbrain_durable_apply",
"curator_hygiene_apply_review",
"desktop_adapter_exposure",
"runtime_health_proof",
"seed_package_pickup",
]:
if remaining_gates.get(gate) != "blocked-follow-up":
errors.append(f"memory_route_remaining_gate_missing:{gate}")
service_handoff = load_json("docs/contracts/personal-agent-conductor-curator-service-handoff.json", errors)
if service_handoff:
if service_handoff.get("schema_version") != "personal-agent-conductor-curator-service-handoff/v1":
errors.append("service_handoff_schema_version_invalid")
if service_handoff.get("status") != "active-profile-service-handoff":
errors.append("service_handoff_status_invalid")
if service_handoff.get("handoff_id") != "personal-agent-conductor-curator-service-handoff":
errors.append("service_handoff_id_invalid")
if service_handoff.get("profile_identity") != "personal-agent":
errors.append("service_handoff_profile_identity_not_personal_agent")
if service_handoff.get("display_name") != "Steev":
errors.append("service_handoff_display_name_not_steev")
for key in [
"core_promotion_claimed",
"seed_readiness_claimed",
"runtime_readiness_claimed",
"desktop_integration_claimed",
]:
if service_handoff.get(key) is not False:
errors.append(f"service_handoff_overclaim:{key}")
boundary = service_handoff.get("authority_boundary", {})
for key in [
"profile_owns_capability_surface_contract",
"conductor_owns_future_route_selection_and_worker_contracts",
"curator_owns_future_hygiene_review_queue",
"secondbrain_owns_personal_memory_domain_apply",
"capability_packages_own_runtime_health",
]:
if boundary.get(key) is not True:
errors.append(f"service_handoff_boundary_missing:{key}")
memory = service_handoff.get("memory_policy", {})
if memory.get("target") != "secondbrain-personal":
errors.append("service_handoff_memory_target_not_secondbrain_personal")
if memory.get("target_domain_term") != "Personal Memory Domain":
errors.append("service_handoff_memory_domain_term_invalid")
if "orgbrain" not in memory.get("forbidden", []):
errors.append("service_handoff_orgbrain_not_forbidden")
if "proposal-only" not in memory.get("durable_write_policy", ""):
errors.append("service_handoff_memory_not_proposal_only")
services = service_handoff.get("service_identities", [])
service_ids = {service.get("service_id") for service in services}
missing_services = sorted(REQUIRED_HANDOFF_SERVICES - service_ids)
if missing_services:
errors.append(f"service_handoff_services_missing:{','.join(missing_services)}")
for service in services:
service_id = service.get("service_id", "<unknown>")
if not service.get("capability_package"):
errors.append(f"service_handoff_service_package_missing:{service_id}")
if not service.get("owner_route"):
errors.append(f"service_handoff_service_owner_missing:{service_id}")
if not service.get("health_shape"):
errors.append(f"service_handoff_service_health_missing:{service_id}")
if not service.get("readiness_state"):
errors.append(f"service_handoff_service_readiness_missing:{service_id}")
allowed = service.get("allowed_effects")
denied = service.get("denied_effects")
if not isinstance(allowed, list) or not allowed:
errors.append(f"service_handoff_allowed_effects_invalid:{service_id}")
if not isinstance(denied, list):
errors.append(f"service_handoff_denied_effects_invalid:{service_id}")
else:
for effect in ["orgbrain_write", "durable_memory_write"]:
if effect not in denied and service_id != "personal-agent.secondbrain.proposal-route":
errors.append(f"service_handoff_denied_effect_missing:{service_id}:{effect}")
if service_id == "personal-agent.secondbrain.proposal-route":
for effect in ["secondbrain_apply", "direct_memory_write", "orgbrain_write"]:
if effect not in denied:
errors.append(f"service_handoff_denied_effect_missing:{service_id}:{effect}")
conductor = service_handoff.get("conductor_handoff", {})
if conductor.get("target_workspace") != "../conductor":
errors.append("service_handoff_conductor_workspace_invalid")
if conductor.get("adoption_status") != "pending-conductor-lane-pickup":
errors.append("service_handoff_conductor_adoption_not_pending")
if conductor.get("validator_command") != "python3 tools/validate_conductor_child.py":
errors.append("service_handoff_conductor_validator_missing")
if conductor.get("validator_result_observed") != "ok":
errors.append("service_handoff_conductor_validator_not_ok")
conductor_forbidden = set(conductor.get("forbidden_conductor_effects", []))
for effect in ["runtime_start", "runtime_stop", "desktop_integration", "core_mutation", "secret_read", "raw_payload_import"]:
if effect not in conductor_forbidden:
errors.append(f"service_handoff_conductor_forbidden_missing:{effect}")
worker_expectations = set(conductor.get("worker_contract_expectations", []))
for expectation in ["one route per worker", "workspace-local validator", "redacted evidence", "no raw personal payloads"]:
if expectation not in worker_expectations:
errors.append(f"service_handoff_worker_expectation_missing:{expectation}")
curator = service_handoff.get("curator_handoff", {})
if curator.get("target_workspace") != "../curator":
errors.append("service_handoff_curator_workspace_invalid")
if curator.get("adoption_status") != "pending-curator-lane-pickup":
errors.append("service_handoff_curator_adoption_not_pending")
if curator.get("validator_command") != "python3 tools/validate_curator_child.py":
errors.append("service_handoff_curator_validator_missing")
if curator.get("validator_result_observed") != "ok":
errors.append("service_handoff_curator_validator_not_ok")
curator_reviews = set(curator.get("allowed_future_reviews", []))
for review in ["novelty_candidate", "stale_candidate", "duplicate_candidate", "supersession_candidate", "archive_candidate"]:
if review not in curator_reviews:
errors.append(f"service_handoff_curator_review_missing:{review}")
curator_forbidden = set(curator.get("forbidden_curator_effects", []))
for effect in ["direct_memory_write", "raw_payload_import", "orgbrain_write", "secret_read", "core_mutation", "seed_mutation"]:
if effect not in curator_forbidden:
errors.append(f"service_handoff_curator_forbidden_missing:{effect}")
apply = service_handoff.get("apply_expectations", {})
if apply.get("target") != "secondbrain-personal":
errors.append("service_handoff_apply_target_invalid")
if apply.get("apply_owner") != "secondbrain":
errors.append("service_handoff_apply_owner_invalid")
if apply.get("hygiene_owner") != "curator":
errors.append("service_handoff_hygiene_owner_invalid")
if apply.get("apply_route_defined") is not True:
errors.append("service_handoff_apply_route_not_defined")
if apply.get("apply_route_contract") != "../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-apply-contract.md":
errors.append("service_handoff_apply_route_contract_missing")
if apply.get("apply_allowed_now") is not False:
errors.append("service_handoff_apply_allowed_now")
if apply.get("live_apply_executed") is not False:
errors.append("service_handoff_live_apply_executed")
if apply.get("durable_apply_without_approval") is not False:
errors.append("service_handoff_apply_without_approval")
for key in ["requires_proposal_envelope", "requires_approval", "requires_redacted_evidence"]:
if apply.get(key) is not True:
errors.append(f"service_handoff_apply_requirement_missing:{key}")
if apply.get("requires_secondbrain_validator") != "python3 tools/validate_secondbrain_child.py":
errors.append("service_handoff_secondbrain_validator_missing")
if "orgbrain" not in apply.get("forbidden_targets", []):
errors.append("service_handoff_apply_orgbrain_not_forbidden")
source_contracts = set(service_handoff.get("source_contracts", []))
for ref in [
"docs/contracts/personal-agent-profile-surface-contract.json",
"docs/contracts/personal-agent-bluebubbles-binding.json",
"docs/contracts/personal-agent-proton-rclone-package.json",
"docs/contracts/personal-agent-secondbrain-proposal-route.json",
"../secondbrain/docs/integration/2026-06-14-secondbrain-personal-agent-imessage-apply-contract.md",
]:
if ref not in source_contracts:
errors.append(f"service_handoff_source_contract_missing:{ref}")
forbidden_fields = set(service_handoff.get("proof_policy", {}).get("forbidden_fields", []))
for field in [
"raw_messages",
"message_text",
"mail_bodies",
"contact_details",
"calendar_event_details",
"drive_file_names",
"endpoint_payloads",
"credentials",
"secret_values",
"raw_transcripts",
]:
if field not in forbidden_fields:
errors.append(f"service_handoff_forbidden_field_missing:{field}")
remaining_gates = service_handoff.get("remaining_gates", {})
if remaining_gates.get("secondbrain_governed_apply_route") != "defined-no-live-apply":
errors.append("service_handoff_governed_apply_route_not_defined")
for gate in [
"conductor_lane_pickup",
"curator_personal_memory_hygiene_lane_pickup",
"secondbrain_durable_apply",
"desktop_adapter_exposure",
"seed_package_pickup",
]:
if remaining_gates.get(gate) != "blocked-follow-up":
errors.append(f"service_handoff_remaining_gate_missing:{gate}")
if remaining_gates.get("runtime_health_proof") != "complete-child-local":
errors.append("service_handoff_runtime_health_proof_not_child_complete")
runtime = load_json("docs/contracts/personal-agent-runtime-readiness-snapshot.json", errors)
if runtime:
if runtime.get("schema_version") != "personal-agent-runtime-readiness-snapshot/v1":
errors.append("runtime_snapshot_schema_version_invalid")
if runtime.get("status") != "active-redacted-runtime-snapshot":
errors.append("runtime_snapshot_status_invalid")
if runtime.get("profile_identity") != "personal-agent":
errors.append("runtime_snapshot_profile_identity_not_personal_agent")
if runtime.get("display_name") != "Steev":
errors.append("runtime_snapshot_display_name_not_steev")
if runtime.get("aggregate_runtime_state") != "degraded":
errors.append("runtime_snapshot_aggregate_not_degraded")
for key in ["runtime_readiness_claimed", "seed_readiness_claimed", "core_promotion_claimed"]:
if runtime.get(key) is not False:
errors.append(f"runtime_snapshot_overclaim:{key}")
if runtime.get("seed_local_acceptance_claimed") is not True:
errors.append("runtime_snapshot_seed_local_acceptance_not_claimed")
if runtime.get("memory_target") != "secondbrain-personal":
errors.append("runtime_snapshot_memory_target_not_secondbrain_personal")
if "orgbrain" not in runtime.get("forbidden_memory_targets", []):
errors.append("runtime_snapshot_orgbrain_not_forbidden")
surfaces = runtime.get("surface_states", [])
surface_names = {surface.get("surface") for surface in surfaces}
missing_surfaces = sorted(REQUIRED_RUNTIME_SURFACES - surface_names)
if missing_surfaces:
errors.append(f"runtime_snapshot_surfaces_missing:{','.join(missing_surfaces)}")
state_by_surface = {surface.get("surface"): surface.get("readiness_state") for surface in surfaces}
if state_by_surface.get("imessage.read") != "ready":
errors.append("runtime_snapshot_imessage_not_ready")
for surface in ["mail.read", "calendar.read", "contacts.read", "drive.read"]:
if state_by_surface.get(surface) != "degraded":
errors.append(f"runtime_snapshot_surface_not_degraded:{surface}")
for surface in surfaces:
name = surface.get("surface", "<unknown>")
if surface.get("readiness_state") not in {"ready", "degraded", "pending", "blocked", "disabled"}:
errors.append(f"runtime_snapshot_readiness_invalid:{name}")
if not surface.get("capability_package"):
errors.append(f"runtime_snapshot_package_missing:{name}")
if not surface.get("health_source"):
errors.append(f"runtime_snapshot_health_source_missing:{name}")
health = surface.get("redacted_health")
if not isinstance(health, dict):
errors.append(f"runtime_snapshot_health_not_object:{name}")
if "raw_mail_observed" in health and health.get("raw_mail_observed") is not False:
errors.append("runtime_snapshot_raw_mail_observed")
if "raw_calendar_events_observed" in health and health.get("raw_calendar_events_observed") is not False:
errors.append("runtime_snapshot_raw_calendar_observed")
if "raw_contacts_observed" in health and health.get("raw_contacts_observed") is not False:
errors.append("runtime_snapshot_raw_contacts_observed")
if "drive_file_names_observed" in health and health.get("drive_file_names_observed") is not False:
errors.append("runtime_snapshot_drive_file_names_observed")
if "drive_file_contents_observed" in health and health.get("drive_file_contents_observed") is not False:
errors.append("runtime_snapshot_drive_file_contents_observed")
imessage_health = next((surface.get("redacted_health", {}) for surface in surfaces if surface.get("surface") == "imessage.read"), {})
if imessage_health.get("validator_ok") is not True:
errors.append("runtime_snapshot_bluebubbles_validator_not_ok")
if imessage_health.get("read_only_imessage") is not True:
errors.append("runtime_snapshot_imessage_not_readonly")
if imessage_health.get("secondbrain_governed_apply_route") != "defined-no-live-apply":
errors.append("runtime_snapshot_secondbrain_apply_route_not_defined")
if imessage_health.get("package_runtime_claims") is not False:
errors.append("runtime_snapshot_bluebubbles_runtime_claim_overclaimed")
posture = runtime.get("supervisor_posture", {})
expected_posture = {
"mac_mini_bluebubbles": "package-validator-ok-redacted",
"proton_bridge_service": "inactive-disabled",
"proton_bridge_proxy_service": "inactive-disabled",
"rclone_rc_service": "disabled-inactive",
"rclone_proxy_service": "disabled-inactive",
}
for key, expected in expected_posture.items():
if posture.get(key) != expected:
errors.append(f"runtime_snapshot_posture_mismatch:{key}")
gaps = {gap.get("id") for gap in runtime.get("named_runtime_gaps", [])}
missing_gaps = sorted(REQUIRED_RUNTIME_GAPS - gaps)
if missing_gaps:
errors.append(f"runtime_snapshot_gaps_missing:{','.join(missing_gaps)}")
reboot = runtime.get("optional_reboot_power_loss_proof", {})
if reboot.get("status") != "not-run":
errors.append("runtime_snapshot_reboot_status_not_not_run")
if reboot.get("required_for_final_always_on_claim") is not True:
errors.append("runtime_snapshot_reboot_not_required_for_final")
commands = set(runtime.get("observed_commands", []))
for command in [
"python3 tools/validate_bluebubbles_child.py",
"hermes -p steev mcp list",
"rclone --config /home/svrnty/.config/rclone/rclone.conf about proton: --json",
]:
if command not in commands:
errors.append(f"runtime_snapshot_observed_command_missing:{command}")
forbidden_fields = set(runtime.get("proof_policy", {}).get("forbidden_fields", []))
for field in [
"raw_messages",
"message_text",
"mail_bodies",
"contact_details",
"calendar_event_details",
"drive_file_names",
"drive_file_contents",
"endpoint_payloads",
"credentials",
"secret_values",
]:
if field not in forbidden_fields:
errors.append(f"runtime_snapshot_forbidden_field_missing:{field}")
remaining_gates = runtime.get("remaining_gates", {})
if remaining_gates.get("secondbrain_governed_apply_route") != "defined-no-live-apply":
errors.append("runtime_snapshot_governed_apply_route_not_defined")
if remaining_gates.get("proton_rclone_child_candidate") != "complete-child-local":
errors.append("runtime_snapshot_child_candidate_gate_missing")
if remaining_gates.get("proton_email_gate_repair") != "complete-child-local":
errors.append("runtime_snapshot_email_gate_repair_not_complete")
if remaining_gates.get("proton_contacts_gate_repair") != "complete-child-local":
errors.append("runtime_snapshot_contacts_gate_repair_not_complete")
if remaining_gates.get("proton_bridge_systemd_convergence") != "complete-child-local-docker-route-active":
errors.append("runtime_snapshot_bridge_convergence_not_complete")
if remaining_gates.get("proton_rclone_child_registration") != "complete-core-s606-child-local":
errors.append("runtime_snapshot_child_registration_gate_missing")
if remaining_gates.get("seed_local_acceptance") != "complete-governed-local-jp-only":
errors.append("runtime_snapshot_seed_local_acceptance_gate_missing")
if remaining_gates.get("proton_suite_provider_smoke") != "blocked-follow-up":
errors.append("runtime_snapshot_provider_smoke_gate_missing")
if remaining_gates.get("profile_exposure_route") != "blocked-core-route-required":
errors.append("runtime_snapshot_profile_exposure_gate_missing")
if remaining_gates.get("longer_standing_runtime_proof") != "follow-up":
errors.append("runtime_snapshot_longer_standing_runtime_gate_missing")
for gate in ["secondbrain_durable_apply", "desktop_adapter_exposure", "final_acceptance_packet"]:
if remaining_gates.get(gate) != "blocked-follow-up":
errors.append(f"runtime_snapshot_remaining_gate_missing:{gate}")
if remaining_gates.get("reboot_power_loss_drill") != "optional-follow-up":
errors.append("runtime_snapshot_reboot_gate_missing")
desktop = load_json("docs/contracts/personal-agent-desktop-exposure-contract.json", errors)
if desktop:
if desktop.get("schema_version") != "personal-agent-desktop-exposure-contract/v1":
errors.append("desktop_exposure_schema_version_invalid")
if desktop.get("status") != "active-profile-desktop-exposure-contract":
errors.append("desktop_exposure_status_invalid")
if desktop.get("contract_id") != "personal-agent-desktop-exposure-contract":
errors.append("desktop_exposure_contract_id_invalid")
if desktop.get("profile_identity") != "personal-agent":
errors.append("desktop_exposure_profile_identity_not_personal_agent")
if desktop.get("display_name") != "Steev":
errors.append("desktop_exposure_display_name_not_steev")
for key in [
"desktop_integration_claimed",
"runtime_readiness_claimed",
"seed_readiness_claimed",
"core_promotion_claimed",
]:
if desktop.get(key) is not False:
errors.append(f"desktop_exposure_overclaim:{key}")
if desktop.get("seed_local_acceptance_claimed") is not True:
errors.append("desktop_exposure_seed_local_acceptance_not_claimed")
if desktop.get("adapter_workspace") != "../cortex-hermes-adapter":
errors.append("desktop_exposure_adapter_workspace_invalid")
if desktop.get("adapter_validator_command") != "python3 tools/validate_cortex_hermes_adapter_child.py":
errors.append("desktop_exposure_adapter_validator_missing")
if desktop.get("adapter_validator_result_observed") != "ok":
errors.append("desktop_exposure_adapter_validator_not_ok")
refs = set(desktop.get("adapter_reference_contracts", []))
for ref in [
"../cortex-hermes-adapter/contracts/desktop-dashboard-host-surface.md",
"../cortex-hermes-adapter/contracts/personal-agent-s518-runtime-host-surface-intake.json",
"../cortex-hermes-adapter/contracts/first-open-evidence.schema.json",
"../cortex-hermes-adapter/dashboard/package-view.sample.json",
]:
if ref not in refs:
errors.append(f"desktop_exposure_adapter_ref_missing:{ref}")
boundary = desktop.get("authority_boundary", {})
for key in [
"profile_owns_desktop_exposure_contract",
"adapter_owns_desktop_rendering",
"seed_owns_package_first_open_proof",
"core_owns_acceptance",
]:
if boundary.get(key) is not True:
errors.append(f"desktop_exposure_boundary_missing:{key}")
if boundary.get("profile_mutates_adapter") is not False:
errors.append("desktop_exposure_profile_mutates_adapter")
adapter_surfaces = set(desktop.get("allowed_adapter_surfaces", []))
if adapter_surfaces != REQUIRED_ADAPTER_SURFACES:
errors.append("desktop_exposure_adapter_surfaces_invalid")
state_vocabulary = set(desktop.get("state_vocabulary", []))
if state_vocabulary != REQUIRED_DESKTOP_STATE_VOCABULARY:
errors.append("desktop_exposure_state_vocabulary_invalid")
rows = desktop.get("desktop_rows", [])
row_ids = {row.get("row_id") for row in rows}
missing_rows = sorted(REQUIRED_DESKTOP_ROWS - row_ids)
if missing_rows:
errors.append(f"desktop_exposure_rows_missing:{','.join(missing_rows)}")
state_by_row = {row.get("row_id"): row.get("state") for row in rows}
expected_states = {
"personal-agent.profile": "degraded",
"personal-agent.imessage.read": "ready",
"personal-agent.mail.read": "degraded",
"personal-agent.calendar.read": "degraded",
"personal-agent.contacts.read": "degraded",
"personal-agent.drive.read": "degraded",
"personal-agent.seed-local-acceptance": "ready",
"personal-agent.proton-suite.provider-smoke": "blocked",
"personal-agent.secondbrain.proposal": "pending",
"personal-agent.browser.host-runtime": "blocked",
"personal-agent.write-actions": "disabled",
}
for row_id, expected in expected_states.items():
if state_by_row.get(row_id) != expected:
errors.append(f"desktop_exposure_row_state_invalid:{row_id}")
for row in rows:
row_id = row.get("row_id", "<unknown>")
if row.get("surface") not in REQUIRED_ADAPTER_SURFACES:
errors.append(f"desktop_exposure_row_surface_invalid:{row_id}")
if row.get("state") not in REQUIRED_DESKTOP_STATE_VOCABULARY:
errors.append(f"desktop_exposure_row_state_not_vocab:{row_id}")
if not row.get("source_contract"):
errors.append(f"desktop_exposure_row_source_missing:{row_id}")
if not row.get("visible_reason"):
errors.append(f"desktop_exposure_row_reason_missing:{row_id}")
false_effects = desktop.get("desktop_false_effects", {})
for effect in [
"adapter_mutated_by_profile",
"desktop_or_dashboard_opened",
"runtime_started",
"runtime_stopped",
"docker_started",
"profile_exposure_changed",
"memory_domain_access_granted",
"provider_call",
"secret_value_read",
"raw_payload_imported",
"seed_release_claim",
"runtime_readiness_claim",
"public_release_claim",
]:
if false_effects.get(effect) is not False:
errors.append(f"desktop_exposure_false_effect_not_false:{effect}")
memory = desktop.get("memory_policy", {})
if memory.get("target") != "secondbrain-personal":
errors.append("desktop_exposure_memory_target_not_secondbrain_personal")
if "orgbrain" not in memory.get("forbidden", []):
errors.append("desktop_exposure_orgbrain_not_forbidden")
if memory.get("desktop_displays_memory_content") is not False:
errors.append("desktop_exposure_displays_memory_content")
if memory.get("desktop_displays_redacted_state_only") is not True:
errors.append("desktop_exposure_not_redacted_state_only")
forbidden_fields = set(desktop.get("proof_policy", {}).get("forbidden_fields", []))
for field in [
"raw_messages",
"message_text",
"mail_bodies",
"contact_details",
"calendar_event_details",
"drive_file_names",
"drive_file_contents",
"endpoint_payloads",
"credentials",
"secret_values",
]:
if field not in forbidden_fields:
errors.append(f"desktop_exposure_forbidden_field_missing:{field}")
remaining_gates = desktop.get("remaining_gates", {})
if remaining_gates.get("secondbrain_governed_apply_route") != "defined-no-live-apply":
errors.append("desktop_exposure_governed_apply_route_not_defined")
if remaining_gates.get("seed_local_acceptance") != "complete-governed-local-jp-only":
errors.append("desktop_exposure_seed_local_acceptance_gate_missing")
if remaining_gates.get("proton_suite_seed_package_pickup") != "blocked-provider-smoke":
errors.append("desktop_exposure_proton_suite_seed_pickup_gate_missing")
if remaining_gates.get("profile_exposure_route") != "blocked-core-route-required":
errors.append("desktop_exposure_profile_exposure_gate_missing")
if remaining_gates.get("longer_standing_runtime_proof") != "follow-up":
errors.append("desktop_exposure_longer_standing_runtime_gate_missing")
for gate in [
"adapter_lane_pickup",
"desktop_ui_wiring",
"runtime_readiness_finalization",
"browser_host_runtime_approval",
"final_acceptance_packet",
]:
if remaining_gates.get(gate) != "blocked-follow-up":
errors.append(f"desktop_exposure_remaining_gate_missing:{gate}")
for rel in ["AGENT.md", "CONTRACT.md", "DISCLOSURE.md", "README.md", "docs/STEEV-MASTER.md"]:
path = ROOT / rel
if not path.exists():
continue
text = path.read_text(encoding="utf-8")
if "personal-agent" not in text:
errors.append(f"visible_personal_agent_note_missing:{rel}")
if "display name" not in text and "display/distribution alias" not in text:
errors.append(f"visible_display_alias_note_missing:{rel}")
supersession = ROOT / "docs/supersession/2026-06-14-personal-agent-context-runtime-supersession-register.md"
if supersession.exists():
text = supersession.read_text(encoding="utf-8")
for snippet in [
"active-authority",
"active-alias",
"active-capability-package",
"Personal-agent BlueBubbles binding",
"Proton/rclone package candidate",
"Personal-agent Secondbrain proposal/apply route",
"Personal-agent Conductor/Curator service handoff",
"Personal-agent runtime readiness snapshot",
"Personal-agent desktop exposure contract",
"superseded",
"legacy-reference",
"blocked-follow-up",
"secondbrain-personal",
"`orgbrain` as denied",
"PACR-010",
]:
if snippet not in text:
errors.append(f"supersession_missing:{snippet}")
for stale in ["SPCR-", "steev-personal-context-runtime"]:
if stale in text:
errors.append(f"supersession_stale_reference:{stale}")
reconciliation = ROOT / "docs/evidence/2026-06-15-personal-agent-core-seed-readiness-reconciliation.md"
if reconciliation.exists():
text = reconciliation.read_text(encoding="utf-8")
for snippet in [
"PACR-014",
"f8403f1e5927933a0a5e283d2020119336e4e5e7",
"Core S606 registration output",
"Core S641 Proton Suite governance pickup",
"Core S642 Seed Proton Suite refresh pickup",
"Core S643 Seed validator repair pickup",
"Seed final acceptance gate",
"Seed boundary decision",
"Seed objective audit",
"Steev is represented as Seed-local accepted",
"`profile_exposure_route`: Core route required",
"`secondbrain_durable_apply`: Secondbrain route and exact approval required",
"`proton_suite_provider_smoke`: blocked",
"No Core mutation",
"broad goal-completion claim",
]:
if snippet not in text:
errors.append(f"reconciliation_missing:{snippet}")
governed_boundary = ROOT / "docs/evidence/2026-06-15-personal-agent-current-governed-boundary.md"
if governed_boundary.exists():
text = governed_boundary.read_text(encoding="utf-8")
for snippet in GOVERNED_BOUNDARY_SNIPPETS:
if snippet not in text:
errors.append(f"governed_boundary_missing:{snippet}")
for rel, expected_hash in GOVERNED_BOUNDARY_SOURCE_LOCKS.items():
path = resolve_external(rel)
if not path.exists():
errors.append(f"governed_boundary_source_missing:{rel}")
continue
actual_hash = sha256_file(path)
allowed_hashes = GOVERNED_BOUNDARY_REPAIRED_SOURCE_HASHES.get(rel, set())
if actual_hash != expected_hash and actual_hash not in allowed_hashes:
errors.append(f"governed_boundary_source_hash_drift:{rel}:{actual_hash}")
result = {
"ok": not errors,
"validator": "personal-agent-profile-distribution-v8",
"checked": REQUIRED,
"errors": errors,
"warnings": [],
}
print(json.dumps(result, indent=2, sort_keys=True))
return 0 if result["ok"] else 1
if __name__ == "__main__":
raise SystemExit(main())