FlutIQ / app /llm /client.py
kredd25's picture
v0.15: dual-mode (Cloud + Edge) + expert-briefing dossier reframe
a159716
"""
Gemma 4 client via OpenRouter free tier.
Uses the OpenAI-compatible chat/completions endpoint. Supports function
calling and reasoning mode (the latter is the risk-analyst agent's
showcase capability).
Notes from the smoke test (scripts/smoke_test.py):
- Reasoning trace lives at message.reasoning_details[i].text — the
earlier spec said .content, which is wrong on Gemma 4 / OpenRouter
today. We also fall back to the top-level message.reasoning string
that OpenRouter returns concatenated for convenience.
- The shared :free upstream pool 429s very quickly (we got rate-limited
after 2 calls). BYOK a Google AI Studio key into OpenRouter
integrations for the demo. We retry on 429 with the fallback model,
then exponential backoff up to a cap.
"""
import asyncio
import json
from typing import Optional
import httpx
from app.config import (
APP_URL,
APP_NAME,
INFERENCE_BACKEND,
LLM_API_KEY,
LLM_BASE_URL,
MODEL_FALLBACK,
MODEL_PRIMARY,
)
class RateLimitedError(Exception):
"""All retries exhausted while upstream was rate-limiting."""
# Local inference is slower than cloud — the call has to wait for the
# laptop GPU/CPU instead of OpenRouter's server. Bump the per-request
# timeout so a slow first-token doesn't trip httpx before the model
# finishes generating.
_HTTP_TIMEOUT = 300.0 if INFERENCE_BACKEND == "ollama" else 120.0
async def call_gemma4(
messages: list[dict],
tools: Optional[list[dict]] = None,
model: str = MODEL_PRIMARY,
reasoning: bool = False,
max_tokens: int = 4096,
temperature: float = 0.3,
retries: int = 3,
) -> dict:
"""
Call Gemma 4 via the configured INFERENCE_BACKEND. Returns raw response JSON.
Retry policy:
- On 429 with primary model: switch to fallback model, then retry
with exponential backoff (2s, 4s, 8s). (Cloud only; Ollama queues
instead of rate-limiting.)
- On timeout: short retry.
"""
if INFERENCE_BACKEND != "ollama" and not LLM_API_KEY:
raise RuntimeError("OPENROUTER_API_KEY is not set")
headers = {
"Authorization": f"Bearer {LLM_API_KEY or 'ollama'}",
"Content-Type": "application/json",
}
# OpenRouter-only telemetry headers; Ollama would just ignore them
# but cleaner to omit when irrelevant.
if INFERENCE_BACKEND == "openrouter":
headers["HTTP-Referer"] = APP_URL
headers["X-Title"] = APP_NAME
payload: dict = {
"model": model,
"messages": messages,
"max_tokens": max_tokens,
"temperature": temperature,
}
if tools:
payload["tools"] = tools
payload["tool_choice"] = "auto"
if reasoning:
payload["reasoning"] = {"enabled": True}
current_model = model
backoff = 2.0
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client:
for attempt in range(retries + 1):
try:
resp = await client.post(
f"{LLM_BASE_URL}/chat/completions",
headers=headers,
json={**payload, "model": current_model},
)
except httpx.TimeoutException:
if attempt >= retries:
raise
await asyncio.sleep(backoff)
backoff *= 2
continue
if resp.status_code == 429:
if current_model == MODEL_PRIMARY and attempt == 0:
current_model = MODEL_FALLBACK
continue
if attempt >= retries:
raise RateLimitedError(
f"Both Gemma 4 free models rate-limited after {retries + 1} attempts. "
"BYOK a Google AI Studio key at openrouter.ai/settings/integrations."
)
await asyncio.sleep(backoff)
backoff *= 2
continue
# Transient upstream errors (502/503/504 are common when an
# OpenRouter provider hiccups). Retry with backoff before
# surfacing to the user as an agent error.
if resp.status_code in (500, 502, 503, 504):
if attempt >= retries:
resp.raise_for_status()
await asyncio.sleep(backoff)
backoff *= 2
continue
resp.raise_for_status()
return resp.json()
raise RuntimeError("call_gemma4 exited retry loop without returning")
def extract_text(response: dict) -> str:
choices = response.get("choices") or []
if not choices:
return ""
return (choices[0].get("message") or {}).get("content") or ""
def extract_tool_calls(response: dict) -> list[dict]:
choices = response.get("choices") or []
if not choices:
return []
return (choices[0].get("message") or {}).get("tool_calls") or []
def extract_reasoning(response: dict) -> str:
"""
Extract the chain-of-thought reasoning trace from a Gemma 4 response.
OpenRouter returns reasoning two ways for Gemma 4:
1. message.reasoning_details: [{type, text, format, index}, ...]
2. message.reasoning: "<concatenated string>"
The (1) form is canonical (per-block, indexable). The (2) form is a
convenience concatenation. We prefer (1) and fall back to (2).
"""
choices = response.get("choices") or []
if not choices:
return ""
msg = choices[0].get("message") or {}
details = msg.get("reasoning_details") or []
if details:
parts = []
for d in details:
if not isinstance(d, dict):
continue
text = d.get("text") or d.get("content") or ""
if text:
parts.append(text)
if parts:
return "\n".join(parts)
reasoning = msg.get("reasoning")
if isinstance(reasoning, str) and reasoning:
return reasoning
return ""
def parse_json_response(text: str) -> Optional[dict]:
"""Strip ```json fences and parse. Return None on failure."""
if not text:
return None
clean = text.strip()
if clean.startswith("```"):
clean = clean.split("\n", 1)[1] if "\n" in clean else clean[3:]
if clean.endswith("```"):
clean = clean.rsplit("```", 1)[0]
clean = clean.strip()
if clean.startswith("json"):
clean = clean[4:].strip()
try:
return json.loads(clean)
except json.JSONDecodeError:
return None
async def run_tool_loop(
system_prompt: str,
user_prompt: str,
tools: list[dict],
tool_handlers: dict,
max_iterations: int = 5,
model: str = MODEL_PRIMARY,
) -> str:
"""
Run a Gemma 4 agent that can call tools.
tool_handlers: dict mapping function name → async callable that
returns a JSON-serializable result.
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
for _ in range(max_iterations):
response = await call_gemma4(messages, tools=tools, model=model)
tool_calls = extract_tool_calls(response)
if not tool_calls:
return extract_text(response)
messages.append(response["choices"][0]["message"])
for tc in tool_calls:
fn_name = tc["function"]["name"]
try:
fn_args = json.loads(tc["function"]["arguments"])
except json.JSONDecodeError:
fn_args = {}
handler = tool_handlers.get(fn_name)
if handler:
try:
result = await handler(**fn_args)
except Exception as e:
result = {"error": str(e)}
else:
result = {"error": f"Unknown tool: {fn_name}"}
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": json.dumps(result, default=str),
})
return extract_text(response)