CC: prepare generic VISION package candidate

This commit is contained in:
Svrnty
2026-06-06 08:25:14 -04:00
parent 8045f46b06
commit d62c5eb744
23 changed files with 1406 additions and 9 deletions
+148
View File
@@ -0,0 +1,148 @@
"""Visual Evidence adapter for Cortex OS VISION package candidates."""
from __future__ import annotations
import json
from typing import Any, Protocol
from pydantic import BaseModel, Field
PACKAGE_ID = "visual-perception-package-candidate"
DEFAULT_VALIDATION_STATUS = "candidate_validated"
class VlmAnalysisResponse(Protocol):
justification: str
model_id: str
raw_scores_json: str
class VisualEvidence(BaseModel):
"""Normalized visual observation record for host disclosure and handoff."""
producing_package_id: str = PACKAGE_ID
producing_tool_id: str
capability_surface: str
source_reference: str
provider_mode: str
retention_disclosure: str
observed_content_summary: str
extracted_claims: list[str] = Field(default_factory=list)
confidence: str
caveats: list[str] = Field(default_factory=list)
timestamp: str
validation_status: str
model_id: str | None = None
raw_observation_ref: str | None = None
def visual_evidence_from_vlm_response(
response: VlmAnalysisResponse,
*,
source_reference: str,
provider_mode: str,
retention_disclosure: str,
timestamp: str,
producing_tool_id: str = "vision.image_analyze",
capability_surface: str = "vision.image_analyze",
confidence: str = "medium",
caveats: list[str] | None = None,
validation_status: str = DEFAULT_VALIDATION_STATUS,
) -> VisualEvidence:
"""Adapt the current VLM response into a Cortex OS Visual Evidence record."""
_require_non_empty("source_reference", source_reference)
_require_non_empty("provider_mode", provider_mode)
_require_non_empty("retention_disclosure", retention_disclosure)
_require_non_empty("timestamp", timestamp)
raw_text = response.raw_scores_json.strip()
parsed = _parse_json_object(raw_text)
summary = _summary_from_response(parsed, response.justification, raw_text)
claims = _claims_from_raw(parsed, raw_text)
if not claims and summary:
claims = [f"summary: {summary}"]
return VisualEvidence(
producing_tool_id=producing_tool_id,
capability_surface=capability_surface,
source_reference=source_reference,
provider_mode=provider_mode,
retention_disclosure=retention_disclosure,
observed_content_summary=summary,
extracted_claims=claims,
confidence=confidence,
caveats=caveats or [],
timestamp=timestamp,
validation_status=validation_status,
model_id=response.model_id,
raw_observation_ref="AnalyzeResponse.raw_scores_json",
)
def _require_non_empty(field_name: str, value: str) -> None:
if not value or not value.strip():
raise ValueError(f"{field_name} is required for Visual Evidence")
def _parse_json_object(raw_text: str) -> dict[str, Any] | None:
if not raw_text:
return None
try:
parsed = json.loads(raw_text)
except json.JSONDecodeError:
return None
return parsed if isinstance(parsed, dict) else None
def _summary_from_response(
parsed: dict[str, Any] | None, justification: str, raw_text: str
) -> str:
if parsed:
for key in (
"observed_content_summary",
"summary",
"description",
"caption",
"text",
):
value = parsed.get(key)
if isinstance(value, str) and value.strip():
return _compact(value)
if justification.strip():
return _compact(justification)
return _compact(raw_text)
def _claims_from_raw(parsed: dict[str, Any] | None, raw_text: str) -> list[str]:
if not parsed:
return [f"raw: {_compact(raw_text)}"] if raw_text else []
claims: list[str] = []
for key, value in parsed.items():
claims.append(f"{key}: {_format_claim_value(value)}")
return claims
def _format_claim_value(value: Any) -> str:
if isinstance(value, list):
if not value:
return "[]"
if all(
isinstance(item, str | int | float | bool) or item is None
for item in value
):
return ", ".join(str(item) for item in value)
return json.dumps(value, sort_keys=True, separators=(",", ":"))
if isinstance(value, dict):
return json.dumps(value, sort_keys=True, separators=(",", ":"))
if value is None:
return "null"
return str(value)
def _compact(text: str, limit: int = 500) -> str:
compacted = " ".join(text.split())
if len(compacted) <= limit:
return compacted
return compacted[: limit - 3].rstrip() + "..."