from __future__ import annotations import os from dataclasses import dataclass, field from src import observability as obs from src.models.provider import ModelProvider, model_error def _default_compat_model() -> str: """The served model id this thin client sends by default. Pulls the ``balanced`` tier's model from the catalogue (the single source of truth) so there is no hard-coded model name to drift; falls back to a constant if the catalogue cannot be read. Note this is the *served id* (what a raw OpenAI-compatible call expects), not the ``openai/`` LiteLLM string. """ try: from src.models import modal_catalogue key = modal_catalogue.default_key_for_profile("balanced") if key: entry = modal_catalogue.entry_by_key(key) if entry: return entry["served_model_id"] except Exception: # pragma: no cover - defensive: catalogue unavailable pass return "google/gemma-4-12B" @dataclass class OpenAICompatProvider(ModelProvider): """Thin client for any OpenAI-compatible chat-completions endpoint. The engine's live path routes through the LiteLLM gateway (:class:`~src.models.litellm_provider.LiteLLMProvider`); this client remains for the legacy single-provider :func:`build_from_env` path and as the home of the role→system personas. It targets the small models served on Modal — there is no OpenAI / generic-cloud default. Driven by env so no scenario hard-codes a provider: MODAL_LLM_BASE_URL — endpoint URL ending in /v1 (offline when unset) MODAL_LLM_KEY — endpoint bearer token (a self-served vLLM accepts any; defaults to "EMPTY") MODEL_BALANCED — model id to send (falls back to the catalogue default) """ model: str = field(default_factory=lambda: os.getenv("MODEL_BALANCED") or _default_compat_model()) base_url: str | None = field(default_factory=lambda: os.getenv("MODAL_LLM_BASE_URL") or None) api_key: str | None = field(default_factory=lambda: os.getenv("MODAL_LLM_KEY") or None) max_tokens: int = 256 temperature: float = 0.9 _client: object = field(default=None, init=False, repr=False) _last_usage: dict = field(default_factory=dict, init=False, repr=False) def _get_client(self): if self._client is None: try: import openai except ImportError as exc: raise ImportError("openai package is required for OpenAICompatProvider. Run: uv add openai") from exc kwargs: dict = {} if self.base_url: kwargs["base_url"] = self.base_url # A self-served vLLM endpoint accepts any token; the SDK still requires # one, so default to the conventional placeholder rather than erroring. kwargs["api_key"] = self.api_key or "EMPTY" self._client = openai.OpenAI(**kwargs) return self._client def complete(self, role: str, prompt: str) -> str: from src.models.provider import estimate_tokens client = self._get_client() system = self._system_for_role(role) span_attrs = { "gen_ai.system": "openai-compatible", "gen_ai.request.model": self.model, "llm.api_base": self.base_url or "", "mal.role": role, } with obs.span("llm.call", **span_attrs): try: resp = client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": system}, {"role": "user", "content": prompt}, ], max_tokens=self.max_tokens, temperature=self.temperature, ) text = resp.choices[0].message.content.strip() usage = getattr(resp, "usage", None) if usage is not None: self._last_usage = { "prompt_tokens": getattr(usage, "prompt_tokens", 0) or 0, "completion_tokens": getattr(usage, "completion_tokens", 0) or 0, "total_tokens": getattr(usage, "total_tokens", 0) or 0, } else: p, c = estimate_tokens(prompt), estimate_tokens(text) self._last_usage = {"prompt_tokens": p, "completion_tokens": c, "total_tokens": p + c} obs.add_span_attrs( **{ "gen_ai.usage.input_tokens": int(self._last_usage["prompt_tokens"]), "gen_ai.usage.output_tokens": int(self._last_usage["completion_tokens"]), "llm.prompt": prompt, "llm.completion": text, } ) obs.record_llm_call( self.model, prompt_tokens=int(self._last_usage["prompt_tokens"]), completion_tokens=int(self._last_usage["completion_tokens"]), ) obs.log( "llm.call", role=role, model=self.model, structured=False, prompt_tokens=int(self._last_usage["prompt_tokens"]), completion_tokens=int(self._last_usage["completion_tokens"]), ) obs.log("llm.exchange", level="debug", role=role, model=self.model, prompt=prompt, completion=text) return text except Exception as exc: self._last_usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} obs.log("llm.error", level="warning", model=self.model, role=role, error=str(exc)) return model_error(exc) @staticmethod def _system_for_role(role: str) -> str: personas = { "seedkeeper": ( "You are the Seedkeeper of Thousand Token Wood — an ancient, gentle observer " "who notices what grows, what fades, and what strange new thing just sprouted. " "Describe the world in one vivid, specific sentence. Be concrete and surprising. " "Do not explain. Do not moralize. Just observe." ), "mischief-critic": ( "You are the Wood's Reckoning — the keeper who, at the tale's end, records what " "has become REAL in the wood. Do not critique the storytelling or mention 'the " "scene'; never break the spell. In one or two sentences, set down as settled " "in-world fact the new law, creature, or transformation the cast brought into " "being. Be specific. Be final." ), "pocket-actor": ( "You are a Pocket Actor — a tiny character living inside the scene who wants " "something impossible and speaks with great urgency about it. " "Speak in first person. One or two sentences. Be specific and a little absurd." ), "echo": ( "You are the Echo — you take the most recent thing said or dropped into the wood " "and return it transformed by the forest's logic. Speak one vivid sentence aloud. " "Make it weirder and more alive; never simply repeat what you heard." ), "clue-gatherer": ( "You are a Clue Gatherer in a mystery scenario. " "Extract one specific, concrete clue from the current scene. " "State it plainly. Do not speculate." ), "hypothesis-former": ( "You are a Hypothesis Former. Based on the clues so far, propose one testable " "explanation in a single sentence. Be specific. Start with 'Hypothesis:'." ), "devils-advocate": ( "You are the Devil's Advocate. Challenge the current hypothesis with one " "specific counter-argument or overlooked fact. Be brief and sharp." ), "scene-whisperer": ( "You are a scene whisperer for a magical forest world. " "Describe a new atmospheric detail in one vivid sentence. Be evocative." ), } return personas.get(role, f"You are a {role}. Respond in one or two sentences.") def has_live_credentials() -> bool: """True when *any* inference backend is configured (else the offline stub runs). Single source of truth for the online/offline decision, shared by :func:`build_from_env` and the :class:`~src.models.router.ModelRouter` so they never disagree. Two backends can satisfy it (ADR-0015 / ADR-0024): * **Modal** — ``MODAL_WORKSPACE`` (the engine templates each profile's endpoint URL from it) or ``MODAL_LLM_BASE_URL`` (a single explicit OpenAI-compatible endpoint), the small models you deploy yourself; or * **Hugging Face** — ``HF_TOKEN`` (the serverless Inference Providers router) or ``HF_INFERENCE_BASE_URL`` (a self-hosted TGI / dedicated endpoint). There is no generic cloud key — everything routes to models you host or to HF's inference router. Delegates to :mod:`src.models.inference` so the chip, the router, and this gate agree on what "live" means. """ from src.models import inference return bool(inference.configured_backends()) def build_from_env() -> ModelProvider: """Return the best available single provider based on environment configuration. Kept for backward compatibility with Phase-1 agents that take one provider. Manifest-driven agents use the per-profile :class:`ModelRouter` instead. """ from src.models.provider import DeterministicTinyModel if has_live_credentials(): return OpenAICompatProvider() return DeterministicTinyModel()