Spaces:
Sleeping
Sleeping
| # 导入所需的库 | |
| import json # 用于JSON数据处理 | |
| import time # 用于时间相关操作 | |
| import traceback # 用于异常堆栈跟踪 | |
| import uuid | |
| from datetime import datetime | |
| from typing import Optional, Dict, Any | |
| from fastapi import FastAPI, HTTPException, Request, Depends, Response # FastAPI框架相关组件 | |
| from fastapi.middleware.cors import CORSMiddleware # CORS中间件 | |
| from fastapi.responses import StreamingResponse, JSONResponse # 特殊响应类型 | |
| import httpx # 异步HTTP客户端 | |
| import logging # 日志管理 | |
| import random # 随机数生成 | |
| import uvicorn # ASGI服务器 | |
| import asyncio # 异步IO | |
| # API端点配置 | |
| QWEN_API_URL = 'https://chat.qwenlm.ai/api/chat/completions' # 通达API聊天完成接口 | |
| QWEN_MODELS_URL = 'https://chat.qwenlm.ai/api/models' # 模型列表接口 | |
| MAX_RETRIES = 3 # 最大重试次数 | |
| RETRY_DELAY = 1 # 重试延迟时间(秒) | |
| # 配置日志 | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # 创建FastAPI应用实例 | |
| app = FastAPI() | |
| # 添加CORS中间件 | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # 允许所有来源 | |
| allow_credentials=True, # 允许携带凭证 | |
| allow_methods=["*"], # 允许所有HTTP方法 | |
| allow_headers=["*"], # 允许所有请求头 | |
| ) | |
| # 创建异步HTTP客户端 | |
| client = httpx.AsyncClient() | |
| async def fetch_with_retry(url, options, retries=MAX_RETRIES): | |
| """ | |
| 带重试机制的异步请求函数 | |
| 参数: | |
| url: 请求URL | |
| options: 请求选项 | |
| retries: 重试次数 | |
| """ | |
| last_error = None | |
| auth_header = options.get('headers', {}).get('Authorization', '') | |
| if auth_header.startswith('Bearer '): | |
| logger.info(f"Processing request with session identifier: {auth_header[7:]}") | |
| for i in range(retries): | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| # 设置请求头 | |
| if 'headers' in options: | |
| client.headers.update(options['headers']) | |
| # 发送请求 | |
| response = await client.request( | |
| method=options.get('method', 'GET'), | |
| url=url, | |
| content=options.get('body'), | |
| timeout=60 | |
| ) | |
| # 检查响应类型和状态码 | |
| content_type = response.headers.get('content-type', '') | |
| if 'text/html' in content_type or response.status_code == 500: | |
| last_error = { | |
| 'status': response.status_code, | |
| 'contentType': content_type, | |
| 'responseText': response.text[:1000], | |
| 'headers': dict(response.headers) | |
| } | |
| if i < retries - 1: | |
| logger.error(f"Retry attempt {i+1} for session {auth_header[7:]} failed") | |
| await asyncio.sleep(RETRY_DELAY * (i + 1)) | |
| continue | |
| return response | |
| except Exception as error: | |
| last_error = error | |
| logger.error(f"Connection error for session {auth_header[7:]} on attempt {i+1}") | |
| traceback.print_exc() | |
| if i < retries - 1: | |
| await asyncio.sleep(RETRY_DELAY * (i + 1)) | |
| continue | |
| # 所有重试都失败后抛出异常 | |
| raise Exception(json.dumps({ | |
| 'error': True, | |
| 'message': 'All retry attempts failed', | |
| 'lastError': str(last_error), | |
| 'retries': retries | |
| })) | |
| async def process_line(line, previous_content): | |
| """ | |
| 处理流式响应中的单行数据,避免内容重复 | |
| """ | |
| try: | |
| data = json.loads(line[6:]) # 解析JSON数据 | |
| if data.get('choices') and data['choices'][0].get('delta') and data['choices'][0]['delta'].get('content'): | |
| current_content = data['choices'][0]['delta']['content'] | |
| new_content = current_content | |
| # 避免内容重复 | |
| if current_content.startswith(previous_content) and len(previous_content) > 0: | |
| new_content = current_content[len(previous_content):] | |
| # 只有在有新内容时才构建新的响应数据 | |
| if new_content: | |
| new_data = { | |
| 'choices': [{ | |
| 'delta': { | |
| 'content': new_content | |
| } | |
| }] | |
| } | |
| return f"data: {json.dumps(new_data)}\n\n", new_content | |
| return None, None | |
| else: | |
| return f"data: {json.dumps(data)}\n\n", None | |
| except Exception: | |
| return f"{line}\n\n", None | |
| # async def handle_stream(response, previous_content): | |
| # """ | |
| # 处理流式响应,确保实时发送到客户端 | |
| # """ | |
| # buffer = '' | |
| # async for chunk in response.aiter_lines(encoding='utf-8'): | |
| # try: | |
| # # decoded_chunk = chunk.decode('utf-8') | |
| # decoded_chunk = chunk.strip() | |
| # print('decoded_chunk: ', decoded_chunk, ' ::end') | |
| # buffer += decoded_chunk | |
| # | |
| # # 立即处理每个完整的数据行 | |
| # while '\n' in buffer: | |
| # # print('buffer: ', buffer) | |
| # line, buffer = buffer.split('\n', 1) | |
| # line = line.strip() | |
| # if line.startswith('data: '): | |
| # processed_line, new_content = await process_line(line, previous_content) | |
| # if processed_line: | |
| # # 确保每个响应都以正确的SSE格式发送 | |
| # # print('11data: ', json.dumps(processed_line, indent=4)) | |
| # yield processed_line | |
| # # 立即刷新输出 | |
| # await asyncio.sleep(0) | |
| # if new_content: | |
| # previous_content = previous_content + new_content | |
| # | |
| # # print("data: [DONE]") | |
| # # yield "data: [DONE]\n\n" | |
| # | |
| # except Exception as e: | |
| # logger.error(f"Error processing chunk: {str(e)}") | |
| # yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n" | |
| # continue | |
| # | |
| # # 处理剩余的buffer | |
| # if buffer.strip(): | |
| # line = buffer.strip() | |
| # if line.startswith('data: '): | |
| # processed_line, new_content = await process_line(line, previous_content) | |
| # if processed_line: | |
| # yield processed_line | |
| # | |
| # print("data: [DONE]") | |
| # yield "data: [DONE]\n\n" | |
| def create_chat_completion_data( | |
| content: str, model: str, timestamp: int, finish_reason: Optional[str] = None | |
| ) -> Dict[str, Any]: | |
| return { | |
| "id": f"chatcmpl-{uuid.uuid4()}", | |
| "object": "chat.completion.chunk", | |
| "created": timestamp, | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"content": content, "role": "assistant"}, | |
| "finish_reason": finish_reason, | |
| } | |
| ], | |
| "usage": None, | |
| } | |
| async def handle_stream(response, previous_content): | |
| """ | |
| 优化后的流式响应处理函数 | |
| 主要改进: | |
| 1. 使用简单的字符串切片而不是startswith比较 | |
| 2. 增加内容缓存来减少重复处理 | |
| 3. 优化内存使用 | |
| """ | |
| timestamp = int(datetime.now().timestamp()) | |
| content_cache = "" # 用于缓存已处理的内容 | |
| async def send_chunk(content: str): | |
| """内部函数:格式化并发送数据块""" | |
| chunk_data = { | |
| "id": f"chatcmpl-{uuid.uuid4()}", | |
| "object": "chat.completion.chunk", | |
| "created": timestamp, | |
| "model": "qwen", | |
| "choices": [{ | |
| "index": 0, | |
| "delta": { | |
| "content": content, | |
| "role": "assistant" | |
| }, | |
| "finish_reason": None | |
| }] | |
| } | |
| return f"data: {json.dumps(chunk_data)}\n\n" | |
| async for chunk in response.aiter_lines(): | |
| if not chunk or not chunk.startswith('data: '): | |
| continue | |
| try: | |
| # 解析数据 | |
| data = json.loads(chunk[6:]) # 去掉 'data: ' 前缀 | |
| # 提取新内容 | |
| if not (data.get('choices') and | |
| data['choices'][0].get('delta') and | |
| data['choices'][0]['delta'].get('content')): | |
| continue | |
| current_content = data['choices'][0]['delta']['content'] | |
| # 智能差异检测 | |
| if content_cache: | |
| # 找到新内容的起始位置 | |
| for i in range(min(len(current_content), len(content_cache))): | |
| if current_content[i] != content_cache[i]: | |
| new_content = current_content[i:] | |
| break | |
| else: | |
| # 如果前面的内容都相同,新内容就是超出的部分 | |
| new_content = current_content[len(content_cache):] | |
| else: | |
| new_content = current_content | |
| # 只有真正有新内容时才发送 | |
| if new_content: | |
| yield await send_chunk(new_content) | |
| content_cache = current_content # 更新缓存 | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON解析错误: {str(e)}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"处理数据流时发生错误: {str(e)}") | |
| yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n" | |
| # 发送结束标记 | |
| yield "data: [DONE]\n\n" | |
| async def get_openai_auth_headers(request: Request) -> dict: | |
| """ | |
| 获取认证头信息 | |
| 参数: | |
| request: 请求对象 | |
| 返回: | |
| 包含认证信息的字典 | |
| """ | |
| auth_header = request.headers.get("Authorization") | |
| if not auth_header: | |
| raise HTTPException(status_code=401, detail="Missing Authorization header") | |
| logger.info(f"New request authenticated with session {auth_header[7:]}") | |
| return {"Authorization": auth_header} | |
| async def make_request(method, url, headers, body, api_keys=None, retry_count=0): | |
| """ | |
| 发送请求的通用函数,支持多个API密钥 | |
| """ | |
| try: | |
| if api_keys and len(api_keys) > 1: | |
| # 多个API密钥的情况 | |
| remaining_keys = api_keys.copy() | |
| while remaining_keys and retry_count < 3: | |
| selected_key = random.choice(remaining_keys) | |
| remaining_keys.remove(selected_key) | |
| headers = {**headers, "Authorization": f"Bearer {selected_key}"} | |
| logger.info(f"Attempting request with API key: {selected_key}") | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| r = await client.request( | |
| method, | |
| url, | |
| headers=headers, | |
| content=body, | |
| timeout=600 | |
| ) | |
| if r.status_code < 400: | |
| return r | |
| logger.error(f"Request failed with key {selected_key}, status code: {r.status_code}") | |
| except Exception as e: | |
| logger.error(f"Request failed with key {selected_key}: {str(e)}") | |
| retry_count += 1 | |
| raise HTTPException(status_code=500, detail="All API keys failed") | |
| else: | |
| # 单个API密钥的情况 | |
| while retry_count < 3: | |
| single_key = api_keys[0] if api_keys else headers.get("authorization", "").replace("Bearer ", "").strip() | |
| request_headers = {**headers, "Authorization": f"Bearer {single_key}"} | |
| logger.info(f"Attempting request with API key: {single_key}") | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| r = await client.request( | |
| method, | |
| url, | |
| headers=request_headers, | |
| content=body, | |
| timeout=600 | |
| ) | |
| if r.status_code < 400: | |
| return r | |
| logger.error(f"Request attempt {retry_count + 1} failed for session {single_key}") | |
| except Exception as e: | |
| logger.error(f"Connection attempt {retry_count + 1} failed for session {single_key}") | |
| retry_count += 1 | |
| raise HTTPException(status_code=500, detail="Request failed after 3 retries") | |
| except Exception as e: | |
| logger.error(f"Request failed: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| # API路由定义 | |
| async def chat_completions(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)): | |
| """ | |
| 处理聊天完成请求的端点 | |
| """ | |
| # 处理请求头 | |
| headers = dict(request.headers) | |
| if "content-length" in headers: | |
| del headers["content-length"] | |
| if "host" in headers: | |
| del headers["host"] | |
| # 获取请求体 | |
| request_body = await request.body() | |
| try: | |
| request_data = json.loads(request_body.decode('utf-8')) | |
| except json.JSONDecodeError: | |
| raise HTTPException(status_code=400, detail="Invalid JSON") | |
| # 提取请求参数 | |
| messages = request_data.get('messages') | |
| stream = request_data.get('stream', False) | |
| model = request_data.get('model') | |
| max_tokens = request_data.get('max_tokens') | |
| if not model: | |
| raise HTTPException(status_code=400, detail="Model parameter is required") | |
| # 构建请求数据 | |
| qwen_request = { | |
| 'model': model, | |
| 'messages': messages, | |
| 'stream': stream | |
| } | |
| if max_tokens is not None: | |
| qwen_request['max_tokens'] = max_tokens | |
| try: | |
| # 发送请求到Qwen API | |
| response = await fetch_with_retry(QWEN_API_URL, { | |
| 'method': 'POST', | |
| 'headers': { | |
| 'Content-Type': 'application/json', | |
| **auth_headers | |
| }, | |
| 'body': json.dumps(qwen_request), | |
| 'stream': stream | |
| }) | |
| # 处理响应 | |
| if stream: | |
| previous_content = '' | |
| return StreamingResponse(handle_stream(response, previous_content), media_type="text/event-stream", headers={ | |
| 'Cache-Control': 'no-cache, no-transform', | |
| 'Connection': 'keep-alive', | |
| 'X-Accel-Buffering': 'no', # 禁用 Nginx 缓冲 | |
| 'Content-Type': 'text/event-stream', | |
| 'Transfer-Encoding': 'chunked' | |
| }, | |
| status_code=200, | |
| background=None # 确保不会在后台处理 | |
| ) | |
| else: | |
| return Response(content=response.content, status_code=response.status_code, headers=response.headers) | |
| except Exception as error: | |
| raise HTTPException(status_code=500, detail=str(error)) | |
| async def models(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)): | |
| """获取可用模型列表的端点""" | |
| try: | |
| response = await fetch_with_retry(QWEN_MODELS_URL, { | |
| 'method': 'GET', | |
| 'headers': { | |
| 'Content-Type': 'application/json', | |
| **auth_headers | |
| }, | |
| 'timeout': 30 | |
| }) | |
| response_data = response.json() | |
| return JSONResponse(content=response_data) | |
| except Exception as e: | |
| logger.error(f"Error in /api/models: {str(e)}") | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def index(request: Request): | |
| """根路径重定向""" | |
| return Response(status_code=302, headers={"Location": "https://chat.qwenlm.ai"}) | |
| async def redirect_all(path: str, request: Request): | |
| """处理所有其他路径的重定向""" | |
| # 检查是否包含敏感关键词 | |
| if any(keyword in path.lower() for keyword in ['php', 'admin', 'login', 'wp-admin', 'manager', 'user', 'signin']): | |
| return Response(status_code=301, headers={"Location": "http://127.0.0.1"}) | |
| if request.method == 'POST': | |
| return Response(status_code=301, headers={"Location": "http://127.0.0.1"}) | |
| return Response(status_code=302, headers={"Location": "https://linux.do/u/f-droid"}) | |
| # 主程序入口 | |
| if __name__ == "__main__": | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=8080, | |
| loop="asyncio", | |
| timeout_keep_alive=65, | |
| access_log=True, | |
| log_level="debug", | |
| http="h11", | |
| limit_concurrency=1000, | |
| backlog=2048 | |
| ) | |