Spaces:
Running
Running
| """LLM clients for Azure OpenAI and Azure AI Foundry Agents.""" | |
| from __future__ import annotations | |
| from typing import Any | |
| import httpx | |
| import anyio | |
| from azure.ai.projects import AIProjectClient | |
| from azure.identity import DefaultAzureCredential | |
| from ..core.config import get_settings | |
| from ..core.errors import LLMError | |
| class AzureOpenAIClient: | |
| """Minimal Azure OpenAI chat completions client.""" | |
| def __init__(self) -> None: | |
| self._settings = get_settings() | |
| async def chat( | |
| self, transcript: str, prompt: str | None = None, language: str | None = None | |
| ) -> str: | |
| """Call Azure OpenAI chat completions and return assistant text.""" | |
| system_prompt = ( | |
| "You are a concise, helpful assistant. " | |
| "Answer briefly and ask a clarifying question if needed." | |
| ) | |
| if language: | |
| system_prompt += f" Reply in the same language as the user ({language})." | |
| user_content = f"Transcript: {transcript}" | |
| if prompt: | |
| user_content += f"\nUser instruction: {prompt}" | |
| base = self._normalize_endpoint(self._settings.azure_openai_endpoint) | |
| url = ( | |
| f"{base}/openai/deployments/" | |
| f"{self._settings.azure_openai_deployment}/chat/completions" | |
| f"?api-version={self._settings.azure_openai_api_version}" | |
| ) | |
| payload = { | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_content}, | |
| ], | |
| "temperature": 0.2, | |
| "max_tokens": 300, | |
| } | |
| headers = {"api-key": self._settings.azure_openai_api_key} | |
| last_exc: httpx.HTTPStatusError | None = None | |
| try: | |
| for attempt in range(3): | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.post(url, json=payload, headers=headers) | |
| response.raise_for_status() | |
| data: dict[str, Any] = response.json() | |
| last_exc = None | |
| break | |
| except httpx.HTTPStatusError as exc: | |
| last_exc = exc | |
| body = exc.response.text or "" | |
| if exc.response.status_code == 400 and "content_filter" in body: | |
| # retry twice, then return guardrail message | |
| if attempt < 2: | |
| continue | |
| raise LLMError( | |
| code="llm_guardrail", | |
| message="Query is violating some guardrails.", | |
| details={"body": body}, | |
| ) from exc | |
| raise LLMError( | |
| code="llm_http", | |
| message=f"LLM request failed with status {exc.response.status_code}.", | |
| details={"body": body}, | |
| ) from exc | |
| except httpx.HTTPError as exc: | |
| raise LLMError(code="llm_http", message="LLM request failed.") from exc | |
| if last_exc is not None: | |
| raise LLMError( | |
| code="llm_http", | |
| message=f"LLM request failed with status {last_exc.response.status_code}.", | |
| details={"body": last_exc.response.text}, | |
| ) from last_exc | |
| try: | |
| content = data["choices"][0]["message"]["content"] | |
| except (KeyError, IndexError, TypeError) as exc: | |
| raise LLMError(code="llm_response", message="Invalid LLM response.") from exc | |
| text = str(content).strip() | |
| if not text: | |
| raise LLMError(code="llm_empty", message="Empty LLM response.") | |
| return text | |
| def _normalize_endpoint(self, endpoint: str) -> str: | |
| cleaned = endpoint.rstrip("/") | |
| marker = "/openai/" | |
| if marker in cleaned: | |
| cleaned = cleaned.split(marker, 1)[0] | |
| return cleaned | |
| class FoundryAgentClient: | |
| """Azure AI Foundry Agent client using a connection string.""" | |
| def __init__(self) -> None: | |
| self._settings = get_settings() | |
| if not hasattr(AIProjectClient, "from_connection_string"): | |
| raise LLMError( | |
| code="llm_config", | |
| message=( | |
| "azure-ai-projects is missing from_connection_string(). " | |
| "Install azure-ai-projects==1.0.0b10." | |
| ), | |
| ) | |
| self._credential = DefaultAzureCredential( | |
| exclude_managed_identity_credential=True | |
| ) | |
| self._client = AIProjectClient.from_connection_string( | |
| credential=self._credential, | |
| conn_str=self._settings.foundry_project_conn_str, | |
| ) | |
| async def chat( | |
| self, transcript: str, prompt: str | None = None, language: str | None = None | |
| ) -> str: | |
| """Send a message to the Foundry agent and return the reply text.""" | |
| user_content = f"Transcript: {transcript}" | |
| if prompt: | |
| user_content += f"\nUser instruction: {prompt}" | |
| if language: | |
| user_content += f"\nDetected language: {language}. Reply in the same language." | |
| try: | |
| return await anyio.to_thread.run_sync(self._chat_sync, user_content) | |
| except LLMError: | |
| raise | |
| except Exception as exc: | |
| raise LLMError( | |
| code="llm_http", | |
| message="LLM request failed.", | |
| details={"error": repr(exc)}, | |
| ) from exc | |
| def _chat_sync(self, user_content: str) -> str: | |
| thread_id = self._client.agents.create_thread().id | |
| self._client.agents.create_message( | |
| thread_id=thread_id, role="user", content=user_content | |
| ) | |
| run = self._client.agents.create_and_process_run( | |
| thread_id=thread_id, agent_id=self._settings.foundry_agent_id | |
| ) | |
| messages = self._client.agents.list_messages(thread_id=thread_id) | |
| text = self._extract_assistant_text(messages, run_id=getattr(run, "id", None)) | |
| if not text: | |
| raise LLMError( | |
| code="llm_empty", | |
| message="Empty LLM response.", | |
| details={ | |
| "messages_type": type(messages).__name__, | |
| "messages_repr": self._safe_repr(messages), | |
| }, | |
| ) | |
| return text | |
| def _extract_assistant_text( | |
| self, messages: Any, run_id: str | None = None | |
| ) -> str | None: | |
| data = getattr(messages, "data", None) | |
| if data is None and isinstance(messages, dict): | |
| data = messages.get("data") | |
| if not data: | |
| return None | |
| def get(field: str, obj: Any, default: Any = None) -> Any: | |
| if isinstance(obj, dict): | |
| return obj.get(field, default) | |
| return getattr(obj, field, default) | |
| candidates: list[Any] = [] | |
| for m in data: | |
| if get("role", m) != "assistant": | |
| continue | |
| if run_id is None or get("run_id", m) == run_id: | |
| candidates.append(m) | |
| if not candidates: | |
| candidates = [m for m in data if get("role", m) == "assistant"] | |
| if not candidates: | |
| return None | |
| msg = candidates[0] | |
| content = get("content", msg, []) or [] | |
| parts: list[str] = [] | |
| for block in content: | |
| btype = get("type", block) | |
| if btype == "text": | |
| text_obj = get("text", block, {}) | |
| value = get("value", text_obj) | |
| if value: | |
| parts.append(value) | |
| final = "\n".join(parts).strip() | |
| return final or None | |
| def _safe_repr(self, value: Any) -> str: | |
| try: | |
| return repr(value)[:2000] | |
| except Exception: | |
| return "<unreprable>" | |
| class LLMClient: | |
| """LLM router that dispatches to configured provider.""" | |
| def __init__(self) -> None: | |
| self._settings = get_settings() | |
| self._azure = AzureOpenAIClient() | |
| self._foundry = FoundryAgentClient() | |
| async def chat( | |
| self, | |
| transcript: str, | |
| prompt: str | None = None, | |
| provider: str | None = None, | |
| language: str | None = None, | |
| ) -> str: | |
| provider = provider or self._settings.llm_provider | |
| if provider == "foundry_agent": | |
| return await self._foundry.chat(transcript, prompt, language) | |
| if provider == "azure_openai": | |
| return await self._azure.chat(transcript, prompt, language) | |
| raise LLMError(code="llm_provider", message="Unsupported LLM provider.") | |