797 lines
30 KiB
Python
797 lines
30 KiB
Python
"""Canvas command-center routes.
|
|
|
|
These routes are intentionally small: Hermes WebUI owns the command surface,
|
|
while the existing canva-editor Go service owns canonical project/screen state.
|
|
The proxy keeps browser calls same-origin and whitelisted so the first Stitch
|
|
slice can move without coupling WebUI to the design service internals.
|
|
"""
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import urllib.error
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
|
|
_DEFAULT_CANVA_BASE = "http://localhost:8080"
|
|
_PLANB_BRAND_ID = "607740e8-8b76-4455-9c4f-eadbe9b4168c"
|
|
_BRAND_ALIASES = {
|
|
"planb": _PLANB_BRAND_ID,
|
|
"plan-b": _PLANB_BRAND_ID,
|
|
"goutez-planb": _PLANB_BRAND_ID,
|
|
}
|
|
_DEFAULT_CMO_DB_PATHS = (
|
|
"~/.hermes/profiles/cmo-planb/cmo.db",
|
|
"~/.hermes/cmo-planb/cmo.db",
|
|
"~/.hermes/cmo/cmo.db",
|
|
)
|
|
_ALLOWED_EXACT = frozenset({
|
|
"/health",
|
|
"/api/v1/capabilities",
|
|
"/api/v1/projects",
|
|
"/api/v1/generate",
|
|
"/api/v1/generate/from-image",
|
|
})
|
|
_ALLOWED_PATTERN = re.compile(
|
|
r"^/api/v1/projects/[A-Za-z0-9_\-]+/(screens|design-system|variants|prototype-edges|export)"
|
|
r"(/[A-Za-z0-9_\-]+)?$"
|
|
)
|
|
_EVENT_LOCK = threading.Lock()
|
|
_EVENTS = []
|
|
_EVENT_CURSOR = 0
|
|
_MAX_EVENTS = 200
|
|
|
|
|
|
def register(api):
|
|
"""Wire the Canvas proxy, health, design-context, and SSE seed routes."""
|
|
log = api.logger("svrnty.routes.canvas")
|
|
api.register_route("/api/canvas/status", "GET", _handle_status)
|
|
api.register_route("/api/canvas/tools", "GET", _handle_tools)
|
|
api.register_route("/api/canvas/proxy", "GET", _handle_proxy)
|
|
api.register_route("/api/canvas/proxy", "POST", _handle_proxy)
|
|
api.register_route("/api/canvas/proxy", "PUT", _handle_proxy)
|
|
api.register_route("/api/canvas/command", "POST", _handle_command)
|
|
api.register_route("/api/canvas/design-context", "GET", _handle_design_context)
|
|
api.register_route("/api/canvas/events", "GET", _handle_events)
|
|
log.info("canvas endpoints registered")
|
|
|
|
|
|
def _handle_status(handler, _parsed):
|
|
base = _base_url()
|
|
ok = False
|
|
detail = "unreachable"
|
|
capabilities = None
|
|
try:
|
|
req = urllib.request.Request(base + "/health", method="GET")
|
|
with urllib.request.urlopen(req, timeout=2) as resp:
|
|
ok = 200 <= resp.status < 300
|
|
detail = "ok" if ok else f"status {resp.status}"
|
|
if ok:
|
|
caps_req = urllib.request.Request(base + "/api/v1/capabilities", method="GET")
|
|
with urllib.request.urlopen(caps_req, timeout=2) as caps_resp:
|
|
if 200 <= caps_resp.status < 300:
|
|
capabilities = json.loads(caps_resp.read().decode("utf-8"))
|
|
except Exception as e:
|
|
detail = str(e)
|
|
|
|
return _send_json(handler, {
|
|
"ok": ok,
|
|
"service": "canva-editor",
|
|
"base_url": base,
|
|
"detail": detail,
|
|
"capabilities": capabilities,
|
|
"boundary": "Hermes WebUI plugin -> canva-editor HTTP API",
|
|
"entrypoint": "/plugins/svrnty/canvas.html",
|
|
}, 200)
|
|
|
|
|
|
def _handle_tools(handler, _parsed):
|
|
return _send_json(handler, {
|
|
"ok": True,
|
|
"tool_surface": "cmo-canvas-v0",
|
|
"tools": [
|
|
{
|
|
"name": "canvas.status",
|
|
"method": "GET",
|
|
"path": "/api/canvas/status",
|
|
"purpose": "Check canva-editor reachability from Hermes WebUI.",
|
|
},
|
|
{
|
|
"name": "canvas.create_project",
|
|
"method": "POST",
|
|
"path": "/api/canvas/proxy?path=/api/v1/projects",
|
|
"purpose": "Create an owned design project.",
|
|
},
|
|
{
|
|
"name": "canvas.generate_screen",
|
|
"method": "POST",
|
|
"path": "/api/canvas/command",
|
|
"purpose": "Ask the Canvas bridge to generate and persist a screen with live progress events.",
|
|
},
|
|
{
|
|
"name": "canvas.command",
|
|
"method": "POST",
|
|
"path": "/api/canvas/command",
|
|
"purpose": "CMO-facing command endpoint for prompt-to-canvas generation, variants, and event replay.",
|
|
},
|
|
{
|
|
"name": "canvas.generate_from_image",
|
|
"method": "POST",
|
|
"path": "/api/canvas/proxy?path=/api/v1/generate/from-image",
|
|
"purpose": "Generate a structured screen spec from image input.",
|
|
},
|
|
{
|
|
"name": "canvas.save_screen",
|
|
"method": "POST",
|
|
"path": "/api/canvas/proxy?path=/api/v1/projects/{project_id}/screens",
|
|
"purpose": "Persist generated screen specs as canonical canva-editor state.",
|
|
},
|
|
{
|
|
"name": "canvas.update_screen",
|
|
"method": "POST",
|
|
"path": "/api/canvas/proxy?path=/api/v1/projects/{project_id}/screens/{screen_id}&method=PUT",
|
|
"purpose": "Persist simple structured edits to an existing screen spec.",
|
|
},
|
|
{
|
|
"name": "canvas.design_context",
|
|
"method": "GET",
|
|
"path": "/api/canvas/design-context",
|
|
"purpose": "Seed brand and memory context for CMO canvas work.",
|
|
},
|
|
{
|
|
"name": "canvas.record_variant",
|
|
"method": "POST",
|
|
"path": "/api/canvas/proxy?path=/api/v1/projects/{id}/variants",
|
|
"purpose": "Record parent/child lineage for generated design variations.",
|
|
},
|
|
{
|
|
"name": "canvas.connect_prototype_edge",
|
|
"method": "POST",
|
|
"path": "/api/canvas/proxy?path=/api/v1/projects/{id}/prototype-edges",
|
|
"purpose": "Record a navigable relationship between two screens.",
|
|
},
|
|
{
|
|
"name": "canvas.export_project",
|
|
"method": "GET",
|
|
"path": "/api/canvas/proxy?path=/api/v1/projects/{id}/export",
|
|
"purpose": "Export project JSON with screens, variants, prototype edges, and design system.",
|
|
},
|
|
{
|
|
"name": "canvas.events",
|
|
"method": "GET",
|
|
"path": "/api/canvas/events",
|
|
"purpose": "SSE contract seed for live canvas updates.",
|
|
},
|
|
],
|
|
"non_goals": [
|
|
"full-canva-parity",
|
|
"full-figma-parity",
|
|
"publishing-without-jp-approval",
|
|
"upstream-hermes-webui-edits",
|
|
],
|
|
}, 200)
|
|
|
|
|
|
def _handle_design_context(handler, parsed):
|
|
qs = urllib.parse.parse_qs(parsed.query or "")
|
|
brand_id = (qs.get("brand_id") or ["planb"])[0].strip() or "planb"
|
|
context = _load_design_context(brand_id)
|
|
return _send_json(handler, context, 200)
|
|
|
|
|
|
def _handle_events(handler, _parsed):
|
|
"""Return replayable Canvas events.
|
|
|
|
The canva-editor service does not expose a live event bus yet. This route
|
|
establishes the browser/backend contract now. The Canvas panel polls JSON
|
|
deltas, while SSE callers still receive a valid seed stream plus replay.
|
|
"""
|
|
qs = urllib.parse.parse_qs(getattr(_parsed, "query", "") or "")
|
|
if "since" in qs or (qs.get("format") or [""])[0] == "json":
|
|
since = _safe_int((qs.get("since") or ["0"])[0], 0)
|
|
events = _events_since(since)
|
|
cursor = events[-1]["id"] if events else _current_event_cursor()
|
|
return _send_json(handler, {"ok": True, "cursor": cursor, "events": events}, 200)
|
|
|
|
replay = _events_since(0)[-20:]
|
|
chunks = [
|
|
"event: canvas.connected\n"
|
|
"data: {\"ok\":true,\"source\":\"svrnty-hermes-webui-plugin\"}\n\n"
|
|
]
|
|
for event in replay:
|
|
event_name = event.get("type", "canvas.event")
|
|
chunks.append(f"event: {event_name}\n")
|
|
chunks.append("data: " + json.dumps(event) + "\n\n")
|
|
body = "".join(chunks).encode("utf-8")
|
|
handler.send_response(200)
|
|
handler.send_header("Content-Type", "text/event-stream; charset=utf-8")
|
|
handler.send_header("Cache-Control", "no-store")
|
|
handler.send_header("Connection", "close")
|
|
handler.send_header("Content-Length", str(len(body)))
|
|
handler.end_headers()
|
|
handler.wfile.write(body)
|
|
return True
|
|
|
|
|
|
def _handle_command(handler, _parsed):
|
|
"""Run a CMO-style prompt-to-canvas command through canva-editor."""
|
|
try:
|
|
body = _read_json_body(handler)
|
|
except ValueError as e:
|
|
return _send_json(handler, {"ok": False, "error": str(e)}, 400)
|
|
|
|
prompt = str(body.get("prompt") or "").strip()
|
|
if not prompt:
|
|
return _send_json(handler, {"ok": False, "error": "prompt is required"}, 400)
|
|
|
|
variants = max(1, min(6, _safe_int(body.get("variants"), 1)))
|
|
brand_id = str(body.get("brand_id") or body.get("brandId") or "planb").strip() or "planb"
|
|
project_name = str(body.get("project_name") or body.get("projectName") or "Hermes Canvas").strip() or "Hermes Canvas"
|
|
parent_screen_id = str(body.get("parent_screen_id") or body.get("parentScreenId") or "").strip()
|
|
command_id = f"canvas-{int(time.time() * 1000)}"
|
|
first_screen_id = parent_screen_id
|
|
screens = []
|
|
emitted_start = _current_event_cursor()
|
|
design_context = _load_design_context(brand_id)
|
|
|
|
_emit_event("canvas.command.accepted", {
|
|
"command_id": command_id,
|
|
"prompt": prompt,
|
|
"variants": variants,
|
|
"brand_id": brand_id,
|
|
"design_context_status": design_context.get("status"),
|
|
"design_context_version": design_context.get("source_version"),
|
|
})
|
|
try:
|
|
project = _ensure_project(project_name)
|
|
project_id = _record_id(project)
|
|
if not project_id:
|
|
raise RuntimeError("canva-editor project id missing")
|
|
_emit_event("canvas.project.ready", {
|
|
"command_id": command_id,
|
|
"project": project,
|
|
})
|
|
|
|
for idx in range(variants):
|
|
user_variant_prompt = prompt if variants == 1 else (
|
|
f"{prompt}\nVariant {idx + 1} of {variants}: choose a distinct composition."
|
|
)
|
|
variant_prompt = _compose_generation_prompt(user_variant_prompt, design_context)
|
|
_emit_event("canvas.variant.started", {
|
|
"command_id": command_id,
|
|
"project_id": project_id,
|
|
"variant_index": idx + 1,
|
|
"variants": variants,
|
|
"design_context_status": design_context.get("status"),
|
|
})
|
|
generated = _canva_json("/api/v1/generate", "POST", {"prompt": variant_prompt}, timeout=90)
|
|
spec = generated.get("screenSpec") or generated.get("screen_spec") or generated
|
|
screen = _canva_json(
|
|
f"/api/v1/projects/{urllib.parse.quote(project_id)}/screens",
|
|
"POST",
|
|
{"name": prompt[:42] or "Generated screen", "spec": spec},
|
|
timeout=30,
|
|
)
|
|
screen_id = _record_id(screen)
|
|
if idx == 0 and not first_screen_id:
|
|
first_screen_id = screen_id
|
|
elif first_screen_id and screen_id:
|
|
_canva_json(
|
|
f"/api/v1/projects/{urllib.parse.quote(project_id)}/variants",
|
|
"POST",
|
|
{
|
|
"parentScreenId": first_screen_id,
|
|
"screenId": screen_id,
|
|
"prompt": user_variant_prompt,
|
|
"aspects": ["layout", "copy", "composition"],
|
|
"metadata": {"commandId": command_id, "variantIndex": idx + 1},
|
|
},
|
|
timeout=30,
|
|
)
|
|
item = {
|
|
"screen": screen,
|
|
"screenSpec": spec,
|
|
"variant_index": idx + 1,
|
|
"prompt": user_variant_prompt,
|
|
"designContext": _compact_design_context(design_context),
|
|
}
|
|
screens.append(item)
|
|
_emit_event("canvas.screen.persisted", {
|
|
"command_id": command_id,
|
|
"project_id": project_id,
|
|
**item,
|
|
})
|
|
|
|
_emit_event("canvas.command.completed", {
|
|
"command_id": command_id,
|
|
"project": project,
|
|
"screens": len(screens),
|
|
})
|
|
emitted = _events_since(emitted_start)
|
|
return _send_json(handler, {
|
|
"ok": True,
|
|
"command_id": command_id,
|
|
"project": project,
|
|
"screens": screens,
|
|
"design_context": _compact_design_context(design_context),
|
|
"events": emitted,
|
|
"cursor": emitted[-1]["id"] if emitted else _current_event_cursor(),
|
|
}, 200)
|
|
except urllib.error.HTTPError as e:
|
|
detail = e.read().decode("utf-8", errors="replace") if e.fp else str(e)
|
|
_emit_event("canvas.command.failed", {"command_id": command_id, "error": detail})
|
|
return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, e.code)
|
|
except urllib.error.URLError as e:
|
|
detail = f"canva-editor unreachable: {e.reason}"
|
|
_emit_event("canvas.command.failed", {"command_id": command_id, "error": detail})
|
|
return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, 502)
|
|
except Exception as e:
|
|
detail = str(e)
|
|
_emit_event("canvas.command.failed", {"command_id": command_id, "error": detail})
|
|
return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, 500)
|
|
|
|
|
|
def _handle_proxy(handler, parsed):
|
|
qs = urllib.parse.parse_qs(parsed.query or "")
|
|
target_path = (qs.get("path") or [""])[0].strip()
|
|
method_override = (qs.get("method") or [""])[0].strip().upper()
|
|
if not target_path.startswith(("/health", "/api/v1/")):
|
|
return _send_json(handler, {"ok": False, "error": "missing or invalid path"}, 400)
|
|
if not _is_allowed(target_path):
|
|
return _send_json(handler, {"ok": False, "error": f"path not allowed: {target_path}"}, 403)
|
|
|
|
method = handler.command
|
|
if method == "POST" and method_override in {"PUT"}:
|
|
method = method_override
|
|
body = b""
|
|
if method in ("POST", "PUT", "PATCH"):
|
|
length = int(handler.headers.get("Content-Length", "0") or 0)
|
|
if length > 0:
|
|
body = handler.rfile.read(length)
|
|
|
|
target_url = _base_url() + target_path
|
|
req = urllib.request.Request(target_url, data=body if body else None, method=method)
|
|
if body:
|
|
req.add_header("Content-Type", handler.headers.get("Content-Type", "application/json"))
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=45) as resp:
|
|
resp_body = resp.read()
|
|
resp_ctype = resp.headers.get("Content-Type", "application/json")
|
|
resp_status = resp.status
|
|
except urllib.error.HTTPError as e:
|
|
resp_body = e.read() or b""
|
|
resp_ctype = e.headers.get("Content-Type", "application/json") if e.headers else "application/json"
|
|
resp_status = e.code
|
|
except urllib.error.URLError as e:
|
|
return _send_json(handler, {"ok": False, "error": f"canva-editor unreachable: {e.reason}"}, 502)
|
|
except Exception as e:
|
|
return _send_json(handler, {"ok": False, "error": f"proxy error: {e}"}, 500)
|
|
|
|
handler.send_response(resp_status)
|
|
handler.send_header("Content-Type", resp_ctype)
|
|
handler.send_header("Content-Length", str(len(resp_body)))
|
|
handler.send_header("Cache-Control", "no-store")
|
|
handler.end_headers()
|
|
handler.wfile.write(resp_body)
|
|
return True
|
|
|
|
|
|
def _is_allowed(path: str) -> bool:
|
|
return path in _ALLOWED_EXACT or bool(_ALLOWED_PATTERN.match(path))
|
|
|
|
|
|
def _base_url() -> str:
|
|
return os.environ.get("CANVA_EDITOR_BASE_URL", _DEFAULT_CANVA_BASE).rstrip("/")
|
|
|
|
|
|
def _load_design_context(brand_id: str) -> dict:
|
|
resolved_brand_id = _resolve_brand_id(brand_id)
|
|
base_context = {
|
|
"brand_id": brand_id,
|
|
"resolved_brand_id": resolved_brand_id,
|
|
"source": "cmo.brand_profile_cache",
|
|
"status": "missing",
|
|
"brand": None,
|
|
"brand_guideline_text": "",
|
|
"design_md": "",
|
|
"palette_tokens": [],
|
|
"typography_tokens": [],
|
|
"spacing_tokens": [],
|
|
"voice_rules": [],
|
|
"memory_hints": [],
|
|
"secondbrain": {"status": "not_checked", "hints": []},
|
|
"deferred_ports": ["secondbrain-writeback", "svrnty-vision"],
|
|
}
|
|
|
|
db_path = _cmo_db_path()
|
|
base_context["cmo_db_path"] = db_path
|
|
if not db_path:
|
|
base_context["error"] = "CMO database not found"
|
|
return base_context
|
|
|
|
try:
|
|
row = _read_brand_cache(db_path, resolved_brand_id)
|
|
except sqlite3.Error as e:
|
|
base_context["status"] = "error"
|
|
base_context["error"] = f"CMO database read failed: {e}"
|
|
return base_context
|
|
|
|
if not row:
|
|
base_context["error"] = f"brand_profile_cache row not found for {resolved_brand_id}"
|
|
return base_context
|
|
|
|
try:
|
|
blob = json.loads(row["json"] or "{}")
|
|
except ValueError as e:
|
|
base_context["status"] = "error"
|
|
base_context["error"] = f"brand_profile_cache JSON invalid: {e}"
|
|
return base_context
|
|
|
|
brand = blob.get("brand") if isinstance(blob.get("brand"), dict) else None
|
|
guideline = str(blob.get("brand_guideline") or "")
|
|
design_md = str(blob.get("design_md") or "")
|
|
palette = _normalise_token_list(blob.get("palette"))
|
|
typography = _normalise_token_list(blob.get("typography"))
|
|
spacing = _normalise_token_list(blob.get("spacing"))
|
|
voice_rules = _extract_voice_rules(blob.get("voice"), guideline)
|
|
memory_hints = _build_memory_hints(brand, guideline, palette, typography, spacing, voice_rules)
|
|
secondbrain = _load_secondbrain_hints(brand, resolved_brand_id)
|
|
memory_hints.extend(secondbrain.get("hints") or [])
|
|
|
|
base_context.update({
|
|
"status": "ready",
|
|
"cache_brand_id": row["brand_id"],
|
|
"source_version": row["version"],
|
|
"fetched_at": row["fetched_at"],
|
|
"brand": brand,
|
|
"brand_guideline_text": guideline,
|
|
"brand_guideline_excerpt": _excerpt(guideline, 1800),
|
|
"design_md": design_md,
|
|
"design_md_excerpt": _excerpt(design_md, 1400),
|
|
"palette_tokens": palette,
|
|
"typography_tokens": typography,
|
|
"spacing_tokens": spacing,
|
|
"voice_rules": voice_rules,
|
|
"memory_hints": memory_hints[:14],
|
|
"secondbrain": secondbrain,
|
|
"tokens_imported": bool(blob.get("tokens_imported") or palette or typography or spacing),
|
|
})
|
|
return base_context
|
|
|
|
|
|
def _resolve_brand_id(brand_id: str) -> str:
|
|
value = (brand_id or "").strip()
|
|
return _BRAND_ALIASES.get(value.lower(), value)
|
|
|
|
|
|
def _cmo_db_path() -> str:
|
|
candidates = []
|
|
for env_name in ("CANVAS_CMO_DB", "CMO_DB"):
|
|
env_path = os.environ.get(env_name)
|
|
if env_path:
|
|
candidates.append(env_path)
|
|
candidates.extend(_DEFAULT_CMO_DB_PATHS)
|
|
for path in candidates:
|
|
expanded = os.path.expanduser(path)
|
|
if os.path.exists(expanded):
|
|
return expanded
|
|
return ""
|
|
|
|
|
|
def _read_brand_cache(db_path: str, brand_id: str):
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
row = conn.execute(
|
|
"""SELECT brand_id, json, version, fetched_at
|
|
FROM brand_profile_cache
|
|
WHERE brand_id = ?""",
|
|
(brand_id,),
|
|
).fetchone()
|
|
return row
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _normalise_token_list(raw_tokens) -> list:
|
|
if not isinstance(raw_tokens, list):
|
|
return []
|
|
tokens = []
|
|
for item in raw_tokens[:80]:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
token = {
|
|
"path": item.get("path") or item.get("name") or item.get("key"),
|
|
"type": item.get("type"),
|
|
"description": item.get("description"),
|
|
}
|
|
value = item.get("value")
|
|
if value is None:
|
|
value = item.get("valueJson")
|
|
if isinstance(value, str):
|
|
try:
|
|
value = json.loads(value)
|
|
except ValueError:
|
|
pass
|
|
token["value"] = value
|
|
tokens.append({k: v for k, v in token.items() if v not in (None, "")})
|
|
return tokens
|
|
|
|
|
|
def _extract_voice_rules(voice, guideline: str) -> list:
|
|
rules = []
|
|
if isinstance(voice, dict):
|
|
for key in ("tone", "voice", "dos", "donts", "rules", "pillars", "description"):
|
|
value = voice.get(key) or voice.get(key[0].upper() + key[1:])
|
|
if isinstance(value, str) and value.strip():
|
|
rules.append(value.strip())
|
|
elif isinstance(value, list):
|
|
rules.extend(str(item).strip() for item in value if str(item).strip())
|
|
elif isinstance(voice, str) and voice.strip():
|
|
rules.append(voice.strip())
|
|
|
|
if not rules and guideline:
|
|
for line in guideline.splitlines():
|
|
clean = line.strip(" -\t")
|
|
if clean and any(word in clean.lower() for word in ("voice", "tone", "avoid", "must", "never")):
|
|
rules.append(clean)
|
|
if len(rules) >= 8:
|
|
break
|
|
return rules[:12]
|
|
|
|
|
|
def _build_memory_hints(brand, guideline, palette, typography, spacing, voice_rules) -> list:
|
|
hints = []
|
|
if isinstance(brand, dict):
|
|
name = brand.get("displayName") or brand.get("name")
|
|
description = brand.get("description")
|
|
if name:
|
|
hints.append(f"Brand: {name}")
|
|
if description:
|
|
hints.append(f"Brand description: {_excerpt(str(description), 260)}")
|
|
if palette:
|
|
colors = []
|
|
for token in palette:
|
|
value = token.get("value")
|
|
if isinstance(value, dict):
|
|
colors.extend(str(v) for v in value.values() if str(v).startswith("#"))
|
|
elif isinstance(value, str) and value.startswith("#"):
|
|
colors.append(value)
|
|
if colors:
|
|
hints.append("Use BTE palette tokens: " + ", ".join(dict.fromkeys(colors[:8])))
|
|
if typography:
|
|
type_paths = [str(t.get("path")) for t in typography if t.get("path")]
|
|
if type_paths:
|
|
hints.append("Typography context: " + ", ".join(type_paths[:8]))
|
|
if spacing:
|
|
hints.append("Respect BTE spacing tokens and density rhythm.")
|
|
if voice_rules:
|
|
hints.append("Voice constraints: " + " | ".join(_excerpt(rule, 180) for rule in voice_rules[:3]))
|
|
if guideline:
|
|
hints.append("Use BTE brand guideline as primary public-plane creative truth.")
|
|
return hints[:10]
|
|
|
|
|
|
def _load_secondbrain_hints(brand, brand_id: str) -> dict:
|
|
"""Read a few relevant knowledge capsules through local psql when configured."""
|
|
password = os.environ.get("SECONDBRAIN_DB_PASSWORD") or os.environ.get("PGPASSWORD")
|
|
database_url = os.environ.get("SECONDBRAIN_DATABASE_URL")
|
|
if not database_url and not password:
|
|
return {"status": "unconfigured", "hints": []}
|
|
|
|
psql = os.environ.get("PSQL_BIN", "psql")
|
|
sql = """
|
|
SELECT COALESCE(json_agg(row_to_json(t))::text, '[]')
|
|
FROM (
|
|
SELECT id, title, summary, sector, created_at
|
|
FROM knowledge_capsules
|
|
WHERE sector IN ('svrnty', 'planb', 'personal')
|
|
AND (
|
|
title ILIKE '%design%' OR summary ILIKE '%design%' OR content ILIKE '%design%' OR
|
|
title ILIKE '%brand%' OR summary ILIKE '%brand%' OR content ILIKE '%brand%' OR
|
|
title ILIKE '%canvas%' OR summary ILIKE '%canvas%' OR content ILIKE '%canvas%' OR
|
|
title ILIKE '%stitch%' OR summary ILIKE '%stitch%' OR content ILIKE '%stitch%'
|
|
)
|
|
ORDER BY created_at DESC
|
|
LIMIT 4
|
|
) AS t;
|
|
""".strip()
|
|
cmd = [psql]
|
|
env = os.environ.copy()
|
|
if database_url:
|
|
cmd.append(database_url)
|
|
else:
|
|
env["PGPASSWORD"] = password
|
|
cmd.extend([
|
|
"-h", os.environ.get("SECONDBRAIN_DB_HOST", "localhost"),
|
|
"-p", os.environ.get("SECONDBRAIN_DB_PORT", "5435"),
|
|
"-U", os.environ.get("SECONDBRAIN_DB_USER", "svrnty"),
|
|
"-d", os.environ.get("SECONDBRAIN_DB_NAME", "secondbrain"),
|
|
])
|
|
cmd.extend(["-tA", "-c", sql])
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
env=env,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2.5,
|
|
)
|
|
except FileNotFoundError:
|
|
return {"status": "psql_missing", "hints": []}
|
|
except subprocess.TimeoutExpired:
|
|
return {"status": "timeout", "hints": []}
|
|
except Exception as e:
|
|
return {"status": "error", "hints": [], "error": str(e)}
|
|
|
|
if result.returncode != 0:
|
|
return {"status": "error", "hints": [], "error": _excerpt(result.stderr, 240)}
|
|
|
|
try:
|
|
rows = json.loads((result.stdout or "[]").strip() or "[]")
|
|
except ValueError as e:
|
|
return {"status": "error", "hints": [], "error": f"invalid secondbrain JSON: {e}"}
|
|
if not isinstance(rows, list):
|
|
rows = []
|
|
|
|
hints = []
|
|
capsules = []
|
|
brand_name = ""
|
|
if isinstance(brand, dict):
|
|
brand_name = brand.get("displayName") or brand.get("name") or ""
|
|
for row in rows[:4]:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
title = str(row.get("title") or "Untitled capsule")
|
|
summary = _excerpt(str(row.get("summary") or ""), 260)
|
|
capsule = {
|
|
"id": row.get("id"),
|
|
"title": title,
|
|
"sector": row.get("sector"),
|
|
"created_at": row.get("created_at"),
|
|
}
|
|
capsules.append({k: v for k, v in capsule.items() if v not in (None, "")})
|
|
hint = f"Secondbrain capsule {row.get('id')}: {title}"
|
|
if summary:
|
|
hint += f" — {summary}"
|
|
hints.append(hint)
|
|
|
|
return {
|
|
"status": "ready",
|
|
"source": "secondbrain.knowledge_capsules",
|
|
"brand_id": brand_id,
|
|
"brand_name": brand_name,
|
|
"capsules": capsules,
|
|
"hints": hints,
|
|
}
|
|
|
|
|
|
def _excerpt(text: str, limit: int) -> str:
|
|
cleaned = " ".join((text or "").split())
|
|
if len(cleaned) <= limit:
|
|
return cleaned
|
|
return cleaned[: max(0, limit - 3)].rstrip() + "..."
|
|
|
|
|
|
def _compact_design_context(context: dict) -> dict:
|
|
return {
|
|
"brand_id": context.get("brand_id"),
|
|
"resolved_brand_id": context.get("resolved_brand_id"),
|
|
"status": context.get("status"),
|
|
"source": context.get("source"),
|
|
"source_version": context.get("source_version"),
|
|
"fetched_at": context.get("fetched_at"),
|
|
"tokens_imported": context.get("tokens_imported"),
|
|
"memory_hints": context.get("memory_hints") or [],
|
|
"secondbrain_status": (context.get("secondbrain") or {}).get("status"),
|
|
}
|
|
|
|
|
|
def _compose_generation_prompt(prompt: str, context: dict) -> str:
|
|
if not context or context.get("status") != "ready":
|
|
return prompt
|
|
pieces = [prompt.strip()]
|
|
brand = context.get("brand") or {}
|
|
brand_name = brand.get("displayName") or brand.get("name")
|
|
if brand_name:
|
|
pieces.append(f"Brand context: {brand_name}.")
|
|
hints = context.get("memory_hints") or []
|
|
if hints:
|
|
pieces.append("BTE/CMO design constraints:\n- " + "\n- ".join(str(h) for h in hints[:8]))
|
|
guideline = context.get("brand_guideline_excerpt") or context.get("design_md_excerpt") or ""
|
|
if guideline:
|
|
pieces.append("Brand guideline excerpt:\n" + _excerpt(guideline, 1600))
|
|
pieces.append("Generate draft-only canvas output for Hermes WebUI; do not publish or imply approval.")
|
|
return "\n\n".join(piece for piece in pieces if piece)
|
|
|
|
|
|
def _read_json_body(handler) -> dict:
|
|
length = int(handler.headers.get("Content-Length", "0") or 0)
|
|
if length <= 0:
|
|
return {}
|
|
raw = handler.rfile.read(length)
|
|
try:
|
|
payload = json.loads(raw.decode("utf-8"))
|
|
except Exception as e:
|
|
raise ValueError(f"invalid JSON: {e}")
|
|
if not isinstance(payload, dict):
|
|
raise ValueError("body must be a JSON object")
|
|
return payload
|
|
|
|
|
|
def _ensure_project(name: str) -> dict:
|
|
projects = _canva_json("/api/v1/projects", "GET", None, timeout=15)
|
|
if isinstance(projects, list) and projects:
|
|
return projects[0]
|
|
return _canva_json("/api/v1/projects", "POST", {"name": name}, timeout=15)
|
|
|
|
|
|
def _canva_json(path: str, method: str = "GET", payload=None, timeout: int = 45):
|
|
data = None
|
|
if payload is not None:
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(_base_url() + path, data=data, method=method)
|
|
if data is not None:
|
|
req.add_header("Content-Type", "application/json")
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
raw = resp.read()
|
|
if not raw:
|
|
return {}
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
|
|
def _record_id(record) -> str:
|
|
if not isinstance(record, dict):
|
|
return ""
|
|
return str(record.get("id") or record.get("ID") or record.get("uuid") or "")
|
|
|
|
|
|
def _emit_event(event_type: str, payload: dict) -> dict:
|
|
global _EVENT_CURSOR
|
|
with _EVENT_LOCK:
|
|
_EVENT_CURSOR += 1
|
|
event = {
|
|
"id": _EVENT_CURSOR,
|
|
"type": event_type,
|
|
"at": time.time(),
|
|
**payload,
|
|
}
|
|
_EVENTS.append(event)
|
|
del _EVENTS[:-_MAX_EVENTS]
|
|
return dict(event)
|
|
|
|
|
|
def _events_since(cursor: int) -> list:
|
|
with _EVENT_LOCK:
|
|
return [dict(event) for event in _EVENTS if event["id"] > cursor]
|
|
|
|
|
|
def _current_event_cursor() -> int:
|
|
with _EVENT_LOCK:
|
|
return _EVENT_CURSOR
|
|
|
|
|
|
def _safe_int(value, fallback: int) -> int:
|
|
try:
|
|
return int(value)
|
|
except Exception:
|
|
return fallback
|
|
|
|
|
|
def _send_json(handler, payload: dict, status: int) -> bool:
|
|
body = json.dumps(payload).encode("utf-8")
|
|
handler.send_response(status)
|
|
handler.send_header("Content-Type", "application/json; charset=utf-8")
|
|
handler.send_header("Content-Length", str(len(body)))
|
|
handler.send_header("Cache-Control", "no-store")
|
|
handler.end_headers()
|
|
handler.wfile.write(body)
|
|
return True
|