Sbboss's picture
RAG, language updates
0b2d478
"""LLM clients for Azure OpenAI and Azure AI Foundry Agents."""
from __future__ import annotations
from typing import Any
import httpx
import anyio
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from ..core.config import get_settings
from ..core.errors import LLMError
class AzureOpenAIClient:
"""Minimal Azure OpenAI chat completions client."""
def __init__(self) -> None:
self._settings = get_settings()
async def chat(
self, transcript: str, prompt: str | None = None, language: str | None = None
) -> str:
"""Call Azure OpenAI chat completions and return assistant text."""
system_prompt = (
"You are a concise, helpful assistant. "
"Answer briefly and ask a clarifying question if needed."
)
if language:
system_prompt += f" Reply in the same language as the user ({language})."
user_content = f"Transcript: {transcript}"
if prompt:
user_content += f"\nUser instruction: {prompt}"
base = self._normalize_endpoint(self._settings.azure_openai_endpoint)
url = (
f"{base}/openai/deployments/"
f"{self._settings.azure_openai_deployment}/chat/completions"
f"?api-version={self._settings.azure_openai_api_version}"
)
payload = {
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
"temperature": 0.2,
"max_tokens": 300,
}
headers = {"api-key": self._settings.azure_openai_api_key}
last_exc: httpx.HTTPStatusError | None = None
try:
for attempt in range(3):
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(url, json=payload, headers=headers)
response.raise_for_status()
data: dict[str, Any] = response.json()
last_exc = None
break
except httpx.HTTPStatusError as exc:
last_exc = exc
body = exc.response.text or ""
if exc.response.status_code == 400 and "content_filter" in body:
# retry twice, then return guardrail message
if attempt < 2:
continue
raise LLMError(
code="llm_guardrail",
message="Query is violating some guardrails.",
details={"body": body},
) from exc
raise LLMError(
code="llm_http",
message=f"LLM request failed with status {exc.response.status_code}.",
details={"body": body},
) from exc
except httpx.HTTPError as exc:
raise LLMError(code="llm_http", message="LLM request failed.") from exc
if last_exc is not None:
raise LLMError(
code="llm_http",
message=f"LLM request failed with status {last_exc.response.status_code}.",
details={"body": last_exc.response.text},
) from last_exc
try:
content = data["choices"][0]["message"]["content"]
except (KeyError, IndexError, TypeError) as exc:
raise LLMError(code="llm_response", message="Invalid LLM response.") from exc
text = str(content).strip()
if not text:
raise LLMError(code="llm_empty", message="Empty LLM response.")
return text
def _normalize_endpoint(self, endpoint: str) -> str:
cleaned = endpoint.rstrip("/")
marker = "/openai/"
if marker in cleaned:
cleaned = cleaned.split(marker, 1)[0]
return cleaned
class FoundryAgentClient:
"""Azure AI Foundry Agent client using a connection string."""
def __init__(self) -> None:
self._settings = get_settings()
if not hasattr(AIProjectClient, "from_connection_string"):
raise LLMError(
code="llm_config",
message=(
"azure-ai-projects is missing from_connection_string(). "
"Install azure-ai-projects==1.0.0b10."
),
)
self._credential = DefaultAzureCredential(
exclude_managed_identity_credential=True
)
self._client = AIProjectClient.from_connection_string(
credential=self._credential,
conn_str=self._settings.foundry_project_conn_str,
)
async def chat(
self, transcript: str, prompt: str | None = None, language: str | None = None
) -> str:
"""Send a message to the Foundry agent and return the reply text."""
user_content = f"Transcript: {transcript}"
if prompt:
user_content += f"\nUser instruction: {prompt}"
if language:
user_content += f"\nDetected language: {language}. Reply in the same language."
try:
return await anyio.to_thread.run_sync(self._chat_sync, user_content)
except LLMError:
raise
except Exception as exc:
raise LLMError(
code="llm_http",
message="LLM request failed.",
details={"error": repr(exc)},
) from exc
def _chat_sync(self, user_content: str) -> str:
thread_id = self._client.agents.create_thread().id
self._client.agents.create_message(
thread_id=thread_id, role="user", content=user_content
)
run = self._client.agents.create_and_process_run(
thread_id=thread_id, agent_id=self._settings.foundry_agent_id
)
messages = self._client.agents.list_messages(thread_id=thread_id)
text = self._extract_assistant_text(messages, run_id=getattr(run, "id", None))
if not text:
raise LLMError(
code="llm_empty",
message="Empty LLM response.",
details={
"messages_type": type(messages).__name__,
"messages_repr": self._safe_repr(messages),
},
)
return text
def _extract_assistant_text(
self, messages: Any, run_id: str | None = None
) -> str | None:
data = getattr(messages, "data", None)
if data is None and isinstance(messages, dict):
data = messages.get("data")
if not data:
return None
def get(field: str, obj: Any, default: Any = None) -> Any:
if isinstance(obj, dict):
return obj.get(field, default)
return getattr(obj, field, default)
candidates: list[Any] = []
for m in data:
if get("role", m) != "assistant":
continue
if run_id is None or get("run_id", m) == run_id:
candidates.append(m)
if not candidates:
candidates = [m for m in data if get("role", m) == "assistant"]
if not candidates:
return None
msg = candidates[0]
content = get("content", msg, []) or []
parts: list[str] = []
for block in content:
btype = get("type", block)
if btype == "text":
text_obj = get("text", block, {})
value = get("value", text_obj)
if value:
parts.append(value)
final = "\n".join(parts).strip()
return final or None
def _safe_repr(self, value: Any) -> str:
try:
return repr(value)[:2000]
except Exception:
return "<unreprable>"
class LLMClient:
"""LLM router that dispatches to configured provider."""
def __init__(self) -> None:
self._settings = get_settings()
self._azure = AzureOpenAIClient()
self._foundry = FoundryAgentClient()
async def chat(
self,
transcript: str,
prompt: str | None = None,
provider: str | None = None,
language: str | None = None,
) -> str:
provider = provider or self._settings.llm_provider
if provider == "foundry_agent":
return await self._foundry.chat(transcript, prompt, language)
if provider == "azure_openai":
return await self._azure.chat(transcript, prompt, language)
raise LLMError(code="llm_provider", message="Unsupported LLM provider.")