|
|
"""Helper functions to fetch and filter free models from OpenRouter API.""" |
|
|
|
|
|
import json |
|
|
import logging |
|
|
import os |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import Any |
|
|
|
|
|
import requests |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
OPENROUTER_API_URL = "https://openrouter.ai/api/v1/models" |
|
|
CACHE_DIR = Path(".cache") |
|
|
CACHE_FILE = CACHE_DIR / "openrouter_models.json" |
|
|
CACHE_DURATION_SECONDS = 24 * 60 * 60 |
|
|
|
|
|
|
|
|
def is_free_model(model: dict[str, Any]) -> bool: |
|
|
""" |
|
|
Check if a model is free based on its ID or pricing. |
|
|
|
|
|
Args: |
|
|
model: Model dictionary from OpenRouter API |
|
|
|
|
|
Returns: |
|
|
True if the model is free, False otherwise |
|
|
""" |
|
|
model_id = model.get("id", "") |
|
|
|
|
|
|
|
|
if ":free" in model_id: |
|
|
return True |
|
|
|
|
|
|
|
|
pricing = model.get("pricing", {}) |
|
|
prompt_price = pricing.get("prompt", "0") |
|
|
completion_price = pricing.get("completion", "0") |
|
|
|
|
|
|
|
|
try: |
|
|
prompt_price_float = float(prompt_price) if prompt_price else 0.0 |
|
|
completion_price_float = float(completion_price) if completion_price else 0.0 |
|
|
return prompt_price_float == 0.0 and completion_price_float == 0.0 |
|
|
except (ValueError, TypeError): |
|
|
|
|
|
return (prompt_price in ["0", None, ""] and |
|
|
completion_price in ["0", None, ""]) |
|
|
|
|
|
|
|
|
def _load_cache() -> tuple[list[dict[str, Any]] | None, float | None]: |
|
|
""" |
|
|
Load cached models from file. |
|
|
|
|
|
Returns: |
|
|
Tuple of (cached_models, cache_timestamp) or (None, None) if cache doesn't exist or is invalid |
|
|
""" |
|
|
if not CACHE_FILE.exists(): |
|
|
return None, None |
|
|
|
|
|
try: |
|
|
with open(CACHE_FILE, "r", encoding="utf-8") as f: |
|
|
cache_data = json.load(f) |
|
|
|
|
|
cached_models = cache_data.get("models", None) |
|
|
cache_timestamp = cache_data.get("timestamp", None) |
|
|
|
|
|
if cached_models is None or cache_timestamp is None: |
|
|
return None, None |
|
|
|
|
|
return cached_models, cache_timestamp |
|
|
except (json.JSONDecodeError, IOError) as e: |
|
|
logger.warning(f"Error loading cache: {e}") |
|
|
return None, None |
|
|
|
|
|
|
|
|
def _save_cache(models: list[dict[str, Any]]) -> None: |
|
|
""" |
|
|
Save models to cache file. |
|
|
|
|
|
Args: |
|
|
models: List of model dictionaries to cache |
|
|
""" |
|
|
try: |
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
cache_data = { |
|
|
"models": models, |
|
|
"timestamp": time.time(), |
|
|
} |
|
|
|
|
|
with open(CACHE_FILE, "w", encoding="utf-8") as f: |
|
|
json.dump(cache_data, f) |
|
|
|
|
|
logger.info(f"Cached {len(models)} free models to {CACHE_FILE}") |
|
|
except IOError as e: |
|
|
logger.warning(f"Error saving cache: {e}") |
|
|
|
|
|
|
|
|
def fetch_free_models() -> list[dict[str, Any]]: |
|
|
""" |
|
|
Fetch all free models from OpenRouter API. |
|
|
Uses file-based cache that refreshes once per day. |
|
|
|
|
|
Returns: |
|
|
List of free model dictionaries with metadata |
|
|
""" |
|
|
|
|
|
cached_models, cache_timestamp = _load_cache() |
|
|
|
|
|
if cached_models is not None and cache_timestamp is not None: |
|
|
|
|
|
age_seconds = time.time() - cache_timestamp |
|
|
if age_seconds < CACHE_DURATION_SECONDS: |
|
|
logger.info(f"Using cached models (age: {age_seconds / 3600:.1f} hours)") |
|
|
return cached_models |
|
|
else: |
|
|
logger.info(f"Cache expired (age: {age_seconds / 3600:.1f} hours), fetching fresh data") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
response = requests.get(OPENROUTER_API_URL, timeout=10) |
|
|
response.raise_for_status() |
|
|
|
|
|
data = response.json() |
|
|
models = data.get("data", []) |
|
|
|
|
|
|
|
|
free_models = [model for model in models if is_free_model(model)] |
|
|
|
|
|
logger.info(f"Fetched {len(free_models)} free models from OpenRouter") |
|
|
|
|
|
|
|
|
_save_cache(free_models) |
|
|
|
|
|
return free_models |
|
|
|
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Error fetching models from OpenRouter: {e}") |
|
|
|
|
|
if cached_models is not None: |
|
|
logger.warning("API call failed, using expired cache as fallback") |
|
|
return cached_models |
|
|
return [] |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error fetching models: {e}") |
|
|
|
|
|
if cached_models is not None: |
|
|
logger.warning("Unexpected error, using expired cache as fallback") |
|
|
return cached_models |
|
|
return [] |
|
|
|
|
|
|
|
|
def get_model_config(model: dict[str, Any]) -> dict[str, Any]: |
|
|
""" |
|
|
Extract model configuration from OpenRouter API response. |
|
|
|
|
|
Args: |
|
|
model: Model dictionary from OpenRouter API |
|
|
|
|
|
Returns: |
|
|
Model configuration dictionary with type, model, max_context, tokenizer |
|
|
""" |
|
|
model_id = model.get("id", "") |
|
|
context_length = model.get("context_length") |
|
|
architecture = model.get("architecture", {}) |
|
|
tokenizer_group = architecture.get("tokenizer", "") |
|
|
|
|
|
|
|
|
tokenizer = None |
|
|
hugging_face_id = model.get("hugging_face_id") |
|
|
|
|
|
|
|
|
if hugging_face_id: |
|
|
tokenizer = f"hf/{hugging_face_id}" |
|
|
else: |
|
|
|
|
|
|
|
|
parts = model_id.split("/") |
|
|
if len(parts) > 1: |
|
|
org = parts[0] |
|
|
model_name = parts[-1].split(":")[0] |
|
|
tokenizer = f"hf/{org}/{model_name}" |
|
|
else: |
|
|
|
|
|
model_name = model_id.split(":")[0] |
|
|
tokenizer = f"hf/{model_name}" |
|
|
|
|
|
|
|
|
if not tokenizer: |
|
|
tokenizer = "gpt2" |
|
|
|
|
|
|
|
|
if context_length is None: |
|
|
context_length = 131072 |
|
|
|
|
|
return { |
|
|
"type": "free_openrouter", |
|
|
"model": f"openrouter/{model_id}", |
|
|
"max_context": context_length, |
|
|
"tokenizer": tokenizer, |
|
|
"model_id": model_id, |
|
|
"name": model.get("name", model_id), |
|
|
"description": model.get("description", ""), |
|
|
} |
|
|
|
|
|
|