import os import time import requests from typing import List, Dict # ================== 环境变量 ================== LITELLM_BASE_URL = os.environ.get("LITELLM_BASE_URL", "http://localhost:7860") LITELLM_MASTER_KEY = os.environ["LITELLM_MASTER_KEY"] CLIPROXY_BASE_URL = os.environ["CLIPROXY_BASE_URL"] CLIPROXY_API_KEY = os.environ.get("CLIPROXY_API_KEY", "") CREDENTIAL_NAME = os.environ.get("LITELLM_CREDENTIAL_NAME", "cliproxy") FORCE_CREDENTIAL = os.environ.get("FORCE_USE_CREDENTIAL", "false").lower() == "true" PRIMARY_MODEL_GROUP = os.environ.get("FALLBACK_PRIMARY_MODEL", "cliproxy/*") SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL_SECONDS", 3600)) SYNC_TAG = "cliproxy-synced" HEADERS = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"} PROVIDER_PRIORITY = { "cliproxy": 0, "moonshotai": 1, "kimi": 1, "anthropic": 2, "google": 3, } # ================== 基础工具 ================== def wait_for_litellm(): while True: try: r = requests.get(f"{LITELLM_BASE_URL}/health", headers=HEADERS, timeout=5) if r.status_code == 200: print("✅ LiteLLM 已就绪") return except Exception: pass time.sleep(5) def infer_provider(owner: str, model_id: str) -> str: owner = owner.lower() model_id = model_id.lower() if owner == "cliproxy": return "openai" if "gemini" in model_id or owner == "google": return "gemini" if "claude" in model_id or owner == "anthropic": return "anthropic" if owner in ("moonshotai", "kimi"): return "openai" return "openai" def is_system_model(name: str) -> bool: n = name.lower() return n.startswith(("container", "hf", "litellm", "internal")) def sort_fallback(models: Dict[str, str]) -> List[str]: def score(item): name, owner = item if "embed" in name.lower(): return (99, name) return (PROVIDER_PRIORITY.get(owner.lower(), 50), name) return [n for n, _ in sorted(models.items(), key=score)] # ================== LiteLLM API ================== def get_existing_models() -> List[dict]: r = requests.get(f"{LITELLM_BASE_URL}/v1/models", headers=HEADERS) r.raise_for_status() return r.json().get("data", []) def get_cliproxy_models() -> List[dict]: headers = {} if CLIPROXY_API_KEY: headers["Authorization"] = f"Bearer {CLIPROXY_API_KEY}" r = requests.get(f"{CLIPROXY_BASE_URL}/models", headers=headers) r.raise_for_status() data = r.json() return data["data"] if isinstance(data, dict) else data def add_model(original_id: str, owner: str, use_credential: bool): provider = infer_provider(owner, original_id) name = f"{owner}/{original_id}" params = {"model": f"{provider}/{original_id}"} if use_credential: params["litellm_credential_name"] = CREDENTIAL_NAME else: params["api_base"] = CLIPROXY_BASE_URL.rstrip("/") + "/v1" params["api_key"] = CLIPROXY_API_KEY info = { "owned_by": owner, "tags": [SYNC_TAG], "model_type": "embedding" if "embed" in original_id.lower() else "chat", } payload = { "model_name": name, "litellm_params": params, "model_info": info, } r = requests.post(f"{LITELLM_BASE_URL}/model/new", json=payload, headers=HEADERS) if r.ok: print(f"➕ 新增模型 {name}") else: print(f"❌ 新增失败 {name}: {r.text}") def update_model(model_id: str, params: dict, info: dict): payload = {"id": model_id, "litellm_params": params, "model_info": info} r = requests.post(f"{LITELLM_BASE_URL}/model/update", json=payload, headers=HEADERS) if r.ok: print(f"🔄 更新模型 {model_id}") else: print(f"❌ 更新失败 {model_id}: {r.text}") def delete_model(model_id: str, name: str): r = requests.post(f"{LITELLM_BASE_URL}/model/delete", json={"id": model_id}, headers=HEADERS) if r.ok: print(f"🗑️ 删除模型 {name}") def update_fallback(models: List[str]): requests.post( f"{LITELLM_BASE_URL}/fallback/delete", json={"model": PRIMARY_MODEL_GROUP, "fallback_type": "general"}, headers=HEADERS, ) if models: requests.post( f"{LITELLM_BASE_URL}/fallback", json={ "model": PRIMARY_MODEL_GROUP, "fallback_models": models[:50], "fallback_type": "general", }, headers=HEADERS, ) # ================== 幂等同步主逻辑 ================== def sync(): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 同步开始") use_credential = FORCE_CREDENTIAL clip_models = get_cliproxy_models() clip_map = {f"{m['owned_by']}/{m['id']}": m["owned_by"] for m in clip_models} existing = get_existing_models() by_name: Dict[str, List[dict]] = {} for m in existing: if "model_name" in m: by_name.setdefault(m["model_name"], []).append(m) # === 同步 / 更新 / 去重 === for name, owner in clip_map.items(): original_id = name.split("/", 1)[1] provider = infer_provider(owner, original_id) desired_params = {"model": f"{provider}/{original_id}"} if use_credential: desired_params["litellm_credential_name"] = CREDENTIAL_NAME else: desired_params["api_base"] = CLIPROXY_BASE_URL.rstrip("/") + "/v1" desired_params["api_key"] = CLIPROXY_API_KEY desired_info = { "owned_by": owner, "tags": [SYNC_TAG], "model_type": "embedding" if "embed" in original_id.lower() else "chat", } models = by_name.get(name, []) if not models: add_model(original_id, owner, use_credential) continue primary = models[0] current_params = primary.get("litellm_params", {}) has_cred = "litellm_credential_name" in current_params if has_cred != use_credential: update_model(primary["id"], desired_params, desired_info) for dup in models[1:]: delete_model(dup["id"], name) # === 删除失效模型 === for name, models in by_name.items(): if name not in clip_map: for m in models: if SYNC_TAG in m.get("tags", []): delete_model(m["id"], name) # === fallback === sorted_models = sort_fallback(clip_map) fallback_models = [ m for m in sorted_models if "embed" not in m.lower() and not is_system_model(m) ] update_fallback(fallback_models) print("✅ 同步完成\n") # ================== 守护 ================== if __name__ == "__main__": wait_for_litellm() print(f"🚀 同步守护启动,每 {SYNC_INTERVAL}s 执行一次") while True: try: sync() except Exception as e: print("同步异常:", e) time.sleep(SYNC_INTERVAL)