code2api / main.py
hins111's picture
Update main.py
fd88cfe verified
raw
history blame
13.1 kB
# ===================================================================
# main.py (已修复 Hugging Face 健康检查问题)
# ===================================================================
import json
import os
import time
import uuid
import threading
from typing import Any, Dict, List, Optional, TypedDict, Union
import requests
from fastapi import FastAPI, HTTPException, Depends, Query
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
# --- 类型定义和全局变量 ---
class CodeGeeXToken(TypedDict):
token: str
is_valid: bool
last_used: float
error_count: int
VALID_CLIENT_KEYS: set = set()
CODEGEEX_TOKENS: List[CodeGeeXToken] = []
CODEGEEX_MODELS: List[str] = ["claude-3-7-sonnet", "claude-sonnet-4"]
token_rotation_lock = threading.Lock()
MAX_ERROR_COUNT = 3
ERROR_COOLDOWN = 300
DEBUG_MODE = os.environ.get("DEBUG_MODE", "false").lower() == "true"
# --- Pydantic 模型 ---
class ChatMessage(BaseModel):
role: str
content: Union[str, List[Dict[str, Any]]]
reasoning_content: Optional[str] = None
# ... (其他模型保持不变)
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
stream: bool = True
temperature: Optional[float] = None
max_tokens: Optional[int] = None
top_p: Optional[float] = None
class ModelInfo(BaseModel):
id: str
object: str = "model"
created: int
owned_by: str
class ModelList(BaseModel):
object: str = "list"
data: List[ModelInfo]
class ChatCompletionChoice(BaseModel):
message: ChatMessage
index: int = 0
finish_reason: str = "stop"
class ChatCompletionResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[ChatCompletionChoice]
usage: Dict[str, int] = Field(default_factory=lambda: {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0})
class StreamChoice(BaseModel):
delta: Dict[str, Any] = Field(default_factory=dict)
index: int = 0
finish_reason: Optional[str] = None
class StreamResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{uuid.uuid4().hex}")
object: str = "chat.completion.chunk"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[StreamChoice]
# --- FastAPI App ---
app = FastAPI(title="CodeGeeX OpenAI API Adapter")
security = HTTPBearer(auto_error=False)
def log_debug(message: str):
if DEBUG_MODE:
print(f"[DEBUG] {message}")
# --- 配置加载函数 (从 Secrets 读取) ---
def load_client_api_keys_from_secrets():
global VALID_CLIENT_KEYS
try:
keys_str = os.environ.get("CLIENT_API_KEYS")
if not keys_str: raise ValueError("Secret 'CLIENT_API_KEYS' not found.")
keys = json.loads(keys_str)
VALID_CLIENT_KEYS = set(keys) if isinstance(keys, list) else set()
print(f"Successfully loaded {len(VALID_CLIENT_KEYS)} client API keys.")
except Exception as e:
print(f"FATAL: Error loading client API keys: {e}")
VALID_CLIENT_KEYS = set()
def load_codegeex_tokens_from_secrets():
global CODEGEEX_TOKENS
CODEGEEX_TOKENS = []
try:
tokens_str = os.environ.get("CODEGEEX_TOKENS")
if not tokens_str: raise ValueError("Secret 'CODEGEEX_TOKENS' not found.")
tokens = json.loads(tokens_str)
if not isinstance(tokens, list): raise TypeError("Secret 'CODEGEEX_TOKENS' must be a JSON list.")
for token in tokens:
if isinstance(token, str) and token:
CODEGEEX_TOKENS.append({"token": token, "is_valid": True, "last_used": 0, "error_count": 0})
print(f"Successfully loaded {len(CODEGEEX_TOKENS)} CodeGeeX tokens.")
except Exception as e:
print(f"FATAL: Error loading CodeGeeX tokens: {e}")
@app.on_event("startup")
async def startup():
print("Starting CodeGeeX OpenAI API Adapter server...")
load_client_api_keys_from_secrets()
load_codegeex_tokens_from_secrets()
print("Server initialization completed.")
# --- THE FIX: Add a root endpoint for the health check ---
@app.get("/")
def health_check():
"""Health check endpoint for Hugging Face."""
return {"status": "ok", "message": "CodeGeeX API Adapter is running."}
# ---------------------------------------------------------
# --- 原始路由 ---
def get_models_list_response() -> ModelList:
return ModelList(data=[ModelInfo(id=model, created=int(time.time()), owned_by="anthropic") for model in CODEGEEX_MODELS])
@app.get("/v1/models", response_model=ModelList)
async def list_v1_models(_: None = Depends(authenticate_client)):
return get_models_list_response()
# ... (所有其他函数和路由都与之前的文件一致,无需改动)
# The rest of the original code is unchanged.
def get_best_codegeex_token() -> Optional[CodeGeeXToken]:
with token_rotation_lock:
now = time.time()
valid_tokens = [t for t in CODEGEEX_TOKENS if t["is_valid"] and (t["error_count"] < MAX_ERROR_COUNT or now - t["last_used"] > ERROR_COOLDOWN)]
if not valid_tokens: return None
for token in valid_tokens:
if token["error_count"] >= MAX_ERROR_COUNT and now - token["last_used"] > ERROR_COOLDOWN: token["error_count"] = 0
valid_tokens.sort(key=lambda x: (x["last_used"], x["error_count"]))
token = valid_tokens[0]
token["last_used"] = now
return token
def _convert_messages_to_codegeex_format(messages: List[ChatMessage]):
if not messages: return "", []
last_user_msg = next((msg for msg in reversed(messages) if msg.role == "user"), None)
if not last_user_msg: raise HTTPException(status_code=400, detail="No user message found.")
prompt = last_user_msg.content if isinstance(last_user_msg.content, str) else ""
history, user_content, assistant_content = [], "", ""
for msg in messages:
if msg == last_user_msg: break
if msg.role == "user":
if user_content and assistant_content: history.append({"query": user_content, "answer": assistant_content, "id": f"{uuid.uuid4()}"}); user_content, assistant_content = "", ""
user_content = msg.content if isinstance(msg.content, str) else ""
elif msg.role == "assistant":
assistant_content = msg.content if isinstance(msg.content, str) else ""
if user_content: history.append({"query": user_content, "answer": assistant_content, "id": f"{uuid.uuid4()}"}); user_content, assistant_content = "", ""
if user_content and not assistant_content: prompt = user_content + "\n" + prompt
return prompt, history
async def authenticate_client(auth: Optional[HTTPAuthorizationCredentials] = Depends(security)):
if not VALID_CLIENT_KEYS: raise HTTPException(status_code=503, detail="Service unavailable: Client API keys not configured.")
if not auth or not auth.credentials: raise HTTPException(status_code=401, detail="API key required.", headers={"WWW-Authenticate": "Bearer"})
if auth.credentials not in VALID_CLIENT_KEYS: raise HTTPException(status_code=403, detail="Invalid client API key.")
@app.get("/models", response_model=ModelList)
async def list_models_no_auth():
return get_models_list_response()
def _codegeex_stream_generator(response, model: str):
stream_id = f"chatcmpl-{uuid.uuid4().hex}"
created_time = int(time.time())
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'role': 'assistant'})]).json()}\n\n"
buffer = ""
try:
for chunk in response.iter_content(chunk_size=1024):
if not chunk: continue
buffer += chunk.decode("utf-8", errors='ignore')
while "\n\n" in buffer:
event_data, buffer = buffer.split("\n\n", 1)
event_data = event_data.strip()
if not event_data: continue
event_type, data_json = None, None
for line in event_data.split("\n"):
if line.startswith("event:"): event_type = line[6:].strip()
elif line.startswith("data:"):
try: data_json = json.loads(line[5:].strip())
except: continue
if not event_type or not data_json: continue
if event_type == "add":
delta = data_json.get("text", "")
if delta: yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={'content': delta})]).json()}\n\n"
elif event_type == "finish":
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={}, finish_reason='stop')]).json()}\n\n"
yield "data: [DONE]\n\n"
return
except Exception as e:
log_debug(f"Stream processing error: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
yield f"data: {StreamResponse(id=stream_id, created=created_time, model=model, choices=[StreamChoice(delta={}, finish_reason='stop')]).json()}\n\n"
yield "data: [DONE]\n\n"
def _build_codegeex_non_stream_response(response, model: str) -> ChatCompletionResponse:
full_content = ""
buffer = ""
for chunk in response.iter_content(chunk_size=1024):
if not chunk: continue
buffer += chunk.decode("utf-8", errors='ignore')
while "\n\n" in buffer:
event_data, buffer = buffer.split("\n\n", 1)
event_data = event_data.strip()
if not event_data: continue
event_type, data_json = None, None
for line in event_data.split("\n"):
if line.startswith("event:"): event_type = line[6:].strip()
elif line.startswith("data:"):
try: data_json = json.loads(line[5:].strip())
except: continue
if not event_type or not data_json: continue
if event_type == "add": full_content += data_json.get("text", "")
elif event_type == "finish":
finish_text = data_json.get("text", "")
if finish_text: full_content = finish_text
return ChatCompletionResponse(model=model, choices=[ChatCompletionChoice(message=ChatMessage(role="assistant", content=full_content))])
return ChatCompletionResponse(model=model, choices=[ChatCompletionChoice(message=ChatMessage(role="assistant", content=full_content))])
@app.post("/v1/chat/completions")
async def chat_completions(request: ChatCompletionRequest, _: None = Depends(authenticate_client)):
if request.model not in CODEGEEX_MODELS: raise HTTPException(status_code=404, detail=f"Model '{request.model}' not found.")
if not request.messages: raise HTTPException(status_code=400, detail="No messages provided.")
try: prompt, history = _convert_messages_to_codegeex_format(request.messages)
except Exception as e: raise HTTPException(status_code=400, detail=f"Failed to process messages: {e}")
for attempt in range(len(CODEGEEX_TOKENS) + 1):
if attempt == len(CODEGEEX_TOKENS): raise HTTPException(status_code=503, detail="All attempts to contact CodeGeeX API failed.")
token = get_best_codegeex_token()
if not token: raise HTTPException(status_code=503, detail="No valid CodeGeeX tokens available.")
try:
payload = {"user_role": 0, "ide": "VSCode", "prompt": prompt, "model": request.model, "history": history, "talkId": f"{uuid.uuid4()}", "plugin_version": "", "locale": "", "agent": None, "candidates": {"candidate_msg_id": "", "candidate_type": "", "selected_candidate": ""}, "ide_version": "", "machineId": ""}
headers = {"User-Agent": "Mozilla/5.0", "Accept": "text/event-stream", "Content-Type": "application/json", "code-token": token["token"]}
response = requests.post("https://codegeex.cn/prod/code/chatCodeSseV3/chat", data=json.dumps(payload), headers=headers, stream=True, timeout=300.0)
response.raise_for_status()
if request.stream: return StreamingResponse(_codegeex_stream_generator(response, request.model), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"})
else: return _build_codegeex_non_stream_response(response, request.model)
except requests.HTTPError as e:
status_code = getattr(e.response, "status_code", 500)
with token_rotation_lock:
if status_code in [401, 403]: token["is_valid"] = False
elif status_code in [429, 500, 502, 503, 504]: token["error_count"] += 1
except Exception as e:
with token_rotation_lock: token["error_count"] += 1