""" Gemma 4 client via OpenRouter free tier. Uses the OpenAI-compatible chat/completions endpoint. Supports function calling and reasoning mode (the latter is the risk-analyst agent's showcase capability). Notes from the smoke test (scripts/smoke_test.py): - Reasoning trace lives at message.reasoning_details[i].text — the earlier spec said .content, which is wrong on Gemma 4 / OpenRouter today. We also fall back to the top-level message.reasoning string that OpenRouter returns concatenated for convenience. - The shared :free upstream pool 429s very quickly (we got rate-limited after 2 calls). BYOK a Google AI Studio key into OpenRouter integrations for the demo. We retry on 429 with the fallback model, then exponential backoff up to a cap. """ import asyncio import json from typing import Optional import httpx from app.config import ( APP_URL, APP_NAME, INFERENCE_BACKEND, LLM_API_KEY, LLM_BASE_URL, MODEL_FALLBACK, MODEL_PRIMARY, ) class RateLimitedError(Exception): """All retries exhausted while upstream was rate-limiting.""" # Local inference is slower than cloud — the call has to wait for the # laptop GPU/CPU instead of OpenRouter's server. Bump the per-request # timeout so a slow first-token doesn't trip httpx before the model # finishes generating. _HTTP_TIMEOUT = 300.0 if INFERENCE_BACKEND == "ollama" else 120.0 async def call_gemma4( messages: list[dict], tools: Optional[list[dict]] = None, model: str = MODEL_PRIMARY, reasoning: bool = False, max_tokens: int = 4096, temperature: float = 0.3, retries: int = 3, ) -> dict: """ Call Gemma 4 via the configured INFERENCE_BACKEND. Returns raw response JSON. Retry policy: - On 429 with primary model: switch to fallback model, then retry with exponential backoff (2s, 4s, 8s). (Cloud only; Ollama queues instead of rate-limiting.) - On timeout: short retry. """ if INFERENCE_BACKEND != "ollama" and not LLM_API_KEY: raise RuntimeError("OPENROUTER_API_KEY is not set") headers = { "Authorization": f"Bearer {LLM_API_KEY or 'ollama'}", "Content-Type": "application/json", } # OpenRouter-only telemetry headers; Ollama would just ignore them # but cleaner to omit when irrelevant. if INFERENCE_BACKEND == "openrouter": headers["HTTP-Referer"] = APP_URL headers["X-Title"] = APP_NAME payload: dict = { "model": model, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, } if tools: payload["tools"] = tools payload["tool_choice"] = "auto" if reasoning: payload["reasoning"] = {"enabled": True} current_model = model backoff = 2.0 async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT) as client: for attempt in range(retries + 1): try: resp = await client.post( f"{LLM_BASE_URL}/chat/completions", headers=headers, json={**payload, "model": current_model}, ) except httpx.TimeoutException: if attempt >= retries: raise await asyncio.sleep(backoff) backoff *= 2 continue if resp.status_code == 429: if current_model == MODEL_PRIMARY and attempt == 0: current_model = MODEL_FALLBACK continue if attempt >= retries: raise RateLimitedError( f"Both Gemma 4 free models rate-limited after {retries + 1} attempts. " "BYOK a Google AI Studio key at openrouter.ai/settings/integrations." ) await asyncio.sleep(backoff) backoff *= 2 continue # Transient upstream errors (502/503/504 are common when an # OpenRouter provider hiccups). Retry with backoff before # surfacing to the user as an agent error. if resp.status_code in (500, 502, 503, 504): if attempt >= retries: resp.raise_for_status() await asyncio.sleep(backoff) backoff *= 2 continue resp.raise_for_status() return resp.json() raise RuntimeError("call_gemma4 exited retry loop without returning") def extract_text(response: dict) -> str: choices = response.get("choices") or [] if not choices: return "" return (choices[0].get("message") or {}).get("content") or "" def extract_tool_calls(response: dict) -> list[dict]: choices = response.get("choices") or [] if not choices: return [] return (choices[0].get("message") or {}).get("tool_calls") or [] def extract_reasoning(response: dict) -> str: """ Extract the chain-of-thought reasoning trace from a Gemma 4 response. OpenRouter returns reasoning two ways for Gemma 4: 1. message.reasoning_details: [{type, text, format, index}, ...] 2. message.reasoning: "" The (1) form is canonical (per-block, indexable). The (2) form is a convenience concatenation. We prefer (1) and fall back to (2). """ choices = response.get("choices") or [] if not choices: return "" msg = choices[0].get("message") or {} details = msg.get("reasoning_details") or [] if details: parts = [] for d in details: if not isinstance(d, dict): continue text = d.get("text") or d.get("content") or "" if text: parts.append(text) if parts: return "\n".join(parts) reasoning = msg.get("reasoning") if isinstance(reasoning, str) and reasoning: return reasoning return "" def parse_json_response(text: str) -> Optional[dict]: """Strip ```json fences and parse. Return None on failure.""" if not text: return None clean = text.strip() if clean.startswith("```"): clean = clean.split("\n", 1)[1] if "\n" in clean else clean[3:] if clean.endswith("```"): clean = clean.rsplit("```", 1)[0] clean = clean.strip() if clean.startswith("json"): clean = clean[4:].strip() try: return json.loads(clean) except json.JSONDecodeError: return None async def run_tool_loop( system_prompt: str, user_prompt: str, tools: list[dict], tool_handlers: dict, max_iterations: int = 5, model: str = MODEL_PRIMARY, ) -> str: """ Run a Gemma 4 agent that can call tools. tool_handlers: dict mapping function name → async callable that returns a JSON-serializable result. """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] for _ in range(max_iterations): response = await call_gemma4(messages, tools=tools, model=model) tool_calls = extract_tool_calls(response) if not tool_calls: return extract_text(response) messages.append(response["choices"][0]["message"]) for tc in tool_calls: fn_name = tc["function"]["name"] try: fn_args = json.loads(tc["function"]["arguments"]) except json.JSONDecodeError: fn_args = {} handler = tool_handlers.get(fn_name) if handler: try: result = await handler(**fn_args) except Exception as e: result = {"error": str(e)} else: result = {"error": f"Unknown tool: {fn_name}"} messages.append({ "role": "tool", "tool_call_id": tc["id"], "content": json.dumps(result, default=str), }) return extract_text(response)