| """Models.dev registry integration — primary database for providers and models. |
| |
| Fetches from https://models.dev/api.json — a community-maintained database |
| of 4000+ models across 109+ providers. Provides: |
| |
| - **Provider metadata**: name, base URL, env vars, documentation link |
| - **Model metadata**: context window, max output, cost/M tokens, capabilities |
| (reasoning, tools, vision, PDF, audio), modalities, knowledge cutoff, |
| open-weights flag, family grouping, deprecation status |
| |
| Data resolution order (like TypeScript OpenCode): |
| 1. Bundled snapshot (ships with the package — offline-first) |
| 2. Disk cache (~/.hermes/models_dev_cache.json) |
| 3. Network fetch (https://models.dev/api.json) |
| 4. Background refresh every 60 minutes |
| |
| Other modules should import the dataclasses and query functions from here |
| rather than parsing the raw JSON themselves. |
| """ |
|
|
| import json |
| import logging |
| import time |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| from utils import atomic_json_write |
|
|
| import requests |
|
|
| logger = logging.getLogger(__name__) |
|
|
| MODELS_DEV_URL = "https://models.dev/api.json" |
| _MODELS_DEV_CACHE_TTL = 3600 |
|
|
| |
| _models_dev_cache: Dict[str, Any] = {} |
| _models_dev_cache_time: float = 0 |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class ModelInfo: |
| """Full metadata for a single model from models.dev.""" |
|
|
| id: str |
| name: str |
| family: str |
| provider_id: str |
|
|
| |
| reasoning: bool = False |
| tool_call: bool = False |
| attachment: bool = False |
| temperature: bool = False |
| structured_output: bool = False |
| open_weights: bool = False |
|
|
| |
| input_modalities: Tuple[str, ...] = () |
| output_modalities: Tuple[str, ...] = () |
|
|
| |
| context_window: int = 0 |
| max_output: int = 0 |
| max_input: Optional[int] = None |
|
|
| |
| cost_input: float = 0.0 |
| cost_output: float = 0.0 |
| cost_cache_read: Optional[float] = None |
| cost_cache_write: Optional[float] = None |
|
|
| |
| knowledge_cutoff: str = "" |
| release_date: str = "" |
| status: str = "" |
| interleaved: Any = False |
|
|
| def has_cost_data(self) -> bool: |
| return self.cost_input > 0 or self.cost_output > 0 |
|
|
| def supports_vision(self) -> bool: |
| return self.attachment or "image" in self.input_modalities |
|
|
| def supports_pdf(self) -> bool: |
| return "pdf" in self.input_modalities |
|
|
| def supports_audio_input(self) -> bool: |
| return "audio" in self.input_modalities |
|
|
| def format_cost(self) -> str: |
| """Human-readable cost string, e.g. '$3.00/M in, $15.00/M out'.""" |
| if not self.has_cost_data(): |
| return "unknown" |
| parts = [f"${self.cost_input:.2f}/M in", f"${self.cost_output:.2f}/M out"] |
| if self.cost_cache_read is not None: |
| parts.append(f"cache read ${self.cost_cache_read:.2f}/M") |
| return ", ".join(parts) |
|
|
| def format_capabilities(self) -> str: |
| """Human-readable capabilities, e.g. 'reasoning, tools, vision, PDF'.""" |
| caps = [] |
| if self.reasoning: |
| caps.append("reasoning") |
| if self.tool_call: |
| caps.append("tools") |
| if self.supports_vision(): |
| caps.append("vision") |
| if self.supports_pdf(): |
| caps.append("PDF") |
| if self.supports_audio_input(): |
| caps.append("audio") |
| if self.structured_output: |
| caps.append("structured output") |
| if self.open_weights: |
| caps.append("open weights") |
| return ", ".join(caps) if caps else "basic" |
|
|
|
|
| @dataclass |
| class ProviderInfo: |
| """Full metadata for a provider from models.dev.""" |
|
|
| id: str |
| name: str |
| env: Tuple[str, ...] |
| api: str |
| doc: str = "" |
| model_count: int = 0 |
|
|
|
|
| |
| |
| |
|
|
| |
| PROVIDER_TO_MODELS_DEV: Dict[str, str] = { |
| "openrouter": "openrouter", |
| "anthropic": "anthropic", |
| "openai": "openai", |
| "openai-codex": "openai", |
| "zai": "zai", |
| "kimi-coding": "kimi-for-coding", |
| "stepfun": "stepfun", |
| "kimi-coding-cn": "kimi-for-coding", |
| "minimax": "minimax", |
| "minimax-cn": "minimax-cn", |
| "deepseek": "deepseek", |
| "alibaba": "alibaba", |
| "qwen-oauth": "alibaba", |
| "copilot": "github-copilot", |
| "ai-gateway": "vercel", |
| "opencode-zen": "opencode", |
| "opencode-go": "opencode-go", |
| "kilocode": "kilo", |
| "fireworks": "fireworks-ai", |
| "huggingface": "huggingface", |
| "gemini": "google", |
| "google": "google", |
| "xai": "xai", |
| "xiaomi": "xiaomi", |
| "nvidia": "nvidia", |
| "groq": "groq", |
| "mistral": "mistral", |
| "togetherai": "togetherai", |
| "perplexity": "perplexity", |
| "cohere": "cohere", |
| "ollama-cloud": "ollama-cloud", |
| } |
|
|
| |
| _MODELS_DEV_TO_PROVIDER: Optional[Dict[str, str]] = None |
|
|
|
|
|
|
| def _get_cache_path() -> Path: |
| """Return path to disk cache file.""" |
| from hermes_constants import get_hermes_home |
| return get_hermes_home() / "models_dev_cache.json" |
|
|
|
|
| def _load_disk_cache() -> Dict[str, Any]: |
| """Load models.dev data from disk cache.""" |
| try: |
| cache_path = _get_cache_path() |
| if cache_path.exists(): |
| with open(cache_path, encoding="utf-8") as f: |
| return json.load(f) |
| except Exception as e: |
| logger.debug("Failed to load models.dev disk cache: %s", e) |
| return {} |
|
|
|
|
| def _save_disk_cache(data: Dict[str, Any]) -> None: |
| """Save models.dev data to disk cache atomically.""" |
| try: |
| cache_path = _get_cache_path() |
| atomic_json_write(cache_path, data, indent=None, separators=(",", ":")) |
| except Exception as e: |
| logger.debug("Failed to save models.dev disk cache: %s", e) |
|
|
|
|
| def fetch_models_dev(force_refresh: bool = False) -> Dict[str, Any]: |
| """Fetch models.dev registry. In-memory cache (1hr) + disk fallback. |
| |
| Returns the full registry dict keyed by provider ID, or empty dict on failure. |
| """ |
| global _models_dev_cache, _models_dev_cache_time |
|
|
| |
| if ( |
| not force_refresh |
| and _models_dev_cache |
| and (time.time() - _models_dev_cache_time) < _MODELS_DEV_CACHE_TTL |
| ): |
| return _models_dev_cache |
|
|
| |
| try: |
| response = requests.get(MODELS_DEV_URL, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
| if isinstance(data, dict) and data: |
| _models_dev_cache = data |
| _models_dev_cache_time = time.time() |
| _save_disk_cache(data) |
| logger.debug( |
| "Fetched models.dev registry: %d providers, %d total models", |
| len(data), |
| sum(len(p.get("models", {})) for p in data.values() if isinstance(p, dict)), |
| ) |
| return data |
| except Exception as e: |
| logger.debug("Failed to fetch models.dev: %s", e) |
|
|
| |
| |
| if not _models_dev_cache: |
| _models_dev_cache = _load_disk_cache() |
| if _models_dev_cache: |
| _models_dev_cache_time = time.time() - _MODELS_DEV_CACHE_TTL + 300 |
| logger.debug("Loaded models.dev from disk cache (%d providers)", len(_models_dev_cache)) |
|
|
| return _models_dev_cache |
|
|
|
|
| def lookup_models_dev_context(provider: str, model: str) -> Optional[int]: |
| """Look up context_length for a provider+model combo in models.dev. |
| |
| Returns the context window in tokens, or None if not found. |
| Handles case-insensitive matching and filters out context=0 entries. |
| """ |
| mdev_provider_id = PROVIDER_TO_MODELS_DEV.get(provider) |
| if not mdev_provider_id: |
| return None |
|
|
| data = fetch_models_dev() |
| provider_data = data.get(mdev_provider_id) |
| if not isinstance(provider_data, dict): |
| return None |
|
|
| models = provider_data.get("models", {}) |
| if not isinstance(models, dict): |
| return None |
|
|
| |
| entry = models.get(model) |
| if entry: |
| ctx = _extract_context(entry) |
| if ctx: |
| return ctx |
|
|
| |
| model_lower = model.lower() |
| for mid, mdata in models.items(): |
| if mid.lower() == model_lower: |
| ctx = _extract_context(mdata) |
| if ctx: |
| return ctx |
|
|
| return None |
|
|
|
|
| def _extract_context(entry: Dict[str, Any]) -> Optional[int]: |
| """Extract context_length from a models.dev model entry. |
| |
| Returns None for invalid/zero values (some audio/image models have context=0). |
| """ |
| if not isinstance(entry, dict): |
| return None |
| limit = entry.get("limit") |
| if not isinstance(limit, dict): |
| return None |
| ctx = limit.get("context") |
| if isinstance(ctx, (int, float)) and ctx > 0: |
| return int(ctx) |
| return None |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass |
| class ModelCapabilities: |
| """Structured capability metadata for a model from models.dev.""" |
|
|
| supports_tools: bool = True |
| supports_vision: bool = False |
| supports_reasoning: bool = False |
| context_window: int = 200000 |
| max_output_tokens: int = 8192 |
| model_family: str = "" |
|
|
|
|
| def _get_provider_models(provider: str) -> Optional[Dict[str, Any]]: |
| """Resolve a Hermes provider ID to its models dict from models.dev. |
| |
| Returns the models dict or None if the provider is unknown or has no data. |
| """ |
| mdev_provider_id = PROVIDER_TO_MODELS_DEV.get(provider) |
| if not mdev_provider_id: |
| return None |
|
|
| data = fetch_models_dev() |
| provider_data = data.get(mdev_provider_id) |
| if not isinstance(provider_data, dict): |
| return None |
|
|
| models = provider_data.get("models", {}) |
| if not isinstance(models, dict): |
| return None |
|
|
| return models |
|
|
|
|
| def _find_model_entry(models: Dict[str, Any], model: str) -> Optional[Dict[str, Any]]: |
| """Find a model entry by exact match, then case-insensitive fallback.""" |
| |
| entry = models.get(model) |
| if isinstance(entry, dict): |
| return entry |
|
|
| |
| model_lower = model.lower() |
| for mid, mdata in models.items(): |
| if mid.lower() == model_lower and isinstance(mdata, dict): |
| return mdata |
|
|
| return None |
|
|
|
|
| def get_model_capabilities(provider: str, model: str) -> Optional[ModelCapabilities]: |
| """Look up full capability metadata from models.dev cache. |
| |
| Uses the existing fetch_models_dev() and PROVIDER_TO_MODELS_DEV mapping. |
| Returns None if model not found. |
| |
| Extracts from model entry fields: |
| - reasoning (bool) → supports_reasoning |
| - tool_call (bool) → supports_tools |
| - attachment (bool) → supports_vision |
| - limit.context (int) → context_window |
| - limit.output (int) → max_output_tokens |
| - family (str) → model_family |
| """ |
| models = _get_provider_models(provider) |
| if models is None: |
| return None |
|
|
| entry = _find_model_entry(models, model) |
| if entry is None: |
| return None |
|
|
| |
| supports_tools = bool(entry.get("tool_call", False)) |
| |
| |
| input_mods = entry.get("modalities", {}) |
| if isinstance(input_mods, dict): |
| input_mods = input_mods.get("input", []) |
| else: |
| input_mods = [] |
| supports_vision = bool(entry.get("attachment", False)) or "image" in input_mods |
| supports_reasoning = bool(entry.get("reasoning", False)) |
|
|
| |
| limit = entry.get("limit", {}) |
| if not isinstance(limit, dict): |
| limit = {} |
|
|
| ctx = limit.get("context") |
| context_window = int(ctx) if isinstance(ctx, (int, float)) and ctx > 0 else 200000 |
|
|
| out = limit.get("output") |
| max_output_tokens = int(out) if isinstance(out, (int, float)) and out > 0 else 8192 |
|
|
| model_family = entry.get("family", "") or "" |
|
|
| return ModelCapabilities( |
| supports_tools=supports_tools, |
| supports_vision=supports_vision, |
| supports_reasoning=supports_reasoning, |
| context_window=context_window, |
| max_output_tokens=max_output_tokens, |
| model_family=model_family, |
| ) |
|
|
|
|
| def list_provider_models(provider: str) -> List[str]: |
| """Return all model IDs for a provider from models.dev. |
| |
| Returns an empty list if the provider is unknown or has no data. |
| """ |
| from hermes_cli.models import normalize_provider |
| provider = normalize_provider(provider) or provider |
| |
| models = _get_provider_models(provider) |
| if models is None: |
| return [] |
| return [ |
| mid for mid in models.keys() |
| if not _should_hide_from_provider_catalog(provider, mid) |
| ] |
|
|
|
|
| |
| |
| import re |
| _NOISE_PATTERNS: re.Pattern = re.compile( |
| r"-tts\b|embedding|live-|-(preview|exp)-\d{2,4}[-_]|" |
| r"-image\b|-image-preview\b|-customtools\b", |
| re.IGNORECASE, |
| ) |
|
|
| |
| |
| |
| |
| _GOOGLE_HIDDEN_MODELS = frozenset({ |
| |
| |
| "gemma-4-31b-it", |
| "gemma-4-26b-it", |
| "gemma-4-26b-a4b-it", |
| "gemma-3-1b", |
| "gemma-3-1b-it", |
| "gemma-3-2b", |
| "gemma-3-2b-it", |
| "gemma-3-4b", |
| "gemma-3-4b-it", |
| "gemma-3-12b", |
| "gemma-3-12b-it", |
| "gemma-3-27b", |
| "gemma-3-27b-it", |
| |
| |
| "gemini-1.5-flash", |
| "gemini-1.5-pro", |
| "gemini-1.5-flash-8b", |
| "gemini-2.0-flash", |
| "gemini-2.0-flash-lite", |
| }) |
|
|
|
|
| def _should_hide_from_provider_catalog(provider: str, model_id: str) -> bool: |
| provider_lower = (provider or "").strip().lower() |
| model_lower = (model_id or "").strip().lower() |
| if provider_lower in {"gemini", "google"} and model_lower in _GOOGLE_HIDDEN_MODELS: |
| return True |
| return False |
|
|
|
|
| def list_agentic_models(provider: str) -> List[str]: |
| """Return model IDs suitable for agentic use from models.dev. |
| |
| Filters for tool_call=True and excludes noise (TTS, embedding, |
| dated preview snapshots, live/streaming, image-only models). |
| Returns an empty list on any failure. |
| """ |
| models = _get_provider_models(provider) |
| if models is None: |
| return [] |
|
|
| result = [] |
| for mid, entry in models.items(): |
| if not isinstance(entry, dict): |
| continue |
| if _should_hide_from_provider_catalog(provider, mid): |
| continue |
| if not entry.get("tool_call", False): |
| continue |
| if _NOISE_PATTERNS.search(mid): |
| continue |
| result.append(mid) |
| return result |
|
|
|
|
|
|
| |
| |
| |
|
|
| def _parse_model_info(model_id: str, raw: Dict[str, Any], provider_id: str) -> ModelInfo: |
| """Convert a raw models.dev model entry dict into a ModelInfo dataclass.""" |
| limit = raw.get("limit") or {} |
| if not isinstance(limit, dict): |
| limit = {} |
|
|
| cost = raw.get("cost") or {} |
| if not isinstance(cost, dict): |
| cost = {} |
|
|
| modalities = raw.get("modalities") or {} |
| if not isinstance(modalities, dict): |
| modalities = {} |
|
|
| input_mods = modalities.get("input") or [] |
| output_mods = modalities.get("output") or [] |
|
|
| ctx = limit.get("context") |
| ctx_int = int(ctx) if isinstance(ctx, (int, float)) and ctx > 0 else 0 |
| out = limit.get("output") |
| out_int = int(out) if isinstance(out, (int, float)) and out > 0 else 0 |
| inp = limit.get("input") |
| inp_int = int(inp) if isinstance(inp, (int, float)) and inp > 0 else None |
|
|
| return ModelInfo( |
| id=model_id, |
| name=raw.get("name", "") or model_id, |
| family=raw.get("family", "") or "", |
| provider_id=provider_id, |
| reasoning=bool(raw.get("reasoning", False)), |
| tool_call=bool(raw.get("tool_call", False)), |
| attachment=bool(raw.get("attachment", False)), |
| temperature=bool(raw.get("temperature", False)), |
| structured_output=bool(raw.get("structured_output", False)), |
| open_weights=bool(raw.get("open_weights", False)), |
| input_modalities=tuple(input_mods) if isinstance(input_mods, list) else (), |
| output_modalities=tuple(output_mods) if isinstance(output_mods, list) else (), |
| context_window=ctx_int, |
| max_output=out_int, |
| max_input=inp_int, |
| cost_input=float(cost.get("input", 0) or 0), |
| cost_output=float(cost.get("output", 0) or 0), |
| cost_cache_read=float(cost["cache_read"]) if "cache_read" in cost and cost["cache_read"] is not None else None, |
| cost_cache_write=float(cost["cache_write"]) if "cache_write" in cost and cost["cache_write"] is not None else None, |
| knowledge_cutoff=raw.get("knowledge", "") or "", |
| release_date=raw.get("release_date", "") or "", |
| status=raw.get("status", "") or "", |
| interleaved=raw.get("interleaved", False), |
| ) |
|
|
|
|
| def _parse_provider_info(provider_id: str, raw: Dict[str, Any]) -> ProviderInfo: |
| """Convert a raw models.dev provider entry dict into a ProviderInfo.""" |
| env = raw.get("env") or [] |
| models = raw.get("models") or {} |
| return ProviderInfo( |
| id=provider_id, |
| name=raw.get("name", "") or provider_id, |
| env=tuple(env) if isinstance(env, list) else (), |
| api=raw.get("api", "") or "", |
| doc=raw.get("doc", "") or "", |
| model_count=len(models) if isinstance(models, dict) else 0, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def get_provider_info(provider_id: str) -> Optional[ProviderInfo]: |
| """Get full provider metadata from models.dev. |
| |
| Accepts either a Hermes provider ID (e.g. "kilocode") or a models.dev |
| ID (e.g. "kilo"). Returns None if the provider is not in the catalog. |
| """ |
| |
| mdev_id = PROVIDER_TO_MODELS_DEV.get(provider_id, provider_id) |
|
|
| data = fetch_models_dev() |
| raw = data.get(mdev_id) |
| if not isinstance(raw, dict): |
| return None |
|
|
| return _parse_provider_info(mdev_id, raw) |
|
|
|
|
| |
| |
| |
|
|
| def get_model_info( |
| provider_id: str, model_id: str |
| ) -> Optional[ModelInfo]: |
| """Get full model metadata from models.dev. |
| |
| Accepts Hermes or models.dev provider ID. Tries exact match then |
| case-insensitive fallback. Returns None if not found. |
| """ |
| mdev_id = PROVIDER_TO_MODELS_DEV.get(provider_id, provider_id) |
|
|
| data = fetch_models_dev() |
| pdata = data.get(mdev_id) |
| if not isinstance(pdata, dict): |
| return None |
|
|
| models = pdata.get("models", {}) |
| if not isinstance(models, dict): |
| return None |
|
|
| |
| raw = models.get(model_id) |
| if isinstance(raw, dict): |
| return _parse_model_info(model_id, raw, mdev_id) |
|
|
| |
| model_lower = model_id.lower() |
| for mid, mdata in models.items(): |
| if mid.lower() == model_lower and isinstance(mdata, dict): |
| return _parse_model_info(mid, mdata, mdev_id) |
|
|
| return None |
|
|