Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import base64 | |
| import json | |
| import re | |
| from dataclasses import dataclass | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import requests | |
| OPENROUTER_CHAT_URL = "https://openrouter.ai/api/v1/chat/completions" | |
| OPENROUTER_MODELS_URL = "https://openrouter.ai/api/v1/models" | |
| class ChatResult: | |
| content: str | |
| model: str | |
| native_finish_reason: Optional[str] | |
| tool_calls: Any | |
| raw: dict | |
| def list_models(api_key: str) -> dict: | |
| headers = {"Authorization": f"Bearer {api_key}"} | |
| r = requests.get(OPENROUTER_MODELS_URL, headers=headers, timeout=60) | |
| r.raise_for_status() | |
| return r.json() | |
| def choose_free_vision_model(api_key: str, preferred: List[str]) -> str: | |
| models = list_models(api_key).get("data", []) | |
| # try preferred first | |
| available = {m.get("id") for m in models if isinstance(m, dict)} | |
| for p in preferred: | |
| if p in available: | |
| return p | |
| # fallback: any model with ":free" + some vision hint in the metadata | |
| for m in models: | |
| if not isinstance(m, dict): | |
| continue | |
| mid = m.get("id", "") | |
| if ":free" not in mid: | |
| continue | |
| # crude heuristic: many vision models have "vl" or "vision" somewhere | |
| text = json.dumps(m).lower() | |
| if ("vision" in text) or ("image" in text) or ("vl" in mid.lower()): | |
| return mid | |
| raise RuntimeError("Could not find any free vision-capable model in /models. Set OPENROUTER_MODEL explicitly.") | |
| def choose_any_free_text_model(api_key: str) -> str: | |
| models = list_models(api_key).get("data", []) | |
| for m in models: | |
| if not isinstance(m, dict): | |
| continue | |
| mid = m.get("id", "") | |
| if ":free" not in mid: | |
| continue | |
| # exclude known vision-only ids if any; otherwise allow | |
| return mid | |
| raise RuntimeError("Could not find any free text-capable model in /models.") | |
| def _img_bytes_to_data_url(png_bytes: bytes) -> str: | |
| b64 = base64.b64encode(png_bytes).decode("utf-8") | |
| return f"data:image/png;base64,{b64}" | |
| def make_user_message_with_images(prompt_text: str, images: List[bytes]) -> dict: | |
| """ | |
| OpenRouter follows OpenAI chat schema. Use 'image_url' (snake) which is supported by OpenAI-style APIs. | |
| """ | |
| content: List[dict] = [{"type": "text", "text": prompt_text}] | |
| for b in images: | |
| content.append( | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": _img_bytes_to_data_url(b)}, | |
| } | |
| ) | |
| return {"role": "user", "content": content} | |
| def chat_completion( | |
| api_key: str, | |
| model: str, | |
| messages: List[dict], | |
| temperature: float = 0.0, | |
| max_tokens: int = 1200, | |
| ) -> ChatResult: | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| r = requests.post(OPENROUTER_CHAT_URL, headers=headers, json=payload, timeout=180) | |
| if r.status_code != 200: | |
| print(f"API Error {r.status_code}: {r.text}", flush=True) | |
| r.raise_for_status() | |
| data = r.json() | |
| # OpenAI-like response | |
| choice = (data.get("choices") or [{}])[0] | |
| msg = choice.get("message") or {} | |
| content = msg.get("content") or "" | |
| tool_calls = msg.get("tool_calls") | |
| finish = choice.get("finish_reason") | |
| return ChatResult( | |
| content=content if isinstance(content, str) else json.dumps(content), | |
| model=data.get("model") or model, | |
| native_finish_reason=finish, | |
| tool_calls=tool_calls, | |
| raw=data, | |
| ) | |
| _JSON_OBJ_RE = re.compile(r"\{.*\}", re.DOTALL) | |
| _JSON_ARR_RE = re.compile(r"\[.*\]", re.DOTALL) | |
| def robust_json_loads(text: str) -> Any: | |
| """ | |
| Extract the first valid JSON object/array from a messy LLM output. | |
| """ | |
| if not text: | |
| raise ValueError("Empty model output.") | |
| t = text.strip() | |
| # direct try | |
| try: | |
| return json.loads(t) | |
| except Exception: | |
| pass | |
| # try find object | |
| m = _JSON_OBJ_RE.search(t) | |
| if m: | |
| cand = m.group(0) | |
| try: | |
| return json.loads(cand) | |
| except Exception: | |
| pass | |
| # try find array | |
| m = _JSON_ARR_RE.search(t) | |
| if m: | |
| cand = m.group(0) | |
| try: | |
| return json.loads(cand) | |
| except Exception: | |
| pass | |
| raise ValueError("Could not parse JSON from model output.") | |
| def repair_to_json(api_key: str, bad_text: str, model: str) -> str: | |
| """ | |
| Uses a free text model to rewrite messy output into strict JSON only. | |
| """ | |
| sys = ( | |
| "You are a strict JSON formatter. " | |
| "Return ONLY valid JSON. No markdown, no commentary. " | |
| "Preserve keys/values if possible." | |
| ) | |
| user = f"Convert this into valid JSON ONLY:\n\n{bad_text}" | |
| res = chat_completion( | |
| api_key=api_key, | |
| model=model, | |
| messages=[ | |
| {"role": "system", "content": sys}, | |
| {"role": "user", "content": user}, | |
| ], | |
| temperature=0.0, | |
| max_tokens=1200, | |
| ) | |
| return res.content.strip() | |