qwenchat2api / app.py
kevin
copilot 优化方案
9cd681c
# 导入所需的库
import json # 用于JSON数据处理
import time # 用于时间相关操作
import traceback # 用于异常堆栈跟踪
import uuid
from datetime import datetime
from typing import Optional, Dict, Any
from fastapi import FastAPI, HTTPException, Request, Depends, Response # FastAPI框架相关组件
from fastapi.middleware.cors import CORSMiddleware # CORS中间件
from fastapi.responses import StreamingResponse, JSONResponse # 特殊响应类型
import httpx # 异步HTTP客户端
import logging # 日志管理
import random # 随机数生成
import uvicorn # ASGI服务器
import asyncio # 异步IO
# API端点配置
QWEN_API_URL = 'https://chat.qwenlm.ai/api/chat/completions' # 通达API聊天完成接口
QWEN_MODELS_URL = 'https://chat.qwenlm.ai/api/models' # 模型列表接口
MAX_RETRIES = 3 # 最大重试次数
RETRY_DELAY = 1 # 重试延迟时间(秒)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
# 创建FastAPI应用实例
app = FastAPI()
# 添加CORS中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有来源
allow_credentials=True, # 允许携带凭证
allow_methods=["*"], # 允许所有HTTP方法
allow_headers=["*"], # 允许所有请求头
)
# 创建异步HTTP客户端
client = httpx.AsyncClient()
async def fetch_with_retry(url, options, retries=MAX_RETRIES):
"""
带重试机制的异步请求函数
参数:
url: 请求URL
options: 请求选项
retries: 重试次数
"""
last_error = None
auth_header = options.get('headers', {}).get('Authorization', '')
if auth_header.startswith('Bearer '):
logger.info(f"Processing request with session identifier: {auth_header[7:]}")
for i in range(retries):
try:
async with httpx.AsyncClient() as client:
# 设置请求头
if 'headers' in options:
client.headers.update(options['headers'])
# 发送请求
response = await client.request(
method=options.get('method', 'GET'),
url=url,
content=options.get('body'),
timeout=60
)
# 检查响应类型和状态码
content_type = response.headers.get('content-type', '')
if 'text/html' in content_type or response.status_code == 500:
last_error = {
'status': response.status_code,
'contentType': content_type,
'responseText': response.text[:1000],
'headers': dict(response.headers)
}
if i < retries - 1:
logger.error(f"Retry attempt {i+1} for session {auth_header[7:]} failed")
await asyncio.sleep(RETRY_DELAY * (i + 1))
continue
return response
except Exception as error:
last_error = error
logger.error(f"Connection error for session {auth_header[7:]} on attempt {i+1}")
traceback.print_exc()
if i < retries - 1:
await asyncio.sleep(RETRY_DELAY * (i + 1))
continue
# 所有重试都失败后抛出异常
raise Exception(json.dumps({
'error': True,
'message': 'All retry attempts failed',
'lastError': str(last_error),
'retries': retries
}))
async def process_line(line, previous_content):
"""
处理流式响应中的单行数据,避免内容重复
"""
try:
data = json.loads(line[6:]) # 解析JSON数据
if data.get('choices') and data['choices'][0].get('delta') and data['choices'][0]['delta'].get('content'):
current_content = data['choices'][0]['delta']['content']
new_content = current_content
# 避免内容重复
if current_content.startswith(previous_content) and len(previous_content) > 0:
new_content = current_content[len(previous_content):]
# 只有在有新内容时才构建新的响应数据
if new_content:
new_data = {
'choices': [{
'delta': {
'content': new_content
}
}]
}
return f"data: {json.dumps(new_data)}\n\n", new_content
return None, None
else:
return f"data: {json.dumps(data)}\n\n", None
except Exception:
return f"{line}\n\n", None
# async def handle_stream(response, previous_content):
# """
# 处理流式响应,确保实时发送到客户端
# """
# buffer = ''
# async for chunk in response.aiter_lines(encoding='utf-8'):
# try:
# # decoded_chunk = chunk.decode('utf-8')
# decoded_chunk = chunk.strip()
# print('decoded_chunk: ', decoded_chunk, ' ::end')
# buffer += decoded_chunk
#
# # 立即处理每个完整的数据行
# while '\n' in buffer:
# # print('buffer: ', buffer)
# line, buffer = buffer.split('\n', 1)
# line = line.strip()
# if line.startswith('data: '):
# processed_line, new_content = await process_line(line, previous_content)
# if processed_line:
# # 确保每个响应都以正确的SSE格式发送
# # print('11data: ', json.dumps(processed_line, indent=4))
# yield processed_line
# # 立即刷新输出
# await asyncio.sleep(0)
# if new_content:
# previous_content = previous_content + new_content
#
# # print("data: [DONE]")
# # yield "data: [DONE]\n\n"
#
# except Exception as e:
# logger.error(f"Error processing chunk: {str(e)}")
# yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n"
# continue
#
# # 处理剩余的buffer
# if buffer.strip():
# line = buffer.strip()
# if line.startswith('data: '):
# processed_line, new_content = await process_line(line, previous_content)
# if processed_line:
# yield processed_line
#
# print("data: [DONE]")
# yield "data: [DONE]\n\n"
def create_chat_completion_data(
content: str, model: str, timestamp: int, finish_reason: Optional[str] = None
) -> Dict[str, Any]:
return {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": timestamp,
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": content, "role": "assistant"},
"finish_reason": finish_reason,
}
],
"usage": None,
}
async def handle_stream(response, previous_content):
"""
优化后的流式响应处理函数
主要改进:
1. 使用简单的字符串切片而不是startswith比较
2. 增加内容缓存来减少重复处理
3. 优化内存使用
"""
timestamp = int(datetime.now().timestamp())
content_cache = "" # 用于缓存已处理的内容
async def send_chunk(content: str):
"""内部函数:格式化并发送数据块"""
chunk_data = {
"id": f"chatcmpl-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"created": timestamp,
"model": "qwen",
"choices": [{
"index": 0,
"delta": {
"content": content,
"role": "assistant"
},
"finish_reason": None
}]
}
return f"data: {json.dumps(chunk_data)}\n\n"
async for chunk in response.aiter_lines():
if not chunk or not chunk.startswith('data: '):
continue
try:
# 解析数据
data = json.loads(chunk[6:]) # 去掉 'data: ' 前缀
# 提取新内容
if not (data.get('choices') and
data['choices'][0].get('delta') and
data['choices'][0]['delta'].get('content')):
continue
current_content = data['choices'][0]['delta']['content']
# 智能差异检测
if content_cache:
# 找到新内容的起始位置
for i in range(min(len(current_content), len(content_cache))):
if current_content[i] != content_cache[i]:
new_content = current_content[i:]
break
else:
# 如果前面的内容都相同,新内容就是超出的部分
new_content = current_content[len(content_cache):]
else:
new_content = current_content
# 只有真正有新内容时才发送
if new_content:
yield await send_chunk(new_content)
content_cache = current_content # 更新缓存
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {str(e)}")
continue
except Exception as e:
logger.error(f"处理数据流时发生错误: {str(e)}")
yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n"
# 发送结束标记
yield "data: [DONE]\n\n"
async def get_openai_auth_headers(request: Request) -> dict:
"""
获取认证头信息
参数:
request: 请求对象
返回:
包含认证信息的字典
"""
auth_header = request.headers.get("Authorization")
if not auth_header:
raise HTTPException(status_code=401, detail="Missing Authorization header")
logger.info(f"New request authenticated with session {auth_header[7:]}")
return {"Authorization": auth_header}
async def make_request(method, url, headers, body, api_keys=None, retry_count=0):
"""
发送请求的通用函数,支持多个API密钥
"""
try:
if api_keys and len(api_keys) > 1:
# 多个API密钥的情况
remaining_keys = api_keys.copy()
while remaining_keys and retry_count < 3:
selected_key = random.choice(remaining_keys)
remaining_keys.remove(selected_key)
headers = {**headers, "Authorization": f"Bearer {selected_key}"}
logger.info(f"Attempting request with API key: {selected_key}")
try:
async with httpx.AsyncClient() as client:
r = await client.request(
method,
url,
headers=headers,
content=body,
timeout=600
)
if r.status_code < 400:
return r
logger.error(f"Request failed with key {selected_key}, status code: {r.status_code}")
except Exception as e:
logger.error(f"Request failed with key {selected_key}: {str(e)}")
retry_count += 1
raise HTTPException(status_code=500, detail="All API keys failed")
else:
# 单个API密钥的情况
while retry_count < 3:
single_key = api_keys[0] if api_keys else headers.get("authorization", "").replace("Bearer ", "").strip()
request_headers = {**headers, "Authorization": f"Bearer {single_key}"}
logger.info(f"Attempting request with API key: {single_key}")
try:
async with httpx.AsyncClient() as client:
r = await client.request(
method,
url,
headers=request_headers,
content=body,
timeout=600
)
if r.status_code < 400:
return r
logger.error(f"Request attempt {retry_count + 1} failed for session {single_key}")
except Exception as e:
logger.error(f"Connection attempt {retry_count + 1} failed for session {single_key}")
retry_count += 1
raise HTTPException(status_code=500, detail="Request failed after 3 retries")
except Exception as e:
logger.error(f"Request failed: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
# API路由定义
@app.api_route(
"/api/chat/completions",
methods=["POST", "OPTIONS"],
)
async def chat_completions(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)):
"""
处理聊天完成请求的端点
"""
# 处理请求头
headers = dict(request.headers)
if "content-length" in headers:
del headers["content-length"]
if "host" in headers:
del headers["host"]
# 获取请求体
request_body = await request.body()
try:
request_data = json.loads(request_body.decode('utf-8'))
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
# 提取请求参数
messages = request_data.get('messages')
stream = request_data.get('stream', False)
model = request_data.get('model')
max_tokens = request_data.get('max_tokens')
if not model:
raise HTTPException(status_code=400, detail="Model parameter is required")
# 构建请求数据
qwen_request = {
'model': model,
'messages': messages,
'stream': stream
}
if max_tokens is not None:
qwen_request['max_tokens'] = max_tokens
try:
# 发送请求到Qwen API
response = await fetch_with_retry(QWEN_API_URL, {
'method': 'POST',
'headers': {
'Content-Type': 'application/json',
**auth_headers
},
'body': json.dumps(qwen_request),
'stream': stream
})
# 处理响应
if stream:
previous_content = ''
return StreamingResponse(handle_stream(response, previous_content), media_type="text/event-stream", headers={
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', # 禁用 Nginx 缓冲
'Content-Type': 'text/event-stream',
'Transfer-Encoding': 'chunked'
},
status_code=200,
background=None # 确保不会在后台处理
)
else:
return Response(content=response.content, status_code=response.status_code, headers=response.headers)
except Exception as error:
raise HTTPException(status_code=500, detail=str(error))
@app.get("/api/models")
async def models(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)):
"""获取可用模型列表的端点"""
try:
response = await fetch_with_retry(QWEN_MODELS_URL, {
'method': 'GET',
'headers': {
'Content-Type': 'application/json',
**auth_headers
},
'timeout': 30
})
response_data = response.json()
return JSONResponse(content=response_data)
except Exception as e:
logger.error(f"Error in /api/models: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get('/')
async def index(request: Request):
"""根路径重定向"""
return Response(status_code=302, headers={"Location": "https://chat.qwenlm.ai"})
@app.get('/{path:path}')
@app.post('/{path:path}')
async def redirect_all(path: str, request: Request):
"""处理所有其他路径的重定向"""
# 检查是否包含敏感关键词
if any(keyword in path.lower() for keyword in ['php', 'admin', 'login', 'wp-admin', 'manager', 'user', 'signin']):
return Response(status_code=301, headers={"Location": "http://127.0.0.1"})
if request.method == 'POST':
return Response(status_code=301, headers={"Location": "http://127.0.0.1"})
return Response(status_code=302, headers={"Location": "https://linux.do/u/f-droid"})
# 主程序入口
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8080,
loop="asyncio",
timeout_keep_alive=65,
access_log=True,
log_level="debug",
http="h11",
limit_concurrency=1000,
backlog=2048
)