# ============================================================================= # app/providers.py # 09.03.2026 # LLM + Search Provider Registry + Fallback Chain # Universal MCP Hub (Sandboxed) - based on PyFundaments Architecture # Copyright 2026 - Volkan Kücükbudak # Apache License V. 2 + ESOL 1.1 # Repo: https://github.com/VolkanSah/Universal-MCP-Hub-sandboxed # ============================================================================= # ARCHITECTURE NOTE: # This file lives exclusively in app/ and is ONLY started by app/app.py. # NO direct access to fundaments/*, .env, or Guardian (main.py). # All config comes from app/.pyfun via app/config.py. # # PROVIDER PRINCIPLE: # No key = no provider = no tool = no crash. # Server always starts, just with fewer providers. # Adding a new provider = update .pyfun + add class here. Never touch mcp.py! # # FALLBACK CHAIN: # Defined in .pyfun per provider via fallback_to field. # anthropic → fails → gemini → fails → openrouter → fails → RuntimeError # Visited set prevents infinite loops. # # SECURITY NOTE: # API keys are NEVER logged or included in exception messages. # All errors are sanitized before propagation — only HTTP status codes # and safe_url (query params stripped) are ever exposed in logs. # # HOW TO ADD A NEW LLM PROVIDER — 3 steps, nothing else to touch: # 1. Add class below (copy a dummy, implement complete()) # 2. Register name → class in _PROVIDER_CLASSES dict # 3. Add [LLM_PROVIDER.yourprovider] block in app/.pyfun # → env_key, base_url, default_model, fallback_to # # DEPENDENCY CHAIN (app/* only, no fundaments!): # config.py → parses app/.pyfun — single source of truth # providers.py → LLM + Search registry + fallback chain # tools.py → calls providers.llm_complete() / providers.search() # mcp.py → calls providers.list_active_llm() / list_active_search() # ============================================================================= import os import logging import httpx from . import config logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger("providers") # ============================================================================= # SECTION 1 — Base Provider # Shared HTTP logic — implemented ONCE, reused by all providers. # ============================================================================= class BaseProvider: """ Base class for all LLM providers. Subclasses only implement complete() — HTTP logic lives here. """ def __init__(self, name: str, cfg: dict): self.name = name self.key = os.getenv(cfg.get("env_key", "")) self.base_url = cfg.get("base_url", "") self.fallback = cfg.get("fallback_to", "") self.timeout = int(config.get_limits().get("REQUEST_TIMEOUT_SEC", "60")) self.model = cfg.get("default_model", "") # Safe key hint for debug logs — never log the full key self._key_hint = ( f"{self.key[:4]}...{self.key[-4:]}" if self.key and len(self.key) > 8 else "***" ) async def complete(self, prompt: str, model: str, max_tokens: int) -> str: """Override in each provider subclass.""" raise NotImplementedError async def _post(self, url: str, headers: dict, payload: dict) -> dict: """ Shared HTTP POST — used by all providers. Raises RuntimeError with sanitized message on non-2xx responses. API keys are never included in raised exceptions or log output. """ safe_url = url.split("?")[0] # strip query params (may contain API keys) logger.debug(f"POST → {safe_url}") async with httpx.AsyncClient() as client: r = await client.post( url, headers=headers, json=payload, timeout=self.timeout, ) try: r.raise_for_status() except httpx.HTTPStatusError as e: # Sanitize: only status code + safe_url, never headers or body raise RuntimeError( f"HTTP {e.response.status_code} from {safe_url}" ) from None return r.json() # ============================================================================= # SECTION 2 — LLM Provider Implementations # Only the API-specific parsing logic differs per provider. # ============================================================================= class AnthropicProvider(BaseProvider): """Anthropic Claude API — Messages endpoint.""" async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str: cfg = config.get_active_llm_providers().get("anthropic", {}) data = await self._post( f"{self.base_url}/messages", headers={ "x-api-key": self.key, "anthropic-version": cfg.get("api_version_header", "2023-06-01"), "content-type": "application/json", }, payload={ "model": model or self.model, "max_tokens": max_tokens, "messages": [{"role": "user", "content": prompt}], }, ) return data["content"][0]["text"] class GeminiProvider(BaseProvider): """Google Gemini API — generateContent endpoint.""" async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str: m = model or self.model safe_url = f"{self.base_url}/models/{m}:generateContent" async with httpx.AsyncClient() as client: r = await client.post( safe_url, params={"key": self.key}, # key in query param, never in logs json={ "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": max_tokens}, }, timeout=self.timeout, ) try: r.raise_for_status() except httpx.HTTPStatusError as e: raise RuntimeError( f"HTTP {e.response.status_code} from {safe_url}" ) from None return r.json()["candidates"][0]["content"]["parts"][0]["text"] class OpenRouterProvider(BaseProvider): """OpenRouter API — OpenAI-compatible chat completions endpoint. Required headers: HTTP-Referer + X-Title (required by OpenRouter for free models and rate limit attribution). """ async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str: data = await self._post( f"{self.base_url}/chat/completions", headers={ "Authorization": f"Bearer {self.key}", "HTTP-Referer": os.getenv("APP_URL", "https://huggingface.co"), "X-Title": os.getenv("HUB_NAME", "Universal MCP Hub"), # required! "content-type": "application/json", }, payload={ "model": model or self.model, "max_tokens": max_tokens, "messages": [{"role": "user", "content": prompt}], }, ) return data["choices"][0]["message"]["content"] class HuggingFaceProvider(BaseProvider): """HuggingFace Inference API — OpenAI-compatible serverless endpoint. base_url in .pyfun: https://api-inference.huggingface.co/v1 Model goes in payload, not in URL. Free tier: max ~8B models. PRO required for 70B+. """ async def complete(self, prompt: str, model: str = None, max_tokens: int = 512) -> str: m = model or self.model data = await self._post( f"{self.base_url}/chat/completions", headers={ "Authorization": f"Bearer {self.key}", "content-type": "application/json", }, payload={ "model": m, "max_tokens": max_tokens, "messages": [{"role": "user", "content": prompt}], }, ) return data["choices"][0]["message"]["content"] # ============================================================================= # DUMMY PROVIDERS — copy, uncomment, adapt # Steps: (1) uncomment class (2) add to _PROVIDER_CLASSES (3) add to .pyfun # ============================================================================= # --- OpenAI ------------------------------------------------------------------- # .pyfun block to add: # # [LLM_PROVIDER.openai] # active = "true" # base_url = "https://api.openai.com/v1" # env_key = "OPENAI_API_KEY" # default_model = "gpt-4o-mini" # models = "gpt-4o, gpt-4o-mini, gpt-3.5-turbo" # fallback_to = "" # [LLM_PROVIDER.openai_END] # # class OpenAIProvider(BaseProvider): # """OpenAI API — OpenAI-compatible chat completions endpoint.""" # # async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str: # data = await self._post( # f"{self.base_url}/chat/completions", # headers={ # "Authorization": f"Bearer {self.key}", # "content-type": "application/json", # }, # payload={ # "model": model or self.model, # "max_tokens": max_tokens, # "messages": [{"role": "user", "content": prompt}], # }, # ) # return data["choices"][0]["message"]["content"] # --- Mistral ------------------------------------------------------------------ # .pyfun block to add: # # [LLM_PROVIDER.mistral] # active = "true" # base_url = "https://api.mistral.ai/v1" # env_key = "MISTRAL_API_KEY" # default_model = "mistral-large-latest" # models = "mistral-large-latest, mistral-small-latest, codestral-latest" # fallback_to = "" # [LLM_PROVIDER.mistral_END] # # class MistralProvider(BaseProvider): # """Mistral AI API — OpenAI-compatible chat completions endpoint.""" # # async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str: # data = await self._post( # f"{self.base_url}/chat/completions", # headers={ # "Authorization": f"Bearer {self.key}", # "content-type": "application/json", # }, # payload={ # "model": model or self.model, # "max_tokens": max_tokens, # "messages": [{"role": "user", "content": prompt}], # }, # ) # return data["choices"][0]["message"]["content"] # --- xAI (Grok) --------------------------------------------------------------- # .pyfun block to add: # # [LLM_PROVIDER.xai] # active = "true" # base_url = "https://api.x.ai/v1" # env_key = "XAI_API_KEY" # default_model = "grok-3-mini" # models = "grok-3, grok-3-mini, grok-3-fast" # fallback_to = "" # [LLM_PROVIDER.xai_END] # # class XAIProvider(BaseProvider): # """xAI Grok API — OpenAI-compatible chat completions endpoint.""" # # async def complete(self, prompt: str, model: str = None, max_tokens: int = 1024) -> str: # data = await self._post( # f"{self.base_url}/chat/completions", # headers={ # "Authorization": f"Bearer {self.key}", # "content-type": "application/json", # }, # payload={ # "model": model or self.model, # "max_tokens": max_tokens, # "messages": [{"role": "user", "content": prompt}], # }, # ) # return data["choices"][0]["message"]["content"] # ============================================================================= # SECTION 3 — Provider Registry # Built from .pyfun [LLM_PROVIDERS] at initialize(). # Maps provider names → classes. # To activate a dummy: uncomment class above + add entry here. # ============================================================================= _PROVIDER_CLASSES = { "anthropic": AnthropicProvider, "gemini": GeminiProvider, "openrouter": OpenRouterProvider, "huggingface": HuggingFaceProvider, # "openai": OpenAIProvider, # ← uncomment to activate # "mistral": MistralProvider, # ← uncomment to activate # "xai": XAIProvider, # ← uncomment to activate } _registry: dict = {} def initialize() -> None: """ Build provider registry from .pyfun [LLM_PROVIDERS]. Called once by mcp.py during startup sequence. Skips providers with missing ENV keys — no crash, just fewer tools. """ global _registry active = config.get_active_llm_providers() for name, cfg in active.items(): env_key = cfg.get("env_key", "") if not env_key or not os.getenv(env_key): logger.info(f"Provider '{name}' skipped — ENV key not set.") continue cls = _PROVIDER_CLASSES.get(name) if not cls: logger.info(f"Provider '{name}' has no handler yet — skipped.") continue _registry[name] = cls(name, cfg) logger.info(f"Provider registered: {name}") # ============================================================================= # SECTION 4 — LLM Execution + Fallback Chain # ============================================================================= async def llm_complete( prompt: str, provider_name: str = None, model: str = None, max_tokens: int = 1024, ) -> str: """ Send prompt to LLM provider with automatic fallback chain. Fallback order is defined in .pyfun via fallback_to field. Raises RuntimeError if all providers in the chain fail. Args: prompt: Input text to send to the model. provider_name: Provider name override. Defaults to default_provider from .pyfun [TOOL.llm_complete]. model: Model name override. Defaults to provider's default_model. max_tokens: Max tokens in response. Default: 1024. Returns: Model response as plain text string. """ if not provider_name: tools_cfg = config.get_active_tools() provider_name = tools_cfg.get("llm_complete", {}).get("default_provider", "anthropic") visited = set() current = provider_name while current and current not in visited: visited.add(current) provider = _registry.get(current) if not provider: logger.warning(f"Provider '{current}' not in registry — trying fallback.") else: try: result = await provider.complete(prompt, model, max_tokens) logger.info(f"Response from provider: '{current}'") return f"[{current}] {result}" except Exception as e: # Log only exception type + sanitized message — never raw {e} # which may contain headers, keys, or response bodies logger.warning( f"Provider '{current}' failed: {type(e).__name__}: {e} — trying fallback." ) cfg = config.get_active_llm_providers().get(current, {}) current = cfg.get("fallback_to", "") raise RuntimeError("All providers failed — no fallback available.") # Alias — used internally by tools.py complete = llm_complete # ============================================================================= # SECTION 5 — Search Execution # Search providers not yet implemented — returns placeholder. # Add BraveProvider, TavilyProvider here when ready. # ============================================================================= async def search( query: str, provider_name: str = None, max_results: int = 5, ) -> str: """ Search the web via configured search provider. Search providers not yet implemented — placeholder until BraveProvider ready. Args: query: Search query string. provider_name: Provider name override (e.g. 'brave', 'tavily'). max_results: Maximum number of results. Default: 5. Returns: Formatted search results as plain text string. """ # TODO: implement BraveProvider, TavilyProvider # Same pattern as LLM providers — add class + register in _SEARCH_REGISTRY logger.info(f"web_search called — query: '{query}' — search providers not yet active.") return f"Search not yet implemented. Query was: {query}" # ============================================================================= # SECTION 6 — Registry Helpers # Used by mcp.py for tool registration decisions. # ============================================================================= def list_active_llm() -> list: """Returns list of active LLM provider names.""" return list(_registry.keys()) def list_active_search() -> list: """ Returns list of active search provider names. Empty until search providers are implemented. """ # TODO: return list(_search_registry.keys()) when search providers are ready return [] def get(name: str) -> BaseProvider: """Get a specific provider instance by name.""" return _registry.get(name) # ============================================================================= # Direct execution guard # ============================================================================= if __name__ == "__main__": print("WARNING: Run via main.py → app.py, not directly.")