Spaces:
Running
Running
File size: 7,046 Bytes
259349d 38bbab9 259349d 38bbab9 259349d 38bbab9 3a5c3c8 38bbab9 259349d 38bbab9 259349d 38bbab9 3a5c3c8 259349d 3a5c3c8 259349d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | from __future__ import annotations
import json
import threading
from dataclasses import asdict, dataclass
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from time import sleep
from typing import Any
from uuid import uuid4
from livekit import api as livekit_api
from src.api.livekit_tokens import create_room_token, ensure_agent_dispatched_sync
from src.core.logger import logger
from src.core.settings import settings
BOOTSTRAP_MAX_ATTEMPTS = 4
BOOTSTRAP_RETRY_DELAYS_SEC = (0.25, 0.5, 1.0)
RETRYABLE_TWIRP_CODES = {
livekit_api.TwirpErrorCode.ABORTED,
livekit_api.TwirpErrorCode.DEADLINE_EXCEEDED,
livekit_api.TwirpErrorCode.INTERNAL,
livekit_api.TwirpErrorCode.RESOURCE_EXHAUSTED,
livekit_api.TwirpErrorCode.UNAVAILABLE,
livekit_api.TwirpErrorCode.UNKNOWN,
}
CLIENT_BOOTSTRAP_ERROR_MESSAGE = "Could not initialize voice session. Please try again."
@dataclass(frozen=True)
class SessionBootstrapPayload:
room_name: str
token: str
participant_identity: str
session_id: str
dispatch_id: str | None
dispatch_worker_id: str | None
def build_session_bootstrap_payload() -> SessionBootstrapPayload:
"""Create a brand-new room/token/dispatch payload for a connect attempt."""
attempt = 1
while True:
try:
return _build_session_bootstrap_payload_once()
except Exception as exc:
if attempt >= BOOTSTRAP_MAX_ATTEMPTS or not _is_retryable_bootstrap_error(exc):
raise
delay = BOOTSTRAP_RETRY_DELAYS_SEC[min(attempt - 1, len(BOOTSTRAP_RETRY_DELAYS_SEC) - 1)]
logger.warning(
"Bootstrap payload attempt %s/%s failed with retryable error (%s): %s; retrying in %.2fs",
attempt,
BOOTSTRAP_MAX_ATTEMPTS,
type(exc).__name__,
exc,
delay,
)
sleep(delay)
attempt += 1
def _build_session_bootstrap_payload_once() -> SessionBootstrapPayload:
"""Create one room/token/dispatch payload without retries."""
room_name = f"voice-{uuid4().hex[:8]}"
token_data = create_room_token(room_name=room_name)
session_id = str(uuid4())
metadata_payload = {
"type": "session_meta",
"session_id": session_id,
"participant_id": token_data.identity,
}
dispatch = ensure_agent_dispatched_sync(
room_name=room_name,
agent_name=settings.livekit.LIVEKIT_AGENT_NAME,
metadata=json.dumps(metadata_payload),
reset_existing=True,
)
assigned_worker_id = None
for job in getattr(dispatch.state, "jobs", []):
state = getattr(job, "state", None)
if state and getattr(state, "worker_id", None):
assigned_worker_id = state.worker_id
break
payload = SessionBootstrapPayload(
room_name=room_name,
token=token_data.token,
participant_identity=token_data.identity,
session_id=session_id,
dispatch_id=getattr(dispatch, "id", None),
dispatch_worker_id=assigned_worker_id,
)
logger.info(
"Prepared session bootstrap payload room=%s participant=%s dispatch_id=%s worker_id=%s",
payload.room_name,
payload.participant_identity,
payload.dispatch_id or "none",
payload.dispatch_worker_id or "unassigned",
)
return payload
def _is_retryable_bootstrap_error(exc: Exception) -> bool:
if isinstance(exc, livekit_api.TwirpError):
return exc.code in RETRYABLE_TWIRP_CODES
if isinstance(exc, (TimeoutError, OSError, ConnectionError)):
return True
return False
def build_bootstrap_error_payload(_: Exception | None = None) -> dict[str, str]:
"""Build a client-safe error response payload without internal details."""
return {
"error": "bootstrap_failed",
"message": CLIENT_BOOTSTRAP_ERROR_MESSAGE,
}
class _SessionBootstrapHandler(BaseHTTPRequestHandler):
server_version = "OpenVoiceAgentBootstrap/1.0"
protocol_version = "HTTP/1.1"
def do_GET(self) -> None: # noqa: N802
if self.path.split("?", 1)[0] != "/session/bootstrap":
self._write_json({"error": "not_found"}, status=HTTPStatus.NOT_FOUND)
return
try:
payload = build_session_bootstrap_payload()
except Exception as exc: # pragma: no cover - exercised by integration flow
logger.exception("Failed to create session bootstrap payload: %s", exc)
self._write_json(
build_bootstrap_error_payload(exc),
status=HTTPStatus.INTERNAL_SERVER_ERROR,
)
return
self._write_json(asdict(payload), status=HTTPStatus.OK)
def do_OPTIONS(self) -> None: # noqa: N802
self.send_response(HTTPStatus.NO_CONTENT)
self._write_cors_headers()
self.send_header("Content-Length", "0")
self.end_headers()
def log_message(self, format: str, *args: Any) -> None:
# Route server access logs through project logger to keep output consistent.
logger.debug("Session bootstrap server: " + format, *args)
def _write_json(self, payload: dict[str, Any], *, status: HTTPStatus) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status)
self._write_cors_headers()
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _write_cors_headers(self) -> None:
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
_server_lock = threading.Lock()
_bootstrap_server: ThreadingHTTPServer | None = None
_bootstrap_thread: threading.Thread | None = None
def ensure_session_bootstrap_server() -> str:
"""Start bootstrap server once and return its URL."""
global _bootstrap_server, _bootstrap_thread
with _server_lock:
if _bootstrap_server is None:
server = ThreadingHTTPServer(("127.0.0.1", 0), _SessionBootstrapHandler)
server.daemon_threads = True
thread = threading.Thread(
target=server.serve_forever,
name="session-bootstrap-server",
daemon=True,
)
thread.start()
_bootstrap_server = server
_bootstrap_thread = thread
logger.info(
"Session bootstrap server started at http://127.0.0.1:%s/session/bootstrap",
server.server_port,
)
if _bootstrap_server is None:
raise RuntimeError("Session bootstrap server did not initialize")
return f"http://127.0.0.1:{_bootstrap_server.server_port}/session/bootstrap"
|