# 导入所需的库 import json # 用于JSON数据处理 import time # 用于时间相关操作 import traceback # 用于异常堆栈跟踪 import uuid from datetime import datetime from typing import Optional, Dict, Any from fastapi import FastAPI, HTTPException, Request, Depends, Response # FastAPI框架相关组件 from fastapi.middleware.cors import CORSMiddleware # CORS中间件 from fastapi.responses import StreamingResponse, JSONResponse # 特殊响应类型 import httpx # 异步HTTP客户端 import logging # 日志管理 import random # 随机数生成 import uvicorn # ASGI服务器 import asyncio # 异步IO # API端点配置 QWEN_API_URL = 'https://chat.qwenlm.ai/api/chat/completions' # 通达API聊天完成接口 QWEN_MODELS_URL = 'https://chat.qwenlm.ai/api/models' # 模型列表接口 MAX_RETRIES = 3 # 最大重试次数 RETRY_DELAY = 1 # 重试延迟时间(秒) # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) # 创建FastAPI应用实例 app = FastAPI() # 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源 allow_credentials=True, # 允许携带凭证 allow_methods=["*"], # 允许所有HTTP方法 allow_headers=["*"], # 允许所有请求头 ) # 创建异步HTTP客户端 client = httpx.AsyncClient() async def fetch_with_retry(url, options, retries=MAX_RETRIES): """ 带重试机制的异步请求函数 参数: url: 请求URL options: 请求选项 retries: 重试次数 """ last_error = None auth_header = options.get('headers', {}).get('Authorization', '') if auth_header.startswith('Bearer '): logger.info(f"Processing request with session identifier: {auth_header[7:]}") for i in range(retries): try: async with httpx.AsyncClient() as client: # 设置请求头 if 'headers' in options: client.headers.update(options['headers']) # 发送请求 response = await client.request( method=options.get('method', 'GET'), url=url, content=options.get('body'), timeout=60 ) # 检查响应类型和状态码 content_type = response.headers.get('content-type', '') if 'text/html' in content_type or response.status_code == 500: last_error = { 'status': response.status_code, 'contentType': content_type, 'responseText': response.text[:1000], 'headers': dict(response.headers) } if i < retries - 1: logger.error(f"Retry attempt {i+1} for session {auth_header[7:]} failed") await asyncio.sleep(RETRY_DELAY * (i + 1)) continue return response except Exception as error: last_error = error logger.error(f"Connection error for session {auth_header[7:]} on attempt {i+1}") traceback.print_exc() if i < retries - 1: await asyncio.sleep(RETRY_DELAY * (i + 1)) continue # 所有重试都失败后抛出异常 raise Exception(json.dumps({ 'error': True, 'message': 'All retry attempts failed', 'lastError': str(last_error), 'retries': retries })) async def process_line(line, previous_content): """ 处理流式响应中的单行数据,避免内容重复 """ try: data = json.loads(line[6:]) # 解析JSON数据 if data.get('choices') and data['choices'][0].get('delta') and data['choices'][0]['delta'].get('content'): current_content = data['choices'][0]['delta']['content'] new_content = current_content # 避免内容重复 if current_content.startswith(previous_content) and len(previous_content) > 0: new_content = current_content[len(previous_content):] # 只有在有新内容时才构建新的响应数据 if new_content: new_data = { 'choices': [{ 'delta': { 'content': new_content } }] } return f"data: {json.dumps(new_data)}\n\n", new_content return None, None else: return f"data: {json.dumps(data)}\n\n", None except Exception: return f"{line}\n\n", None # async def handle_stream(response, previous_content): # """ # 处理流式响应,确保实时发送到客户端 # """ # buffer = '' # async for chunk in response.aiter_lines(encoding='utf-8'): # try: # # decoded_chunk = chunk.decode('utf-8') # decoded_chunk = chunk.strip() # print('decoded_chunk: ', decoded_chunk, ' ::end') # buffer += decoded_chunk # # # 立即处理每个完整的数据行 # while '\n' in buffer: # # print('buffer: ', buffer) # line, buffer = buffer.split('\n', 1) # line = line.strip() # if line.startswith('data: '): # processed_line, new_content = await process_line(line, previous_content) # if processed_line: # # 确保每个响应都以正确的SSE格式发送 # # print('11data: ', json.dumps(processed_line, indent=4)) # yield processed_line # # 立即刷新输出 # await asyncio.sleep(0) # if new_content: # previous_content = previous_content + new_content # # # print("data: [DONE]") # # yield "data: [DONE]\n\n" # # except Exception as e: # logger.error(f"Error processing chunk: {str(e)}") # yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n" # continue # # # 处理剩余的buffer # if buffer.strip(): # line = buffer.strip() # if line.startswith('data: '): # processed_line, new_content = await process_line(line, previous_content) # if processed_line: # yield processed_line # # print("data: [DONE]") # yield "data: [DONE]\n\n" def create_chat_completion_data( content: str, model: str, timestamp: int, finish_reason: Optional[str] = None ) -> Dict[str, Any]: return { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": timestamp, "model": model, "choices": [ { "index": 0, "delta": {"content": content, "role": "assistant"}, "finish_reason": finish_reason, } ], "usage": None, } async def handle_stream(response, previous_content): """ 优化后的流式响应处理函数 主要改进: 1. 使用简单的字符串切片而不是startswith比较 2. 增加内容缓存来减少重复处理 3. 优化内存使用 """ timestamp = int(datetime.now().timestamp()) content_cache = "" # 用于缓存已处理的内容 async def send_chunk(content: str): """内部函数:格式化并发送数据块""" chunk_data = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": timestamp, "model": "qwen", "choices": [{ "index": 0, "delta": { "content": content, "role": "assistant" }, "finish_reason": None }] } return f"data: {json.dumps(chunk_data)}\n\n" async for chunk in response.aiter_lines(): if not chunk or not chunk.startswith('data: '): continue try: # 解析数据 data = json.loads(chunk[6:]) # 去掉 'data: ' 前缀 # 提取新内容 if not (data.get('choices') and data['choices'][0].get('delta') and data['choices'][0]['delta'].get('content')): continue current_content = data['choices'][0]['delta']['content'] # 智能差异检测 if content_cache: # 找到新内容的起始位置 for i in range(min(len(current_content), len(content_cache))): if current_content[i] != content_cache[i]: new_content = current_content[i:] break else: # 如果前面的内容都相同,新内容就是超出的部分 new_content = current_content[len(content_cache):] else: new_content = current_content # 只有真正有新内容时才发送 if new_content: yield await send_chunk(new_content) content_cache = current_content # 更新缓存 except json.JSONDecodeError as e: logger.error(f"JSON解析错误: {str(e)}") continue except Exception as e: logger.error(f"处理数据流时发生错误: {str(e)}") yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n" # 发送结束标记 yield "data: [DONE]\n\n" async def get_openai_auth_headers(request: Request) -> dict: """ 获取认证头信息 参数: request: 请求对象 返回: 包含认证信息的字典 """ auth_header = request.headers.get("Authorization") if not auth_header: raise HTTPException(status_code=401, detail="Missing Authorization header") logger.info(f"New request authenticated with session {auth_header[7:]}") return {"Authorization": auth_header} async def make_request(method, url, headers, body, api_keys=None, retry_count=0): """ 发送请求的通用函数,支持多个API密钥 """ try: if api_keys and len(api_keys) > 1: # 多个API密钥的情况 remaining_keys = api_keys.copy() while remaining_keys and retry_count < 3: selected_key = random.choice(remaining_keys) remaining_keys.remove(selected_key) headers = {**headers, "Authorization": f"Bearer {selected_key}"} logger.info(f"Attempting request with API key: {selected_key}") try: async with httpx.AsyncClient() as client: r = await client.request( method, url, headers=headers, content=body, timeout=600 ) if r.status_code < 400: return r logger.error(f"Request failed with key {selected_key}, status code: {r.status_code}") except Exception as e: logger.error(f"Request failed with key {selected_key}: {str(e)}") retry_count += 1 raise HTTPException(status_code=500, detail="All API keys failed") else: # 单个API密钥的情况 while retry_count < 3: single_key = api_keys[0] if api_keys else headers.get("authorization", "").replace("Bearer ", "").strip() request_headers = {**headers, "Authorization": f"Bearer {single_key}"} logger.info(f"Attempting request with API key: {single_key}") try: async with httpx.AsyncClient() as client: r = await client.request( method, url, headers=request_headers, content=body, timeout=600 ) if r.status_code < 400: return r logger.error(f"Request attempt {retry_count + 1} failed for session {single_key}") except Exception as e: logger.error(f"Connection attempt {retry_count + 1} failed for session {single_key}") retry_count += 1 raise HTTPException(status_code=500, detail="Request failed after 3 retries") except Exception as e: logger.error(f"Request failed: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) # API路由定义 @app.api_route( "/api/chat/completions", methods=["POST", "OPTIONS"], ) async def chat_completions(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)): """ 处理聊天完成请求的端点 """ # 处理请求头 headers = dict(request.headers) if "content-length" in headers: del headers["content-length"] if "host" in headers: del headers["host"] # 获取请求体 request_body = await request.body() try: request_data = json.loads(request_body.decode('utf-8')) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON") # 提取请求参数 messages = request_data.get('messages') stream = request_data.get('stream', False) model = request_data.get('model') max_tokens = request_data.get('max_tokens') if not model: raise HTTPException(status_code=400, detail="Model parameter is required") # 构建请求数据 qwen_request = { 'model': model, 'messages': messages, 'stream': stream } if max_tokens is not None: qwen_request['max_tokens'] = max_tokens try: # 发送请求到Qwen API response = await fetch_with_retry(QWEN_API_URL, { 'method': 'POST', 'headers': { 'Content-Type': 'application/json', **auth_headers }, 'body': json.dumps(qwen_request), 'stream': stream }) # 处理响应 if stream: previous_content = '' return StreamingResponse(handle_stream(response, previous_content), media_type="text/event-stream", headers={ 'Cache-Control': 'no-cache, no-transform', 'Connection': 'keep-alive', 'X-Accel-Buffering': 'no', # 禁用 Nginx 缓冲 'Content-Type': 'text/event-stream', 'Transfer-Encoding': 'chunked' }, status_code=200, background=None # 确保不会在后台处理 ) else: return Response(content=response.content, status_code=response.status_code, headers=response.headers) except Exception as error: raise HTTPException(status_code=500, detail=str(error)) @app.get("/api/models") async def models(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)): """获取可用模型列表的端点""" try: response = await fetch_with_retry(QWEN_MODELS_URL, { 'method': 'GET', 'headers': { 'Content-Type': 'application/json', **auth_headers }, 'timeout': 30 }) response_data = response.json() return JSONResponse(content=response_data) except Exception as e: logger.error(f"Error in /api/models: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @app.get('/') async def index(request: Request): """根路径重定向""" return Response(status_code=302, headers={"Location": "https://chat.qwenlm.ai"}) @app.get('/{path:path}') @app.post('/{path:path}') async def redirect_all(path: str, request: Request): """处理所有其他路径的重定向""" # 检查是否包含敏感关键词 if any(keyword in path.lower() for keyword in ['php', 'admin', 'login', 'wp-admin', 'manager', 'user', 'signin']): return Response(status_code=301, headers={"Location": "http://127.0.0.1"}) if request.method == 'POST': return Response(status_code=301, headers={"Location": "http://127.0.0.1"}) return Response(status_code=302, headers={"Location": "https://linux.do/u/f-droid"}) # 主程序入口 if __name__ == "__main__": uvicorn.run( app, host="0.0.0.0", port=8080, loop="asyncio", timeout_keep_alive=65, access_log=True, log_level="debug", http="h11", limit_concurrency=1000, backlog=2048 )