kodu2api / main.py
hins111's picture
Create main.py
9d831fa verified
# ===================================================================
# main.py (已修改以适配 Hugging Face Secrets)
# ===================================================================
import httpx
import time
import json
import os
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any, Union
# --- Pydantic 模型定义 ---
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
stream: bool = False
temperature: Optional[float] = None
max_tokens: Optional[int] = None
top_p: Optional[float] = None
# --- 全局变量 ---
config: Dict[str, Any] = {}
app = FastAPI(title="Kodu2API Adapter", description="将 Kodu AI API 转换为 OpenAI 格式")
security = HTTPBearer()
# --- 核心函数 (已修改) ---
def load_config():
"""
从环境变量加载配置 (适配 Hugging Face Secrets).
"""
global config
print("Loading configuration from environment variables...")
try:
# 必需的 Secret: Kodu Refresh Token
kodu_refresh_token = os.environ.get("KODU_REFRESH_TOKEN")
if not kodu_refresh_token:
raise ValueError("Secret 'KODU_REFRESH_TOKEN' is required but not found.")
# 必需的 Secret: 你的服务密码 (API Keys)
service_api_keys_str = os.environ.get("SERVICE_API_KEYS")
if not service_api_keys_str:
raise ValueError("Secret 'SERVICE_API_KEYS' is required but not found.")
service_api_keys = json.loads(service_api_keys_str)
# 可选的 Secret: Kodu 模型映射
kodu_models_str = os.environ.get("KODU_MODELS")
kodu_models = json.loads(kodu_models_str) if kodu_models_str else {}
# 可选的 Secret: 代理 URL
proxy_url = os.environ.get("PROXY_URL") or None
# 可选的 Secret: Base URL
base_url = os.environ.get("BASE_URL", "https://api.kodu.ai")
# 组合成 config 字典
config = {
"kodu_refresh_token": kodu_refresh_token,
"kodu_access_token": "", # 初始为空,后续会自动刷新
"kodu_models": kodu_models,
"service_api_keys": service_api_keys,
"proxy_url": proxy_url,
"base_url": base_url
}
# 验证 service_api_keys 的格式
if not isinstance(config["service_api_keys"], list) or not all(isinstance(i, str) for i in config["service_api_keys"]):
raise TypeError("Secret 'SERVICE_API_KEYS' must be a JSON list of strings (e.g., [\"sk-key1\", \"sk-key2\"]).")
print("Configuration loaded successfully from secrets.")
print(f"Loaded {len(config['service_api_keys'])} service API key(s).")
print(f"Loaded {len(config['kodu_models'])} Kodu model mapping(s).")
if config["proxy_url"]:
print(f"Using proxy: {config['proxy_url']}")
except Exception as e:
print(f"FATAL: Could not load configuration from secrets. Error: {e}")
# 在配置失败时,将 config 设置为空字典以阻止应用不完整地运行
config = {}
raise
async def refresh_access_token():
"""
使用 refresh_token 获取新的 access_token.
"""
if not config:
print("Cannot refresh token, config not loaded.")
return False
print("Attempting to refresh Kodu access token...")
async with httpx.AsyncClient(proxies=config.get("proxy_url")) as client:
try:
response = await client.post(
f"{config['base_url']}/auth/refresh-token",
headers={"Authorization": f"Bearer {config['kodu_refresh_token']}"}
)
response.raise_for_status()
config['kodu_access_token'] = response.json().get('accessToken')
print("Successfully refreshed Kodu access token.")
return True
except httpx.HTTPStatusError as e:
print(f"Error refreshing access token: {e.response.status_code} - {e.response.text}")
return False
except Exception as e:
print(f"An unexpected error occurred during token refresh: {e}")
return False
async def authenticate_client(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
验证客户端提供的 API Key.
"""
if not config:
raise HTTPException(status_code=503, detail="Service Unavailable: Server configuration is not loaded.")
if credentials.credentials not in config.get("service_api_keys", []):
raise HTTPException(status_code=401, detail="Invalid API Key")
# --- FastAPI 事件和路由 ---
@app.on_event("startup")
async def startup_event():
"""
应用启动时执行的事件.
"""
load_config()
if config:
await refresh_access_token()
@app.get("/v1/models", dependencies=[Depends(authenticate_client)])
async def list_models():
"""
列出可用的模型.
"""
model_data = [
{"id": model_name, "object": "model", "owned_by": "kodu-ai", "created": int(time.time())}
for model_name in config.get("kodu_models", {})
]
return {"object": "list", "data": model_data}
@app.post("/v1/chat/completions", dependencies=[Depends(authenticate_client)])
async def create_chat_completion(request: ChatCompletionRequest):
"""
处理聊天请求.
"""
if not config.get('kodu_access_token'):
# 尝试再次刷新 token
if not await refresh_access_token():
raise HTTPException(status_code=503, detail="Kodu AI service unavailable, could not refresh token.")
kodu_model = config.get("kodu_models", {}).get(request.model)
if not kodu_model:
raise HTTPException(status_code=404, detail=f"Model '{request.model}' not found or mapped.")
payload = {
"model": kodu_model,
"messages": [{"role": msg.role, "content": msg.content} for msg in request.messages],
"stream": request.stream,
}
headers = {
"Authorization": f"Bearer {config['kodu_access_token']}",
"Content-Type": "application/json",
}
async def stream_generator():
async with httpx.AsyncClient(proxies=config.get("proxy_url"), timeout=300) as client:
try:
async with client.stream("POST", f"{config['base_url']}/chat/completions", headers=headers, json=payload) as response:
if response.status_code != 200:
error_content = await response.aread()
print(f"Kodu API Error: {response.status_code} - {error_content.decode()}")
# 尝试刷新 token 并重试一次
if response.status_code == 401:
print("Received 401, attempting to refresh token and retry...")
await refresh_access_token()
# 需要重新构建流请求,这里为了简化,直接返回错误
yield f"data: {json.dumps({'error': {'message': f'Kodu API Error: {error_content.decode()}'}})}\n\n"
yield "data: [DONE]\n\n"
return
async for chunk in response.aiter_bytes():
yield chunk.decode('utf-8')
except Exception as e:
print(f"An error occurred during streaming: {e}")
yield f"data: {json.dumps({'error': {'message': 'An internal error occurred.'}})}\n\n"
yield "data: [DONE]\n\n"
if request.stream:
return StreamingResponse(stream_generator(), media_type="text/event-stream")
else:
async with httpx.AsyncClient(proxies=config.get("proxy_url"), timeout=300) as client:
try:
response = await client.post(f"{config['base_url']}/chat/completions", headers=headers, json=payload)
if response.status_code == 401:
print("Received 401, refreshing token and retrying...")
await refresh_access_token()
headers["Authorization"] = f"Bearer {config['kodu_access_token']}"
response = await client.post(f"{config['base_url']}/chat/completions", headers=headers, json=payload)
response.raise_for_status()
return JSONResponse(content=response.json())
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail=e.response.text)