| from datetime import datetime, timezone |
| from typing import Any, Dict, Optional |
|
|
| from app.config.config import settings |
| from app.log.logger import get_model_logger |
| from app.service.client.api_client import GeminiApiClient |
|
|
| logger = get_model_logger() |
|
|
|
|
| class ModelService: |
| async def get_gemini_models(self, api_key: str) -> Optional[Dict[str, Any]]: |
| api_client = GeminiApiClient(base_url=settings.BASE_URL) |
| gemini_models = await api_client.get_models(api_key) |
|
|
| if gemini_models is None: |
| logger.error("从 API 客户端获取模型列表失败。") |
| return None |
|
|
| try: |
| filtered_models_list = [] |
| for model in gemini_models.get("models", []): |
| model_id = model["name"].split("/")[-1] |
| if model_id not in settings.FILTERED_MODELS: |
| filtered_models_list.append(model) |
| else: |
| logger.debug(f"Filtered out model: {model_id}") |
|
|
| gemini_models["models"] = filtered_models_list |
| return gemini_models |
| except Exception as e: |
| logger.error(f"处理模型列表时出错: {e}") |
| return None |
|
|
| async def get_gemini_openai_models(self, api_key: str) -> Optional[Dict[str, Any]]: |
| """获取 Gemini 模型并转换为 OpenAI 格式""" |
| gemini_models = await self.get_gemini_models(api_key) |
| if gemini_models is None: |
| return None |
| |
| return await self.convert_to_openai_models_format(gemini_models) |
|
|
| async def convert_to_openai_models_format( |
| self, gemini_models: Dict[str, Any] |
| ) -> Dict[str, Any]: |
| openai_format = {"object": "list", "data": [], "success": True} |
|
|
| for model in gemini_models.get("models", []): |
| model_id = model["name"].split("/")[-1] |
| openai_model = { |
| "id": model_id, |
| "object": "model", |
| "created": int(datetime.now(timezone.utc).timestamp()), |
| "owned_by": "google", |
| "permission": [], |
| "root": model["name"], |
| "parent": None, |
| } |
| openai_format["data"].append(openai_model) |
|
|
| if model_id in settings.SEARCH_MODELS: |
| search_model = openai_model.copy() |
| search_model["id"] = f"{model_id}-search" |
| openai_format["data"].append(search_model) |
| if model_id in settings.IMAGE_MODELS: |
| image_model = openai_model.copy() |
| image_model["id"] = f"{model_id}-image" |
| openai_format["data"].append(image_model) |
| if model_id in settings.THINKING_MODELS: |
| non_thinking_model = openai_model.copy() |
| non_thinking_model["id"] = f"{model_id}-non-thinking" |
| openai_format["data"].append(non_thinking_model) |
|
|
| if settings.CREATE_IMAGE_MODEL: |
| image_model = openai_model.copy() |
| image_model["id"] = f"{settings.CREATE_IMAGE_MODEL}-chat" |
| openai_format["data"].append(image_model) |
| return openai_format |
|
|
| async def check_model_support(self, model: str) -> bool: |
| if not model or not isinstance(model, str): |
| return False |
|
|
| model = model.strip() |
| if model.endswith("-search"): |
| model = model[:-7] |
| return model in settings.SEARCH_MODELS |
| if model.endswith("-image"): |
| model = model[:-6] |
| return model in settings.IMAGE_MODELS |
|
|
| return model not in settings.FILTERED_MODELS |
|
|