| """Shared model-switching logic for CLI and gateway /model commands. |
| |
| Both the CLI (cli.py) and gateway (gateway/run.py) /model handlers |
| share the same core pipeline: |
| |
| parse flags -> alias resolution -> provider resolution -> |
| credential resolution -> normalize model name -> |
| metadata lookup -> build result |
| |
| This module ties together the foundation layers: |
| |
| - ``agent.models_dev`` -- models.dev catalog, ModelInfo, ProviderInfo |
| - ``hermes_cli.providers`` -- canonical provider identity + overlays |
| - ``hermes_cli.model_normalize`` -- per-provider name formatting |
| |
| Provider switching uses the ``--provider`` flag exclusively. |
| No colon-based ``provider:model`` syntax — colons are reserved for |
| OpenRouter variant suffixes (``:free``, ``:extended``, ``:fast``). |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| import re |
| from dataclasses import dataclass |
| from typing import List, NamedTuple, Optional |
|
|
| from hermes_cli.providers import ( |
| custom_provider_slug, |
| determine_api_mode, |
| get_label, |
| is_aggregator, |
| resolve_provider_full, |
| ) |
| from hermes_cli.model_normalize import ( |
| normalize_model_for_provider, |
| ) |
| from agent.models_dev import ( |
| ModelCapabilities, |
| ModelInfo, |
| get_model_capabilities, |
| get_model_info, |
| list_provider_models, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| _HERMES_MODEL_WARNING = ( |
| "Nous Research Hermes 3 & 4 models are NOT agentic and are not designed " |
| "for use with Hermes Agent. They lack the tool-calling capabilities " |
| "required for agent workflows. Consider using an agentic model instead " |
| "(Claude, GPT, Gemini, DeepSeek, etc.)." |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| _NOUS_HERMES_NON_AGENTIC_RE = re.compile( |
| r"(?:^|[/:])hermes[-_ ]?[34](?:[-_.:]|$)", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def is_nous_hermes_non_agentic(model_name: str) -> bool: |
| """Return True if *model_name* is a real Nous Hermes 3/4 chat model. |
| |
| Used to decide whether to surface the non-agentic warning at startup. |
| Callers in :mod:`cli.py` and here should go through this single helper |
| so the two sites don't drift. |
| """ |
| if not model_name: |
| return False |
| return bool(_NOUS_HERMES_NON_AGENTIC_RE.search(model_name)) |
|
|
|
|
| def _check_hermes_model_warning(model_name: str) -> str: |
| """Return a warning string if *model_name* is a Nous Hermes 3/4 chat model.""" |
| if is_nous_hermes_non_agentic(model_name): |
| return _HERMES_MODEL_WARNING |
| return "" |
|
|
|
|
| |
| |
| |
| |
|
|
| class ModelIdentity(NamedTuple): |
| """Vendor slug and family prefix used for catalog resolution.""" |
| vendor: str |
| family: str |
|
|
|
|
| MODEL_ALIASES: dict[str, ModelIdentity] = { |
| |
| "sonnet": ModelIdentity("anthropic", "claude-sonnet"), |
| "opus": ModelIdentity("anthropic", "claude-opus"), |
| "haiku": ModelIdentity("anthropic", "claude-haiku"), |
| "claude": ModelIdentity("anthropic", "claude"), |
|
|
| |
| "gpt5": ModelIdentity("openai", "gpt-5"), |
| "gpt": ModelIdentity("openai", "gpt"), |
| "codex": ModelIdentity("openai", "codex"), |
| "o3": ModelIdentity("openai", "o3"), |
| "o4": ModelIdentity("openai", "o4"), |
|
|
| |
| "gemini": ModelIdentity("google", "gemini"), |
|
|
| |
| "deepseek": ModelIdentity("deepseek", "deepseek-chat"), |
|
|
| |
| "grok": ModelIdentity("x-ai", "grok"), |
|
|
| |
| "llama": ModelIdentity("meta-llama", "llama"), |
|
|
| |
| "qwen": ModelIdentity("qwen", "qwen"), |
|
|
| |
| "minimax": ModelIdentity("minimax", "minimax"), |
|
|
| |
| "nemotron": ModelIdentity("nvidia", "nemotron"), |
|
|
| |
| "kimi": ModelIdentity("moonshotai", "kimi"), |
|
|
| |
| "glm": ModelIdentity("z-ai", "glm"), |
|
|
| |
| "step": ModelIdentity("stepfun", "step"), |
|
|
| |
| "mimo": ModelIdentity("xiaomi", "mimo"), |
|
|
| |
| "trinity": ModelIdentity("arcee-ai", "trinity"), |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| class DirectAlias(NamedTuple): |
| """Exact model mapping that bypasses catalog resolution.""" |
| model: str |
| provider: str |
| base_url: str |
|
|
|
|
| |
| _BUILTIN_DIRECT_ALIASES: dict[str, DirectAlias] = {} |
|
|
| |
| DIRECT_ALIASES: dict[str, DirectAlias] = {} |
|
|
|
|
| def _load_direct_aliases() -> dict[str, DirectAlias]: |
| """Load direct aliases from config.yaml ``model_aliases:`` section. |
| |
| Config format:: |
| |
| model_aliases: |
| qwen: |
| model: "qwen3.5:397b" |
| provider: custom |
| base_url: "https://ollama.com/v1" |
| minimax: |
| model: "minimax-m2.7" |
| provider: custom |
| base_url: "https://ollama.com/v1" |
| """ |
| merged = dict(_BUILTIN_DIRECT_ALIASES) |
| try: |
| from hermes_cli.config import load_config |
| cfg = load_config() |
| user_aliases = cfg.get("model_aliases") |
| if isinstance(user_aliases, dict): |
| for name, entry in user_aliases.items(): |
| if not isinstance(entry, dict): |
| continue |
| model = entry.get("model", "") |
| provider = entry.get("provider", "custom") |
| base_url = entry.get("base_url", "") |
| if model: |
| merged[name.strip().lower()] = DirectAlias( |
| model=model, provider=provider, base_url=base_url, |
| ) |
| except Exception: |
| pass |
| return merged |
|
|
|
|
| def _ensure_direct_aliases() -> None: |
| """Lazy-load direct aliases on first use.""" |
| global DIRECT_ALIASES |
| if not DIRECT_ALIASES: |
| DIRECT_ALIASES = _load_direct_aliases() |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class ModelSwitchResult: |
| """Result of a model switch attempt.""" |
|
|
| success: bool |
| new_model: str = "" |
| target_provider: str = "" |
| provider_changed: bool = False |
| api_key: str = "" |
| base_url: str = "" |
| api_mode: str = "" |
| error_message: str = "" |
| warning_message: str = "" |
| provider_label: str = "" |
| resolved_via_alias: str = "" |
| capabilities: Optional[ModelCapabilities] = None |
| model_info: Optional[ModelInfo] = None |
| is_global: bool = False |
|
|
|
|
| @dataclass |
| class CustomAutoResult: |
| """Result of switching to bare 'custom' provider with auto-detect.""" |
|
|
| success: bool |
| model: str = "" |
| base_url: str = "" |
| api_key: str = "" |
| error_message: str = "" |
|
|
|
|
| |
| |
| |
|
|
| def parse_model_flags(raw_args: str) -> tuple[str, str, bool]: |
| """Parse --provider and --global flags from /model command args. |
| |
| Returns (model_input, explicit_provider, is_global). |
| |
| Examples:: |
| |
| "sonnet" -> ("sonnet", "", False) |
| "sonnet --global" -> ("sonnet", "", True) |
| "sonnet --provider anthropic" -> ("sonnet", "anthropic", False) |
| "--provider my-ollama" -> ("", "my-ollama", False) |
| "sonnet --provider anthropic --global" -> ("sonnet", "anthropic", True) |
| """ |
| is_global = False |
| explicit_provider = "" |
|
|
| |
| |
| import re as _re |
| raw_args = _re.sub(r'[\u2012\u2013\u2014\u2015](provider|global)', r'--\1', raw_args) |
|
|
| |
| if "--global" in raw_args: |
| is_global = True |
| raw_args = raw_args.replace("--global", "").strip() |
|
|
| |
| parts = raw_args.split() |
| i = 0 |
| filtered: list[str] = [] |
| while i < len(parts): |
| if parts[i] == "--provider" and i + 1 < len(parts): |
| explicit_provider = parts[i + 1] |
| i += 2 |
| else: |
| filtered.append(parts[i]) |
| i += 1 |
|
|
| model_input = " ".join(filtered).strip() |
| return (model_input, explicit_provider, is_global) |
|
|
|
|
| |
| |
| |
|
|
| def _model_sort_key(model_id: str, prefix: str) -> tuple: |
| """Sort key for model version preference. |
| |
| Extracts version numbers after the family prefix and returns a sort key |
| that prefers higher versions. Suffix tokens (``pro``, ``omni``, etc.) |
| are used as tiebreakers, with common quality indicators ranked. |
| |
| Examples (with prefix ``"mimo"``):: |
| |
| mimo-v2.5-pro → (-2.5, 0, 'pro') # highest version wins |
| mimo-v2.5 → (-2.5, 1, '') # no suffix = lower than pro |
| mimo-v2-pro → (-2.0, 0, 'pro') |
| mimo-v2-omni → (-2.0, 1, 'omni') |
| mimo-v2-flash → (-2.0, 1, 'flash') |
| """ |
| |
| rest = model_id[len(prefix):] |
| if rest.startswith("/"): |
| rest = rest[1:] |
| rest = rest.lstrip("-").strip() |
|
|
| |
| |
| |
| |
| nums: list[float] = [] |
| suffix_buf = "" |
| state = "start" |
| num_buf = "" |
|
|
| for ch in rest: |
| if state == "start": |
| if ch in "vV": |
| state = "in_version" |
| elif ch.isdigit(): |
| state = "in_version" |
| num_buf += ch |
| elif ch in "-_.": |
| pass |
| else: |
| state = "in_suffix" |
| suffix_buf += ch |
| elif state == "in_version": |
| if ch.isdigit(): |
| num_buf += ch |
| elif ch == ".": |
| if "." in num_buf: |
| |
| try: |
| nums.append(float(num_buf.rstrip("."))) |
| except ValueError: |
| pass |
| num_buf = "" |
| else: |
| num_buf += ch |
| elif ch in "-_.": |
| if num_buf: |
| try: |
| nums.append(float(num_buf.rstrip("."))) |
| except ValueError: |
| pass |
| num_buf = "" |
| state = "between" |
| else: |
| if num_buf: |
| try: |
| nums.append(float(num_buf.rstrip("."))) |
| except ValueError: |
| pass |
| num_buf = "" |
| state = "in_suffix" |
| suffix_buf += ch |
| elif state == "between": |
| if ch.isdigit(): |
| state = "in_version" |
| num_buf = ch |
| elif ch in "vV": |
| state = "in_version" |
| elif ch in "-_.": |
| pass |
| else: |
| state = "in_suffix" |
| suffix_buf += ch |
| elif state == "in_suffix": |
| suffix_buf += ch |
|
|
| |
| if num_buf and state == "in_version": |
| try: |
| nums.append(float(num_buf.rstrip("."))) |
| except ValueError: |
| pass |
|
|
| suffix = suffix_buf.lower().strip("-_.") |
| suffix = suffix.strip() |
|
|
| |
| version_key = tuple(-n for n in nums) |
|
|
| |
| |
| _SUFFIX_RANK = {"pro": 0, "max": 0, "plus": 0, "turbo": 0} |
| suffix_rank = _SUFFIX_RANK.get(suffix, 1) |
|
|
| return version_key + (suffix_rank, suffix) |
|
|
|
|
| def resolve_alias( |
| raw_input: str, |
| current_provider: str, |
| ) -> Optional[tuple[str, str, str]]: |
| """Resolve a short alias against the current provider's catalog. |
| |
| Looks up *raw_input* in :data:`MODEL_ALIASES`, then searches the |
| current provider's models.dev catalog for the model whose ID starts |
| with ``vendor/family`` (or just ``family`` for non-aggregator |
| providers) and has the **highest version**. |
| |
| Returns: |
| ``(provider, resolved_model_id, alias_name)`` if a match is |
| found on the current provider, or ``None`` if the alias doesn't |
| exist or no matching model is available. |
| """ |
| key = raw_input.strip().lower() |
|
|
| |
| _ensure_direct_aliases() |
| direct = DIRECT_ALIASES.get(key) |
| if direct is not None: |
| return (direct.provider, direct.model, key) |
|
|
| |
| |
| |
| for alias_name, da in DIRECT_ALIASES.items(): |
| if da.model.lower() == key: |
| return (da.provider, da.model, alias_name) |
|
|
| identity = MODEL_ALIASES.get(key) |
| if identity is None: |
| return None |
|
|
| vendor, family = identity |
|
|
| |
| |
| |
| catalog = list_provider_models(current_provider) |
| try: |
| from hermes_cli.models import _PROVIDER_MODELS |
| static = _PROVIDER_MODELS.get(current_provider, []) |
| if static: |
| seen = {m.lower() for m in catalog} |
| for m in static: |
| if m.lower() not in seen: |
| catalog.append(m) |
| except Exception: |
| pass |
|
|
| |
| aggregator = is_aggregator(current_provider) |
|
|
| if aggregator: |
| prefix = f"{vendor}/{family}".lower() |
| matches = [ |
| mid for mid in catalog |
| if mid.lower().startswith(prefix) |
| ] |
| else: |
| family_lower = family.lower() |
| matches = [ |
| mid for mid in catalog |
| if mid.lower().startswith(family_lower) |
| ] |
|
|
| if not matches: |
| return None |
|
|
| |
| prefix_for_sort = f"{vendor}/{family}" if aggregator else family |
| matches.sort(key=lambda m: _model_sort_key(m, prefix_for_sort)) |
| return (current_provider, matches[0], key) |
|
|
|
|
| def get_authenticated_provider_slugs( |
| current_provider: str = "", |
| user_providers: dict = None, |
| custom_providers: list | None = None, |
| ) -> list[str]: |
| """Return slugs of providers that have credentials. |
| |
| Uses ``list_authenticated_providers()`` which is backed by the models.dev |
| in-memory cache (1 hr TTL) — no extra network cost. |
| """ |
| try: |
| providers = list_authenticated_providers( |
| current_provider=current_provider, |
| user_providers=user_providers, |
| custom_providers=custom_providers, |
| max_models=0, |
| ) |
| return [p["slug"] for p in providers] |
| except Exception: |
| return [] |
|
|
|
|
| def _resolve_alias_fallback( |
| raw_input: str, |
| authenticated_providers: list[str] = (), |
| ) -> Optional[tuple[str, str, str]]: |
| """Try to resolve an alias on the user's authenticated providers. |
| |
| Falls back to ``("openrouter", "nous")`` only when no authenticated |
| providers are supplied (backwards compat for non-interactive callers). |
| """ |
| providers = authenticated_providers or ("openrouter", "nous") |
| for provider in providers: |
| result = resolve_alias(raw_input, provider) |
| if result is not None: |
| return result |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def switch_model( |
| raw_input: str, |
| current_provider: str, |
| current_model: str, |
| current_base_url: str = "", |
| current_api_key: str = "", |
| is_global: bool = False, |
| explicit_provider: str = "", |
| user_providers: dict = None, |
| custom_providers: list | None = None, |
| ) -> ModelSwitchResult: |
| """Core model-switching pipeline shared between CLI and gateway. |
| |
| Resolution chain: |
| |
| If --provider given: |
| a. Resolve provider via resolve_provider_full() |
| b. Resolve credentials |
| c. If model given, resolve alias on target provider or use as-is |
| d. If no model, auto-detect from endpoint |
| |
| If no --provider: |
| a. Try alias resolution on current provider |
| b. If alias exists but not on current provider -> fallback |
| c. On aggregator, try vendor/model slug conversion |
| d. Aggregator catalog search |
| e. detect_provider_for_model() as last resort |
| f. Resolve credentials |
| g. Normalize model name for target provider |
| |
| Finally: |
| h. Get full model metadata from models.dev |
| i. Build result |
| |
| Args: |
| raw_input: The model name (after flag parsing). |
| current_provider: The currently active provider. |
| current_model: The currently active model name. |
| current_base_url: The currently active base URL. |
| current_api_key: The currently active API key. |
| is_global: Whether to persist the switch. |
| explicit_provider: From --provider flag (empty = no explicit provider). |
| user_providers: The ``providers:`` dict from config.yaml (for user endpoints). |
| custom_providers: The ``custom_providers:`` list from config.yaml. |
| |
| Returns: |
| ModelSwitchResult with all information the caller needs. |
| """ |
| from hermes_cli.models import ( |
| copilot_model_api_mode, |
| detect_provider_for_model, |
| validate_requested_model, |
| opencode_model_api_mode, |
| ) |
| from hermes_cli.runtime_provider import resolve_runtime_provider |
|
|
| resolved_alias = "" |
| new_model = raw_input.strip() |
| target_provider = current_provider |
|
|
| |
| |
| |
| if explicit_provider: |
| |
| pdef = resolve_provider_full( |
| explicit_provider, |
| user_providers, |
| custom_providers, |
| ) |
| if pdef is None: |
| _switch_err = ( |
| f"Unknown provider '{explicit_provider}'. " |
| f"Check 'hermes model' for available providers, or define it " |
| f"in config.yaml under 'providers:'." |
| ) |
| |
| try: |
| from hermes_cli.config import validate_config_structure |
| _cfg_issues = validate_config_structure() |
| if _cfg_issues: |
| _switch_err += "\n\nRun 'hermes doctor' — config issues detected:" |
| for _ci in _cfg_issues[:3]: |
| _switch_err += f"\n • {_ci.message}" |
| except Exception: |
| pass |
| return ModelSwitchResult( |
| success=False, |
| is_global=is_global, |
| error_message=_switch_err, |
| ) |
|
|
| target_provider = pdef.id |
|
|
| |
| if not new_model: |
| if pdef.base_url: |
| from hermes_cli.runtime_provider import _auto_detect_local_model |
| detected = _auto_detect_local_model(pdef.base_url) |
| if detected: |
| new_model = detected |
| else: |
| return ModelSwitchResult( |
| success=False, |
| target_provider=target_provider, |
| provider_label=pdef.name, |
| is_global=is_global, |
| error_message=( |
| f"No model detected on {pdef.name} ({pdef.base_url}). " |
| f"Specify the model explicitly: /model <model-name> --provider {explicit_provider}" |
| ), |
| ) |
| else: |
| return ModelSwitchResult( |
| success=False, |
| target_provider=target_provider, |
| provider_label=pdef.name, |
| is_global=is_global, |
| error_message=( |
| f"Provider '{pdef.name}' has no base URL configured. " |
| f"Specify a model: /model <model-name> --provider {explicit_provider}" |
| ), |
| ) |
|
|
| |
| alias_result = resolve_alias(new_model, target_provider) |
| if alias_result is not None: |
| _, new_model, resolved_alias = alias_result |
|
|
| |
| |
| |
| else: |
| |
| alias_result = resolve_alias(raw_input, current_provider) |
|
|
| if alias_result is not None: |
| target_provider, new_model, resolved_alias = alias_result |
| logger.debug( |
| "Alias '%s' resolved to %s on %s", |
| resolved_alias, new_model, target_provider, |
| ) |
| else: |
| |
| key = raw_input.strip().lower() |
| if key in MODEL_ALIASES: |
| authed = get_authenticated_provider_slugs( |
| current_provider=current_provider, |
| user_providers=user_providers, |
| custom_providers=custom_providers, |
| ) |
| fallback_result = _resolve_alias_fallback(raw_input, authed) |
| if fallback_result is not None: |
| target_provider, new_model, resolved_alias = fallback_result |
| logger.debug( |
| "Alias '%s' resolved via fallback to %s on %s", |
| resolved_alias, new_model, target_provider, |
| ) |
| else: |
| identity = MODEL_ALIASES[key] |
| return ModelSwitchResult( |
| success=False, |
| is_global=is_global, |
| error_message=( |
| f"Alias '{key}' maps to {identity.vendor}/{identity.family} " |
| f"but no matching model was found in any provider catalog. " |
| f"Try specifying the full model name." |
| ), |
| ) |
| else: |
| |
| |
| |
| |
| colon_pos = raw_input.find(":") |
| if colon_pos > 0 and "/" not in raw_input and is_aggregator(current_provider): |
| left = raw_input[:colon_pos].strip().lower() |
| right = raw_input[colon_pos + 1:].strip() |
| if left and right: |
| |
| new_model = f"{left}/{right}" |
| logger.debug( |
| "Converted vendor:model '%s' to aggregator slug '%s'", |
| raw_input, new_model, |
| ) |
|
|
| |
| if is_aggregator(target_provider) and not resolved_alias: |
| catalog = list_provider_models(target_provider) |
| if catalog: |
| new_model_lower = new_model.lower() |
| for mid in catalog: |
| if mid.lower() == new_model_lower: |
| new_model = mid |
| break |
| else: |
| for mid in catalog: |
| if "/" in mid: |
| _, bare = mid.split("/", 1) |
| if bare.lower() == new_model_lower: |
| new_model = mid |
| break |
|
|
| |
| _base = current_base_url or "" |
| is_custom = current_provider in ("custom", "local") or ( |
| "localhost" in _base or "127.0.0.1" in _base |
| ) |
|
|
| if ( |
| target_provider == current_provider |
| and not is_custom |
| and not resolved_alias |
| ): |
| detected = detect_provider_for_model(new_model, current_provider) |
| if detected: |
| target_provider, new_model = detected |
|
|
| |
| |
| |
|
|
| provider_changed = target_provider != current_provider |
| provider_label = get_label(target_provider) |
| if target_provider.startswith("custom:"): |
| custom_pdef = resolve_provider_full( |
| target_provider, |
| user_providers, |
| custom_providers, |
| ) |
| if custom_pdef is not None: |
| provider_label = custom_pdef.name |
|
|
| |
| api_key = current_api_key |
| base_url = current_base_url |
| api_mode = "" |
|
|
| if provider_changed or explicit_provider: |
| try: |
| runtime = resolve_runtime_provider(requested=target_provider) |
| api_key = runtime.get("api_key", "") |
| base_url = runtime.get("base_url", "") |
| api_mode = runtime.get("api_mode", "") |
| except Exception as e: |
| return ModelSwitchResult( |
| success=False, |
| target_provider=target_provider, |
| provider_label=provider_label, |
| is_global=is_global, |
| error_message=( |
| f"Could not resolve credentials for provider " |
| f"'{provider_label}': {e}" |
| ), |
| ) |
| else: |
| try: |
| runtime = resolve_runtime_provider(requested=current_provider) |
| api_key = runtime.get("api_key", "") |
| base_url = runtime.get("base_url", "") |
| api_mode = runtime.get("api_mode", "") |
| except Exception: |
| pass |
|
|
| |
| if resolved_alias: |
| _ensure_direct_aliases() |
| _da = DIRECT_ALIASES.get(resolved_alias) |
| if _da is not None and _da.base_url: |
| base_url = _da.base_url |
| api_mode = "" |
| if not api_key: |
| api_key = "no-key-required" |
|
|
| |
| new_model = normalize_model_for_provider(new_model, target_provider) |
|
|
| |
| try: |
| validation = validate_requested_model( |
| new_model, |
| target_provider, |
| api_key=api_key, |
| base_url=base_url, |
| ) |
| except Exception as e: |
| validation = { |
| "accepted": False, |
| "persist": False, |
| "recognized": False, |
| "message": f"Could not validate `{new_model}`: {e}", |
| } |
|
|
| if not validation.get("accepted"): |
| msg = validation.get("message", "Invalid model") |
| return ModelSwitchResult( |
| success=False, |
| new_model=new_model, |
| target_provider=target_provider, |
| provider_label=provider_label, |
| is_global=is_global, |
| error_message=msg, |
| ) |
|
|
| |
| if validation.get("corrected_model"): |
| new_model = validation["corrected_model"] |
|
|
| |
| if target_provider in {"copilot", "github-copilot"}: |
| api_mode = copilot_model_api_mode(new_model, api_key=api_key) |
|
|
| |
| if target_provider in {"opencode-zen", "opencode-go", "opencode"}: |
| api_mode = opencode_model_api_mode(target_provider, new_model) |
|
|
| |
| if not api_mode: |
| api_mode = determine_api_mode(target_provider, base_url) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| if ( |
| api_mode == "anthropic_messages" |
| and target_provider in {"opencode-zen", "opencode-go"} |
| and isinstance(base_url, str) |
| and base_url |
| ): |
| base_url = re.sub(r"/v1/?$", "", base_url) |
|
|
| |
| capabilities = get_model_capabilities(target_provider, new_model) |
|
|
| |
| model_info = get_model_info(target_provider, new_model) |
|
|
| |
| warnings: list[str] = [] |
| if validation.get("message"): |
| warnings.append(validation["message"]) |
| hermes_warn = _check_hermes_model_warning(new_model) |
| if hermes_warn: |
| warnings.append(hermes_warn) |
|
|
| |
| return ModelSwitchResult( |
| success=True, |
| new_model=new_model, |
| target_provider=target_provider, |
| provider_changed=provider_changed, |
| api_key=api_key, |
| base_url=base_url, |
| api_mode=api_mode, |
| warning_message=" | ".join(warnings) if warnings else "", |
| provider_label=provider_label, |
| resolved_via_alias=resolved_alias, |
| capabilities=capabilities, |
| model_info=model_info, |
| is_global=is_global, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def list_authenticated_providers( |
| current_provider: str = "", |
| current_base_url: str = "", |
| user_providers: dict = None, |
| custom_providers: list | None = None, |
| max_models: int = 8, |
| ) -> List[dict]: |
| """Detect which providers have credentials and list their curated models. |
| |
| Uses the curated model lists from hermes_cli/models.py (OPENROUTER_MODELS, |
| _PROVIDER_MODELS) — NOT the full models.dev catalog. These are hand-picked |
| agentic models that work well as agent backends. |
| |
| Returns a list of dicts, each with: |
| - slug: str — the --provider value to use |
| - name: str — display name |
| - is_current: bool |
| - is_user_defined: bool |
| - models: list[str] — curated model IDs (up to max_models) |
| - total_models: int — total curated count |
| - source: str — "built-in", "models.dev", "user-config" |
| |
| Only includes providers that have API keys set or are user-defined endpoints. |
| """ |
| import os |
| from agent.models_dev import ( |
| PROVIDER_TO_MODELS_DEV, |
| fetch_models_dev, |
| get_provider_info as _mdev_pinfo, |
| ) |
| from hermes_cli.auth import PROVIDER_REGISTRY |
| from hermes_cli.models import ( |
| OPENROUTER_MODELS, _PROVIDER_MODELS, |
| _MODELS_DEV_PREFERRED, _merge_with_models_dev, |
| ) |
|
|
| results: List[dict] = [] |
| seen_slugs: set = set() |
| seen_mdev_ids: set = set() |
|
|
| data = fetch_models_dev() |
|
|
| |
| curated: dict[str, list[str]] = dict(_PROVIDER_MODELS) |
| curated["openrouter"] = [mid for mid, _ in OPENROUTER_MODELS] |
| |
| if "nous" not in curated: |
| curated["nous"] = curated["openrouter"] |
| |
| if "ollama-cloud" not in curated: |
| from hermes_cli.models import fetch_ollama_cloud_models |
| curated["ollama-cloud"] = fetch_ollama_cloud_models() |
|
|
| |
| for hermes_id, mdev_id in PROVIDER_TO_MODELS_DEV.items(): |
| |
| |
| |
| if mdev_id in seen_mdev_ids: |
| continue |
| pdata = data.get(mdev_id) |
| if not isinstance(pdata, dict): |
| continue |
|
|
| |
| |
| |
| pconfig = PROVIDER_REGISTRY.get(hermes_id) |
| |
| |
| if pconfig and pconfig.auth_type != "api_key": |
| continue |
| if pconfig and pconfig.api_key_env_vars: |
| env_vars = list(pconfig.api_key_env_vars) |
| else: |
| env_vars = pdata.get("env", []) |
| if not isinstance(env_vars, list): |
| continue |
|
|
| |
| has_creds = any(os.environ.get(ev) for ev in env_vars) |
| if not has_creds: |
| continue |
|
|
| |
| |
| |
| |
| model_ids = curated.get(hermes_id, []) |
| if hermes_id in _MODELS_DEV_PREFERRED: |
| model_ids = _merge_with_models_dev(hermes_id, model_ids) |
| total = len(model_ids) |
| top = model_ids[:max_models] |
|
|
| slug = hermes_id |
| pinfo = _mdev_pinfo(mdev_id) |
| display_name = pinfo.name if pinfo else mdev_id |
|
|
| results.append({ |
| "slug": slug, |
| "name": display_name, |
| "is_current": slug == current_provider or mdev_id == current_provider, |
| "is_user_defined": False, |
| "models": top, |
| "total_models": total, |
| "source": "built-in", |
| }) |
| seen_slugs.add(slug.lower()) |
| seen_mdev_ids.add(mdev_id) |
|
|
| |
| from hermes_cli.providers import HERMES_OVERLAYS |
| from hermes_cli.auth import PROVIDER_REGISTRY as _auth_registry |
|
|
| |
| |
| |
| _mdev_to_hermes = {v: k for k, v in PROVIDER_TO_MODELS_DEV.items()} |
|
|
| for pid, overlay in HERMES_OVERLAYS.items(): |
| if pid.lower() in seen_slugs: |
| continue |
|
|
| |
| hermes_slug = _mdev_to_hermes.get(pid, pid) |
| if hermes_slug.lower() in seen_slugs: |
| continue |
|
|
| |
| has_creds = False |
| if overlay.extra_env_vars: |
| has_creds = any(os.environ.get(ev) for ev in overlay.extra_env_vars) |
| |
| if not has_creds and overlay.auth_type == "api_key": |
| for _key in (pid, hermes_slug): |
| pcfg = _auth_registry.get(_key) |
| if pcfg and pcfg.api_key_env_vars: |
| if any(os.environ.get(ev) for ev in pcfg.api_key_env_vars): |
| has_creds = True |
| break |
| |
| |
| |
| |
| if not has_creds: |
| try: |
| from hermes_cli.auth import _load_auth_store |
| store = _load_auth_store() |
| providers_store = store.get("providers", {}) |
| pool_store = store.get("credential_pool", {}) |
| if store and ( |
| pid in providers_store or hermes_slug in providers_store |
| or pid in pool_store or hermes_slug in pool_store |
| ): |
| has_creds = True |
| except Exception as exc: |
| logger.debug("Auth store check failed for %s: %s", pid, exc) |
| |
| |
| |
| |
| if not has_creds: |
| try: |
| from agent.credential_pool import load_pool |
| pool = load_pool(hermes_slug) |
| if pool.has_credentials(): |
| has_creds = True |
| except Exception as exc: |
| logger.debug("Credential pool check failed for %s: %s", hermes_slug, exc) |
| |
| |
| |
| |
| |
| |
| |
| if not has_creds and hermes_slug == "anthropic": |
| try: |
| from agent.anthropic_adapter import ( |
| read_claude_code_credentials, |
| read_hermes_oauth_credentials, |
| ) |
| hermes_creds = read_hermes_oauth_credentials() |
| cc_creds = read_claude_code_credentials() |
| if (hermes_creds and hermes_creds.get("accessToken")) or \ |
| (cc_creds and cc_creds.get("accessToken")): |
| has_creds = True |
| except Exception as exc: |
| logger.debug("Anthropic external creds check failed: %s", exc) |
| if not has_creds: |
| continue |
|
|
| |
| model_ids = curated.get(hermes_slug, []) or curated.get(pid, []) |
| |
| if hermes_slug in _MODELS_DEV_PREFERRED: |
| model_ids = _merge_with_models_dev(hermes_slug, model_ids) |
| total = len(model_ids) |
| top = model_ids[:max_models] |
|
|
| results.append({ |
| "slug": hermes_slug, |
| "name": get_label(hermes_slug), |
| "is_current": hermes_slug == current_provider or pid == current_provider, |
| "is_user_defined": False, |
| "models": top, |
| "total_models": total, |
| "source": "hermes", |
| }) |
| seen_slugs.add(pid.lower()) |
| seen_slugs.add(hermes_slug.lower()) |
|
|
| |
| |
| |
| |
| try: |
| from hermes_cli.models import CANONICAL_PROVIDERS as _canon_provs |
| except ImportError: |
| _canon_provs = [] |
|
|
| for _cp in _canon_provs: |
| if _cp.slug.lower() in seen_slugs: |
| continue |
|
|
| |
| _cp_config = _auth_registry.get(_cp.slug) |
| _cp_has_creds = False |
| if _cp_config and _cp_config.api_key_env_vars: |
| _cp_has_creds = any(os.environ.get(ev) for ev in _cp_config.api_key_env_vars) |
| |
| if not _cp_has_creds: |
| try: |
| from hermes_cli.auth import _load_auth_store |
| _cp_store = _load_auth_store() |
| _cp_providers_store = _cp_store.get("providers", {}) |
| _cp_pool_store = _cp_store.get("credential_pool", {}) |
| if _cp_store and ( |
| _cp.slug in _cp_providers_store |
| or _cp.slug in _cp_pool_store |
| ): |
| _cp_has_creds = True |
| except Exception: |
| pass |
| if not _cp_has_creds: |
| try: |
| from agent.credential_pool import load_pool |
| _cp_pool = load_pool(_cp.slug) |
| if _cp_pool.has_credentials(): |
| _cp_has_creds = True |
| except Exception: |
| pass |
|
|
| if not _cp_has_creds: |
| continue |
|
|
| _cp_model_ids = curated.get(_cp.slug, []) |
| _cp_total = len(_cp_model_ids) |
| _cp_top = _cp_model_ids[:max_models] |
|
|
| results.append({ |
| "slug": _cp.slug, |
| "name": _cp.label, |
| "is_current": _cp.slug == current_provider, |
| "is_user_defined": False, |
| "models": _cp_top, |
| "total_models": _cp_total, |
| "source": "canonical", |
| }) |
| seen_slugs.add(_cp.slug.lower()) |
|
|
| |
| |
| |
| |
| |
| |
| |
| _section3_emitted_pairs: set = set() |
| if user_providers and isinstance(user_providers, dict): |
| for ep_name, ep_cfg in user_providers.items(): |
| if not isinstance(ep_cfg, dict): |
| continue |
| |
| |
| if ep_name.lower() in seen_slugs: |
| continue |
| display_name = ep_cfg.get("name", "") or ep_name |
| |
| |
| |
| api_url = ( |
| ep_cfg.get("base_url", "") |
| or ep_cfg.get("api", "") |
| or ep_cfg.get("url", "") |
| or "" |
| ) |
| |
| |
| default_model = ep_cfg.get("default_model", "") or ep_cfg.get("model", "") |
|
|
| |
| models_list = [] |
| if default_model: |
| models_list.append(default_model) |
| |
| |
| |
| |
| cfg_models = ep_cfg.get("models", []) |
| if isinstance(cfg_models, dict): |
| for m in cfg_models: |
| if m and m not in models_list: |
| models_list.append(m) |
| elif isinstance(cfg_models, list): |
| for m in cfg_models: |
| if m and m not in models_list: |
| models_list.append(m) |
|
|
| |
| |
| results.append({ |
| "slug": ep_name, |
| "name": display_name, |
| "is_current": ep_name == current_provider, |
| "is_user_defined": True, |
| "models": models_list, |
| "total_models": len(models_list) if models_list else 0, |
| "source": "user-config", |
| "api_url": api_url, |
| }) |
| seen_slugs.add(ep_name.lower()) |
| seen_slugs.add(custom_provider_slug(display_name).lower()) |
| _pair = ( |
| str(display_name).strip().lower(), |
| str(api_url).strip().rstrip("/").lower(), |
| ) |
| if _pair[0] and _pair[1]: |
| _section3_emitted_pairs.add(_pair) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if custom_providers and isinstance(custom_providers, list): |
| from collections import OrderedDict |
|
|
| |
| |
| |
| groups: "OrderedDict[tuple, dict]" = OrderedDict() |
| for entry in custom_providers: |
| if not isinstance(entry, dict): |
| continue |
|
|
| raw_name = (entry.get("name") or "").strip() |
| api_url = ( |
| entry.get("base_url", "") |
| or entry.get("url", "") |
| or entry.get("api", "") |
| or "" |
| ).strip().rstrip("/") |
| if not raw_name or not api_url: |
| continue |
| api_key = (entry.get("api_key") or "").strip() |
|
|
| group_key = (api_url, api_key) |
| if group_key not in groups: |
| |
| |
| |
| |
| display_name = raw_name |
| for sep in ("—", " - "): |
| if sep in display_name: |
| display_name = display_name.split(sep)[0].strip() |
| break |
| if not display_name: |
| display_name = raw_name |
| |
| |
| |
| if ( |
| current_base_url |
| and api_url == current_base_url.strip().rstrip("/") |
| ): |
| slug = current_provider or custom_provider_slug(display_name) |
| else: |
| slug = custom_provider_slug(display_name) |
| groups[group_key] = { |
| "slug": slug, |
| "name": display_name, |
| "api_url": api_url, |
| "models": [], |
| } |
|
|
| |
| |
| |
| |
| |
| default_model = (entry.get("model") or "").strip() |
| if default_model and default_model not in groups[group_key]["models"]: |
| groups[group_key]["models"].append(default_model) |
|
|
| cfg_models = entry.get("models", {}) |
| if isinstance(cfg_models, dict): |
| for m in cfg_models: |
| if m and m not in groups[group_key]["models"]: |
| groups[group_key]["models"].append(m) |
| elif isinstance(cfg_models, list): |
| for m in cfg_models: |
| if m and m not in groups[group_key]["models"]: |
| groups[group_key]["models"].append(m) |
|
|
| _section4_emitted_slugs: set = set() |
| for grp in groups.values(): |
| slug = grp["slug"] |
| |
| |
| |
| if slug.lower() in seen_slugs and slug.lower() not in _section4_emitted_slugs: |
| continue |
| |
| |
| |
| |
| if slug.lower() in _section4_emitted_slugs: |
| base_slug = slug |
| n = 2 |
| while f"{base_slug}-{n}".lower() in seen_slugs: |
| n += 1 |
| slug = f"{base_slug}-{n}" |
| grp["slug"] = slug |
| |
| |
| |
| |
| |
| _pair_key = ( |
| str(grp["name"]).strip().lower(), |
| str(grp["api_url"]).strip().rstrip("/").lower(), |
| ) |
| if _pair_key[0] and _pair_key[1] and _pair_key in _section3_emitted_pairs: |
| continue |
| results.append({ |
| "slug": slug, |
| "name": grp["name"], |
| "is_current": slug == current_provider, |
| "is_user_defined": True, |
| "models": grp["models"], |
| "total_models": len(grp["models"]), |
| "source": "user-config", |
| "api_url": grp["api_url"], |
| }) |
| seen_slugs.add(slug.lower()) |
| _section4_emitted_slugs.add(slug.lower()) |
|
|
| |
| results.sort(key=lambda r: (not r["is_current"], -r["total_models"])) |
|
|
| return results |
|
|