"""Canvas command-center routes. These routes are intentionally small: Hermes WebUI owns the command surface, while the existing canva-editor Go service owns canonical project/screen state. The proxy keeps browser calls same-origin and whitelisted so the first Stitch slice can move without coupling WebUI to the design service internals. """ import json import os import re import sqlite3 import subprocess import threading import time import urllib.error import urllib.parse import urllib.request _DEFAULT_CANVA_BASE = "http://localhost:8080" _PLANB_BRAND_ID = "607740e8-8b76-4455-9c4f-eadbe9b4168c" _BRAND_ALIASES = { "planb": _PLANB_BRAND_ID, "plan-b": _PLANB_BRAND_ID, "goutez-planb": _PLANB_BRAND_ID, } _DEFAULT_CMO_DB_PATHS = ( "~/.hermes/profiles/cmo-planb/cmo.db", "~/.hermes/cmo-planb/cmo.db", "~/.hermes/cmo/cmo.db", ) _ALLOWED_EXACT = frozenset({ "/health", "/api/v1/capabilities", "/api/v1/projects", "/api/v1/generate", "/api/v1/generate/from-image", }) _ALLOWED_PATTERN = re.compile( r"^/api/v1/projects/[A-Za-z0-9_\-]+/(screens|design-system|variants|prototype-edges|export)" r"(/[A-Za-z0-9_\-]+)?$" ) _EVENT_LOCK = threading.Lock() _EVENTS = [] _EVENT_CURSOR = 0 _MAX_EVENTS = 200 def register(api): """Wire the Canvas proxy, health, design-context, and SSE seed routes.""" log = api.logger("svrnty.routes.canvas") api.register_route("/api/canvas/status", "GET", _handle_status) api.register_route("/api/canvas/tools", "GET", _handle_tools) api.register_route("/api/canvas/proxy", "GET", _handle_proxy) api.register_route("/api/canvas/proxy", "POST", _handle_proxy) api.register_route("/api/canvas/proxy", "PUT", _handle_proxy) api.register_route("/api/canvas/command", "POST", _handle_command) api.register_route("/api/canvas/design-context", "GET", _handle_design_context) api.register_route("/api/canvas/events", "GET", _handle_events) log.info("canvas endpoints registered") def _handle_status(handler, _parsed): base = _base_url() ok = False detail = "unreachable" capabilities = None try: req = urllib.request.Request(base + "/health", method="GET") with urllib.request.urlopen(req, timeout=2) as resp: ok = 200 <= resp.status < 300 detail = "ok" if ok else f"status {resp.status}" if ok: caps_req = urllib.request.Request(base + "/api/v1/capabilities", method="GET") with urllib.request.urlopen(caps_req, timeout=2) as caps_resp: if 200 <= caps_resp.status < 300: capabilities = json.loads(caps_resp.read().decode("utf-8")) except Exception as e: detail = str(e) return _send_json(handler, { "ok": ok, "service": "canva-editor", "base_url": base, "detail": detail, "capabilities": capabilities, "boundary": "Hermes WebUI plugin -> canva-editor HTTP API", "entrypoint": "/plugins/svrnty/canvas.html", }, 200) def _handle_tools(handler, _parsed): return _send_json(handler, { "ok": True, "tool_surface": "cmo-canvas-v0", "tools": [ { "name": "canvas.status", "method": "GET", "path": "/api/canvas/status", "purpose": "Check canva-editor reachability from Hermes WebUI.", }, { "name": "canvas.create_project", "method": "POST", "path": "/api/canvas/proxy?path=/api/v1/projects", "purpose": "Create an owned design project.", }, { "name": "canvas.generate_screen", "method": "POST", "path": "/api/canvas/command", "purpose": "Ask the Canvas bridge to generate and persist a screen with live progress events.", }, { "name": "canvas.command", "method": "POST", "path": "/api/canvas/command", "purpose": "CMO-facing command endpoint for prompt-to-canvas generation, variants, and event replay.", }, { "name": "canvas.generate_from_image", "method": "POST", "path": "/api/canvas/proxy?path=/api/v1/generate/from-image", "purpose": "Generate a structured screen spec from image input.", }, { "name": "canvas.save_screen", "method": "POST", "path": "/api/canvas/proxy?path=/api/v1/projects/{project_id}/screens", "purpose": "Persist generated screen specs as canonical canva-editor state.", }, { "name": "canvas.update_screen", "method": "POST", "path": "/api/canvas/proxy?path=/api/v1/projects/{project_id}/screens/{screen_id}&method=PUT", "purpose": "Persist simple structured edits to an existing screen spec.", }, { "name": "canvas.design_context", "method": "GET", "path": "/api/canvas/design-context", "purpose": "Seed brand and memory context for CMO canvas work.", }, { "name": "canvas.record_variant", "method": "POST", "path": "/api/canvas/proxy?path=/api/v1/projects/{id}/variants", "purpose": "Record parent/child lineage for generated design variations.", }, { "name": "canvas.connect_prototype_edge", "method": "POST", "path": "/api/canvas/proxy?path=/api/v1/projects/{id}/prototype-edges", "purpose": "Record a navigable relationship between two screens.", }, { "name": "canvas.export_project", "method": "GET", "path": "/api/canvas/proxy?path=/api/v1/projects/{id}/export", "purpose": "Export project JSON with screens, variants, prototype edges, and design system.", }, { "name": "canvas.events", "method": "GET", "path": "/api/canvas/events", "purpose": "SSE contract seed for live canvas updates.", }, ], "non_goals": [ "full-canva-parity", "full-figma-parity", "publishing-without-jp-approval", "upstream-hermes-webui-edits", ], }, 200) def _handle_design_context(handler, parsed): qs = urllib.parse.parse_qs(parsed.query or "") brand_id = (qs.get("brand_id") or ["planb"])[0].strip() or "planb" context = _load_design_context(brand_id) return _send_json(handler, context, 200) def _handle_events(handler, _parsed): """Return replayable Canvas events. The canva-editor service does not expose a live event bus yet. This route establishes the browser/backend contract now. The Canvas panel polls JSON deltas, while SSE callers still receive a valid seed stream plus replay. """ qs = urllib.parse.parse_qs(getattr(_parsed, "query", "") or "") if "since" in qs or (qs.get("format") or [""])[0] == "json": since = _safe_int((qs.get("since") or ["0"])[0], 0) events = _events_since(since) cursor = events[-1]["id"] if events else _current_event_cursor() return _send_json(handler, {"ok": True, "cursor": cursor, "events": events}, 200) replay = _events_since(0)[-20:] chunks = [ "event: canvas.connected\n" "data: {\"ok\":true,\"source\":\"svrnty-hermes-webui-plugin\"}\n\n" ] for event in replay: event_name = event.get("type", "canvas.event") chunks.append(f"event: {event_name}\n") chunks.append("data: " + json.dumps(event) + "\n\n") body = "".join(chunks).encode("utf-8") handler.send_response(200) handler.send_header("Content-Type", "text/event-stream; charset=utf-8") handler.send_header("Cache-Control", "no-store") handler.send_header("Connection", "close") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) return True def _handle_command(handler, _parsed): """Run a CMO-style prompt-to-canvas command through canva-editor.""" try: body = _read_json_body(handler) except ValueError as e: return _send_json(handler, {"ok": False, "error": str(e)}, 400) prompt = str(body.get("prompt") or "").strip() if not prompt: return _send_json(handler, {"ok": False, "error": "prompt is required"}, 400) variants = max(1, min(6, _safe_int(body.get("variants"), 1))) brand_id = str(body.get("brand_id") or body.get("brandId") or "planb").strip() or "planb" project_name = str(body.get("project_name") or body.get("projectName") or "Hermes Canvas").strip() or "Hermes Canvas" parent_screen_id = str(body.get("parent_screen_id") or body.get("parentScreenId") or "").strip() command_id = f"canvas-{int(time.time() * 1000)}" first_screen_id = parent_screen_id screens = [] emitted_start = _current_event_cursor() design_context = _load_design_context(brand_id) _emit_event("canvas.command.accepted", { "command_id": command_id, "prompt": prompt, "variants": variants, "brand_id": brand_id, "design_context_status": design_context.get("status"), "design_context_version": design_context.get("source_version"), }) try: project = _ensure_project(project_name) project_id = _record_id(project) if not project_id: raise RuntimeError("canva-editor project id missing") _emit_event("canvas.project.ready", { "command_id": command_id, "project": project, }) for idx in range(variants): user_variant_prompt = prompt if variants == 1 else ( f"{prompt}\nVariant {idx + 1} of {variants}: choose a distinct composition." ) variant_prompt = _compose_generation_prompt(user_variant_prompt, design_context) _emit_event("canvas.variant.started", { "command_id": command_id, "project_id": project_id, "variant_index": idx + 1, "variants": variants, "design_context_status": design_context.get("status"), }) generated = _canva_json("/api/v1/generate", "POST", {"prompt": variant_prompt}, timeout=90) spec = generated.get("screenSpec") or generated.get("screen_spec") or generated screen = _canva_json( f"/api/v1/projects/{urllib.parse.quote(project_id)}/screens", "POST", {"name": prompt[:42] or "Generated screen", "spec": spec}, timeout=30, ) screen_id = _record_id(screen) if idx == 0 and not first_screen_id: first_screen_id = screen_id elif first_screen_id and screen_id: _canva_json( f"/api/v1/projects/{urllib.parse.quote(project_id)}/variants", "POST", { "parentScreenId": first_screen_id, "screenId": screen_id, "prompt": user_variant_prompt, "aspects": ["layout", "copy", "composition"], "metadata": {"commandId": command_id, "variantIndex": idx + 1}, }, timeout=30, ) item = { "screen": screen, "screenSpec": spec, "variant_index": idx + 1, "prompt": user_variant_prompt, "designContext": _compact_design_context(design_context), } screens.append(item) _emit_event("canvas.screen.persisted", { "command_id": command_id, "project_id": project_id, **item, }) _emit_event("canvas.command.completed", { "command_id": command_id, "project": project, "screens": len(screens), }) emitted = _events_since(emitted_start) return _send_json(handler, { "ok": True, "command_id": command_id, "project": project, "screens": screens, "design_context": _compact_design_context(design_context), "events": emitted, "cursor": emitted[-1]["id"] if emitted else _current_event_cursor(), }, 200) except urllib.error.HTTPError as e: detail = e.read().decode("utf-8", errors="replace") if e.fp else str(e) _emit_event("canvas.command.failed", {"command_id": command_id, "error": detail}) return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, e.code) except urllib.error.URLError as e: detail = f"canva-editor unreachable: {e.reason}" _emit_event("canvas.command.failed", {"command_id": command_id, "error": detail}) return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, 502) except Exception as e: detail = str(e) _emit_event("canvas.command.failed", {"command_id": command_id, "error": detail}) return _send_json(handler, {"ok": False, "command_id": command_id, "error": detail}, 500) def _handle_proxy(handler, parsed): qs = urllib.parse.parse_qs(parsed.query or "") target_path = (qs.get("path") or [""])[0].strip() method_override = (qs.get("method") or [""])[0].strip().upper() if not target_path.startswith(("/health", "/api/v1/")): return _send_json(handler, {"ok": False, "error": "missing or invalid path"}, 400) if not _is_allowed(target_path): return _send_json(handler, {"ok": False, "error": f"path not allowed: {target_path}"}, 403) method = handler.command if method == "POST" and method_override in {"PUT"}: method = method_override body = b"" if method in ("POST", "PUT", "PATCH"): length = int(handler.headers.get("Content-Length", "0") or 0) if length > 0: body = handler.rfile.read(length) target_url = _base_url() + target_path req = urllib.request.Request(target_url, data=body if body else None, method=method) if body: req.add_header("Content-Type", handler.headers.get("Content-Type", "application/json")) try: with urllib.request.urlopen(req, timeout=45) as resp: resp_body = resp.read() resp_ctype = resp.headers.get("Content-Type", "application/json") resp_status = resp.status except urllib.error.HTTPError as e: resp_body = e.read() or b"" resp_ctype = e.headers.get("Content-Type", "application/json") if e.headers else "application/json" resp_status = e.code except urllib.error.URLError as e: return _send_json(handler, {"ok": False, "error": f"canva-editor unreachable: {e.reason}"}, 502) except Exception as e: return _send_json(handler, {"ok": False, "error": f"proxy error: {e}"}, 500) handler.send_response(resp_status) handler.send_header("Content-Type", resp_ctype) handler.send_header("Content-Length", str(len(resp_body))) handler.send_header("Cache-Control", "no-store") handler.end_headers() handler.wfile.write(resp_body) return True def _is_allowed(path: str) -> bool: return path in _ALLOWED_EXACT or bool(_ALLOWED_PATTERN.match(path)) def _base_url() -> str: return os.environ.get("CANVA_EDITOR_BASE_URL", _DEFAULT_CANVA_BASE).rstrip("/") def _load_design_context(brand_id: str) -> dict: resolved_brand_id = _resolve_brand_id(brand_id) base_context = { "brand_id": brand_id, "resolved_brand_id": resolved_brand_id, "source": "cmo.brand_profile_cache", "status": "missing", "brand": None, "brand_guideline_text": "", "design_md": "", "palette_tokens": [], "typography_tokens": [], "spacing_tokens": [], "voice_rules": [], "memory_hints": [], "secondbrain": {"status": "not_checked", "hints": []}, "deferred_ports": ["secondbrain-writeback", "svrnty-vision"], } db_path = _cmo_db_path() base_context["cmo_db_path"] = db_path if not db_path: base_context["error"] = "CMO database not found" return base_context try: row = _read_brand_cache(db_path, resolved_brand_id) except sqlite3.Error as e: base_context["status"] = "error" base_context["error"] = f"CMO database read failed: {e}" return base_context if not row: base_context["error"] = f"brand_profile_cache row not found for {resolved_brand_id}" return base_context try: blob = json.loads(row["json"] or "{}") except ValueError as e: base_context["status"] = "error" base_context["error"] = f"brand_profile_cache JSON invalid: {e}" return base_context brand = blob.get("brand") if isinstance(blob.get("brand"), dict) else None guideline = str(blob.get("brand_guideline") or "") design_md = str(blob.get("design_md") or "") palette = _normalise_token_list(blob.get("palette")) typography = _normalise_token_list(blob.get("typography")) spacing = _normalise_token_list(blob.get("spacing")) voice_rules = _extract_voice_rules(blob.get("voice"), guideline) memory_hints = _build_memory_hints(brand, guideline, palette, typography, spacing, voice_rules) secondbrain = _load_secondbrain_hints(brand, resolved_brand_id) memory_hints.extend(secondbrain.get("hints") or []) base_context.update({ "status": "ready", "cache_brand_id": row["brand_id"], "source_version": row["version"], "fetched_at": row["fetched_at"], "brand": brand, "brand_guideline_text": guideline, "brand_guideline_excerpt": _excerpt(guideline, 1800), "design_md": design_md, "design_md_excerpt": _excerpt(design_md, 1400), "palette_tokens": palette, "typography_tokens": typography, "spacing_tokens": spacing, "voice_rules": voice_rules, "memory_hints": memory_hints[:14], "secondbrain": secondbrain, "tokens_imported": bool(blob.get("tokens_imported") or palette or typography or spacing), }) return base_context def _resolve_brand_id(brand_id: str) -> str: value = (brand_id or "").strip() return _BRAND_ALIASES.get(value.lower(), value) def _cmo_db_path() -> str: candidates = [] for env_name in ("CANVAS_CMO_DB", "CMO_DB"): env_path = os.environ.get(env_name) if env_path: candidates.append(env_path) candidates.extend(_DEFAULT_CMO_DB_PATHS) for path in candidates: expanded = os.path.expanduser(path) if os.path.exists(expanded): return expanded return "" def _read_brand_cache(db_path: str, brand_id: str): conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: row = conn.execute( """SELECT brand_id, json, version, fetched_at FROM brand_profile_cache WHERE brand_id = ?""", (brand_id,), ).fetchone() return row finally: conn.close() def _normalise_token_list(raw_tokens) -> list: if not isinstance(raw_tokens, list): return [] tokens = [] for item in raw_tokens[:80]: if not isinstance(item, dict): continue token = { "path": item.get("path") or item.get("name") or item.get("key"), "type": item.get("type"), "description": item.get("description"), } value = item.get("value") if value is None: value = item.get("valueJson") if isinstance(value, str): try: value = json.loads(value) except ValueError: pass token["value"] = value tokens.append({k: v for k, v in token.items() if v not in (None, "")}) return tokens def _extract_voice_rules(voice, guideline: str) -> list: rules = [] if isinstance(voice, dict): for key in ("tone", "voice", "dos", "donts", "rules", "pillars", "description"): value = voice.get(key) or voice.get(key[0].upper() + key[1:]) if isinstance(value, str) and value.strip(): rules.append(value.strip()) elif isinstance(value, list): rules.extend(str(item).strip() for item in value if str(item).strip()) elif isinstance(voice, str) and voice.strip(): rules.append(voice.strip()) if not rules and guideline: for line in guideline.splitlines(): clean = line.strip(" -\t") if clean and any(word in clean.lower() for word in ("voice", "tone", "avoid", "must", "never")): rules.append(clean) if len(rules) >= 8: break return rules[:12] def _build_memory_hints(brand, guideline, palette, typography, spacing, voice_rules) -> list: hints = [] if isinstance(brand, dict): name = brand.get("displayName") or brand.get("name") description = brand.get("description") if name: hints.append(f"Brand: {name}") if description: hints.append(f"Brand description: {_excerpt(str(description), 260)}") if palette: colors = [] for token in palette: value = token.get("value") if isinstance(value, dict): colors.extend(str(v) for v in value.values() if str(v).startswith("#")) elif isinstance(value, str) and value.startswith("#"): colors.append(value) if colors: hints.append("Use BTE palette tokens: " + ", ".join(dict.fromkeys(colors[:8]))) if typography: type_paths = [str(t.get("path")) for t in typography if t.get("path")] if type_paths: hints.append("Typography context: " + ", ".join(type_paths[:8])) if spacing: hints.append("Respect BTE spacing tokens and density rhythm.") if voice_rules: hints.append("Voice constraints: " + " | ".join(_excerpt(rule, 180) for rule in voice_rules[:3])) if guideline: hints.append("Use BTE brand guideline as primary public-plane creative truth.") return hints[:10] def _load_secondbrain_hints(brand, brand_id: str) -> dict: """Read a few relevant knowledge capsules through local psql when configured.""" password = os.environ.get("SECONDBRAIN_DB_PASSWORD") or os.environ.get("PGPASSWORD") database_url = os.environ.get("SECONDBRAIN_DATABASE_URL") if not database_url and not password: return {"status": "unconfigured", "hints": []} psql = os.environ.get("PSQL_BIN", "psql") sql = """ SELECT COALESCE(json_agg(row_to_json(t))::text, '[]') FROM ( SELECT id, title, summary, sector, created_at FROM knowledge_capsules WHERE sector IN ('svrnty', 'planb', 'personal') AND ( title ILIKE '%design%' OR summary ILIKE '%design%' OR content ILIKE '%design%' OR title ILIKE '%brand%' OR summary ILIKE '%brand%' OR content ILIKE '%brand%' OR title ILIKE '%canvas%' OR summary ILIKE '%canvas%' OR content ILIKE '%canvas%' OR title ILIKE '%stitch%' OR summary ILIKE '%stitch%' OR content ILIKE '%stitch%' ) ORDER BY created_at DESC LIMIT 4 ) AS t; """.strip() cmd = [psql] env = os.environ.copy() if database_url: cmd.append(database_url) else: env["PGPASSWORD"] = password cmd.extend([ "-h", os.environ.get("SECONDBRAIN_DB_HOST", "localhost"), "-p", os.environ.get("SECONDBRAIN_DB_PORT", "5435"), "-U", os.environ.get("SECONDBRAIN_DB_USER", "svrnty"), "-d", os.environ.get("SECONDBRAIN_DB_NAME", "secondbrain"), ]) cmd.extend(["-tA", "-c", sql]) try: result = subprocess.run( cmd, env=env, check=False, capture_output=True, text=True, timeout=2.5, ) except FileNotFoundError: return {"status": "psql_missing", "hints": []} except subprocess.TimeoutExpired: return {"status": "timeout", "hints": []} except Exception as e: return {"status": "error", "hints": [], "error": str(e)} if result.returncode != 0: return {"status": "error", "hints": [], "error": _excerpt(result.stderr, 240)} try: rows = json.loads((result.stdout or "[]").strip() or "[]") except ValueError as e: return {"status": "error", "hints": [], "error": f"invalid secondbrain JSON: {e}"} if not isinstance(rows, list): rows = [] hints = [] capsules = [] brand_name = "" if isinstance(brand, dict): brand_name = brand.get("displayName") or brand.get("name") or "" for row in rows[:4]: if not isinstance(row, dict): continue title = str(row.get("title") or "Untitled capsule") summary = _excerpt(str(row.get("summary") or ""), 260) capsule = { "id": row.get("id"), "title": title, "sector": row.get("sector"), "created_at": row.get("created_at"), } capsules.append({k: v for k, v in capsule.items() if v not in (None, "")}) hint = f"Secondbrain capsule {row.get('id')}: {title}" if summary: hint += f" — {summary}" hints.append(hint) return { "status": "ready", "source": "secondbrain.knowledge_capsules", "brand_id": brand_id, "brand_name": brand_name, "capsules": capsules, "hints": hints, } def _excerpt(text: str, limit: int) -> str: cleaned = " ".join((text or "").split()) if len(cleaned) <= limit: return cleaned return cleaned[: max(0, limit - 3)].rstrip() + "..." def _compact_design_context(context: dict) -> dict: return { "brand_id": context.get("brand_id"), "resolved_brand_id": context.get("resolved_brand_id"), "status": context.get("status"), "source": context.get("source"), "source_version": context.get("source_version"), "fetched_at": context.get("fetched_at"), "tokens_imported": context.get("tokens_imported"), "memory_hints": context.get("memory_hints") or [], "secondbrain_status": (context.get("secondbrain") or {}).get("status"), } def _compose_generation_prompt(prompt: str, context: dict) -> str: if not context or context.get("status") != "ready": return prompt pieces = [prompt.strip()] brand = context.get("brand") or {} brand_name = brand.get("displayName") or brand.get("name") if brand_name: pieces.append(f"Brand context: {brand_name}.") hints = context.get("memory_hints") or [] if hints: pieces.append("BTE/CMO design constraints:\n- " + "\n- ".join(str(h) for h in hints[:8])) guideline = context.get("brand_guideline_excerpt") or context.get("design_md_excerpt") or "" if guideline: pieces.append("Brand guideline excerpt:\n" + _excerpt(guideline, 1600)) pieces.append("Generate draft-only canvas output for Hermes WebUI; do not publish or imply approval.") return "\n\n".join(piece for piece in pieces if piece) def _read_json_body(handler) -> dict: length = int(handler.headers.get("Content-Length", "0") or 0) if length <= 0: return {} raw = handler.rfile.read(length) try: payload = json.loads(raw.decode("utf-8")) except Exception as e: raise ValueError(f"invalid JSON: {e}") if not isinstance(payload, dict): raise ValueError("body must be a JSON object") return payload def _ensure_project(name: str) -> dict: projects = _canva_json("/api/v1/projects", "GET", None, timeout=15) if isinstance(projects, list) and projects: return projects[0] return _canva_json("/api/v1/projects", "POST", {"name": name}, timeout=15) def _canva_json(path: str, method: str = "GET", payload=None, timeout: int = 45): data = None if payload is not None: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(_base_url() + path, data=data, method=method) if data is not None: req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() if not raw: return {} return json.loads(raw.decode("utf-8")) def _record_id(record) -> str: if not isinstance(record, dict): return "" return str(record.get("id") or record.get("ID") or record.get("uuid") or "") def _emit_event(event_type: str, payload: dict) -> dict: global _EVENT_CURSOR with _EVENT_LOCK: _EVENT_CURSOR += 1 event = { "id": _EVENT_CURSOR, "type": event_type, "at": time.time(), **payload, } _EVENTS.append(event) del _EVENTS[:-_MAX_EVENTS] return dict(event) def _events_since(cursor: int) -> list: with _EVENT_LOCK: return [dict(event) for event in _EVENTS if event["id"] > cursor] def _current_event_cursor() -> int: with _EVENT_LOCK: return _EVENT_CURSOR def _safe_int(value, fallback: int) -> int: try: return int(value) except Exception: return fallback def _send_json(handler, payload: dict, status: int) -> bool: body = json.dumps(payload).encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json; charset=utf-8") handler.send_header("Content-Length", str(len(body))) handler.send_header("Cache-Control", "no-store") handler.end_headers() handler.wfile.write(body) return True