143 lines
4.9 KiB
Python
143 lines
4.9 KiB
Python
"""GET /api/cortex-os/runtime-health - Cortex OS Runtime Health slice.
|
|
|
|
Public API surface used: api.register_route, api.logger.
|
|
No forced internal dependencies. This module does not import Hermes internals.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from typing import Any
|
|
|
|
ROUTE_PATH = "/api/cortex-os/runtime-health"
|
|
ROUTE_METHOD = "GET"
|
|
CONTRACT_ID = "runtime-health/v0.1"
|
|
CHECKED_AT = "2026-05-29T00:00:00Z"
|
|
|
|
_FORBIDDEN_TEXT = re.compile(
|
|
r"(https?://|/home/|workspaces/|\b\d{2,5}\b|token|secret|cookie|traceback|exception|\.env)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def register(api: Any) -> None:
|
|
"""Wire the read-only Runtime Health route."""
|
|
log = api.logger("svrnty.routes.cortex_os_runtime_health")
|
|
api.register_route(ROUTE_PATH, ROUTE_METHOD, _handle_runtime_health)
|
|
log.info("cortex os runtime health endpoint registered")
|
|
|
|
|
|
def _handle_runtime_health(handler: Any, parsed: Any) -> bool:
|
|
"""Handler signature matches the plugin loader contract."""
|
|
if getattr(handler, "command", ROUTE_METHOD) != ROUTE_METHOD:
|
|
_write_json(handler, 405, _error_envelope("method_not_allowed", "read only route"))
|
|
return True
|
|
if getattr(parsed, "query", ""):
|
|
_write_json(handler, 400, _error_envelope("query_not_allowed", "query targets are not accepted"))
|
|
return True
|
|
|
|
_write_json(handler, 200, {"ok": True, "result": runtime_health_payload(), "error": None})
|
|
return True
|
|
|
|
|
|
def runtime_health_payload(host_signals: dict[str, Any] | None = None) -> dict[str, Any]:
|
|
"""Return the host-neutral Runtime Health envelope."""
|
|
signals = _summarize_host_signals(host_signals or {})
|
|
status = _derive_status(signals)
|
|
return {
|
|
"contract_id": CONTRACT_ID,
|
|
"checked_at": CHECKED_AT,
|
|
"status": status,
|
|
"readiness": "runtime_not_started",
|
|
"summary": _summary_for(status),
|
|
"authority": {
|
|
"read_only": True,
|
|
"runtime_state_mutation": False,
|
|
"tool_callable_authority": False,
|
|
"mcp_exposure": False,
|
|
"profile_exposure_change": False,
|
|
"memory_domain_access": False,
|
|
"delegated_memory_grant": False,
|
|
"sharing": False,
|
|
"installer_automation": False,
|
|
"product_readiness_claim": False,
|
|
},
|
|
"signals": signals,
|
|
"warnings": ["deterministic_host_surface_inputs_only"],
|
|
"errors": [],
|
|
"redactions": ["host_specific_values", "raw_paths", "raw_urls", "secrets", "raw_payloads"],
|
|
"source_trace": {
|
|
"host_adapter": "hermes",
|
|
"host_surfaces": ["health", "agent_health", "dashboard_status"],
|
|
"live_probe": False,
|
|
"raw_payload_passthrough": False,
|
|
},
|
|
}
|
|
|
|
|
|
def _summarize_host_signals(host_signals: dict[str, Any]) -> list[dict[str, str]]:
|
|
names = ["health", "agent_health", "dashboard_status"]
|
|
if not host_signals:
|
|
return [
|
|
{"name": name, "status": "unknown", "detail": "not_probed"}
|
|
for name in names
|
|
]
|
|
return [
|
|
{
|
|
"name": name,
|
|
"status": _clean_status(host_signals.get(name, "unknown")),
|
|
"detail": _bounded_text(host_signals.get(f"{name}_detail", "declared_surface")),
|
|
}
|
|
for name in names
|
|
]
|
|
|
|
|
|
def _derive_status(signals: list[dict[str, str]]) -> str:
|
|
statuses = {signal["status"] for signal in signals}
|
|
if "unavailable" in statuses:
|
|
return "unavailable"
|
|
if "degraded" in statuses:
|
|
return "degraded"
|
|
if statuses == {"healthy"}:
|
|
return "healthy"
|
|
return "unknown"
|
|
|
|
|
|
def _clean_status(value: Any) -> str:
|
|
text = str(value).strip().lower()
|
|
return text if text in {"healthy", "degraded", "unavailable", "unknown"} else "unknown"
|
|
|
|
|
|
def _summary_for(status: str) -> str:
|
|
if status == "healthy":
|
|
return "Runtime Health signals are healthy."
|
|
if status == "degraded":
|
|
return "Runtime Health signals are degraded."
|
|
if status == "unavailable":
|
|
return "Runtime Health signals are unavailable."
|
|
return "Runtime Health has not been live-probed in this slice."
|
|
|
|
|
|
def _bounded_text(value: Any) -> str:
|
|
text = str(value).strip().replace("\n", " ")
|
|
if not text:
|
|
return "redacted"
|
|
if _FORBIDDEN_TEXT.search(text):
|
|
return "redacted"
|
|
return text[:80]
|
|
|
|
|
|
def _error_envelope(code: str, message: str) -> dict[str, Any]:
|
|
return {"ok": False, "result": None, "error": {"code": code, "message": message}}
|
|
|
|
|
|
def _write_json(handler: Any, status_code: int, payload: dict[str, Any]) -> None:
|
|
body = json.dumps(payload, sort_keys=True).encode("utf-8")
|
|
handler.send_response(status_code)
|
|
handler.send_header("Content-Type", "application/json; charset=utf-8")
|
|
handler.send_header("Content-Length", str(len(body)))
|
|
handler.send_header("Cache-Control", "no-store")
|
|
handler.end_headers()
|
|
handler.wfile.write(body)
|