Spaces:
Paused
Paused
| import asyncio | |
| import json | |
| import time | |
| import random | |
| from fastapi import Request | |
| from fastapi.responses import StreamingResponse | |
| from app.models import ChatCompletionRequest | |
| from app.services import GeminiClient | |
| from app.utils import handle_gemini_error, update_api_call_stats | |
| from .logging_utils import log | |
| # 流式请求处理函数 | |
| async def process_stream_request( | |
| chat_request: ChatCompletionRequest, | |
| http_request: Request, | |
| contents, | |
| system_instruction, | |
| current_api_key: str, | |
| key_manager, | |
| safety_settings, | |
| safety_settings_g2, | |
| api_call_stats, | |
| FAKE_STREAMING, | |
| FAKE_STREAMING_INTERVAL | |
| ) -> StreamingResponse: | |
| """处理流式API请求""" | |
| # 创建一个直接流式响应的生成器函数 | |
| async def stream_response_generator(): | |
| # 如果启用了假流式模式,使用随机遍历API密钥的方式 | |
| if FAKE_STREAMING: | |
| # 创建一个队列用于在任务之间传递数据 | |
| queue = asyncio.Queue() | |
| keep_alive_task = None | |
| api_request_task = None | |
| try: | |
| # 创建一个保持连接的任务,持续发送换行符 | |
| async def keep_alive_sender(): | |
| try: | |
| # 创建一个Gemini客户端用于发送保持连接的换行符 | |
| keep_alive_client = GeminiClient(current_api_key) | |
| # 启动保持连接的生成器 | |
| keep_alive_generator = keep_alive_client.stream_chat( | |
| chat_request, | |
| contents, | |
| safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, | |
| system_instruction | |
| ) | |
| # 持续发送换行符直到被取消 | |
| async for line in keep_alive_generator: | |
| if line == "\n": | |
| # 将换行符格式化为SSE格式 | |
| formatted_chunk = { | |
| "id": "chatcmpl-keepalive", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": chat_request.model, | |
| "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}] | |
| } | |
| # 将格式化的换行符放入队列 | |
| await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") | |
| except asyncio.CancelledError: | |
| # log('info', "保持连接任务被取消", | |
| # extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) | |
| raise | |
| except Exception as e: | |
| log('error', f"保持连接任务出错: {str(e)}", | |
| extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) | |
| # 将错误放入队列 | |
| await queue.put(None) | |
| raise | |
| # 创建一个任务来随机遍历API密钥并请求内容 | |
| async def api_request_handler(): | |
| success = False | |
| try: | |
| # 重置已尝试的密钥 | |
| key_manager.reset_tried_keys_for_request() | |
| # 获取可用的API密钥 | |
| available_keys = key_manager.api_keys.copy() | |
| random.shuffle(available_keys) # 随机打乱密钥顺序 | |
| # 遍历所有API密钥尝试获取响应 | |
| for attempt, api_key in enumerate(available_keys, 1): | |
| try: | |
| log('info', f"假流式模式: 尝试API密钥 {api_key[:8]}... ({attempt}/{len(available_keys)})", | |
| extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) | |
| # 创建一个新的客户端使用当前API密钥 | |
| non_stream_client = GeminiClient(api_key) | |
| # 使用非流式方式请求内容 | |
| response_content = await asyncio.to_thread( | |
| non_stream_client.complete_chat, | |
| chat_request, | |
| contents, | |
| safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, | |
| system_instruction | |
| ) | |
| # 检查响应是否有效 | |
| if response_content and response_content.text: | |
| log('info', f"假流式模式: API密钥 {api_key[:8]}... 成功获取响应", | |
| extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) | |
| # 将完整响应分割成小块,模拟流式返回 | |
| full_text = response_content.text | |
| chunk_size = max(len(full_text) // 10, 1) # 至少分成10块,每块至少1个字符 | |
| for i in range(0, len(full_text), chunk_size): | |
| chunk = full_text[i:i+chunk_size] | |
| formatted_chunk = { | |
| "id": "chatcmpl-someid", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": chat_request.model, | |
| "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] | |
| } | |
| # 将格式化的内容块放入队列 | |
| await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") | |
| success = True | |
| # 更新API调用统计 | |
| update_api_call_stats(api_call_stats, api_key) | |
| break # 成功获取响应,退出循环 | |
| else: | |
| log('warning', f"假流式模式: API密钥 {api_key[:8]}... 返回空响应", | |
| extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) | |
| except Exception as e: | |
| error_detail = handle_gemini_error(e, api_key, key_manager) | |
| log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}", | |
| extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) | |
| # 继续尝试下一个API密钥 | |
| # 如果所有API密钥都尝试失败 | |
| if not success: | |
| error_msg = "所有API密钥均请求失败,请稍后重试" | |
| log('error', error_msg, | |
| extra={'key': 'ALL', 'request_type': 'fake-stream', 'model': chat_request.model}) | |
| # 添加错误信息到队列 | |
| error_json = { | |
| "id": "chatcmpl-error", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": chat_request.model, | |
| "choices": [{"delta": {"content": f"\n\n[错误: {error_msg}]"}, "index": 0, "finish_reason": "error"}] | |
| } | |
| await queue.put(f"data: {json.dumps(error_json)}\n\n") | |
| # 添加完成标记到队列 | |
| await queue.put("data: [DONE]\n\n") | |
| # 添加None表示队列结束 | |
| await queue.put(None) | |
| except asyncio.CancelledError: | |
| log('info', "API请求任务被取消", | |
| extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) | |
| # 添加None表示队列结束 | |
| await queue.put(None) | |
| raise | |
| except Exception as e: | |
| log('error', f"API请求任务出错: {str(e)}", | |
| extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) | |
| # 添加错误信息到队列 | |
| error_json = { | |
| "id": "chatcmpl-error", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": chat_request.model, | |
| "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] | |
| } | |
| await queue.put(f"data: {json.dumps(error_json)}\n\n") | |
| await queue.put("data: [DONE]\n\n") | |
| # 添加None表示队列结束 | |
| await queue.put(None) | |
| raise | |
| # 启动保持连接的任务 | |
| keep_alive_task = asyncio.create_task(keep_alive_sender()) | |
| # 启动API请求任务 | |
| api_request_task = asyncio.create_task(api_request_handler()) | |
| # 从队列中获取数据并发送给客户端 | |
| while True: | |
| chunk = await queue.get() | |
| if chunk is None: # None表示队列结束 | |
| break | |
| yield chunk | |
| # 如果API请求任务已完成,取消保持连接任务 | |
| if api_request_task.done() and not keep_alive_task.done(): | |
| keep_alive_task.cancel() | |
| except asyncio.CancelledError: | |
| log('info', "流式响应生成器被取消", | |
| extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) | |
| # 取消所有任务 | |
| if keep_alive_task and not keep_alive_task.done(): | |
| keep_alive_task.cancel() | |
| if api_request_task and not api_request_task.done(): | |
| api_request_task.cancel() | |
| except Exception as e: | |
| log('error', f"流式响应生成器出错: {str(e)}", | |
| extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) | |
| # 取消所有任务 | |
| if keep_alive_task and not keep_alive_task.done(): | |
| keep_alive_task.cancel() | |
| if api_request_task and not api_request_task.done(): | |
| api_request_task.cancel() | |
| # 发送错误信息给客户端 | |
| error_json = { | |
| "id": "chatcmpl-error", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": chat_request.model, | |
| "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] | |
| } | |
| yield f"data: {json.dumps(error_json)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| finally: | |
| # 确保所有任务都被取消 | |
| if keep_alive_task and not keep_alive_task.done(): | |
| keep_alive_task.cancel() | |
| if api_request_task and not api_request_task.done(): | |
| api_request_task.cancel() | |
| else: | |
| # 原始流式请求处理逻辑 | |
| gemini_client = GeminiClient(current_api_key) | |
| success = False | |
| try: | |
| # 直接迭代生成器并发送响应块 | |
| async for chunk in gemini_client.stream_chat( | |
| chat_request, | |
| contents, | |
| safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, | |
| system_instruction | |
| ): | |
| # 空字符串跳过 | |
| if not chunk: | |
| continue | |
| formatted_chunk = { | |
| "id": "chatcmpl-someid", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": chat_request.model, | |
| "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] | |
| } | |
| success = True # 只要有一个chunk成功,就标记为成功 | |
| yield f"data: {json.dumps(formatted_chunk)}\n\n" | |
| # 如果成功获取到响应,更新API调用统计 | |
| if success: | |
| update_api_call_stats(api_call_stats, current_api_key) | |
| yield "data: [DONE]\n\n" | |
| except asyncio.CancelledError: | |
| extra_log_cancel = {'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model, 'error_message': '客户端已断开连接'} | |
| log('info', "客户端连接已中断", extra=extra_log_cancel) | |
| except Exception as e: | |
| error_detail = handle_gemini_error(e, current_api_key, key_manager) | |
| log('error', f"流式请求失败: {error_detail}", | |
| extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) | |
| # 发送错误信息给客户端 | |
| error_json = { | |
| "id": "chatcmpl-error", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": chat_request.model, | |
| "choices": [{"delta": {"content": f"\n\n[错误: {error_detail}]"}, "index": 0, "finish_reason": "error"}] | |
| } | |
| yield f"data: {json.dumps(error_json)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # 重新抛出异常,这样process_request可以捕获它 | |
| raise e | |
| return StreamingResponse(stream_response_generator(), media_type="text/event-stream") |