mohammedafeef's picture
Upload 16 files
9b9a6d7 verified
"""
utils.py – Shared helpers for all agents
Provides:
• safe_content(response) — extract string from LLM response, handles both
plain-string and list-of-parts content (newer langchain-google-genai versions)
• safe_llm_invoke(llm, messages, fallback) — LLM call with retry + network fallback
• strip_agent_prefix(text) — remove "[Orchestrator → X]" prefixes from instructions
"""
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
# ── Content normaliser ────────────────────────────────────────────────────────
def safe_content(response: Any, fallback: str = "") -> str:
"""
Extract a plain string from an LLM response.
langchain-google-genai ≥ 0.3 sometimes returns `content` as a list of
content-part dicts: [{"type": "text", "text": "..."}, ...]
Older versions return a plain string. Handle both.
"""
if response is None:
return fallback
content = getattr(response, "content", response)
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts = []
for part in content:
if isinstance(part, str):
parts.append(part)
elif isinstance(part, dict):
# {"type": "text", "text": "..."} or {"type": "text", "content": "..."}
parts.append(part.get("text") or part.get("content") or str(part))
else:
parts.append(str(part))
return " ".join(p for p in parts if p).strip()
return str(content).strip() if content else fallback
# ── Safe LLM invoke with retry ────────────────────────────────────────────────
def safe_llm_invoke(
llm: Any,
messages: list,
fallback: str = "I'm sorry, I'm having trouble connecting right now. Please try again.",
retries: int = 2,
delay: float = 1.5,
) -> str:
"""
Invoke an LLM with automatic retry on transient network/API errors.
Returns the response as a plain string (never raises).
"""
last_error = None
for attempt in range(1, retries + 1):
try:
response = llm.invoke(messages)
result = safe_content(response, fallback)
if result:
return result
except Exception as e:
last_error = e
err_str = str(e)
# Permanent errors — don't retry
if any(k in err_str for k in ("API_KEY", "invalid", "PERMISSION_DENIED", "400")):
logger.error(f"LLM permanent error: {e}")
return fallback
# Transient — retry
logger.warning(f"LLM attempt {attempt}/{retries} failed: {e}")
if attempt < retries:
time.sleep(delay)
logger.error(f"LLM failed after {retries} retries: {last_error}")
return fallback
# ── Prefix stripper ───────────────────────────────────────────────────────────
def strip_agent_prefix(text: str, agent_name: str) -> str:
"""
Remove the routing prefix the Orchestrator adds, e.g.:
"[Orchestrator → memory] WRITE symptom: headache"
→ "WRITE symptom: headache"
"""
markers = [
f"[Orchestrator → {agent_name}]",
f"[Orchestrator -> {agent_name}]",
f"[Orchestrator → {agent_name}]", # different arrow char
]
for marker in markers:
if marker in text:
return text.split("]", 1)[-1].strip()
return text.strip()