| """Shared runtime provider resolution for CLI, gateway, cron, and helpers.""" |
|
|
| from __future__ import annotations |
|
|
| import os |
| from typing import Any, Dict, Optional |
|
|
| from hermes_cli import auth as auth_mod |
| from hermes_cli.auth import ( |
| AuthError, |
| PROVIDER_REGISTRY, |
| format_auth_error, |
| resolve_provider, |
| resolve_nous_runtime_credentials, |
| resolve_codex_runtime_credentials, |
| resolve_api_key_provider_credentials, |
| resolve_external_process_provider_credentials, |
| has_usable_secret, |
| ) |
| from hermes_cli.config import load_config |
| from hermes_constants import OPENROUTER_BASE_URL |
|
|
|
|
| def _normalize_custom_provider_name(value: str) -> str: |
| return value.strip().lower().replace(" ", "-") |
|
|
|
|
| def _detect_api_mode_for_url(base_url: str) -> Optional[str]: |
| """Auto-detect api_mode from the resolved base URL. |
| |
| Direct api.openai.com endpoints need the Responses API for GPT-5.x |
| tool calls with reasoning (chat/completions returns 400). |
| """ |
| normalized = (base_url or "").strip().lower().rstrip("/") |
| if "api.openai.com" in normalized and "openrouter" not in normalized: |
| return "codex_responses" |
| return None |
|
|
|
|
| def _auto_detect_local_model(base_url: str) -> str: |
| """Query a local server for its model name when only one model is loaded.""" |
| if not base_url: |
| return "" |
| try: |
| import requests |
| url = base_url.rstrip("/") |
| if not url.endswith("/v1"): |
| url += "/v1" |
| resp = requests.get(url + "/models", timeout=5) |
| if resp.ok: |
| models = resp.json().get("data", []) |
| if len(models) == 1: |
| model_id = models[0].get("id", "") |
| if model_id: |
| return model_id |
| except Exception: |
| pass |
| return "" |
|
|
|
|
| def _get_model_config() -> Dict[str, Any]: |
| config = load_config() |
| model_cfg = config.get("model") |
| if isinstance(model_cfg, dict): |
| cfg = dict(model_cfg) |
| default = cfg.get("default", "").strip() |
| base_url = cfg.get("base_url", "").strip() |
| is_local = "localhost" in base_url or "127.0.0.1" in base_url |
| is_fallback = not default or default == "anthropic/claude-opus-4.6" |
| if is_local and is_fallback and base_url: |
| detected = _auto_detect_local_model(base_url) |
| if detected: |
| cfg["default"] = detected |
| return cfg |
| if isinstance(model_cfg, str) and model_cfg.strip(): |
| return {"default": model_cfg.strip()} |
| return {} |
|
|
|
|
| def _copilot_runtime_api_mode(model_cfg: Dict[str, Any], api_key: str) -> str: |
| configured_mode = _parse_api_mode(model_cfg.get("api_mode")) |
| if configured_mode: |
| return configured_mode |
|
|
| model_name = str(model_cfg.get("default") or "").strip() |
| if not model_name: |
| return "chat_completions" |
|
|
| try: |
| from hermes_cli.models import copilot_model_api_mode |
|
|
| return copilot_model_api_mode(model_name, api_key=api_key) |
| except Exception: |
| return "chat_completions" |
|
|
|
|
| _VALID_API_MODES = {"chat_completions", "codex_responses", "anthropic_messages"} |
|
|
|
|
| def _parse_api_mode(raw: Any) -> Optional[str]: |
| """Validate an api_mode value from config. Returns None if invalid.""" |
| if isinstance(raw, str): |
| normalized = raw.strip().lower() |
| if normalized in _VALID_API_MODES: |
| return normalized |
| return None |
|
|
|
|
| def resolve_requested_provider(requested: Optional[str] = None) -> str: |
| """Resolve provider request from explicit arg, config, then env.""" |
| if requested and requested.strip(): |
| return requested.strip().lower() |
|
|
| model_cfg = _get_model_config() |
| cfg_provider = model_cfg.get("provider") |
| if isinstance(cfg_provider, str) and cfg_provider.strip(): |
| return cfg_provider.strip().lower() |
|
|
| |
| |
| env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower() |
| if env_provider: |
| return env_provider |
|
|
| return "auto" |
|
|
|
|
| def _get_named_custom_provider(requested_provider: str) -> Optional[Dict[str, Any]]: |
| requested_norm = _normalize_custom_provider_name(requested_provider or "") |
| if not requested_norm or requested_norm == "custom": |
| return None |
|
|
| |
| |
| |
| if requested_norm == "auto": |
| return None |
| if not requested_norm.startswith("custom:"): |
| try: |
| auth_mod.resolve_provider(requested_norm) |
| except AuthError: |
| pass |
| else: |
| return None |
|
|
| config = load_config() |
| custom_providers = config.get("custom_providers") |
| if not isinstance(custom_providers, list): |
| return None |
|
|
| for entry in custom_providers: |
| if not isinstance(entry, dict): |
| continue |
| name = entry.get("name") |
| base_url = entry.get("base_url") |
| if not isinstance(name, str) or not isinstance(base_url, str): |
| continue |
| name_norm = _normalize_custom_provider_name(name) |
| menu_key = f"custom:{name_norm}" |
| if requested_norm not in {name_norm, menu_key}: |
| continue |
| result = { |
| "name": name.strip(), |
| "base_url": base_url.strip(), |
| "api_key": str(entry.get("api_key", "") or "").strip(), |
| } |
| api_mode = _parse_api_mode(entry.get("api_mode")) |
| if api_mode: |
| result["api_mode"] = api_mode |
| return result |
|
|
| return None |
|
|
|
|
| def _resolve_named_custom_runtime( |
| *, |
| requested_provider: str, |
| explicit_api_key: Optional[str] = None, |
| explicit_base_url: Optional[str] = None, |
| ) -> Optional[Dict[str, Any]]: |
| custom_provider = _get_named_custom_provider(requested_provider) |
| if not custom_provider: |
| return None |
|
|
| base_url = ( |
| (explicit_base_url or "").strip() |
| or custom_provider.get("base_url", "") |
| ).rstrip("/") |
| if not base_url: |
| return None |
|
|
| api_key_candidates = [ |
| (explicit_api_key or "").strip(), |
| str(custom_provider.get("api_key", "") or "").strip(), |
| os.getenv("OPENAI_API_KEY", "").strip(), |
| os.getenv("OPENROUTER_API_KEY", "").strip(), |
| ] |
| api_key = next((candidate for candidate in api_key_candidates if has_usable_secret(candidate)), "") |
|
|
| return { |
| "provider": "custom", |
| "api_mode": custom_provider.get("api_mode") |
| or _detect_api_mode_for_url(base_url) |
| or "chat_completions", |
| "base_url": base_url, |
| "api_key": api_key, |
| "source": f"custom_provider:{custom_provider.get('name', requested_provider)}", |
| } |
|
|
|
|
| def _resolve_openrouter_runtime( |
| *, |
| requested_provider: str, |
| explicit_api_key: Optional[str] = None, |
| explicit_base_url: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| model_cfg = _get_model_config() |
| cfg_base_url = model_cfg.get("base_url") if isinstance(model_cfg.get("base_url"), str) else "" |
| cfg_provider = model_cfg.get("provider") if isinstance(model_cfg.get("provider"), str) else "" |
| cfg_api_key = "" |
| for k in ("api_key", "api"): |
| v = model_cfg.get(k) |
| if isinstance(v, str) and v.strip(): |
| cfg_api_key = v.strip() |
| break |
| requested_norm = (requested_provider or "").strip().lower() |
| cfg_provider = cfg_provider.strip().lower() |
|
|
| env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip() |
| env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip() |
|
|
| use_config_base_url = False |
| if cfg_base_url.strip() and not explicit_base_url: |
| if requested_norm == "auto": |
| if (not cfg_provider or cfg_provider == "auto") and not env_openai_base_url: |
| use_config_base_url = True |
| elif requested_norm == "custom" and cfg_provider == "custom": |
| |
| use_config_base_url = True |
|
|
| |
| |
| |
| skip_openai_base = requested_norm == "openrouter" |
|
|
| |
| base_url = ( |
| (explicit_base_url or "").strip() |
| or (cfg_base_url.strip() if use_config_base_url else "") |
| or ("" if skip_openai_base else env_openai_base_url) |
| or env_openrouter_base_url |
| or OPENROUTER_BASE_URL |
| ).rstrip("/") |
|
|
| |
| |
| |
| |
| |
| _is_openrouter_url = "openrouter.ai" in base_url |
| if _is_openrouter_url: |
| api_key_candidates = [ |
| explicit_api_key, |
| os.getenv("OPENROUTER_API_KEY"), |
| os.getenv("OPENAI_API_KEY"), |
| ] |
| else: |
| |
| api_key_candidates = [ |
| explicit_api_key, |
| (cfg_api_key if use_config_base_url else ""), |
| os.getenv("OPENAI_API_KEY"), |
| os.getenv("OPENROUTER_API_KEY"), |
| ] |
| api_key = next( |
| (str(candidate or "").strip() for candidate in api_key_candidates if has_usable_secret(candidate)), |
| "", |
| ) |
|
|
| source = "explicit" if (explicit_api_key or explicit_base_url) else "env/config" |
|
|
| |
| |
| |
| |
| effective_provider = "custom" if requested_norm == "custom" else "openrouter" |
| if effective_provider == "custom" and not api_key and not _is_openrouter_url: |
| api_key = "no-key-required" |
|
|
| return { |
| "provider": effective_provider, |
| "api_mode": _parse_api_mode(model_cfg.get("api_mode")) |
| or _detect_api_mode_for_url(base_url) |
| or "chat_completions", |
| "base_url": base_url, |
| "api_key": api_key, |
| "source": source, |
| } |
|
|
|
|
| def resolve_runtime_provider( |
| *, |
| requested: Optional[str] = None, |
| explicit_api_key: Optional[str] = None, |
| explicit_base_url: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """Resolve runtime provider credentials for agent execution.""" |
| requested_provider = resolve_requested_provider(requested) |
|
|
| custom_runtime = _resolve_named_custom_runtime( |
| requested_provider=requested_provider, |
| explicit_api_key=explicit_api_key, |
| explicit_base_url=explicit_base_url, |
| ) |
| if custom_runtime: |
| custom_runtime["requested_provider"] = requested_provider |
| return custom_runtime |
|
|
| provider = resolve_provider( |
| requested_provider, |
| explicit_api_key=explicit_api_key, |
| explicit_base_url=explicit_base_url, |
| ) |
|
|
| if provider == "nous": |
| creds = resolve_nous_runtime_credentials( |
| min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))), |
| timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")), |
| ) |
| return { |
| "provider": "nous", |
| "api_mode": "chat_completions", |
| "base_url": creds.get("base_url", "").rstrip("/"), |
| "api_key": creds.get("api_key", ""), |
| "source": creds.get("source", "portal"), |
| "expires_at": creds.get("expires_at"), |
| "requested_provider": requested_provider, |
| } |
|
|
| if provider == "openai-codex": |
| creds = resolve_codex_runtime_credentials() |
| return { |
| "provider": "openai-codex", |
| "api_mode": "codex_responses", |
| "base_url": creds.get("base_url", "").rstrip("/"), |
| "api_key": creds.get("api_key", ""), |
| "source": creds.get("source", "hermes-auth-store"), |
| "last_refresh": creds.get("last_refresh"), |
| "requested_provider": requested_provider, |
| } |
|
|
| if provider == "copilot-acp": |
| creds = resolve_external_process_provider_credentials(provider) |
| return { |
| "provider": "copilot-acp", |
| "api_mode": "chat_completions", |
| "base_url": creds.get("base_url", "").rstrip("/"), |
| "api_key": creds.get("api_key", ""), |
| "command": creds.get("command", ""), |
| "args": list(creds.get("args") or []), |
| "source": creds.get("source", "process"), |
| "requested_provider": requested_provider, |
| } |
|
|
| |
| if provider == "anthropic": |
| from agent.anthropic_adapter import resolve_anthropic_token |
| token = resolve_anthropic_token() |
| if not token: |
| raise AuthError( |
| "No Anthropic credentials found. Set ANTHROPIC_TOKEN or ANTHROPIC_API_KEY, " |
| "run 'claude setup-token', or authenticate with 'claude /login'." |
| ) |
| |
| |
| |
| model_cfg = _get_model_config() |
| cfg_provider = str(model_cfg.get("provider") or "").strip().lower() |
| cfg_base_url = "" |
| if cfg_provider == "anthropic": |
| cfg_base_url = (model_cfg.get("base_url") or "").strip().rstrip("/") |
| base_url = cfg_base_url or "https://api.anthropic.com" |
| return { |
| "provider": "anthropic", |
| "api_mode": "anthropic_messages", |
| "base_url": base_url, |
| "api_key": token, |
| "source": "env", |
| "requested_provider": requested_provider, |
| } |
|
|
| |
| pconfig = PROVIDER_REGISTRY.get(provider) |
| if pconfig and pconfig.auth_type == "api_key": |
| creds = resolve_api_key_provider_credentials(provider) |
| model_cfg = _get_model_config() |
| base_url = creds.get("base_url", "").rstrip("/") |
| api_mode = "chat_completions" |
| if provider == "copilot": |
| api_mode = _copilot_runtime_api_mode(model_cfg, creds.get("api_key", "")) |
| else: |
| |
| configured_mode = _parse_api_mode(model_cfg.get("api_mode")) |
| if configured_mode: |
| api_mode = configured_mode |
| |
| |
| elif base_url.rstrip("/").endswith("/anthropic"): |
| api_mode = "anthropic_messages" |
| |
| |
| elif provider in ("minimax", "minimax-cn"): |
| api_mode = "anthropic_messages" |
| if base_url.rstrip("/").endswith("/v1"): |
| base_url = base_url.rstrip("/")[:-3] + "/anthropic" |
| return { |
| "provider": provider, |
| "api_mode": api_mode, |
| "base_url": base_url, |
| "api_key": creds.get("api_key", ""), |
| "source": creds.get("source", "env"), |
| "requested_provider": requested_provider, |
| } |
|
|
| runtime = _resolve_openrouter_runtime( |
| requested_provider=requested_provider, |
| explicit_api_key=explicit_api_key, |
| explicit_base_url=explicit_base_url, |
| ) |
| runtime["requested_provider"] = requested_provider |
| return runtime |
|
|
|
|
| def format_runtime_provider_error(error: Exception) -> str: |
| if isinstance(error, AuthError): |
| return format_auth_error(error) |
| return str(error) |
|
|