"""POST /api/transcribe + voice-message audio processor. Migrated from hermes-webui fork commit 014b9eef (now reverted) per Phase 2.1 of the SVRNTY-HERMES Plugin Protocol. Uses the loader's new public API method `api.register_audio_attachment_processor` so streaming.py can pull transcripts of voice-message attachments into the agent-visible text WITHOUT any further fork patch. Configuration (read at call time, never persisted): HERMES_WEBUI_STT_URL external STT endpoint (OpenAI-shape or WhisperX) HERMES_WEBUI_STT_KEY optional bearer token Endpoints + processors: POST /api/transcribe direct one-shot transcription audio_attachment_processor called by streaming.py before agent receives msg Public API surface used: register_route, register_audio_attachment_processor, logger. No forced internal dependencies. """ import email import email.parser import email.policy import io import json import mimetypes import os import re import tempfile import urllib.request import uuid _VOICE_MSG_AUDIO_EXTS = ('.m4a', '.aac', '.oga', '.opus', '.wav', '.mp3', '.flac', '.ogg', '.webm') def register(api): """Wire route + audio processor.""" log = api.logger("svrnty.routes.transcribe") api.register_route("/api/transcribe", "POST", _handle_transcribe) api.register_audio_attachment_processor(_transcribe_audio_attachments) log.info("transcribe endpoint + audio processor registered") def _external_stt_transcribe(audio_path: str, url: str, api_key: str) -> str: """POST audio to an external STT endpoint (multipart `file`). Handles OpenAI-shaped servers (top-level `text`) and WhisperX-style servers (`segments[].text`). Stdlib only. """ boundary = '----webui' + uuid.uuid4().hex fname = os.path.basename(audio_path) or 'audio.webm' ctype = mimetypes.guess_type(fname)[0] or 'application/octet-stream' with open(audio_path, 'rb') as f: audio = f.read() body = b''.join([ ('--' + boundary + '\r\n' 'Content-Disposition: form-data; name="file"; filename="' + fname + '"\r\n' 'Content-Type: ' + ctype + '\r\n\r\n').encode(), audio, ('\r\n--' + boundary + '\r\n' 'Content-Disposition: form-data; name="model"\r\n\r\nwhisper-1').encode(), ('\r\n--' + boundary + '--\r\n').encode(), ]) headers = {'Content-Type': 'multipart/form-data; boundary=' + boundary} if api_key: headers['Authorization'] = 'Bearer ' + api_key req = urllib.request.Request(url, data=body, headers=headers) with urllib.request.urlopen(req, timeout=300) as resp: data = json.loads(resp.read()) text = str(data.get('text') or '').strip() if not text: segs = data.get('segments') or [] text = ' '.join(str(s.get('text', '')).strip() for s in segs).strip() return text def _transcribe_audio_attachments(attachments) -> str: """Audio-attachment processor — registered via the loader. Scans attachments for voice-message audio files; transcribes each via the configured STT endpoint; returns a single text block to prepend to the agent-visible message. Empty string when no audio / STT not configured. """ stt_url = os.environ.get('HERMES_WEBUI_STT_URL', '').strip() if not stt_url or not attachments: return '' stt_key = os.environ.get('HERMES_WEBUI_STT_KEY', '').strip() parts = [] for att in attachments or []: if not isinstance(att, dict): continue path = str(att.get('path') or '') mime = str(att.get('mime') or '').lower() name = str(att.get('name') or '') or path is_audio = ( os.path.basename(name).startswith('voice-message') or mime.startswith('audio/') or os.path.splitext(name)[1].lower() in _VOICE_MSG_AUDIO_EXTS ) if not is_audio or not path: continue try: text = _external_stt_transcribe(path, stt_url, stt_key) except Exception: print(f'[svrnty] voice-message transcription failed for {name}', flush=True) text = '' if text: parts.append(text) return '[Voice message transcript]\n' + '\n\n'.join(parts) if parts else '' def _handle_transcribe(handler, parsed): """POST /api/transcribe — direct one-shot transcription. Reads a multipart form with field `file` (the recorded audio blob), writes it to a temp file, sends it to the configured STT endpoint, returns `{"ok": true, "transcript": "..."}`. """ stt_url = os.environ.get('HERMES_WEBUI_STT_URL', '').strip() if not stt_url: return _send_json(handler, {'ok': False, 'error': 'HERMES_WEBUI_STT_URL not configured'}, 503) ctype = handler.headers.get('Content-Type', '') if 'multipart' not in ctype.lower(): return _send_json(handler, {'ok': False, 'error': 'multipart/form-data required'}, 400) length = int(handler.headers.get('Content-Length', '0') or 0) if not length: return _send_json(handler, {'ok': False, 'error': 'empty body'}, 400) body = handler.rfile.read(length) file_bytes, fname = _parse_multipart_file(body, ctype, field_name='file') if file_bytes is None: return _send_json(handler, {'ok': False, 'error': "missing 'file' field"}, 400) fname = fname or 'audio.webm' suffix = os.path.splitext(fname)[1] or '.webm' temp_path = None try: with tempfile.NamedTemporaryFile(prefix='svrnty-stt-', suffix=suffix, delete=False) as tmp: temp_path = tmp.name tmp.write(file_bytes) transcript = _external_stt_transcribe( temp_path, stt_url, os.environ.get('HERMES_WEBUI_STT_KEY', '').strip()) return _send_json(handler, {'ok': True, 'transcript': transcript}, 200) except Exception as e: return _send_json(handler, {'ok': False, 'error': str(e)}, 500) finally: if temp_path and os.path.exists(temp_path): try: os.remove(temp_path) except OSError: pass def _parse_multipart_file(body: bytes, content_type: str, field_name: str = 'file'): """Parse a multipart body and return (file_bytes, filename) for the named field. Stdlib only. cgi.FieldStorage was removed in Python 3.13 (PEP 594), so we parse via the email module which is the documented replacement. Returns (None, None) when the named field is absent. """ # Construct a fake email message so email.parser handles the multipart split. full = b'Content-Type: ' + content_type.encode() + b'\r\n\r\n' + body parser = email.parser.BytesParser(policy=email.policy.default) msg = parser.parsebytes(full) if not msg.is_multipart(): return None, None for part in msg.iter_parts(): disp = part.get('Content-Disposition', '') m = re.search(r'name="([^"]+)"', disp) if not m or m.group(1) != field_name: continue fn_m = re.search(r'filename="([^"]+)"', disp) filename = fn_m.group(1) if fn_m else None payload = part.get_payload(decode=True) return payload, filename return None, None def _send_json(handler, payload: dict, status: int) -> bool: body = json.dumps(payload).encode('utf-8') handler.send_response(status) handler.send_header('Content-Type', 'application/json; charset=utf-8') handler.send_header('Content-Length', str(len(body))) handler.send_header('Cache-Control', 'no-store') handler.end_headers() handler.wfile.write(body) return True