Spaces:
Sleeping
Sleeping
File size: 12,825 Bytes
8f7da58 8193c41 8f7da58 0167890 8f7da58 8193c41 8f7da58 8193c41 8f7da58 a1d8504 8f7da58 1641cad 8f7da58 8193c41 8f7da58 8193c41 8f7da58 6febd52 8193c41 8f7da58 1641cad 8f7da58 1641cad 8193c41 8f7da58 1641cad 8f7da58 1641cad 8f7da58 6febd52 1641cad 6febd52 8193c41 6febd52 8f7da58 1641cad 8f7da58 1641cad 8f7da58 1641cad 5e3ba57 8f7da58 1641cad 8f7da58 1641cad 8f7da58 0167890 1641cad 5e3ba57 8f7da58 1641cad 8f7da58 8193c41 6febd52 8f7da58 1641cad 8193c41 8f7da58 1641cad 8193c41 8f7da58 5e3ba57 8f7da58 a1d8504 8f7da58 0167890 a1d8504 8f7da58 a1d8504 8f7da58 a1d8504 7d6d879 8f7da58 7d6d879 d0f8e17 7d6d879 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 | from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse
import httpx
from mcp.shared.exceptions import McpError
from livekit.agents import AgentSession, mcp
from livekit.agents.llm.tool_context import ToolError
from livekit.plugins import openai as openai_plugin
from src.agent.prompts.runtime import MCP_STARTUP_GREETING
from src.core.logger import logger
from src.core.settings import LLMSettings
NVIDIA_OPENAI_BASE_URL = "https://integrate.api.nvidia.com/v1"
MCP_GENERATE_REPLY_BLOCK_MESSAGE = (
"Manual generate_reply is disabled in MCP mode; use session.say(...) instead."
)
MCP_TOOL_TIMEOUT_MESSAGE = (
"The external tool '{tool_name}' timed out. "
"Do not retry '{tool_name}' again in this turn. "
"Give the user a brief answer without it."
)
MCP_TOOL_UNAVAILABLE_MESSAGE = (
"The external tool '{tool_name}' is temporarily unavailable. "
"Do not retry '{tool_name}' again in this turn. "
"Give the user a brief answer without it."
)
@dataclass(frozen=True)
class MCPRuntimeDecision:
enabled: bool
reason: str
@dataclass(frozen=True)
class LLMRuntimeConfig:
llm: Any
mcp_servers: list[mcp.MCPServerHTTP] | None
provider: str
model: str
@property
def mcp_runtime_active(self) -> bool:
return self.mcp_servers is not None
class ConfiguredMCPServerHTTP(mcp.MCPServerHTTP):
def __init__(
self,
*,
url: str,
timeout_seconds: float,
headers: dict[str, Any] | None = None,
) -> None:
bounded_timeout = _bounded_timeout_seconds(timeout_seconds)
super().__init__(
url=url,
headers=headers,
timeout=bounded_timeout,
client_session_timeout_seconds=bounded_timeout,
)
self._request_timeout_seconds = bounded_timeout
@property
def request_timeout_seconds(self) -> float:
return self._request_timeout_seconds
@property
def client_session_timeout_seconds(self) -> float:
return self._read_timeout
def _make_function_tool(
self,
name: str,
description: str | None,
input_schema: dict[str, Any],
meta: dict[str, Any] | None,
) -> mcp.MCPTool:
async def _tool_called(raw_arguments: dict[str, Any]) -> Any:
if self._client is None:
raise ToolError(
"Tool invocation failed: internal service is unavailable. "
"Please check that the MCPServer is still running."
)
try:
tool_result = await self._client.call_tool(name, raw_arguments)
except Exception as exc:
normalized = normalize_mcp_tool_exception(tool_name=name, exc=exc)
if normalized is None:
raise
logger.warning(
"MCP tool invocation failed: tool=%s timeout=%s detail=%s",
name,
is_mcp_timeout_exception(exc),
describe_mcp_exception(exc),
)
raise normalized from exc
if tool_result.isError:
error_str = "\n".join(str(part) for part in tool_result.content)
raise ToolError(error_str)
if len(tool_result.content) == 1:
return tool_result.content[0].model_dump_json()
if len(tool_result.content) > 1:
return json.dumps([item.model_dump() for item in tool_result.content])
raise ToolError(
f"Tool '{name}' completed without producing a result. "
"This might indicate an issue with internal processing."
)
raw_schema = {
"name": name,
"description": description,
"parameters": input_schema,
}
if meta:
raw_schema["meta"] = meta
return mcp.function_tool(_tool_called, raw_schema=raw_schema)
def resolve_mcp_runtime_mode(
*,
mcp_enabled: bool,
llm_provider: str,
nvidia_api_key: str | None,
) -> MCPRuntimeDecision:
provider = (llm_provider or "").strip().lower()
if not mcp_enabled:
return MCPRuntimeDecision(enabled=False, reason="mcp_disabled")
if provider not in {"nvidia", "ollama"}:
return MCPRuntimeDecision(enabled=False, reason=f"provider_not_supported:{provider}")
if provider == "nvidia" and not nvidia_api_key:
return MCPRuntimeDecision(enabled=False, reason="missing_nvidia_api_key")
return MCPRuntimeDecision(enabled=True, reason="mcp_enabled")
def resolve_mcp_server_urls(
*,
mcp_server_url: str,
mcp_extra_server_urls: str,
) -> list[str]:
candidates = [mcp_server_url, *(mcp_extra_server_urls or "").split(",")]
deduplicated: list[str] = []
seen: set[str] = set()
for candidate in candidates:
normalized = (candidate or "").strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
deduplicated.append(normalized)
return deduplicated
def normalize_mcp_tool_exception(*, tool_name: str, exc: Exception) -> ToolError | None:
if is_mcp_timeout_exception(exc):
return ToolError(MCP_TOOL_TIMEOUT_MESSAGE.format(tool_name=tool_name))
if is_mcp_transport_exception(exc):
return ToolError(MCP_TOOL_UNAVAILABLE_MESSAGE.format(tool_name=tool_name))
return None
def is_mcp_timeout_exception(exc: BaseException) -> bool:
for error in iter_exception_chain(exc):
if isinstance(error, (TimeoutError, httpx.TimeoutException)):
return True
if isinstance(error, McpError) and looks_like_timeout_message(error.error.message):
return True
return False
def is_mcp_transport_exception(exc: BaseException) -> bool:
return any(
isinstance(error, (McpError, httpx.RequestError, OSError))
for error in iter_exception_chain(exc)
)
def iter_exception_chain(exc: BaseException) -> tuple[BaseException, ...]:
chain: list[BaseException] = []
seen: set[int] = set()
current: BaseException | None = exc
while current is not None and id(current) not in seen:
chain.append(current)
seen.add(id(current))
current = current.__cause__ or current.__context__
return tuple(chain)
def looks_like_timeout_message(message: str | None) -> bool:
normalized = (message or "").strip().lower()
if not normalized:
return False
return any(
token in normalized
for token in ("timed out", "timeout", "deadline exceeded", "read timed out")
)
def describe_mcp_exception(exc: BaseException) -> str:
for error in iter_exception_chain(exc):
if isinstance(error, McpError):
detail = error.error.message
else:
detail = str(error)
if detail:
return detail
return type(exc).__name__
def _bounded_timeout_seconds(timeout_seconds: float) -> float:
return max(float(timeout_seconds), 1.0)
def build_llm_runtime(
llm_settings: LLMSettings,
) -> LLMRuntimeConfig:
provider = (llm_settings.LLM_PROVIDER or "").strip().lower()
llm_timeout = build_mcp_http_timeout(llm_settings.LLM_CONN_TIMEOUT_SEC)
mcp_timeout_seconds = _bounded_timeout_seconds(llm_settings.MCP_CONN_TIMEOUT_SEC)
mcp_decision = resolve_mcp_runtime_mode(
mcp_enabled=llm_settings.MCP_ENABLED,
llm_provider=provider,
nvidia_api_key=llm_settings.NVIDIA_API_KEY,
)
mcp_server_urls: list[str] = []
if mcp_decision.enabled:
mcp_server_urls = resolve_mcp_server_urls(
mcp_server_url=llm_settings.MCP_SERVER_URL,
mcp_extra_server_urls=llm_settings.MCP_EXTRA_SERVER_URLS,
)
mcp_servers = [
ConfiguredMCPServerHTTP(url=url, timeout_seconds=mcp_timeout_seconds)
for url in mcp_server_urls
]
else:
mcp_servers = None
if provider == "nvidia":
if not llm_settings.NVIDIA_API_KEY:
raise ValueError(
"NVIDIA_API_KEY is required when LLM_PROVIDER=nvidia"
)
model = llm_settings.NVIDIA_MODEL
base_url = NVIDIA_OPENAI_BASE_URL
api_key = llm_settings.NVIDIA_API_KEY
extra_body = {"chat_template_kwargs": {"enable_thinking": False}}
elif provider == "ollama":
model = (llm_settings.OLLAMA_MODEL or "").strip()
if not model:
raise ValueError("OLLAMA_MODEL is required when LLM_PROVIDER=ollama")
base_url = (llm_settings.OLLAMA_BASE_URL or "").strip()
if not base_url:
raise ValueError("OLLAMA_BASE_URL is required when LLM_PROVIDER=ollama")
validate_ollama_model_for_endpoint(base_url=base_url, model=model)
api_key = resolve_ollama_api_key(llm_settings.OLLAMA_API_KEY)
extra_body = {"think": False}
else:
raise ValueError(
f"Unknown LLM provider: {provider}. Must be 'nvidia' or 'ollama'"
)
if llm_settings.MCP_ENABLED and not mcp_decision.enabled:
logger.warning(
"MCP runtime requested but unavailable: reason=%s provider=%s",
mcp_decision.reason,
provider,
)
elif mcp_decision.enabled:
logger.info(
"MCP runtime enabled: mcp_servers=%s llm_provider=%s llm_model=%s llm_timeout_sec=%.2f mcp_timeout_sec=%.2f",
mcp_server_urls,
provider,
model,
llm_settings.LLM_CONN_TIMEOUT_SEC,
mcp_timeout_seconds,
)
else:
logger.info("MCP runtime disabled (MCP_ENABLED=false)")
llm = openai_plugin.LLM(
model=model,
api_key=api_key,
base_url=base_url,
temperature=llm_settings.LLM_TEMPERATURE,
max_completion_tokens=llm_settings.LLM_MAX_TOKENS,
timeout=llm_timeout,
_strict_tool_schema=False,
extra_body=extra_body,
)
return LLMRuntimeConfig(
llm=llm,
mcp_servers=mcp_servers,
provider=provider,
model=model,
)
def resolve_ollama_api_key(api_key: str | None) -> str:
value = (api_key or "").strip()
if value:
return value
return "ollama"
def validate_ollama_model_for_endpoint(*, base_url: str, model: str) -> None:
if not is_ollama_cloud_openai_endpoint(base_url):
return
if model.lower().endswith(":cloud"):
raise ValueError(
"OLLAMA_MODEL cannot use ':cloud' aliases with OLLAMA_BASE_URL=https://ollama.com/v1. "
"Use an exact model ID from https://ollama.com/v1/models (for example, qwen3-next:80b)."
)
def is_ollama_cloud_openai_endpoint(base_url: str) -> bool:
raw = (base_url or "").strip()
if not raw:
return False
parsed = urlparse(raw)
host = (parsed.hostname or "").lower()
path = (parsed.path or "").rstrip("/")
return host in {"ollama.com", "www.ollama.com", "api.ollama.com"} and path == "/v1"
def build_mcp_http_timeout(timeout_seconds: float) -> httpx.Timeout:
bounded_timeout = max(timeout_seconds, 1.0)
return httpx.Timeout(
connect=bounded_timeout,
read=bounded_timeout,
write=bounded_timeout,
pool=bounded_timeout,
)
def install_mcp_generate_reply_guard(
session: AgentSession,
*,
mcp_runtime_active: bool,
) -> None:
if not mcp_runtime_active:
return
if getattr(session, "_open_voice_mcp_generate_reply_guard_installed", False):
return
def _blocked_generate_reply(*_: Any, **__: Any) -> Any:
raise RuntimeError(MCP_GENERATE_REPLY_BLOCK_MESSAGE)
setattr(session, "_open_voice_mcp_generate_reply_guard_installed", True)
setattr(session, "_open_voice_original_generate_reply", session.generate_reply)
setattr(session, "generate_reply", _blocked_generate_reply)
logger.info("MCP runtime policy active: manual generate_reply disabled")
def run_startup_greeting(
session: AgentSession,
*,
mcp_runtime_active: bool,
) -> Any | None:
if mcp_runtime_active:
logger.info("MCP runtime startup greeting via session.say")
try:
return session.say(
MCP_STARTUP_GREETING,
allow_interruptions=True,
add_to_chat_ctx=False,
)
except Exception as exc:
logger.warning(f"MCP startup greeting could not start: {exc}")
return None
try:
session.generate_reply(instructions="Greet the user and offer your assistance.")
except Exception as exc:
logger.warning(f"Startup greeting via generate_reply failed: {exc}")
return None
|