File size: 17,122 Bytes
4fc29a0
 
 
 
b3e6518
 
 
 
4fc29a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3d6247
 
 
 
 
 
4fc29a0
e3d6247
 
4fc29a0
e3d6247
 
4fc29a0
 
 
 
e3d6247
 
4fc29a0
e3d6247
 
 
4fc29a0
 
 
 
 
 
 
e3d6247
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
4fc29a0
e3d6247
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
4fc29a0
e88b429
4fc29a0
e3d6247
4fc29a0
e3d6247
 
 
e88b429
4fc29a0
e3d6247
 
e88b429
 
 
 
 
 
 
 
 
 
 
 
e3d6247
e88b429
e3d6247
e88b429
e3d6247
b3e6518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3d6247
9cd681c
 
 
 
 
 
 
 
b3e6518
9cd681c
b4a3e27
9cd681c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b4a3e27
b3e6518
9cd681c
 
 
b3e6518
9cd681c
 
 
 
 
 
 
b3e6518
 
9cd681c
b3e6518
9cd681c
 
 
 
 
 
 
 
 
 
 
 
b3e6518
9cd681c
 
 
 
b3e6518
9cd681c
 
 
b3e6518
9cd681c
b3e6518
 
9cd681c
b3e6518
e3d6247
e88b429
b4a3e27
e3d6247
4fc29a0
 
 
 
 
 
 
e3d6247
 
 
 
 
 
 
4fc29a0
 
 
e3d6247
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
4fc29a0
 
 
 
e3d6247
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
bb95a94
b3e6518
e88b429
b3e6518
 
0fd5c22
bb95a94
0fd5c22
 
bb95a94
e3d6247
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fc29a0
e3d6247
 
 
 
 
4fc29a0
 
e3d6247
 
 
 
 
 
4fc29a0
e3d6247
0fd5c22
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# 导入所需的库
import json  # 用于JSON数据处理
import time  # 用于时间相关操作
import traceback  # 用于异常堆栈跟踪
import uuid
from datetime import datetime
from typing import Optional, Dict, Any

from fastapi import FastAPI, HTTPException, Request, Depends, Response  # FastAPI框架相关组件
from fastapi.middleware.cors import CORSMiddleware  # CORS中间件
from fastapi.responses import StreamingResponse, JSONResponse  # 特殊响应类型
import httpx  # 异步HTTP客户端
import logging  # 日志管理
import random  # 随机数生成
import uvicorn  # ASGI服务器
import asyncio  # 异步IO

# API端点配置
QWEN_API_URL = 'https://chat.qwenlm.ai/api/chat/completions'  # 通达API聊天完成接口
QWEN_MODELS_URL = 'https://chat.qwenlm.ai/api/models'  # 模型列表接口
MAX_RETRIES = 3  # 最大重试次数
RETRY_DELAY = 1  # 重试延迟时间(秒)

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

# 创建FastAPI应用实例
app = FastAPI()

# 添加CORS中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 允许所有来源
    allow_credentials=True,  # 允许携带凭证
    allow_methods=["*"],  # 允许所有HTTP方法
    allow_headers=["*"],  # 允许所有请求头
)

# 创建异步HTTP客户端
client = httpx.AsyncClient()

async def fetch_with_retry(url, options, retries=MAX_RETRIES):
    """
    带重试机制的异步请求函数
    参数:
        url: 请求URL
        options: 请求选项
        retries: 重试次数
    """
    last_error = None
    auth_header = options.get('headers', {}).get('Authorization', '')
    if auth_header.startswith('Bearer '):
        logger.info(f"Processing request with session identifier: {auth_header[7:]}")

    for i in range(retries):
        try:
            async with httpx.AsyncClient() as client:
                # 设置请求头
                if 'headers' in options:
                    client.headers.update(options['headers'])
                # 发送请求
                response = await client.request(
                    method=options.get('method', 'GET'),
                    url=url,
                    content=options.get('body'),
                    timeout=60
                )

                # 检查响应类型和状态码
                content_type = response.headers.get('content-type', '')
                if 'text/html' in content_type or response.status_code == 500:
                    last_error = {
                        'status': response.status_code,
                        'contentType': content_type,
                        'responseText': response.text[:1000],
                        'headers': dict(response.headers)
                    }
                    if i < retries - 1:
                        logger.error(f"Retry attempt {i+1} for session {auth_header[7:]} failed")
                        await asyncio.sleep(RETRY_DELAY * (i + 1))
                        continue
                return response
        except Exception as error:
            last_error = error
            logger.error(f"Connection error for session {auth_header[7:]} on attempt {i+1}")
            traceback.print_exc()
            if i < retries - 1:
                await asyncio.sleep(RETRY_DELAY * (i + 1))
                continue

    # 所有重试都失败后抛出异常
    raise Exception(json.dumps({
        'error': True,
        'message': 'All retry attempts failed',
        'lastError': str(last_error),
        'retries': retries
    }))

async def process_line(line, previous_content):
    """
    处理流式响应中的单行数据,避免内容重复
    """
    try:
        data = json.loads(line[6:])  # 解析JSON数据
        if data.get('choices') and data['choices'][0].get('delta') and data['choices'][0]['delta'].get('content'):
            current_content = data['choices'][0]['delta']['content']
            new_content = current_content

            # 避免内容重复
            if current_content.startswith(previous_content) and len(previous_content) > 0:
                new_content = current_content[len(previous_content):]

            # 只有在有新内容时才构建新的响应数据
            if new_content:
                new_data = {
                    'choices': [{
                        'delta': {
                            'content': new_content
                        }
                    }]
                }
                return f"data: {json.dumps(new_data)}\n\n", new_content
            return None, None
        else:
            return f"data: {json.dumps(data)}\n\n", None
    except Exception:
        return f"{line}\n\n", None

# async def handle_stream(response, previous_content):
#     """
#     处理流式响应,确保实时发送到客户端
#     """
#     buffer = ''
#     async for chunk in response.aiter_lines(encoding='utf-8'):
#         try:
#             # decoded_chunk = chunk.decode('utf-8')
#             decoded_chunk = chunk.strip()
#             print('decoded_chunk: ', decoded_chunk, ' ::end')
#             buffer += decoded_chunk
#
#             # 立即处理每个完整的数据行
#             while '\n' in buffer:
#                 # print('buffer: ', buffer)
#                 line, buffer = buffer.split('\n', 1)
#                 line = line.strip()
#                 if line.startswith('data: '):
#                     processed_line, new_content = await process_line(line, previous_content)
#                     if processed_line:
#                         # 确保每个响应都以正确的SSE格式发送
#                         # print('11data: ', json.dumps(processed_line, indent=4))
#                         yield processed_line
#                         # 立即刷新输出
#                         await asyncio.sleep(0)
#                         if new_content:
#                             previous_content = previous_content + new_content
#
#                 # print("data: [DONE]")
#                 # yield "data: [DONE]\n\n"
#
#         except Exception as e:
#             logger.error(f"Error processing chunk: {str(e)}")
#             yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n"
#             continue
#
#     # 处理剩余的buffer
#     if buffer.strip():
#         line = buffer.strip()
#         if line.startswith('data: '):
#             processed_line, new_content = await process_line(line, previous_content)
#             if processed_line:
#                 yield processed_line
#
#     print("data: [DONE]")
#     yield "data: [DONE]\n\n"
def create_chat_completion_data(
        content: str, model: str, timestamp: int, finish_reason: Optional[str] = None
) -> Dict[str, Any]:
    return {
        "id": f"chatcmpl-{uuid.uuid4()}",
        "object": "chat.completion.chunk",
        "created": timestamp,
        "model": model,
        "choices": [
            {
                "index": 0,
                "delta": {"content": content, "role": "assistant"},
                "finish_reason": finish_reason,
            }
        ],
        "usage": None,
    }

async def handle_stream(response, previous_content):
    """
    优化后的流式响应处理函数

    主要改进:
    1. 使用简单的字符串切片而不是startswith比较
    2. 增加内容缓存来减少重复处理
    3. 优化内存使用
    """
    timestamp = int(datetime.now().timestamp())
    content_cache = ""  # 用于缓存已处理的内容

    async def send_chunk(content: str):
        """内部函数:格式化并发送数据块"""
        chunk_data = {
            "id": f"chatcmpl-{uuid.uuid4()}",
            "object": "chat.completion.chunk",
            "created": timestamp,
            "model": "qwen",
            "choices": [{
                "index": 0,
                "delta": {
                    "content": content,
                    "role": "assistant"
                },
                "finish_reason": None
            }]
        }
        return f"data: {json.dumps(chunk_data)}\n\n"

    async for chunk in response.aiter_lines():
        if not chunk or not chunk.startswith('data: '):
            continue

        try:
            # 解析数据
            data = json.loads(chunk[6:])  # 去掉 'data: ' 前缀

            # 提取新内容
            if not (data.get('choices') and
                    data['choices'][0].get('delta') and
                    data['choices'][0]['delta'].get('content')):
                continue

            current_content = data['choices'][0]['delta']['content']

            # 智能差异检测
            if content_cache:
                # 找到新内容的起始位置
                for i in range(min(len(current_content), len(content_cache))):
                    if current_content[i] != content_cache[i]:
                        new_content = current_content[i:]
                        break
                else:
                    # 如果前面的内容都相同,新内容就是超出的部分
                    new_content = current_content[len(content_cache):]
            else:
                new_content = current_content

            # 只有真正有新内容时才发送
            if new_content:
                yield await send_chunk(new_content)
                content_cache = current_content  # 更新缓存

        except json.JSONDecodeError as e:
            logger.error(f"JSON解析错误: {str(e)}")
            continue
        except Exception as e:
            logger.error(f"处理数据流时发生错误: {str(e)}")
            yield f"data: {{\"error\":true,\"message\":\"{str(e)}\"}}\n\n"

    # 发送结束标记
    yield "data: [DONE]\n\n"



async def get_openai_auth_headers(request: Request) -> dict:
    """
    获取认证头信息
    参数:
        request: 请求对象
    返回:
        包含认证信息的字典
    """
    auth_header = request.headers.get("Authorization")
    if not auth_header:
        raise HTTPException(status_code=401, detail="Missing Authorization header")
    logger.info(f"New request authenticated with session {auth_header[7:]}")
    return {"Authorization": auth_header}

async def make_request(method, url, headers, body, api_keys=None, retry_count=0):
    """
    发送请求的通用函数,支持多个API密钥
    """
    try:
        if api_keys and len(api_keys) > 1:
            # 多个API密钥的情况
            remaining_keys = api_keys.copy()
            while remaining_keys and retry_count < 3:
                selected_key = random.choice(remaining_keys)
                remaining_keys.remove(selected_key)
                headers = {**headers, "Authorization": f"Bearer {selected_key}"}
                logger.info(f"Attempting request with API key: {selected_key}")
                try:
                    async with httpx.AsyncClient() as client:
                        r = await client.request(
                            method,
                            url,
                            headers=headers,
                            content=body,
                            timeout=600
                        )
                        if r.status_code < 400:
                            return r
                        logger.error(f"Request failed with key {selected_key}, status code: {r.status_code}")
                except Exception as e:
                    logger.error(f"Request failed with key {selected_key}: {str(e)}")
                retry_count += 1
            raise HTTPException(status_code=500, detail="All API keys failed")
        else:
            # 单个API密钥的情况
            while retry_count < 3:
                single_key = api_keys[0] if api_keys else headers.get("authorization", "").replace("Bearer ", "").strip()
                request_headers = {**headers, "Authorization": f"Bearer {single_key}"}
                logger.info(f"Attempting request with API key: {single_key}")
                try:
                    async with httpx.AsyncClient() as client:
                        r = await client.request(
                            method,
                            url,
                            headers=request_headers,
                            content=body,
                            timeout=600
                        )
                        if r.status_code < 400:
                            return r
                        logger.error(f"Request attempt {retry_count + 1} failed for session {single_key}")
                except Exception as e:
                    logger.error(f"Connection attempt {retry_count + 1} failed for session {single_key}")
                retry_count += 1
            raise HTTPException(status_code=500, detail="Request failed after 3 retries")
    except Exception as e:
        logger.error(f"Request failed: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

# API路由定义
@app.api_route(
    "/api/chat/completions",
    methods=["POST", "OPTIONS"],
)
async def chat_completions(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)):
    """
    处理聊天完成请求的端点
    """
    # 处理请求头
    headers = dict(request.headers)
    if "content-length" in headers:
        del headers["content-length"]
    if "host" in headers:
        del headers["host"]

    # 获取请求体
    request_body = await request.body()
    try:
        request_data = json.loads(request_body.decode('utf-8'))
    except json.JSONDecodeError:
        raise HTTPException(status_code=400, detail="Invalid JSON")

    # 提取请求参数
    messages = request_data.get('messages')
    stream = request_data.get('stream', False)
    model = request_data.get('model')
    max_tokens = request_data.get('max_tokens')

    if not model:
        raise HTTPException(status_code=400, detail="Model parameter is required")

    # 构建请求数据
    qwen_request = {
        'model': model,
        'messages': messages,
        'stream': stream
    }
    if max_tokens is not None:
        qwen_request['max_tokens'] = max_tokens

    try:
        # 发送请求到Qwen API
        response = await fetch_with_retry(QWEN_API_URL, {
            'method': 'POST',
            'headers': {
                'Content-Type': 'application/json',
                **auth_headers
            },
            'body': json.dumps(qwen_request),
            'stream': stream
        })

        # 处理响应
        if stream:
            previous_content = ''
            return StreamingResponse(handle_stream(response, previous_content), media_type="text/event-stream", headers={
                'Cache-Control': 'no-cache, no-transform',
                'Connection': 'keep-alive',
                'X-Accel-Buffering': 'no',  # 禁用 Nginx 缓冲
                'Content-Type': 'text/event-stream',
                'Transfer-Encoding': 'chunked'
            },
             status_code=200,
             background=None # 确保不会在后台处理
            )
        else:
            return Response(content=response.content, status_code=response.status_code, headers=response.headers)
    except Exception as error:
        raise HTTPException(status_code=500, detail=str(error))

@app.get("/api/models")
async def models(request: Request, auth_headers: dict = Depends(get_openai_auth_headers)):
    """获取可用模型列表的端点"""
    try:
        response = await fetch_with_retry(QWEN_MODELS_URL, {
            'method': 'GET',
            'headers': {
                'Content-Type': 'application/json',
                **auth_headers
            },
            'timeout': 30
        })
        response_data = response.json()
        return JSONResponse(content=response_data)
    except Exception as e:
        logger.error(f"Error in /api/models: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get('/')
async def index(request: Request):
    """根路径重定向"""
    return Response(status_code=302, headers={"Location": "https://chat.qwenlm.ai"})

@app.get('/{path:path}')
@app.post('/{path:path}')
async def redirect_all(path: str, request: Request):
    """处理所有其他路径的重定向"""
    # 检查是否包含敏感关键词
    if any(keyword in path.lower() for keyword in ['php', 'admin', 'login', 'wp-admin', 'manager', 'user', 'signin']):
        return Response(status_code=301, headers={"Location": "http://127.0.0.1"})
    if request.method == 'POST':
        return Response(status_code=301, headers={"Location": "http://127.0.0.1"})
    return Response(status_code=302, headers={"Location": "https://linux.do/u/f-droid"})

# 主程序入口
if __name__ == "__main__":
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8080,
        loop="asyncio",
        timeout_keep_alive=65,
        access_log=True,
        log_level="debug",
        http="h11",
        limit_concurrency=1000,
        backlog=2048
    )