""" Response handlers for streaming and non-streaming responses """ import json import time from typing import Generator, Optional import requests from fastapi import HTTPException from fastapi.responses import JSONResponse, StreamingResponse from app.core.config import settings from app.models.schemas import ( Message, Delta, Choice, Usage, OpenAIResponse, UpstreamRequest, UpstreamData, UpstreamError, ModelItem ) from app.utils.helpers import debug_log, call_upstream_api, transform_thinking_content from app.core.token_manager import token_manager from app.utils.sse_parser import SSEParser from app.utils.tools import extract_tool_invocations, remove_tool_json_content from app.utils.sse_tool_handler import SSEToolHandler def create_openai_response_chunk( model: str, delta: Optional[Delta] = None, finish_reason: Optional[str] = None ) -> OpenAIResponse: """Create OpenAI response chunk for streaming""" return OpenAIResponse( id=f"chatcmpl-{int(time.time())}", object="chat.completion.chunk", created=int(time.time()), model=model, choices=[Choice( index=0, delta=delta or Delta(), finish_reason=finish_reason )] ) def handle_upstream_error(error: UpstreamError) -> Generator[str, None, None]: """Handle upstream error response""" debug_log(f"上游错误: code={error.code}, detail={error.detail}") # Send end chunk end_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, finish_reason="stop" ) yield f"data: {end_chunk.model_dump_json()}\n\n" yield "data: [DONE]\n\n" class ResponseHandler: """Base class for response handling""" def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str): self.upstream_req = upstream_req self.chat_id = chat_id self.auth_token = auth_token def _call_upstream(self) -> requests.Response: """Call upstream API with error handling""" max_retries = settings.MAX_RETRIES retry_count = 0 while retry_count < max_retries: try: debug_log(f"尝试调用上游API (第 {retry_count + 1}/{max_retries} 次)") response = call_upstream_api(self.upstream_req, self.chat_id, self.auth_token) # Check if response is successful if response.status_code == 200: # Mark token as successful token_manager.mark_token_success(self.auth_token) debug_log("上游API调用成功") return response elif response.status_code in [401, 403]: # Authentication/authorization error - mark token as failed debug_log(f"Token认证失败 (状态码: {response.status_code}): {self.auth_token[:20]}...") token_manager.mark_token_failed(self.auth_token) # Try to get a new token new_token = token_manager.get_next_token() if new_token and new_token != self.auth_token: debug_log(f"尝试使用新token: {new_token[:20]}...") self.auth_token = new_token retry_count += 1 continue else: debug_log("没有更多可用token") return response elif response.status_code in [429]: # Rate limit - don't mark token as failed, just retry debug_log(f"遇到速率限制 (状态码: {response.status_code}),等待后重试") if retry_count < max_retries - 1: import time time.sleep(2 ** retry_count) # 指数退避 retry_count += 1 continue else: return response elif response.status_code >= 500: # Server error - retry without marking token as failed debug_log(f"服务器错误 (状态码: {response.status_code}),稍后重试") if retry_count < max_retries - 1: import time time.sleep(1) retry_count += 1 continue else: return response else: # Other client errors, return response as-is debug_log(f"客户端错误 (状态码: {response.status_code})") return response except Exception as e: error_msg = str(e) debug_log(f"调用上游失败 (尝试 {retry_count + 1}/{max_retries}): {error_msg}") # 判断是否是连接问题还是token问题 is_connection_error = any(keyword in error_msg.lower() for keyword in [ 'connection', 'timeout', 'network', 'dns', 'socket', 'ssl' ]) if is_connection_error: debug_log("检测到网络连接问题,不标记token失败") # 网络问题不标记token失败,直接重试 if retry_count < max_retries - 1: import time time.sleep(2) # 等待2秒后重试 retry_count += 1 continue else: raise Exception(f"网络连接问题,重试{max_retries}次后仍失败: {error_msg}") else: # 其他错误可能是token问题,标记失败并尝试新token debug_log("检测到可能的token问题,标记token失败") token_manager.mark_token_failed(self.auth_token) # Try to get a new token new_token = token_manager.get_next_token() if new_token and new_token != self.auth_token and retry_count < max_retries - 1: debug_log(f"尝试使用新token: {new_token[:20]}...") self.auth_token = new_token retry_count += 1 continue else: raise # If we get here, all retries failed raise Exception("所有重试尝试均失败") def _handle_upstream_error(self, response: requests.Response) -> None: """Handle upstream error response""" debug_log(f"上游返回错误状态: {response.status_code}") if settings.DEBUG_LOGGING: debug_log(f"上游错误响应: {response.text}") class StreamResponseHandler(ResponseHandler): """Handler for streaming responses""" def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False): super().__init__(upstream_req, chat_id, auth_token) self.has_tools = has_tools self.buffered_content = "" self.tool_calls = None # Initialize SSE tool handler for improved tool processing self.tool_handler = SSEToolHandler(chat_id, settings.PRIMARY_MODEL) if has_tools else None # 思考状态跟踪 self.first_thinking_chunk = True def handle(self) -> Generator[str, None, None]: """Handle streaming response""" debug_log(f"开始处理流式响应 (chat_id={self.chat_id})") try: response = self._call_upstream() except Exception: yield "data: {\"error\": \"Failed to call upstream\"}\n\n" return if response.status_code != 200: self._handle_upstream_error(response) yield "data: {\"error\": \"Upstream error\"}\n\n" return # Send initial role chunk first_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(role="assistant") ) yield f"data: {first_chunk.model_dump_json()}\n\n" # Process stream debug_log("开始读取上游SSE流") sent_initial_answer = False stream_ended_normally = False try: with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser: for event in parser.iter_json_data(UpstreamData): upstream_data = event['data'] # Check for errors if self._has_error(upstream_data): error = self._get_error(upstream_data) yield from handle_upstream_error(error) stream_ended_normally = True break debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, " f"内容长度: {len(upstream_data.data.delta_content or '')}, 完成: {upstream_data.data.done}") # Process content yield from self._process_content_with_tools(upstream_data, sent_initial_answer) # Update sent_initial_answer flag if we sent content if not sent_initial_answer and (upstream_data.data.delta_content or upstream_data.data.edit_content): sent_initial_answer = True # Check if done if upstream_data.data.done or upstream_data.data.phase == "done": debug_log("检测到流结束信号") yield from self._send_end_chunk() stream_ended_normally = True break except Exception as e: debug_log(f"SSE流处理异常: {e}") # 流异常结束,发送错误响应 if not stream_ended_normally: error_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(content=f"\n\n[系统提示: 连接中断,响应可能不完整]") ) yield f"data: {error_chunk.model_dump_json()}\n\n" # 确保流正常结束 if not stream_ended_normally: debug_log("流未正常结束,发送结束信号") yield from self._send_end_chunk(force_stop=True) def _has_error(self, upstream_data: UpstreamData) -> bool: """Check if upstream data contains error""" return bool( upstream_data.error or upstream_data.data.error or (upstream_data.data.inner and upstream_data.data.inner.error) ) def _get_error(self, upstream_data: UpstreamData) -> UpstreamError: """Get error from upstream data""" return ( upstream_data.error or upstream_data.data.error or (upstream_data.data.inner.error if upstream_data.data.inner else None) ) def _process_content( self, upstream_data: UpstreamData, sent_initial_answer: bool ) -> Generator[str, None, None]: """Process content from upstream data""" content = upstream_data.data.delta_content or upstream_data.data.edit_content if not content: return # Transform thinking content if upstream_data.data.phase == "thinking": content = transform_thinking_content(content) # Buffer content if tools are enabled if self.has_tools: self.buffered_content += content else: # Handle initial answer content if (not sent_initial_answer and upstream_data.data.edit_content and upstream_data.data.phase == "answer"): content = self._extract_edit_content(upstream_data.data.edit_content) if content: debug_log(f"发送普通内容: {content}") chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(content=content) ) yield f"data: {chunk.model_dump_json()}\n\n" sent_initial_answer = True # Handle delta content if upstream_data.data.delta_content: if content: if upstream_data.data.phase == "thinking": # 第一个思考块添加开始标签,其他块保持纯内容 if self.first_thinking_chunk: formatted_content = f"{content}" self.first_thinking_chunk = False else: formatted_content = content debug_log(f"发送思考内容: {content}") chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(content=formatted_content) ) else: # 如果从thinking阶段转到其他阶段,需要结束thinking标签 if not self.first_thinking_chunk and upstream_data.data.phase == "answer": # 先发送思考结束标签 thinking_end_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(content="") ) yield f"data: {thinking_end_chunk.model_dump_json()}\n\n" # 重置状态 self.first_thinking_chunk = True debug_log(f"发送普通内容: {content}") chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(content=content) ) yield f"data: {chunk.model_dump_json()}\n\n" def _extract_edit_content(self, edit_content: str) -> str: """Extract content from edit_content field""" parts = edit_content.split("") return parts[1] if len(parts) > 1 else "" def _send_end_chunk(self, force_stop: bool = False) -> Generator[str, None, None]: """Send end chunk and DONE signal""" finish_reason = "stop" if self.has_tools and not force_stop: # Try to extract tool calls from buffered content self.tool_calls = extract_tool_invocations(self.buffered_content) if self.tool_calls: debug_log(f"检测到工具调用: {len(self.tool_calls)} 个") # Send tool calls with proper format for i, tc in enumerate(self.tool_calls): tool_call_delta = { "index": i, "id": tc.get("id"), "type": tc.get("type", "function"), "function": tc.get("function", {}), } out_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(tool_calls=[tool_call_delta]) ) yield f"data: {out_chunk.model_dump_json()}\n\n" finish_reason = "tool_calls" else: # Send regular content trimmed_content = remove_tool_json_content(self.buffered_content) if trimmed_content: debug_log(f"发送常规内容: {len(trimmed_content)} 字符") content_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(content=trimmed_content) ) yield f"data: {content_chunk.model_dump_json()}\n\n" elif force_stop: # 强制结束时,发送缓冲的内容(如果有) if self.buffered_content: debug_log(f"强制结束,发送缓冲内容: {len(self.buffered_content)} 字符") content_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, delta=Delta(content=self.buffered_content) ) yield f"data: {content_chunk.model_dump_json()}\n\n" # Send final chunk end_chunk = create_openai_response_chunk( model=settings.PRIMARY_MODEL, finish_reason=finish_reason ) yield f"data: {end_chunk.model_dump_json()}\n\n" yield "data: [DONE]\n\n" debug_log(f"流式响应完成 (finish_reason: {finish_reason})") def _process_content_with_tools( self, upstream_data: UpstreamData, sent_initial_answer: bool ) -> Generator[str, None, None]: """Process content with improved tool handling""" # Handle tool calls with improved SSE tool handler if self.has_tools and self.tool_handler: # Check if this is a tool_call phase if upstream_data.data.phase == "tool_call": # Use the improved tool handler for tool call processing yield from self.tool_handler.process_tool_call_phase( upstream_data.data.model_dump(), is_stream=True ) return elif upstream_data.data.phase == "other": # Handle other phase which may contain tool completion signals yield from self.tool_handler.process_other_phase( upstream_data.data.model_dump(), is_stream=True ) return # Fall back to original content processing yield from self._process_content(upstream_data, sent_initial_answer) class NonStreamResponseHandler(ResponseHandler): """Handler for non-streaming responses""" def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False): super().__init__(upstream_req, chat_id, auth_token) self.has_tools = has_tools # 思考状态跟踪 self.first_thinking_chunk = True self.in_thinking_phase = False def handle(self) -> JSONResponse: """Handle non-streaming response""" debug_log(f"开始处理非流式响应 (chat_id={self.chat_id})") try: response = self._call_upstream() except Exception as e: debug_log(f"调用上游失败: {e}") raise HTTPException(status_code=502, detail="Failed to call upstream") if response.status_code != 200: self._handle_upstream_error(response) raise HTTPException(status_code=502, detail="Upstream error") # Collect full response full_content = [] debug_log("开始收集完整响应内容") response_completed = False try: with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser: for event in parser.iter_json_data(UpstreamData): upstream_data = event['data'] if upstream_data.data.delta_content: content = upstream_data.data.delta_content if upstream_data.data.phase == "thinking": content = transform_thinking_content(content) # 处理思考内容的分块格式 if not self.in_thinking_phase: # 进入思考阶段,添加开始标签 self.in_thinking_phase = True if self.first_thinking_chunk: content = f"{content}" self.first_thinking_chunk = False else: content = f"{content}" # 如果已经在思考阶段,保持纯内容 else: # 如果从thinking阶段转到其他阶段 if self.in_thinking_phase: # 添加结束标签到前一个内容 if full_content and not self.first_thinking_chunk: full_content.append("") self.in_thinking_phase = False self.first_thinking_chunk = True if content: full_content.append(content) if upstream_data.data.done or upstream_data.data.phase == "done": debug_log("检测到完成信号,停止收集") response_completed = True break except Exception as e: debug_log(f"非流式响应收集异常: {e}") if not full_content: # 如果没有收集到任何内容,抛出异常 raise HTTPException(status_code=502, detail=f"Response collection failed: {str(e)}") else: debug_log(f"部分内容收集成功,继续处理 ({len(full_content)} 个片段)") if not response_completed and not full_content: debug_log("响应未完成且无内容,可能是连接问题") raise HTTPException(status_code=502, detail="Incomplete response from upstream") # 如果响应结束时还在思考阶段,需要添加结束标签 if self.in_thinking_phase and not self.first_thinking_chunk: full_content.append("") final_content = "".join(full_content) debug_log(f"内容收集完成,最终长度: {len(final_content)}") # Handle tool calls for non-streaming tool_calls = None finish_reason = "stop" message_content = final_content if self.has_tools: tool_calls = extract_tool_invocations(final_content) if tool_calls: # Content must be null when tool_calls are present (OpenAI spec) message_content = None finish_reason = "tool_calls" debug_log(f"提取到工具调用: {json.dumps(tool_calls, ensure_ascii=False)}") else: # Remove tool JSON from content message_content = remove_tool_json_content(final_content) if not message_content: message_content = final_content # 保留原内容如果清理后为空 # Build response response_data = OpenAIResponse( id=f"chatcmpl-{int(time.time())}", object="chat.completion", created=int(time.time()), model=settings.PRIMARY_MODEL, choices=[Choice( index=0, message=Message( role="assistant", content=message_content, tool_calls=tool_calls ), finish_reason=finish_reason )], usage=Usage() ) debug_log("非流式响应发送完成") return JSONResponse(content=response_data.model_dump(exclude_none=True))