import os import time import requests from typing import List, Dict, Set # ---------- 环境变量 ---------- LITELLM_BASE_URL = os.environ.get("LITELLM_BASE_URL", "http://localhost:7860") LITELLM_MASTER_KEY = os.environ["LITELLM_MASTER_KEY"] CLIPROXY_BASE_URL = os.environ["CLIPROXY_BASE_URL"] # 例如 https://xxx.hf.space (不包含 /v1) CLIPROXY_API_KEY = os.environ["CLIPROXY_API_KEY"] # CLIProxy 的 API Key PRIMARY_MODEL_GROUP = os.environ.get("FALLBACK_PRIMARY_MODEL", "cliproxy/*") SYNC_INTERVAL = int(os.environ.get("SYNC_INTERVAL_SECONDS", 3600)) MAX_RETRIES = int(os.environ.get("MAX_RETRIES", 5)) RETRY_DELAY = int(os.environ.get("RETRY_DELAY_SECONDS", 5)) SYNC_TAG = "cliproxy-synced" MASTER_HEADERS = {"Authorization": f"Bearer {LITELLM_MASTER_KEY}"} # ---------- 等待 LiteLLM 就绪 ---------- def wait_for_litellm_ready(timeout: int = 180): print("⏳ 等待 LiteLLM Proxy 启动...") start = time.time() while time.time() - start < timeout: try: resp = requests.get(f"{LITELLM_BASE_URL}/health", headers=MASTER_HEADERS, timeout=5) if resp.status_code == 200: print("✅ LiteLLM Proxy 已就绪") return except requests.RequestException: pass time.sleep(5) raise RuntimeError(f"❌ LiteLLM Proxy 未在 {timeout}s 内就绪") # ---------- CLIProxy 模型获取 ---------- def get_cliproxy_models() -> List[Dict]: headers = {"Authorization": f"Bearer {CLIPROXY_API_KEY}"} # CLIProxy 的 /models 端点通常不在 /v1 下,按实际情况调整 resp = requests.get(f"{CLIPROXY_BASE_URL}/models", headers=headers) resp.raise_for_status() return resp.json()["data"] # ---------- LiteLLM 现有模型 ID 集合 ---------- def get_existing_model_ids() -> Set[str]: """获取 LiteLLM 中所有模型的 ID 集合""" resp = requests.get(f"{LITELLM_BASE_URL}/v1/models", headers=MASTER_HEADERS) resp.raise_for_status() return {m["id"] for m in resp.json().get("data", [])} # ---------- 判断是否为同步模型(用于清理) ---------- def is_synced_model(model_id: str) -> bool: """检查模型 ID 是否符合我们的同步命名规则:包含 /,且第一部分不是 openai""" if "/" not in model_id: return False prefix, name = model_id.split("/", 1) # 我们同步的模型前缀通常是 owner (非 openai),如 antigravity/xxx return prefix != "openai" and len(name) > 0 # ---------- 模型删除 ---------- def delete_model(model_name: str): resp = requests.delete( f"{LITELLM_BASE_URL}/model/delete", json={"model_name": model_name}, headers=MASTER_HEADERS, ) return resp.status_code # ---------- 模型添加 ---------- def add_model_to_litellm(original_id: str, owner: str) -> bool: """添加模型到 LiteLLM,使用环境变量中的 api_base 和 api_key""" new_name = f"{owner}/{original_id}" if not original_id.startswith(f"{owner}/") else original_id payload = { "model_name": new_name, "litellm_params": { "model": f"openai/{original_id}", "api_base": CLIPROXY_BASE_URL.rstrip("/") + "/v1", "api_key": CLIPROXY_API_KEY, }, "model_info": { "owned_by": owner, "tags": [SYNC_TAG], "access_groups": [f"{owner}-models"], }, } resp = requests.post( f"{LITELLM_BASE_URL}/model/new", json=payload, headers=MASTER_HEADERS, ) if resp.status_code in (200, 201): return True else: print(f" ❌ 添加模型 {new_name} 失败: {resp.status_code} {resp.text}") return False # ---------- 更新 Fallback 链 ---------- def update_fallback_chain(model_names: List[str]): # 先清空现有的 fallback requests.delete( f"{LITELLM_BASE_URL}/fallback/{PRIMARY_MODEL_GROUP}?fallback_type=general", headers=MASTER_HEADERS, ) if not model_names: print("⚠️ 没有可用的 Fallback 模型,已清空规则。") return fallback_models = model_names[:50] payload = { "model": PRIMARY_MODEL_GROUP, "fallback_models": fallback_models, "fallback_type": "general", } resp = requests.post(f"{LITELLM_BASE_URL}/fallback", json=payload, headers=MASTER_HEADERS) if resp.ok: print(f"✅ Fallback 已更新,包含 {len(fallback_models)} 个模型。") else: print(f"❌ Fallback 更新失败: {resp.status_code} {resp.text}") # ---------- 主同步流程 ---------- def sync(): print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 开始同步…") # 1. 获取 CLIProxy 当前模型列表 cliproxy_models = get_cliproxy_models() cliproxy_map: Dict[str, str] = {} # new_name -> owner for m in cliproxy_models: owner = m.get("owned_by", "unknown") original_id = m["id"] new_name = f"{owner}/{original_id}" cliproxy_map[new_name] = owner # 2. 获取 LiteLLM 现有模型 ID 集合 existing_ids = get_existing_model_ids() # 3. 添加新模型(去重) added_count = 0 for new_name, owner in cliproxy_map.items(): if new_name not in existing_ids: if add_model_to_litellm(original_id=cliproxy_map.get(new_name, new_name.split("/", 1)[1]), owner=owner): print(f" ➕ 新增模型: {new_name}") added_count += 1 else: print(f" ❌ 新增 {new_name} 失败") # 4. 删除无效模型(只删除符合同步命名规则且不在 CLIProxy 中的) deleted_count = 0 for model_id in list(existing_ids): if is_synced_model(model_id) and model_id not in cliproxy_map: status = delete_model(model_id) if status == 200: print(f" 🗑️ 删除失效模型: {model_id}") deleted_count += 1 else: print(f" ❌ 删除 {model_id} 失败 (status {status})") # 5. 更新 Fallback 链 current_valid_models = list(cliproxy_map.keys()) update_fallback_chain(current_valid_models) print(f"同步完成:新增 {added_count}, 删除 {deleted_count}\n") # ---------- 守护进程 ---------- if __name__ == "__main__": wait_for_litellm_ready() print("开始守护同步任务...") while True: sync() time.sleep(SYNC_INTERVAL)