# =================================================================== # main.py (最终修复版:修正函数定义顺序) # =================================================================== import json import os import time import uuid import threading from typing import Any, Dict, List, Optional, TypedDict, Union import requests from fastapi import FastAPI, HTTPException, Depends, Query from fastapi.responses import StreamingResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel, Field # --- 类型定义和全局变量 --- class CodeGeeXToken(TypedDict): token: str is_valid: bool last_used: float error_count: int VALID_CLIENT_KEYS: set = set() CODEGEEX_TOKENS: List[CodeGeeXToken] = [] CODEGEEX_MODELS: List[str] = ["claude-3-7-sonnet", "claude-sonnet-4"] token_rotation_lock = threading.Lock() MAX_ERROR_COUNT = 3 ERROR_COOLDOWN = 300 DEBUG_MODE = os.environ.get("DEBUG_MODE", "false").lower() == "true" # --- Pydantic 模型 --- class ChatMessage(BaseModel): role: str content: Union[str, List[Dict[str, Any]]] reasoning_content: Optional[str] = None class ChatCompletionRequest(BaseModel): model: str messages: List[ChatMessage] stream: bool = True temperature: Optional[float] = None max_tokens: Optional[int] = None top_p: Optional[float] = None class ModelInfo(BaseModel): id: str object: str = "model" created: int owned_by: str class ModelList(BaseModel): object: str = "list" data: List[ModelInfo] class ChatCompletionChoice(BaseModel): message: ChatMessage index: int = 0 finish_reason: str = "stop" class ChatCompletionResponse(BaseModel): id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") object: str = "chat.completion" created: int = Field(default_factory=lambda: int(time.time())) model: str choices: List[ChatCompletionChoice] usage: Dict[str, int] = Field(default_factory=lambda: {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) class StreamChoice(BaseModel): delta: Dict[str, Any] = Field(default_factory=dict) index: int = 0 finish_reason: Optional[str] = None class StreamResponse(BaseModel): id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}") object: str = "chat.completion.chunk" created: int = Field(default_factory=lambda: int(time.time())) model: str choices: List[StreamChoice] # --- FastAPI App --- app = FastAPI(title="CodeGeeX OpenAI API Adapter") security = HTTPBearer(auto_error=False) def log_debug(message: str): if DEBUG_MODE: print(f"[DEBUG] {message}") # --- 配置加载函数 (从 Secrets 读取) --- def load_client_api_keys_from_secrets(): global VALID_CLIENT_KEYS try: keys_str = os.environ.get("CLIENT_API_KEYS") if not keys_str: raise ValueError("Secret 'CLIENT_API_KEYS' not found.") keys = json.loads(keys_str) VALID_CLIENT_KEYS = set(keys) if isinstance(keys, list) else set() print(f"Successfully loaded {len(VALID_CLIENT_KEYS)} client API keys.") except Exception as e: print(f"FATAL: Error loading client API keys: {e}") VALID_CLIENT_KEYS = set() def load_codegeex_tokens_from_secrets(): global CODEGEEX_TOKENS CODEGEEX_TOKENS = [] try: tokens_str = os.environ.get("CODEGEEX_TOKENS") if not tokens_str: raise ValueError("Secret 'CODEGEEX_TOKENS' not found.") tokens = json.loads(tokens_str) if not isinstance(tokens, list): raise TypeError("Secret 'CODEGEEX_TOKENS' must be a JSON list.") for token in tokens: if isinstance(token, str) and token: CODEGEEX_TOKENS.append({"token": token, "is_valid": True, "last_used": 0, "error_count": 0}) print(f"Successfully loaded {len(CODEGEEX_TOKENS)} CodeGeeX tokens.") except Exception as e: print(f"FATAL: Error loading CodeGeeX tokens: {e}") # --- 核心工具函数和认证 (确保在使用前定义) --- def get_best_codegeex_token() -> Optional[CodeGeeXToken]: with token_rotation_lock: now = time.time() valid_tokens = [t for t in CODEGEEX_TOKENS if t["is_valid"] and (t["error_count"] < MAX_ERROR_COUNT or now - t["last_used"] > ERROR_COOLDOWN)] if not valid_tokens: return None for token in valid_tokens: if token["error_count"] >= MAX_ERROR_COUNT and now - token["last_used"] > ERROR_COOLDOWN: token["error_count"] = 0 valid_tokens.sort(key=lambda x: (x["last_used"], x["error_count"])) token = valid_tokens[0] token["last_used"] = now return token def _convert_messages_to_codegeex_format(messages: List[ChatMessage]): if not messages: return "", [] last_user_msg = next((msg for msg in reversed(messages) if msg.role == "user"), None) if not last_user_msg: raise HTTPException(status_code=400, detail="No user message found.") prompt = last_user_msg.content if isinstance(last_user_msg.content, str) else "" history, user_content, assistant_content = [], "", "" for msg in messages: if msg == last_user_msg: break if msg.role == "user": if user_content and assistant_content: history.append({"query": user_content, "answer": assistant_content, "id": f"{uuid.uuid4()}"}); user_content, assistant_content = "", "" user_content = msg.content if isinstance(msg.content, str) else "" elif msg.role == "assistant": assistant_content = msg.content if isinstance(msg.content, str) else "" if user_content: history.append({"query": user_content, "answer": assistant_content, "id": f"{uuid.uuid4()}"}); user_content, assistant_content = "", "" if user_content and not assistant_content: prompt = user_content + "\n" + prompt return prompt, history async def authenticate_client(auth: Optional[HTTPAuthorizationCredentials] = Depends(security)): if not VALID_CLIENT_KEYS: raise HTTPException(status_code=503, detail="Service unavailable: Client API keys not configured.") if not auth or not auth.credentials: raise HTTPException(status_code=401, detail="API key required.", headers={"WWW-Authenticate": "Bearer"}) if auth.credentials not in VALID_CLIENT_KEYS: raise HTTPException(status_code=403, detail="Invalid client API key.") # --- FastAPI 事件和路由 --- @app.on_event("startup") async def startup(): print("Starting CodeGeeX OpenAI API Adapter server...") load_client_api_keys_from_secrets() load_codegeex_tokens_from_secrets() print("Server initialization completed.") @app.get("/") def health_check(): return {"status": "ok", "message": "CodeGeeX API Adapter is running."} def get_models_list_response() -> ModelList: return ModelList(data=[ModelInfo(id=model, created=int(time.time()), owned_by="anthropic") for model in CODEGEEX_MODELS]) @app.get("/v1/models", response_model=ModelList) async def list_v1_models(_: None = Depends(authenticate_client)): return get_models_list_response() @app.get("/models", response_model=ModelList) async def list_models_no_auth(): return get_models_list_response() def _codegeex_stream_generator(response, model: str): stream_id = f"chatcmpl-{uuid.uuid4().hex}" created_time = int(time.time()) yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'role': 'assistant'})]).json()}\n\n" buffer = "" try: for chunk in response.iter_content(chunk_size=1024): if not chunk: continue buffer += chunk.decode("utf-8", errors='ignore') while "\n\n" in buffer: event_data, buffer = buffer.split("\n\n", 1) event_data = event_data.strip() if not event_data: continue event_type, data_json = None, None for line in event_data.split("\n"): if line.startswith("event:"): event_type = line[6:].strip() elif line.startswith("data:"): try: data_json = json.loads(line[5:].strip()) except: continue if not event_type or not data_json: continue if event_type == "add": delta = data_json.get("text", "") if delta: yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'content': delta})]).json()}\n\n" elif event_type == "finish": yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={}, finish_reason='stop')]).json()}\n\n" yield "data: [DONE]\n\n" return except Exception as e: log_debug(f"Stream processing error: {e}") yield f"data: {json.dumps({'error': str(e)})}\n\n" yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={}, finish_reason='stop')]).json()}\n\n" yield "data: [DONE]\n\n" def _build_codegeex_non_stream_response(response, model: str) -> ChatCompletionResponse: full_content = "" buffer = "" for chunk in response.iter_content(chunk_size=1024): if not chunk: continue buffer += chunk.decode("utf-8", errors='ignore') while "\n\n" in buffer: event_data, buffer = buffer.split("\n\n", 1) event_data = event_data.strip() if not event_data: continue event_type, data_json = None, None for line in event_data.split("\n"): if line.startswith("event:"): event_type = line[6:].strip() elif line.startswith("data:"): try: data_json = json.loads(line[5:].strip()) except: continue if not event_type or not data_json: continue if event_type == "add": full_content += data_json.get("text", "") elif event_type == "finish": finish_text = data_json.get("text", "") if finish_text: full_content = finish_text return ChatCompletionResponse(model=model, choices=[ChatCompletionChoice(message=ChatMessage(role="assistant", content=full_content))]) return ChatCompletionResponse(model=model, choices=[ChatCompletionChoice(message=ChatMessage(role="assistant", content=full_content))]) @app.post("/v1/chat/completions") async def chat_completions(request: ChatCompletionRequest, _: None = Depends(authenticate_client)): if request.model not in CODEGEEX_MODELS: raise HTTPException(status_code=404, detail=f"Model '{request.model}' not found.") if not request.messages: raise HTTPException(status_code=400, detail="No messages provided.") try: prompt, history = _convert_messages_to_codegeex_format(request.messages) except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to process messages: {e}") for attempt in range(len(CODEGEEX_TOKENS) + 1): if attempt == len(CODEGEEX_TOKENS): raise HTTPException(status_code=503, detail="All attempts to contact CodeGeeX API failed.") token = get_best_codegeex_token() if not token: raise HTTPException(status_code=503, detail="No valid CodeGeeX tokens available.") try: payload = {"user_role": 0, "ide": "VSCode", "prompt": prompt, "model": request.model, "history": history, "talkId": f"{uuid.uuid4()}", "plugin_version": "", "locale": "", "agent": None, "candidates": {"candidate_msg_id": "", "candidate_type": "", "selected_candidate": ""}, "ide_version": "", "machineId": ""} headers = {"User-Agent": "Mozilla/5.0", "Accept": "text/event-stream", "Content-Type": "application/json", "code-token": token["token"]} response = requests.post("https://codegeex.cn/prod/code/chatCodeSseV3/chat", data=json.dumps(payload), headers=headers, stream=True, timeout=300.0) response.raise_for_status() if request.stream: return StreamingResponse(_codegeex_stream_generator(response, request.model), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}) else: return _build_codegeex_non_stream_response(response, request.model) except requests.HTTPError as e: status_code = getattr(e.response, "status_code", 500) with token_rotation_lock: if status_code in [401, 403]: token["is_valid"] = False elif status_code in [429, 500, 502, 503, 504]: token["error_count"] += 1 except Exception as e: with token_rotation_lock: token["error_count"] += 1