| import httpx |
| import asyncio |
| import json |
| from typing import List, Dict, Optional, Any |
|
|
| |
| import config as app_config |
|
|
| _model_cache: Optional[Dict[str, List[str]]] = None |
| _cache_lock = asyncio.Lock() |
|
|
| async def fetch_and_parse_models_config() -> Optional[Dict[str, List[str]]]: |
| """ |
| Fetches the model configuration JSON from the URL specified in app_config. |
| Parses it and returns a dictionary with 'vertex_models' and 'vertex_express_models'. |
| Returns None if fetching or parsing fails. |
| """ |
| if not app_config.MODELS_CONFIG_URL: |
| print("ERROR: MODELS_CONFIG_URL is not set in the environment/config.") |
| return None |
|
|
| print(f"Fetching model configuration from: {app_config.MODELS_CONFIG_URL}") |
| try: |
| async with httpx.AsyncClient() as client: |
| response = await client.get(app_config.MODELS_CONFIG_URL) |
| response.raise_for_status() |
| data = response.json() |
| |
| |
| if isinstance(data, dict) and \ |
| "vertex_models" in data and isinstance(data["vertex_models"], list) and \ |
| "vertex_express_models" in data and isinstance(data["vertex_express_models"], list): |
| print("Successfully fetched and parsed model configuration.") |
| |
| |
| return { |
| "vertex_models": data["vertex_models"], |
| "vertex_express_models": data["vertex_express_models"] |
| } |
| else: |
| print(f"ERROR: Fetched model configuration has an invalid structure: {data}") |
| return None |
| except httpx.RequestError as e: |
| print(f"ERROR: HTTP request failed while fetching model configuration: {e}") |
| return None |
| except json.JSONDecodeError as e: |
| print(f"ERROR: Failed to decode JSON from model configuration: {e}") |
| return None |
| except Exception as e: |
| print(f"ERROR: An unexpected error occurred while fetching/parsing model configuration: {e}") |
| return None |
|
|
| async def get_models_config() -> Dict[str, List[str]]: |
| """ |
| Returns the cached model configuration. |
| If not cached, fetches and caches it. |
| Returns a default empty structure if fetching fails. |
| """ |
| global _model_cache |
| async with _cache_lock: |
| if _model_cache is None: |
| print("Model cache is empty. Fetching configuration...") |
| _model_cache = await fetch_and_parse_models_config() |
| if _model_cache is None: |
| print("WARNING: Using default empty model configuration due to fetch/parse failure.") |
| _model_cache = {"vertex_models": [], "vertex_express_models": []} |
| return _model_cache |
|
|
| async def get_vertex_models() -> List[str]: |
| config = await get_models_config() |
| return config.get("vertex_models", []) |
|
|
| async def get_vertex_express_models() -> List[str]: |
| config = await get_models_config() |
| return config.get("vertex_express_models", []) |
|
|
| async def refresh_models_config_cache() -> bool: |
| """ |
| Forces a refresh of the model configuration cache. |
| Returns True if successful, False otherwise. |
| """ |
| global _model_cache |
| print("Attempting to refresh model configuration cache...") |
| async with _cache_lock: |
| new_config = await fetch_and_parse_models_config() |
| if new_config is not None: |
| _model_cache = new_config |
| print("Model configuration cache refreshed successfully.") |
| return True |
| else: |
| print("ERROR: Failed to refresh model configuration cache.") |
| |
| |
| return False |