File size: 7,046 Bytes
259349d
 
 
 
 
 
 
38bbab9
259349d
 
 
38bbab9
 
259349d
 
 
 
38bbab9
 
 
 
 
 
 
 
 
 
3a5c3c8
38bbab9
259349d
 
 
 
 
 
 
 
 
 
 
 
 
38bbab9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259349d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38bbab9
 
 
 
 
 
 
 
3a5c3c8
 
 
 
 
 
 
 
259349d
 
 
 
 
 
 
 
 
 
 
 
 
 
3a5c3c8
259349d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from __future__ import annotations

import json
import threading
from dataclasses import asdict, dataclass
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from time import sleep
from typing import Any
from uuid import uuid4

from livekit import api as livekit_api

from src.api.livekit_tokens import create_room_token, ensure_agent_dispatched_sync
from src.core.logger import logger
from src.core.settings import settings

BOOTSTRAP_MAX_ATTEMPTS = 4
BOOTSTRAP_RETRY_DELAYS_SEC = (0.25, 0.5, 1.0)
RETRYABLE_TWIRP_CODES = {
    livekit_api.TwirpErrorCode.ABORTED,
    livekit_api.TwirpErrorCode.DEADLINE_EXCEEDED,
    livekit_api.TwirpErrorCode.INTERNAL,
    livekit_api.TwirpErrorCode.RESOURCE_EXHAUSTED,
    livekit_api.TwirpErrorCode.UNAVAILABLE,
    livekit_api.TwirpErrorCode.UNKNOWN,
}
CLIENT_BOOTSTRAP_ERROR_MESSAGE = "Could not initialize voice session. Please try again."


@dataclass(frozen=True)
class SessionBootstrapPayload:
    room_name: str
    token: str
    participant_identity: str
    session_id: str
    dispatch_id: str | None
    dispatch_worker_id: str | None


def build_session_bootstrap_payload() -> SessionBootstrapPayload:
    """Create a brand-new room/token/dispatch payload for a connect attempt."""
    attempt = 1
    while True:
        try:
            return _build_session_bootstrap_payload_once()
        except Exception as exc:
            if attempt >= BOOTSTRAP_MAX_ATTEMPTS or not _is_retryable_bootstrap_error(exc):
                raise

            delay = BOOTSTRAP_RETRY_DELAYS_SEC[min(attempt - 1, len(BOOTSTRAP_RETRY_DELAYS_SEC) - 1)]
            logger.warning(
                "Bootstrap payload attempt %s/%s failed with retryable error (%s): %s; retrying in %.2fs",
                attempt,
                BOOTSTRAP_MAX_ATTEMPTS,
                type(exc).__name__,
                exc,
                delay,
            )
            sleep(delay)
            attempt += 1


def _build_session_bootstrap_payload_once() -> SessionBootstrapPayload:
    """Create one room/token/dispatch payload without retries."""
    room_name = f"voice-{uuid4().hex[:8]}"
    token_data = create_room_token(room_name=room_name)
    session_id = str(uuid4())
    metadata_payload = {
        "type": "session_meta",
        "session_id": session_id,
        "participant_id": token_data.identity,
    }
    dispatch = ensure_agent_dispatched_sync(
        room_name=room_name,
        agent_name=settings.livekit.LIVEKIT_AGENT_NAME,
        metadata=json.dumps(metadata_payload),
        reset_existing=True,
    )

    assigned_worker_id = None
    for job in getattr(dispatch.state, "jobs", []):
        state = getattr(job, "state", None)
        if state and getattr(state, "worker_id", None):
            assigned_worker_id = state.worker_id
            break

    payload = SessionBootstrapPayload(
        room_name=room_name,
        token=token_data.token,
        participant_identity=token_data.identity,
        session_id=session_id,
        dispatch_id=getattr(dispatch, "id", None),
        dispatch_worker_id=assigned_worker_id,
    )
    logger.info(
        "Prepared session bootstrap payload room=%s participant=%s dispatch_id=%s worker_id=%s",
        payload.room_name,
        payload.participant_identity,
        payload.dispatch_id or "none",
        payload.dispatch_worker_id or "unassigned",
    )
    return payload


def _is_retryable_bootstrap_error(exc: Exception) -> bool:
    if isinstance(exc, livekit_api.TwirpError):
        return exc.code in RETRYABLE_TWIRP_CODES
    if isinstance(exc, (TimeoutError, OSError, ConnectionError)):
        return True
    return False


def build_bootstrap_error_payload(_: Exception | None = None) -> dict[str, str]:
    """Build a client-safe error response payload without internal details."""
    return {
        "error": "bootstrap_failed",
        "message": CLIENT_BOOTSTRAP_ERROR_MESSAGE,
    }


class _SessionBootstrapHandler(BaseHTTPRequestHandler):
    server_version = "OpenVoiceAgentBootstrap/1.0"
    protocol_version = "HTTP/1.1"

    def do_GET(self) -> None:  # noqa: N802
        if self.path.split("?", 1)[0] != "/session/bootstrap":
            self._write_json({"error": "not_found"}, status=HTTPStatus.NOT_FOUND)
            return

        try:
            payload = build_session_bootstrap_payload()
        except Exception as exc:  # pragma: no cover - exercised by integration flow
            logger.exception("Failed to create session bootstrap payload: %s", exc)
            self._write_json(
                build_bootstrap_error_payload(exc),
                status=HTTPStatus.INTERNAL_SERVER_ERROR,
            )
            return

        self._write_json(asdict(payload), status=HTTPStatus.OK)

    def do_OPTIONS(self) -> None:  # noqa: N802
        self.send_response(HTTPStatus.NO_CONTENT)
        self._write_cors_headers()
        self.send_header("Content-Length", "0")
        self.end_headers()

    def log_message(self, format: str, *args: Any) -> None:
        # Route server access logs through project logger to keep output consistent.
        logger.debug("Session bootstrap server: " + format, *args)

    def _write_json(self, payload: dict[str, Any], *, status: HTTPStatus) -> None:
        body = json.dumps(payload).encode("utf-8")
        self.send_response(status)
        self._write_cors_headers()
        self.send_header("Content-Type", "application/json; charset=utf-8")
        self.send_header("Cache-Control", "no-store")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def _write_cors_headers(self) -> None:
        self.send_header("Access-Control-Allow-Origin", "*")
        self.send_header("Access-Control-Allow-Methods", "GET, OPTIONS")
        self.send_header("Access-Control-Allow-Headers", "Content-Type")


_server_lock = threading.Lock()
_bootstrap_server: ThreadingHTTPServer | None = None
_bootstrap_thread: threading.Thread | None = None


def ensure_session_bootstrap_server() -> str:
    """Start bootstrap server once and return its URL."""
    global _bootstrap_server, _bootstrap_thread

    with _server_lock:
        if _bootstrap_server is None:
            server = ThreadingHTTPServer(("127.0.0.1", 0), _SessionBootstrapHandler)
            server.daemon_threads = True
            thread = threading.Thread(
                target=server.serve_forever,
                name="session-bootstrap-server",
                daemon=True,
            )
            thread.start()
            _bootstrap_server = server
            _bootstrap_thread = thread
            logger.info(
                "Session bootstrap server started at http://127.0.0.1:%s/session/bootstrap",
                server.server_port,
            )

        if _bootstrap_server is None:
            raise RuntimeError("Session bootstrap server did not initialize")
        return f"http://127.0.0.1:{_bootstrap_server.server_port}/session/bootstrap"