Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import threading | |
| from dataclasses import asdict, dataclass | |
| from http import HTTPStatus | |
| from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer | |
| from time import sleep | |
| from typing import Any | |
| from uuid import uuid4 | |
| from livekit import api as livekit_api | |
| from src.api.livekit_tokens import create_room_token, ensure_agent_dispatched_sync | |
| from src.core.logger import logger | |
| from src.core.settings import settings | |
| BOOTSTRAP_MAX_ATTEMPTS = 4 | |
| BOOTSTRAP_RETRY_DELAYS_SEC = (0.25, 0.5, 1.0) | |
| RETRYABLE_TWIRP_CODES = { | |
| livekit_api.TwirpErrorCode.ABORTED, | |
| livekit_api.TwirpErrorCode.DEADLINE_EXCEEDED, | |
| livekit_api.TwirpErrorCode.INTERNAL, | |
| livekit_api.TwirpErrorCode.RESOURCE_EXHAUSTED, | |
| livekit_api.TwirpErrorCode.UNAVAILABLE, | |
| livekit_api.TwirpErrorCode.UNKNOWN, | |
| } | |
| CLIENT_BOOTSTRAP_ERROR_MESSAGE = "Could not initialize voice session. Please try again." | |
| class SessionBootstrapPayload: | |
| room_name: str | |
| token: str | |
| participant_identity: str | |
| session_id: str | |
| dispatch_id: str | None | |
| dispatch_worker_id: str | None | |
| def build_session_bootstrap_payload() -> SessionBootstrapPayload: | |
| """Create a brand-new room/token/dispatch payload for a connect attempt.""" | |
| attempt = 1 | |
| while True: | |
| try: | |
| return _build_session_bootstrap_payload_once() | |
| except Exception as exc: | |
| if attempt >= BOOTSTRAP_MAX_ATTEMPTS or not _is_retryable_bootstrap_error(exc): | |
| raise | |
| delay = BOOTSTRAP_RETRY_DELAYS_SEC[min(attempt - 1, len(BOOTSTRAP_RETRY_DELAYS_SEC) - 1)] | |
| logger.warning( | |
| "Bootstrap payload attempt %s/%s failed with retryable error (%s): %s; retrying in %.2fs", | |
| attempt, | |
| BOOTSTRAP_MAX_ATTEMPTS, | |
| type(exc).__name__, | |
| exc, | |
| delay, | |
| ) | |
| sleep(delay) | |
| attempt += 1 | |
| def _build_session_bootstrap_payload_once() -> SessionBootstrapPayload: | |
| """Create one room/token/dispatch payload without retries.""" | |
| room_name = f"voice-{uuid4().hex[:8]}" | |
| token_data = create_room_token(room_name=room_name) | |
| session_id = str(uuid4()) | |
| metadata_payload = { | |
| "type": "session_meta", | |
| "session_id": session_id, | |
| "participant_id": token_data.identity, | |
| } | |
| dispatch = ensure_agent_dispatched_sync( | |
| room_name=room_name, | |
| agent_name=settings.livekit.LIVEKIT_AGENT_NAME, | |
| metadata=json.dumps(metadata_payload), | |
| reset_existing=True, | |
| ) | |
| assigned_worker_id = None | |
| for job in getattr(dispatch.state, "jobs", []): | |
| state = getattr(job, "state", None) | |
| if state and getattr(state, "worker_id", None): | |
| assigned_worker_id = state.worker_id | |
| break | |
| payload = SessionBootstrapPayload( | |
| room_name=room_name, | |
| token=token_data.token, | |
| participant_identity=token_data.identity, | |
| session_id=session_id, | |
| dispatch_id=getattr(dispatch, "id", None), | |
| dispatch_worker_id=assigned_worker_id, | |
| ) | |
| logger.info( | |
| "Prepared session bootstrap payload room=%s participant=%s dispatch_id=%s worker_id=%s", | |
| payload.room_name, | |
| payload.participant_identity, | |
| payload.dispatch_id or "none", | |
| payload.dispatch_worker_id or "unassigned", | |
| ) | |
| return payload | |
| def _is_retryable_bootstrap_error(exc: Exception) -> bool: | |
| if isinstance(exc, livekit_api.TwirpError): | |
| return exc.code in RETRYABLE_TWIRP_CODES | |
| if isinstance(exc, (TimeoutError, OSError, ConnectionError)): | |
| return True | |
| return False | |
| def build_bootstrap_error_payload(_: Exception | None = None) -> dict[str, str]: | |
| """Build a client-safe error response payload without internal details.""" | |
| return { | |
| "error": "bootstrap_failed", | |
| "message": CLIENT_BOOTSTRAP_ERROR_MESSAGE, | |
| } | |
| class _SessionBootstrapHandler(BaseHTTPRequestHandler): | |
| server_version = "OpenVoiceAgentBootstrap/1.0" | |
| protocol_version = "HTTP/1.1" | |
| def do_GET(self) -> None: # noqa: N802 | |
| if self.path.split("?", 1)[0] != "/session/bootstrap": | |
| self._write_json({"error": "not_found"}, status=HTTPStatus.NOT_FOUND) | |
| return | |
| try: | |
| payload = build_session_bootstrap_payload() | |
| except Exception as exc: # pragma: no cover - exercised by integration flow | |
| logger.exception("Failed to create session bootstrap payload: %s", exc) | |
| self._write_json( | |
| build_bootstrap_error_payload(exc), | |
| status=HTTPStatus.INTERNAL_SERVER_ERROR, | |
| ) | |
| return | |
| self._write_json(asdict(payload), status=HTTPStatus.OK) | |
| def do_OPTIONS(self) -> None: # noqa: N802 | |
| self.send_response(HTTPStatus.NO_CONTENT) | |
| self._write_cors_headers() | |
| self.send_header("Content-Length", "0") | |
| self.end_headers() | |
| def log_message(self, format: str, *args: Any) -> None: | |
| # Route server access logs through project logger to keep output consistent. | |
| logger.debug("Session bootstrap server: " + format, *args) | |
| def _write_json(self, payload: dict[str, Any], *, status: HTTPStatus) -> None: | |
| body = json.dumps(payload).encode("utf-8") | |
| self.send_response(status) | |
| self._write_cors_headers() | |
| self.send_header("Content-Type", "application/json; charset=utf-8") | |
| self.send_header("Cache-Control", "no-store") | |
| self.send_header("Content-Length", str(len(body))) | |
| self.end_headers() | |
| self.wfile.write(body) | |
| def _write_cors_headers(self) -> None: | |
| self.send_header("Access-Control-Allow-Origin", "*") | |
| self.send_header("Access-Control-Allow-Methods", "GET, OPTIONS") | |
| self.send_header("Access-Control-Allow-Headers", "Content-Type") | |
| _server_lock = threading.Lock() | |
| _bootstrap_server: ThreadingHTTPServer | None = None | |
| _bootstrap_thread: threading.Thread | None = None | |
| def ensure_session_bootstrap_server() -> str: | |
| """Start bootstrap server once and return its URL.""" | |
| global _bootstrap_server, _bootstrap_thread | |
| with _server_lock: | |
| if _bootstrap_server is None: | |
| server = ThreadingHTTPServer(("127.0.0.1", 0), _SessionBootstrapHandler) | |
| server.daemon_threads = True | |
| thread = threading.Thread( | |
| target=server.serve_forever, | |
| name="session-bootstrap-server", | |
| daemon=True, | |
| ) | |
| thread.start() | |
| _bootstrap_server = server | |
| _bootstrap_thread = thread | |
| logger.info( | |
| "Session bootstrap server started at http://127.0.0.1:%s/session/bootstrap", | |
| server.server_port, | |
| ) | |
| if _bootstrap_server is None: | |
| raise RuntimeError("Session bootstrap server did not initialize") | |
| return f"http://127.0.0.1:{_bootstrap_server.server_port}/session/bootstrap" | |