svrnty-hermes-webui-plugin/routes/canvas.py
2026-05-28 21:44:02 -04:00

797 lines
30 KiB
Python

"""Canvas command-center routes.
These routes are intentionally small: Hermes WebUI owns the command surface,
while the existing canva-editor Go service owns canonical project/screen state.
The proxy keeps browser calls same-origin and whitelisted so the first Stitch
slice can move without coupling WebUI to the design service internals.
"""
import json
import os
import re
import sqlite3
import subprocess
import threading
import time
import urllib.error
import urllib.parse
import urllib.request
_DEFAULT_CANVA_BASE = "http://localhost:8080"
_PLANB_BRAND_ID = "607740e8-8b76-4455-9c4f-eadbe9b4168c"
_BRAND_ALIASES = {
"planb": _PLANB_BRAND_ID,
"plan-b": _PLANB_BRAND_ID,
"goutez-planb": _PLANB_BRAND_ID,
}
_DEFAULT_CMO_DB_PATHS = (
"~/.hermes/profiles/cmo-planb/cmo.db",
"~/.hermes/cmo-planb/cmo.db",
"~/.hermes/cmo/cmo.db",
)
_ALLOWED_EXACT = frozenset({
"/health",
"/api/v1/capabilities",
"/api/v1/projects",
"/api/v1/generate",
"/api/v1/generate/from-image",
})
_ALLOWED_PATTERN = re.compile(
r"^/api/v1/projects/[A-Za-z0-9_\-]+/(screens|design-system|variants|prototype-edges|export)"
r"(/[A-Za-z0-9_\-]+)?$"
)
_EVENT_LOCK = threading.Lock()
_EVENTS = []
_EVENT_CURSOR = 0
_MAX_EVENTS = 200
def register(api):
"""Wire the Canvas proxy, health, design-context, and SSE seed routes."""
log = api.logger("svrnty.routes.canvas")
api.register_route("/api/canvas/status", "GET", _handle_status)
api.register_route("/api/canvas/tools", "GET", _handle_tools)
api.register_route("/api/canvas/proxy", "GET", _handle_proxy)
api.register_route("/api/canvas/proxy", "POST", _handle_proxy)
api.register_route("/api/canvas/proxy", "PUT", _handle_proxy)
api.register_route("/api/canvas/command", "POST", _handle_command)
api.register_route("/api/canvas/design-context", "GET", _handle_design_context)
api.register_route("/api/canvas/events", "GET", _handle_events)
log.info("canvas endpoints registered")
def _handle_status(handler, _parsed):
base = _base_url()
ok = False
detail = "unreachable"
capabilities = None
try:
req = urllib.request.Request(base + "/health", method="GET")
with urllib.request.urlopen(req, timeout=2) as resp:
ok = 200 <= resp.status < 300
detail = "ok" if ok else f"status {resp.status}"
if ok:
caps_req = urllib.request.Request(base + "/api/v1/capabilities", method="GET")
with urllib.request.urlopen(caps_req, timeout=2) as caps_resp:
if 200 <= caps_resp.status < 300:
capabilities = json.loads(caps_resp.read().decode("utf-8"))
except Exception as e:
detail = str(e)
return _send_json(handler, {
"ok": ok,
"service": "canva-editor",
"base_url": base,
"detail": detail,
"capabilities": capabilities,
"boundary": "Hermes WebUI plugin -> canva-editor HTTP API",
"entrypoint": "/plugins/svrnty/canvas.html",
}, 200)
def _handle_tools(handler, _parsed):
return _send_json(handler, {
"ok": True,
"tool_surface": "cmo-canvas-v0",
"tools": [
{
"name": "canvas.status",
"method": "GET",
"path": "/api/canvas/status",
"purpose": "Check canva-editor reachability from Hermes WebUI.",
},
{
"name": "canvas.create_project",
"method": "POST",
"path": "/api/canvas/proxy?path=/api/v1/projects",
"purpose": "Create an owned design project.",
},
{
"name": "canvas.generate_screen",
"method": "POST",
"path": "/api/canvas/command",
"purpose": "Ask the Canvas bridge to generate and persist a screen with live progress events.",
},
{
"name": "canvas.command",
"method": "POST",
"path": "/api/canvas/command",
"purpose": "CMO-facing command endpoint for prompt-to-canvas generation, variants, and event replay.",
},
{
"name": "canvas.generate_from_image",
"method": "POST",
"path": "/api/canvas/proxy?path=/api/v1/generate/from-image",
"purpose": "Generate a structured screen spec from image input.",
},
{
"name": "canvas.save_screen",
"method": "POST",
"path": "/api/canvas/proxy?path=/api/v1/projects/{project_id}/screens",
"purpose": "Persist generated screen specs as canonical canva-editor state.",
},
{
"name": "canvas.update_screen",
"method": "POST",
"path": "/api/canvas/proxy?path=/api/v1/projects/{project_id}/screens/{screen_id}&method=PUT",
"purpose": "Persist simple structured edits to an existing screen spec.",
},
{
"name": "canvas.design_context",
"method": "GET",
"path": "/api/canvas/design-context",
"purpose": "Seed brand and memory context for CMO canvas work.",
},
{
"name": "canvas.record_variant",
"method": "POST",
"path": "/api/canvas/proxy?path=/api/v1/projects/{id}/variants",
"purpose": "Record parent/child lineage for generated design variations.",
},
{
"name": "canvas.connect_prototype_edge",
"method": "POST",
"path": "/api/canvas/proxy?path=/api/v1/projects/{id}/prototype-edges",
"purpose": "Record a navigable relationship between two screens.",
},
{
"name": "canvas.export_project",
"method": "GET",
"path": "/api/canvas/proxy?path=/api/v1/projects/{id}/export",
"purpose": "Export project JSON with screens, variants, prototype edges, and design system.",
},
{
"name": "canvas.events",
"method": "GET",
"path": "/api/canvas/events",
"purpose": "SSE contract seed for live canvas updates.",
},
],
"non_goals": [
"full-canva-parity",
"full-figma-parity",
"publishing-without-jp-approval",
"upstream-hermes-webui-edits",
],
}, 200)
def _handle_design_context(handler, parsed):
qs = urllib.parse.parse_qs(parsed.query or "")
brand_id = (qs.get("brand_id") or ["planb"])[0].strip() or "planb"
context = _load_design_context(brand_id)
return _send_json(handler, context, 200)
def _handle_events(handler, _parsed):
"""Return replayable Canvas events.
The canva-editor service does not expose a live event bus yet. This route
establishes the browser/backend contract now. The Canvas panel polls JSON
deltas, while SSE callers still receive a valid seed stream plus replay.
"""
qs = urllib.parse.parse_qs(getattr(_parsed, "query", "") or "")
if "since" in qs or (qs.get("format") or [""])[0] == "json":
since = _safe_int((qs.get("since") or ["0"])[0], 0)
events = _events_since(since)
cursor = events[-1]["id"] if events else _current_event_cursor()
return _send_json(handler, {"ok": True, "cursor": cursor, "events": events}, 200)
replay = _events_since(0)[-20:]
chunks = [
"event: canvas.connected\n"
"data: {\"ok\":true,\"source\":\"svrnty-hermes-webui-plugin\"}\n\n"
]
for event in replay:
event_name = event.get("type", "canvas.event")
chunks.append(f"event: {event_name}\n")
chunks.append("data: " + json.dumps(event) + "\n\n")
body = "".join(chunks).encode("utf-8")
handler.send_response(200)
handler.send_header("Content-Type", "text/event-stream; charset=utf-8")
handler.send_header("Cache-Control", "no-store")
handler.send_header("Connection", "close")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
return True
def _handle_command(handler, _parsed):
"""Run a CMO-style prompt-to-canvas command through canva-editor."""
try:
body = _read_json_body(handler)
except ValueError as e:
return _send_json(handler, {"ok": False, "error": str(e)}, 400)
prompt = str(body.get("prompt") or "").strip()
if not prompt:
return _send_json(handler, {"ok": False, "error": "prompt is required"}, 400)
variants = max(1, min(6, _safe_int(body.get("variants"), 1)))
brand_id = str(body.get("brand_id") or body.get("brandId") or "planb").strip() or "planb"
project_name = str(body.get("project_name") or body.get("projectName") or "Hermes Canvas").strip() or "Hermes Canvas"
parent_screen_id = str(body.get("parent_screen_id") or body.get("parentScreenId") or "").strip()
command_id = f"canvas-{int(time.time() * 1000)}"
first_screen_id = parent_screen_id
screens = []
emitted_start = _current_event_cursor()
design_context = _load_design_context(brand_id)
_emit_event("canvas.command.accepted", {
"command_id": command_id,
"prompt": prompt,
"variants": variants,
"brand_id": brand_id,
"design_context_status": design_context.get("status"),
"design_context_version": design_context.get("source_version"),
})
try:
project = _ensure_project(project_name)
project_id = _record_id(project)
if not project_id:
raise RuntimeError("canva-editor project id missing")
_emit_event("canvas.project.ready", {
"command_id": command_id,
"project": project,
})
for idx in range(variants):
user_variant_prompt = prompt if variants == 1 else (
f"{prompt}\nVariant {idx + 1} of {variants}: choose a distinct composition."
)
variant_prompt = _compose_generation_prompt(user_variant_prompt, design_context)
_emit_event("canvas.variant.started", {
"command_id": command_id,
"project_id": project_id,
"variant_index": idx + 1,
"variants": variants,
"design_context_status": design_context.get("status"),
})
generated = _canva_json("/api/v1/generate", "POST", {"prompt": variant_prompt}, timeout=90)
spec = generated.get("screenSpec") or generated.get("screen_spec") or generated
screen = _canva_json(
f"/api/v1/projects/{urllib.parse.quote(project_id)}/screens",
"POST",
{"name": prompt[:42] or "Generated screen", "spec": spec},
timeout=30,
)
screen_id = _record_id(screen)
if idx == 0 and not first_screen_id:
first_screen_id = screen_id
elif first_screen_id and screen_id:
_canva_json(
f"/api/v1/projects/{urllib.parse.quote(project_id)}/variants",
"POST",
{
"parentScreenId": first_screen_id,
"screenId": screen_id,
"prompt": user_variant_prompt,
"aspects": ["layout", "copy", "composition"],
"metadata": {"commandId": command_id, "variantIndex": idx + 1},
},
timeout=30,
)
item = {
"screen": screen,
"screenSpec": spec,
"variant_index": idx + 1,
"prompt": user_variant_prompt,
"designContext": _compact_design_context(design_context),
}
screens.append(item)
_emit_event("canvas.screen.persisted", {
"command_id": command_id,
"project_id": project_id,
**item,
})
_emit_event("canvas.command.completed", {
"command_id": command_id,
"project": project,
"screens": len(screens),
})
emitted = _events_since(emitted_start)
return _send_json(handler, {
"ok": True,
"command_id": command_id,
"project": project,
"screens": screens,
"design_context": _compact_design_context(design_context),
"events": emitted,
"cursor": emitted[-1]["id"] if emitted else _current_event_cursor(),
}, 200)
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="replace") if e.fp else str(e)
_emit_event("canvas.command.failed", {"command_id": command_id, "error": detail})
return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, e.code)
except urllib.error.URLError as e:
detail = f"canva-editor unreachable: {e.reason}"
_emit_event("canvas.command.failed", {"command_id": command_id, "error": detail})
return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, 502)
except Exception as e:
detail = str(e)
_emit_event("canvas.command.failed", {"command_id": command_id, "error": detail})
return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, 500)
def _handle_proxy(handler, parsed):
qs = urllib.parse.parse_qs(parsed.query or "")
target_path = (qs.get("path") or [""])[0].strip()
method_override = (qs.get("method") or [""])[0].strip().upper()
if not target_path.startswith(("/health", "/api/v1/")):
return _send_json(handler, {"ok": False, "error": "missing or invalid path"}, 400)
if not _is_allowed(target_path):
return _send_json(handler, {"ok": False, "error": f"path not allowed: {target_path}"}, 403)
method = handler.command
if method == "POST" and method_override in {"PUT"}:
method = method_override
body = b""
if method in ("POST", "PUT", "PATCH"):
length = int(handler.headers.get("Content-Length", "0") or 0)
if length > 0:
body = handler.rfile.read(length)
target_url = _base_url() + target_path
req = urllib.request.Request(target_url, data=body if body else None, method=method)
if body:
req.add_header("Content-Type", handler.headers.get("Content-Type", "application/json"))
try:
with urllib.request.urlopen(req, timeout=45) as resp:
resp_body = resp.read()
resp_ctype = resp.headers.get("Content-Type", "application/json")
resp_status = resp.status
except urllib.error.HTTPError as e:
resp_body = e.read() or b""
resp_ctype = e.headers.get("Content-Type", "application/json") if e.headers else "application/json"
resp_status = e.code
except urllib.error.URLError as e:
return _send_json(handler, {"ok": False, "error": f"canva-editor unreachable: {e.reason}"}, 502)
except Exception as e:
return _send_json(handler, {"ok": False, "error": f"proxy error: {e}"}, 500)
handler.send_response(resp_status)
handler.send_header("Content-Type", resp_ctype)
handler.send_header("Content-Length", str(len(resp_body)))
handler.send_header("Cache-Control", "no-store")
handler.end_headers()
handler.wfile.write(resp_body)
return True
def _is_allowed(path: str) -> bool:
return path in _ALLOWED_EXACT or bool(_ALLOWED_PATTERN.match(path))
def _base_url() -> str:
return os.environ.get("CANVA_EDITOR_BASE_URL", _DEFAULT_CANVA_BASE).rstrip("/")
def _load_design_context(brand_id: str) -> dict:
resolved_brand_id = _resolve_brand_id(brand_id)
base_context = {
"brand_id": brand_id,
"resolved_brand_id": resolved_brand_id,
"source": "cmo.brand_profile_cache",
"status": "missing",
"brand": None,
"brand_guideline_text": "",
"design_md": "",
"palette_tokens": [],
"typography_tokens": [],
"spacing_tokens": [],
"voice_rules": [],
"memory_hints": [],
"secondbrain": {"status": "not_checked", "hints": []},
"deferred_ports": ["secondbrain-writeback", "svrnty-vision"],
}
db_path = _cmo_db_path()
base_context["cmo_db_path"] = db_path
if not db_path:
base_context["error"] = "CMO database not found"
return base_context
try:
row = _read_brand_cache(db_path, resolved_brand_id)
except sqlite3.Error as e:
base_context["status"] = "error"
base_context["error"] = f"CMO database read failed: {e}"
return base_context
if not row:
base_context["error"] = f"brand_profile_cache row not found for {resolved_brand_id}"
return base_context
try:
blob = json.loads(row["json"] or "{}")
except ValueError as e:
base_context["status"] = "error"
base_context["error"] = f"brand_profile_cache JSON invalid: {e}"
return base_context
brand = blob.get("brand") if isinstance(blob.get("brand"), dict) else None
guideline = str(blob.get("brand_guideline") or "")
design_md = str(blob.get("design_md") or "")
palette = _normalise_token_list(blob.get("palette"))
typography = _normalise_token_list(blob.get("typography"))
spacing = _normalise_token_list(blob.get("spacing"))
voice_rules = _extract_voice_rules(blob.get("voice"), guideline)
memory_hints = _build_memory_hints(brand, guideline, palette, typography, spacing, voice_rules)
secondbrain = _load_secondbrain_hints(brand, resolved_brand_id)
memory_hints.extend(secondbrain.get("hints") or [])
base_context.update({
"status": "ready",
"cache_brand_id": row["brand_id"],
"source_version": row["version"],
"fetched_at": row["fetched_at"],
"brand": brand,
"brand_guideline_text": guideline,
"brand_guideline_excerpt": _excerpt(guideline, 1800),
"design_md": design_md,
"design_md_excerpt": _excerpt(design_md, 1400),
"palette_tokens": palette,
"typography_tokens": typography,
"spacing_tokens": spacing,
"voice_rules": voice_rules,
"memory_hints": memory_hints[:14],
"secondbrain": secondbrain,
"tokens_imported": bool(blob.get("tokens_imported") or palette or typography or spacing),
})
return base_context
def _resolve_brand_id(brand_id: str) -> str:
value = (brand_id or "").strip()
return _BRAND_ALIASES.get(value.lower(), value)
def _cmo_db_path() -> str:
candidates = []
for env_name in ("CANVAS_CMO_DB", "CMO_DB"):
env_path = os.environ.get(env_name)
if env_path:
candidates.append(env_path)
candidates.extend(_DEFAULT_CMO_DB_PATHS)
for path in candidates:
expanded = os.path.expanduser(path)
if os.path.exists(expanded):
return expanded
return ""
def _read_brand_cache(db_path: str, brand_id: str):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"""SELECT brand_id, json, version, fetched_at
FROM brand_profile_cache
WHERE brand_id = ?""",
(brand_id,),
).fetchone()
return row
finally:
conn.close()
def _normalise_token_list(raw_tokens) -> list:
if not isinstance(raw_tokens, list):
return []
tokens = []
for item in raw_tokens[:80]:
if not isinstance(item, dict):
continue
token = {
"path": item.get("path") or item.get("name") or item.get("key"),
"type": item.get("type"),
"description": item.get("description"),
}
value = item.get("value")
if value is None:
value = item.get("valueJson")
if isinstance(value, str):
try:
value = json.loads(value)
except ValueError:
pass
token["value"] = value
tokens.append({k: v for k, v in token.items() if v not in (None, "")})
return tokens
def _extract_voice_rules(voice, guideline: str) -> list:
rules = []
if isinstance(voice, dict):
for key in ("tone", "voice", "dos", "donts", "rules", "pillars", "description"):
value = voice.get(key) or voice.get(key[0].upper() + key[1:])
if isinstance(value, str) and value.strip():
rules.append(value.strip())
elif isinstance(value, list):
rules.extend(str(item).strip() for item in value if str(item).strip())
elif isinstance(voice, str) and voice.strip():
rules.append(voice.strip())
if not rules and guideline:
for line in guideline.splitlines():
clean = line.strip(" -\t")
if clean and any(word in clean.lower() for word in ("voice", "tone", "avoid", "must", "never")):
rules.append(clean)
if len(rules) >= 8:
break
return rules[:12]
def _build_memory_hints(brand, guideline, palette, typography, spacing, voice_rules) -> list:
hints = []
if isinstance(brand, dict):
name = brand.get("displayName") or brand.get("name")
description = brand.get("description")
if name:
hints.append(f"Brand: {name}")
if description:
hints.append(f"Brand description: {_excerpt(str(description), 260)}")
if palette:
colors = []
for token in palette:
value = token.get("value")
if isinstance(value, dict):
colors.extend(str(v) for v in value.values() if str(v).startswith("#"))
elif isinstance(value, str) and value.startswith("#"):
colors.append(value)
if colors:
hints.append("Use BTE palette tokens: " + ", ".join(dict.fromkeys(colors[:8])))
if typography:
type_paths = [str(t.get("path")) for t in typography if t.get("path")]
if type_paths:
hints.append("Typography context: " + ", ".join(type_paths[:8]))
if spacing:
hints.append("Respect BTE spacing tokens and density rhythm.")
if voice_rules:
hints.append("Voice constraints: " + " | ".join(_excerpt(rule, 180) for rule in voice_rules[:3]))
if guideline:
hints.append("Use BTE brand guideline as primary public-plane creative truth.")
return hints[:10]
def _load_secondbrain_hints(brand, brand_id: str) -> dict:
"""Read a few relevant knowledge capsules through local psql when configured."""
password = os.environ.get("SECONDBRAIN_DB_PASSWORD") or os.environ.get("PGPASSWORD")
database_url = os.environ.get("SECONDBRAIN_DATABASE_URL")
if not database_url and not password:
return {"status": "unconfigured", "hints": []}
psql = os.environ.get("PSQL_BIN", "psql")
sql = """
SELECT COALESCE(json_agg(row_to_json(t))::text, '[]')
FROM (
SELECT id, title, summary, sector, created_at
FROM knowledge_capsules
WHERE sector IN ('svrnty', 'planb', 'personal')
AND (
title ILIKE '%design%' OR summary ILIKE '%design%' OR content ILIKE '%design%' OR
title ILIKE '%brand%' OR summary ILIKE '%brand%' OR content ILIKE '%brand%' OR
title ILIKE '%canvas%' OR summary ILIKE '%canvas%' OR content ILIKE '%canvas%' OR
title ILIKE '%stitch%' OR summary ILIKE '%stitch%' OR content ILIKE '%stitch%'
)
ORDER BY created_at DESC
LIMIT 4
) AS t;
""".strip()
cmd = [psql]
env = os.environ.copy()
if database_url:
cmd.append(database_url)
else:
env["PGPASSWORD"] = password
cmd.extend([
"-h", os.environ.get("SECONDBRAIN_DB_HOST", "localhost"),
"-p", os.environ.get("SECONDBRAIN_DB_PORT", "5435"),
"-U", os.environ.get("SECONDBRAIN_DB_USER", "svrnty"),
"-d", os.environ.get("SECONDBRAIN_DB_NAME", "secondbrain"),
])
cmd.extend(["-tA", "-c", sql])
try:
result = subprocess.run(
cmd,
env=env,
check=False,
capture_output=True,
text=True,
timeout=2.5,
)
except FileNotFoundError:
return {"status": "psql_missing", "hints": []}
except subprocess.TimeoutExpired:
return {"status": "timeout", "hints": []}
except Exception as e:
return {"status": "error", "hints": [], "error": str(e)}
if result.returncode != 0:
return {"status": "error", "hints": [], "error": _excerpt(result.stderr, 240)}
try:
rows = json.loads((result.stdout or "[]").strip() or "[]")
except ValueError as e:
return {"status": "error", "hints": [], "error": f"invalid secondbrain JSON: {e}"}
if not isinstance(rows, list):
rows = []
hints = []
capsules = []
brand_name = ""
if isinstance(brand, dict):
brand_name = brand.get("displayName") or brand.get("name") or ""
for row in rows[:4]:
if not isinstance(row, dict):
continue
title = str(row.get("title") or "Untitled capsule")
summary = _excerpt(str(row.get("summary") or ""), 260)
capsule = {
"id": row.get("id"),
"title": title,
"sector": row.get("sector"),
"created_at": row.get("created_at"),
}
capsules.append({k: v for k, v in capsule.items() if v not in (None, "")})
hint = f"Secondbrain capsule {row.get('id')}: {title}"
if summary:
hint += f"{summary}"
hints.append(hint)
return {
"status": "ready",
"source": "secondbrain.knowledge_capsules",
"brand_id": brand_id,
"brand_name": brand_name,
"capsules": capsules,
"hints": hints,
}
def _excerpt(text: str, limit: int) -> str:
cleaned = " ".join((text or "").split())
if len(cleaned) <= limit:
return cleaned
return cleaned[: max(0, limit - 3)].rstrip() + "..."
def _compact_design_context(context: dict) -> dict:
return {
"brand_id": context.get("brand_id"),
"resolved_brand_id": context.get("resolved_brand_id"),
"status": context.get("status"),
"source": context.get("source"),
"source_version": context.get("source_version"),
"fetched_at": context.get("fetched_at"),
"tokens_imported": context.get("tokens_imported"),
"memory_hints": context.get("memory_hints") or [],
"secondbrain_status": (context.get("secondbrain") or {}).get("status"),
}
def _compose_generation_prompt(prompt: str, context: dict) -> str:
if not context or context.get("status") != "ready":
return prompt
pieces = [prompt.strip()]
brand = context.get("brand") or {}
brand_name = brand.get("displayName") or brand.get("name")
if brand_name:
pieces.append(f"Brand context: {brand_name}.")
hints = context.get("memory_hints") or []
if hints:
pieces.append("BTE/CMO design constraints:\n- " + "\n- ".join(str(h) for h in hints[:8]))
guideline = context.get("brand_guideline_excerpt") or context.get("design_md_excerpt") or ""
if guideline:
pieces.append("Brand guideline excerpt:\n" + _excerpt(guideline, 1600))
pieces.append("Generate draft-only canvas output for Hermes WebUI; do not publish or imply approval.")
return "\n\n".join(piece for piece in pieces if piece)
def _read_json_body(handler) -> dict:
length = int(handler.headers.get("Content-Length", "0") or 0)
if length <= 0:
return {}
raw = handler.rfile.read(length)
try:
payload = json.loads(raw.decode("utf-8"))
except Exception as e:
raise ValueError(f"invalid JSON: {e}")
if not isinstance(payload, dict):
raise ValueError("body must be a JSON object")
return payload
def _ensure_project(name: str) -> dict:
projects = _canva_json("/api/v1/projects", "GET", None, timeout=15)
if isinstance(projects, list) and projects:
return projects[0]
return _canva_json("/api/v1/projects", "POST", {"name": name}, timeout=15)
def _canva_json(path: str, method: str = "GET", payload=None, timeout: int = 45):
data = None
if payload is not None:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(_base_url() + path, data=data, method=method)
if data is not None:
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
if not raw:
return {}
return json.loads(raw.decode("utf-8"))
def _record_id(record) -> str:
if not isinstance(record, dict):
return ""
return str(record.get("id") or record.get("ID") or record.get("uuid") or "")
def _emit_event(event_type: str, payload: dict) -> dict:
global _EVENT_CURSOR
with _EVENT_LOCK:
_EVENT_CURSOR += 1
event = {
"id": _EVENT_CURSOR,
"type": event_type,
"at": time.time(),
**payload,
}
_EVENTS.append(event)
del _EVENTS[:-_MAX_EVENTS]
return dict(event)
def _events_since(cursor: int) -> list:
with _EVENT_LOCK:
return [dict(event) for event in _EVENTS if event["id"] > cursor]
def _current_event_cursor() -> int:
with _EVENT_LOCK:
return _EVENT_CURSOR
def _safe_int(value, fallback: int) -> int:
try:
return int(value)
except Exception:
return fallback
def _send_json(handler, payload: dict, status: int) -> bool:
body = json.dumps(payload).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json; charset=utf-8")
handler.send_header("Content-Length", str(len(body)))
handler.send_header("Cache-Control", "no-store")
handler.end_headers()
handler.wfile.write(body)
return True