| | import asyncio |
| | import json |
| | import time |
| | import random |
| | from fastapi import Request |
| | from fastapi.responses import StreamingResponse |
| | from app.models import ChatCompletionRequest |
| | from app.services import GeminiClient |
| | from app.utils import handle_gemini_error, update_api_call_stats |
| | from app.utils.logging import log |
| |
|
| | |
| | async def process_stream_request( |
| | chat_request: ChatCompletionRequest, |
| | http_request: Request, |
| | contents, |
| | system_instruction, |
| | current_api_key: str, |
| | key_manager, |
| | safety_settings, |
| | safety_settings_g2, |
| | api_call_stats, |
| | FAKE_STREAMING, |
| | FAKE_STREAMING_INTERVAL |
| | ) -> StreamingResponse: |
| | """处理流式API请求""" |
| | |
| | |
| | async def stream_response_generator(): |
| | |
| | if FAKE_STREAMING: |
| | |
| | queue = asyncio.Queue() |
| | keep_alive_task = None |
| | api_request_task = None |
| | |
| | try: |
| | |
| | async def keep_alive_sender(): |
| | try: |
| | |
| | keep_alive_client = GeminiClient(current_api_key) |
| | |
| | |
| | keep_alive_generator = keep_alive_client.stream_chat( |
| | chat_request, |
| | contents, |
| | safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
| | system_instruction |
| | ) |
| | |
| | |
| | async for line in keep_alive_generator: |
| | if line == "\n": |
| | |
| | formatted_chunk = { |
| | "id": "chatcmpl-keepalive", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}] |
| | } |
| | |
| | await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") |
| | except asyncio.CancelledError: |
| | |
| | |
| | raise |
| | except Exception as e: |
| | log('error', f"保持连接任务出错: {str(e)}", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | await queue.put(None) |
| | raise |
| | |
| | |
| | async def api_request_handler(): |
| | success = False |
| | try: |
| | |
| | key_manager.reset_tried_keys_for_request() |
| | |
| | |
| | available_keys = key_manager.api_keys.copy() |
| | random.shuffle(available_keys) |
| | |
| | |
| | for attempt, api_key in enumerate(available_keys, 1): |
| | try: |
| | log('info', f"假流式模式: 尝试API密钥 {api_key[:8]}... ({attempt}/{len(available_keys)})", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | non_stream_client = GeminiClient(api_key) |
| | |
| | |
| | response_content = await asyncio.to_thread( |
| | non_stream_client.complete_chat, |
| | chat_request, |
| | contents, |
| | safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
| | system_instruction |
| | ) |
| | |
| | |
| | if response_content and response_content.text: |
| | log('info', f"假流式模式: API密钥 {api_key[:8]}... 成功获取响应", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | full_text = response_content.text |
| | chunk_size = max(len(full_text) // 10, 1) |
| | |
| | for i in range(0, len(full_text), chunk_size): |
| | chunk = full_text[i:i+chunk_size] |
| | formatted_chunk = { |
| | "id": "chatcmpl-someid", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] |
| | } |
| | |
| | await queue.put(f"data: {json.dumps(formatted_chunk)}\n\n") |
| | |
| | success = True |
| | |
| | update_api_call_stats(api_call_stats, endpoint=api_key,model=chat_request.model) |
| | break |
| | else: |
| | log('warning', f"假流式模式: API密钥 {api_key[:8]}... 返回空响应", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | except Exception as e: |
| | error_detail = handle_gemini_error(e, api_key, key_manager) |
| | log('error', f"假流式模式: API密钥 {api_key[:8]}... 请求失败: {error_detail}", |
| | extra={'key': api_key[:8], 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | |
| | if not success: |
| | error_msg = "所有API密钥均请求失败,请稍后重试" |
| | log('error', error_msg, |
| | extra={'key': 'ALL', 'request_type': 'fake-stream', 'model': chat_request.model}) |
| | |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {error_msg}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | await queue.put(f"data: {json.dumps(error_json)}\n\n") |
| | |
| | |
| | await queue.put("data: [DONE]\n\n") |
| | |
| | await queue.put(None) |
| | |
| | except asyncio.CancelledError: |
| | log('info', "API请求任务被取消", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | await queue.put(None) |
| | raise |
| | except Exception as e: |
| | log('error', f"API请求任务出错: {str(e)}", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | await queue.put(f"data: {json.dumps(error_json)}\n\n") |
| | await queue.put("data: [DONE]\n\n") |
| | |
| | await queue.put(None) |
| | raise |
| | |
| | |
| | keep_alive_task = asyncio.create_task(keep_alive_sender()) |
| | |
| | api_request_task = asyncio.create_task(api_request_handler()) |
| | |
| | |
| | while True: |
| | chunk = await queue.get() |
| | if chunk is None: |
| | break |
| | yield chunk |
| | |
| | |
| | if api_request_task.done() and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | |
| | except asyncio.CancelledError: |
| | log('info', "流式响应生成器被取消", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | if keep_alive_task and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | if api_request_task and not api_request_task.done(): |
| | api_request_task.cancel() |
| | except Exception as e: |
| | log('error', f"流式响应生成器出错: {str(e)}", |
| | extra={'key': current_api_key[:8], 'request_type': 'fake-stream'}) |
| | |
| | if keep_alive_task and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | if api_request_task and not api_request_task.done(): |
| | api_request_task.cancel() |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {str(e)}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | yield f"data: {json.dumps(error_json)}\n\n" |
| | yield "data: [DONE]\n\n" |
| | finally: |
| | |
| | if keep_alive_task and not keep_alive_task.done(): |
| | keep_alive_task.cancel() |
| | if api_request_task and not api_request_task.done(): |
| | api_request_task.cancel() |
| | else: |
| | |
| | gemini_client = GeminiClient(current_api_key) |
| | success = False |
| | |
| | try: |
| | |
| | async for chunk in gemini_client.stream_chat( |
| | chat_request, |
| | contents, |
| | safety_settings_g2 if 'gemini-2.0-flash-exp' in chat_request.model else safety_settings, |
| | system_instruction |
| | ): |
| | |
| | if not chunk: |
| | continue |
| | |
| | formatted_chunk = { |
| | "id": "chatcmpl-someid", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"role": "assistant", "content": chunk}, "index": 0, "finish_reason": None}] |
| | } |
| | success = True |
| | yield f"data: {json.dumps(formatted_chunk)}\n\n" |
| | |
| | |
| | if success: |
| | update_api_call_stats(api_call_stats,endpoint=current_api_key,model=chat_request.model) |
| | |
| | yield "data: [DONE]\n\n" |
| | |
| | except asyncio.CancelledError: |
| | log('info', "客户端连接已中断", |
| | extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) |
| | except Exception as e: |
| | error_detail = handle_gemini_error(e, current_api_key, key_manager) |
| | log('error', f"流式请求失败: {error_detail}", |
| | extra={'key': current_api_key[:8], 'request_type': 'stream', 'model': chat_request.model}) |
| | |
| | error_json = { |
| | "id": "chatcmpl-error", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": chat_request.model, |
| | "choices": [{"delta": {"content": f"\n\n[错误: {error_detail}]"}, "index": 0, "finish_reason": "error"}] |
| | } |
| | yield f"data: {json.dumps(error_json)}\n\n" |
| | yield "data: [DONE]\n\n" |
| | |
| | raise e |
| | |
| | return StreamingResponse(stream_response_generator(), media_type="text/event-stream") |