|
|
""" |
|
|
Response handlers for streaming and non-streaming responses |
|
|
""" |
|
|
|
|
|
import json |
|
|
import time |
|
|
from typing import Generator, Optional |
|
|
import requests |
|
|
from fastapi import HTTPException |
|
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
|
|
|
|
from app.core.config import settings |
|
|
from app.models.schemas import ( |
|
|
Message, Delta, Choice, Usage, OpenAIResponse, |
|
|
UpstreamRequest, UpstreamData, UpstreamError, ModelItem |
|
|
) |
|
|
from app.utils.helpers import debug_log, call_upstream_api, transform_thinking_content |
|
|
from app.core.token_manager import token_manager |
|
|
from app.utils.sse_parser import SSEParser |
|
|
from app.utils.tools import extract_tool_invocations, remove_tool_json_content |
|
|
from app.utils.sse_tool_handler import SSEToolHandler |
|
|
|
|
|
|
|
|
def create_openai_response_chunk( |
|
|
model: str, |
|
|
delta: Optional[Delta] = None, |
|
|
finish_reason: Optional[str] = None |
|
|
) -> OpenAIResponse: |
|
|
"""Create OpenAI response chunk for streaming""" |
|
|
return OpenAIResponse( |
|
|
id=f"chatcmpl-{int(time.time())}", |
|
|
object="chat.completion.chunk", |
|
|
created=int(time.time()), |
|
|
model=model, |
|
|
choices=[Choice( |
|
|
index=0, |
|
|
delta=delta or Delta(), |
|
|
finish_reason=finish_reason |
|
|
)] |
|
|
) |
|
|
|
|
|
|
|
|
def handle_upstream_error(error: UpstreamError) -> Generator[str, None, None]: |
|
|
"""Handle upstream error response""" |
|
|
debug_log(f"上游错误: code={error.code}, detail={error.detail}") |
|
|
|
|
|
|
|
|
end_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
finish_reason="stop" |
|
|
) |
|
|
yield f"data: {end_chunk.model_dump_json()}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
|
|
|
|
|
|
class ResponseHandler: |
|
|
"""Base class for response handling""" |
|
|
|
|
|
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str): |
|
|
self.upstream_req = upstream_req |
|
|
self.chat_id = chat_id |
|
|
self.auth_token = auth_token |
|
|
|
|
|
def _call_upstream(self) -> requests.Response: |
|
|
"""Call upstream API with error handling""" |
|
|
max_retries = settings.MAX_RETRIES |
|
|
retry_count = 0 |
|
|
|
|
|
while retry_count < max_retries: |
|
|
try: |
|
|
debug_log(f"尝试调用上游API (第 {retry_count + 1}/{max_retries} 次)") |
|
|
response = call_upstream_api(self.upstream_req, self.chat_id, self.auth_token) |
|
|
|
|
|
|
|
|
if response.status_code == 200: |
|
|
|
|
|
token_manager.mark_token_success(self.auth_token) |
|
|
debug_log("上游API调用成功") |
|
|
return response |
|
|
elif response.status_code in [401, 403]: |
|
|
|
|
|
debug_log(f"Token认证失败 (状态码: {response.status_code}): {self.auth_token[:20]}...") |
|
|
token_manager.mark_token_failed(self.auth_token) |
|
|
|
|
|
|
|
|
new_token = token_manager.get_next_token() |
|
|
if new_token and new_token != self.auth_token: |
|
|
debug_log(f"尝试使用新token: {new_token[:20]}...") |
|
|
self.auth_token = new_token |
|
|
retry_count += 1 |
|
|
continue |
|
|
else: |
|
|
debug_log("没有更多可用token") |
|
|
return response |
|
|
elif response.status_code in [429]: |
|
|
|
|
|
debug_log(f"遇到速率限制 (状态码: {response.status_code}),等待后重试") |
|
|
if retry_count < max_retries - 1: |
|
|
import time |
|
|
time.sleep(2 ** retry_count) |
|
|
retry_count += 1 |
|
|
continue |
|
|
else: |
|
|
return response |
|
|
elif response.status_code >= 500: |
|
|
|
|
|
debug_log(f"服务器错误 (状态码: {response.status_code}),稍后重试") |
|
|
if retry_count < max_retries - 1: |
|
|
import time |
|
|
time.sleep(1) |
|
|
retry_count += 1 |
|
|
continue |
|
|
else: |
|
|
return response |
|
|
else: |
|
|
|
|
|
debug_log(f"客户端错误 (状态码: {response.status_code})") |
|
|
return response |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
debug_log(f"调用上游失败 (尝试 {retry_count + 1}/{max_retries}): {error_msg}") |
|
|
|
|
|
|
|
|
is_connection_error = any(keyword in error_msg.lower() for keyword in [ |
|
|
'connection', 'timeout', 'network', 'dns', 'socket', 'ssl' |
|
|
]) |
|
|
|
|
|
if is_connection_error: |
|
|
debug_log("检测到网络连接问题,不标记token失败") |
|
|
|
|
|
if retry_count < max_retries - 1: |
|
|
import time |
|
|
time.sleep(2) |
|
|
retry_count += 1 |
|
|
continue |
|
|
else: |
|
|
raise Exception(f"网络连接问题,重试{max_retries}次后仍失败: {error_msg}") |
|
|
else: |
|
|
|
|
|
debug_log("检测到可能的token问题,标记token失败") |
|
|
token_manager.mark_token_failed(self.auth_token) |
|
|
|
|
|
|
|
|
new_token = token_manager.get_next_token() |
|
|
if new_token and new_token != self.auth_token and retry_count < max_retries - 1: |
|
|
debug_log(f"尝试使用新token: {new_token[:20]}...") |
|
|
self.auth_token = new_token |
|
|
retry_count += 1 |
|
|
continue |
|
|
else: |
|
|
raise |
|
|
|
|
|
|
|
|
raise Exception("所有重试尝试均失败") |
|
|
|
|
|
def _handle_upstream_error(self, response: requests.Response) -> None: |
|
|
"""Handle upstream error response""" |
|
|
debug_log(f"上游返回错误状态: {response.status_code}") |
|
|
if settings.DEBUG_LOGGING: |
|
|
debug_log(f"上游错误响应: {response.text}") |
|
|
|
|
|
|
|
|
class StreamResponseHandler(ResponseHandler): |
|
|
"""Handler for streaming responses""" |
|
|
|
|
|
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False): |
|
|
super().__init__(upstream_req, chat_id, auth_token) |
|
|
self.has_tools = has_tools |
|
|
self.buffered_content = "" |
|
|
self.tool_calls = None |
|
|
|
|
|
self.tool_handler = SSEToolHandler(chat_id, settings.PRIMARY_MODEL) if has_tools else None |
|
|
|
|
|
self.first_thinking_chunk = True |
|
|
|
|
|
def handle(self) -> Generator[str, None, None]: |
|
|
"""Handle streaming response""" |
|
|
debug_log(f"开始处理流式响应 (chat_id={self.chat_id})") |
|
|
|
|
|
try: |
|
|
response = self._call_upstream() |
|
|
except Exception: |
|
|
yield "data: {\"error\": \"Failed to call upstream\"}\n\n" |
|
|
return |
|
|
|
|
|
if response.status_code != 200: |
|
|
self._handle_upstream_error(response) |
|
|
yield "data: {\"error\": \"Upstream error\"}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
first_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(role="assistant") |
|
|
) |
|
|
yield f"data: {first_chunk.model_dump_json()}\n\n" |
|
|
|
|
|
|
|
|
debug_log("开始读取上游SSE流") |
|
|
sent_initial_answer = False |
|
|
stream_ended_normally = False |
|
|
|
|
|
try: |
|
|
with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser: |
|
|
for event in parser.iter_json_data(UpstreamData): |
|
|
upstream_data = event['data'] |
|
|
|
|
|
|
|
|
if self._has_error(upstream_data): |
|
|
error = self._get_error(upstream_data) |
|
|
yield from handle_upstream_error(error) |
|
|
stream_ended_normally = True |
|
|
break |
|
|
|
|
|
debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, " |
|
|
f"内容长度: {len(upstream_data.data.delta_content or '')}, 完成: {upstream_data.data.done}") |
|
|
|
|
|
|
|
|
yield from self._process_content_with_tools(upstream_data, sent_initial_answer) |
|
|
|
|
|
|
|
|
if not sent_initial_answer and (upstream_data.data.delta_content or upstream_data.data.edit_content): |
|
|
sent_initial_answer = True |
|
|
|
|
|
|
|
|
if upstream_data.data.done or upstream_data.data.phase == "done": |
|
|
debug_log("检测到流结束信号") |
|
|
yield from self._send_end_chunk() |
|
|
stream_ended_normally = True |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
debug_log(f"SSE流处理异常: {e}") |
|
|
|
|
|
if not stream_ended_normally: |
|
|
error_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(content=f"\n\n[系统提示: 连接中断,响应可能不完整]") |
|
|
) |
|
|
yield f"data: {error_chunk.model_dump_json()}\n\n" |
|
|
|
|
|
|
|
|
if not stream_ended_normally: |
|
|
debug_log("流未正常结束,发送结束信号") |
|
|
yield from self._send_end_chunk(force_stop=True) |
|
|
|
|
|
def _has_error(self, upstream_data: UpstreamData) -> bool: |
|
|
"""Check if upstream data contains error""" |
|
|
return bool( |
|
|
upstream_data.error or |
|
|
upstream_data.data.error or |
|
|
(upstream_data.data.inner and upstream_data.data.inner.error) |
|
|
) |
|
|
|
|
|
def _get_error(self, upstream_data: UpstreamData) -> UpstreamError: |
|
|
"""Get error from upstream data""" |
|
|
return ( |
|
|
upstream_data.error or |
|
|
upstream_data.data.error or |
|
|
(upstream_data.data.inner.error if upstream_data.data.inner else None) |
|
|
) |
|
|
|
|
|
def _process_content( |
|
|
self, |
|
|
upstream_data: UpstreamData, |
|
|
sent_initial_answer: bool |
|
|
) -> Generator[str, None, None]: |
|
|
"""Process content from upstream data""" |
|
|
content = upstream_data.data.delta_content or upstream_data.data.edit_content |
|
|
|
|
|
if not content: |
|
|
return |
|
|
|
|
|
|
|
|
if upstream_data.data.phase == "thinking": |
|
|
content = transform_thinking_content(content) |
|
|
|
|
|
|
|
|
if self.has_tools: |
|
|
self.buffered_content += content |
|
|
else: |
|
|
|
|
|
if (not sent_initial_answer and |
|
|
upstream_data.data.edit_content and |
|
|
upstream_data.data.phase == "answer"): |
|
|
|
|
|
content = self._extract_edit_content(upstream_data.data.edit_content) |
|
|
if content: |
|
|
debug_log(f"发送普通内容: {content}") |
|
|
chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(content=content) |
|
|
) |
|
|
yield f"data: {chunk.model_dump_json()}\n\n" |
|
|
sent_initial_answer = True |
|
|
|
|
|
|
|
|
if upstream_data.data.delta_content: |
|
|
if content: |
|
|
if upstream_data.data.phase == "thinking": |
|
|
|
|
|
if self.first_thinking_chunk: |
|
|
formatted_content = f"<think>{content}" |
|
|
self.first_thinking_chunk = False |
|
|
else: |
|
|
formatted_content = content |
|
|
|
|
|
debug_log(f"发送思考内容: {content}") |
|
|
chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(content=formatted_content) |
|
|
) |
|
|
else: |
|
|
|
|
|
if not self.first_thinking_chunk and upstream_data.data.phase == "answer": |
|
|
|
|
|
thinking_end_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(content="</think>") |
|
|
) |
|
|
yield f"data: {thinking_end_chunk.model_dump_json()}\n\n" |
|
|
|
|
|
self.first_thinking_chunk = True |
|
|
|
|
|
debug_log(f"发送普通内容: {content}") |
|
|
chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(content=content) |
|
|
) |
|
|
yield f"data: {chunk.model_dump_json()}\n\n" |
|
|
|
|
|
def _extract_edit_content(self, edit_content: str) -> str: |
|
|
"""Extract content from edit_content field""" |
|
|
parts = edit_content.split("</details>") |
|
|
return parts[1] if len(parts) > 1 else "" |
|
|
|
|
|
def _send_end_chunk(self, force_stop: bool = False) -> Generator[str, None, None]: |
|
|
"""Send end chunk and DONE signal""" |
|
|
finish_reason = "stop" |
|
|
|
|
|
if self.has_tools and not force_stop: |
|
|
|
|
|
self.tool_calls = extract_tool_invocations(self.buffered_content) |
|
|
|
|
|
if self.tool_calls: |
|
|
debug_log(f"检测到工具调用: {len(self.tool_calls)} 个") |
|
|
|
|
|
for i, tc in enumerate(self.tool_calls): |
|
|
tool_call_delta = { |
|
|
"index": i, |
|
|
"id": tc.get("id"), |
|
|
"type": tc.get("type", "function"), |
|
|
"function": tc.get("function", {}), |
|
|
} |
|
|
|
|
|
out_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(tool_calls=[tool_call_delta]) |
|
|
) |
|
|
yield f"data: {out_chunk.model_dump_json()}\n\n" |
|
|
|
|
|
finish_reason = "tool_calls" |
|
|
else: |
|
|
|
|
|
trimmed_content = remove_tool_json_content(self.buffered_content) |
|
|
if trimmed_content: |
|
|
debug_log(f"发送常规内容: {len(trimmed_content)} 字符") |
|
|
content_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(content=trimmed_content) |
|
|
) |
|
|
yield f"data: {content_chunk.model_dump_json()}\n\n" |
|
|
elif force_stop: |
|
|
|
|
|
if self.buffered_content: |
|
|
debug_log(f"强制结束,发送缓冲内容: {len(self.buffered_content)} 字符") |
|
|
content_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
delta=Delta(content=self.buffered_content) |
|
|
) |
|
|
yield f"data: {content_chunk.model_dump_json()}\n\n" |
|
|
|
|
|
|
|
|
end_chunk = create_openai_response_chunk( |
|
|
model=settings.PRIMARY_MODEL, |
|
|
finish_reason=finish_reason |
|
|
) |
|
|
yield f"data: {end_chunk.model_dump_json()}\n\n" |
|
|
yield "data: [DONE]\n\n" |
|
|
debug_log(f"流式响应完成 (finish_reason: {finish_reason})") |
|
|
|
|
|
|
|
|
|
|
|
def _process_content_with_tools( |
|
|
self, |
|
|
upstream_data: UpstreamData, |
|
|
sent_initial_answer: bool |
|
|
) -> Generator[str, None, None]: |
|
|
"""Process content with improved tool handling""" |
|
|
|
|
|
if self.has_tools and self.tool_handler: |
|
|
|
|
|
if upstream_data.data.phase == "tool_call": |
|
|
|
|
|
yield from self.tool_handler.process_tool_call_phase( |
|
|
upstream_data.data.model_dump(), |
|
|
is_stream=True |
|
|
) |
|
|
return |
|
|
elif upstream_data.data.phase == "other": |
|
|
|
|
|
yield from self.tool_handler.process_other_phase( |
|
|
upstream_data.data.model_dump(), |
|
|
is_stream=True |
|
|
) |
|
|
return |
|
|
|
|
|
|
|
|
yield from self._process_content(upstream_data, sent_initial_answer) |
|
|
|
|
|
|
|
|
class NonStreamResponseHandler(ResponseHandler): |
|
|
"""Handler for non-streaming responses""" |
|
|
|
|
|
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False): |
|
|
super().__init__(upstream_req, chat_id, auth_token) |
|
|
self.has_tools = has_tools |
|
|
|
|
|
self.first_thinking_chunk = True |
|
|
self.in_thinking_phase = False |
|
|
|
|
|
def handle(self) -> JSONResponse: |
|
|
"""Handle non-streaming response""" |
|
|
debug_log(f"开始处理非流式响应 (chat_id={self.chat_id})") |
|
|
|
|
|
try: |
|
|
response = self._call_upstream() |
|
|
except Exception as e: |
|
|
debug_log(f"调用上游失败: {e}") |
|
|
raise HTTPException(status_code=502, detail="Failed to call upstream") |
|
|
|
|
|
if response.status_code != 200: |
|
|
self._handle_upstream_error(response) |
|
|
raise HTTPException(status_code=502, detail="Upstream error") |
|
|
|
|
|
|
|
|
full_content = [] |
|
|
debug_log("开始收集完整响应内容") |
|
|
response_completed = False |
|
|
|
|
|
try: |
|
|
with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser: |
|
|
for event in parser.iter_json_data(UpstreamData): |
|
|
upstream_data = event['data'] |
|
|
|
|
|
if upstream_data.data.delta_content: |
|
|
content = upstream_data.data.delta_content |
|
|
|
|
|
if upstream_data.data.phase == "thinking": |
|
|
content = transform_thinking_content(content) |
|
|
|
|
|
|
|
|
if not self.in_thinking_phase: |
|
|
|
|
|
self.in_thinking_phase = True |
|
|
if self.first_thinking_chunk: |
|
|
content = f"<think>{content}" |
|
|
self.first_thinking_chunk = False |
|
|
else: |
|
|
content = f"<think>{content}" |
|
|
|
|
|
else: |
|
|
|
|
|
if self.in_thinking_phase: |
|
|
|
|
|
if full_content and not self.first_thinking_chunk: |
|
|
full_content.append("</think>") |
|
|
self.in_thinking_phase = False |
|
|
self.first_thinking_chunk = True |
|
|
|
|
|
if content: |
|
|
full_content.append(content) |
|
|
|
|
|
if upstream_data.data.done or upstream_data.data.phase == "done": |
|
|
debug_log("检测到完成信号,停止收集") |
|
|
response_completed = True |
|
|
break |
|
|
|
|
|
except Exception as e: |
|
|
debug_log(f"非流式响应收集异常: {e}") |
|
|
if not full_content: |
|
|
|
|
|
raise HTTPException(status_code=502, detail=f"Response collection failed: {str(e)}") |
|
|
else: |
|
|
debug_log(f"部分内容收集成功,继续处理 ({len(full_content)} 个片段)") |
|
|
|
|
|
if not response_completed and not full_content: |
|
|
debug_log("响应未完成且无内容,可能是连接问题") |
|
|
raise HTTPException(status_code=502, detail="Incomplete response from upstream") |
|
|
|
|
|
|
|
|
if self.in_thinking_phase and not self.first_thinking_chunk: |
|
|
full_content.append("</think>") |
|
|
|
|
|
final_content = "".join(full_content) |
|
|
debug_log(f"内容收集完成,最终长度: {len(final_content)}") |
|
|
|
|
|
|
|
|
tool_calls = None |
|
|
finish_reason = "stop" |
|
|
message_content = final_content |
|
|
|
|
|
if self.has_tools: |
|
|
tool_calls = extract_tool_invocations(final_content) |
|
|
if tool_calls: |
|
|
|
|
|
message_content = None |
|
|
finish_reason = "tool_calls" |
|
|
debug_log(f"提取到工具调用: {json.dumps(tool_calls, ensure_ascii=False)}") |
|
|
else: |
|
|
|
|
|
message_content = remove_tool_json_content(final_content) |
|
|
if not message_content: |
|
|
message_content = final_content |
|
|
|
|
|
|
|
|
response_data = OpenAIResponse( |
|
|
id=f"chatcmpl-{int(time.time())}", |
|
|
object="chat.completion", |
|
|
created=int(time.time()), |
|
|
model=settings.PRIMARY_MODEL, |
|
|
choices=[Choice( |
|
|
index=0, |
|
|
message=Message( |
|
|
role="assistant", |
|
|
content=message_content, |
|
|
tool_calls=tool_calls |
|
|
), |
|
|
finish_reason=finish_reason |
|
|
)], |
|
|
usage=Usage() |
|
|
) |
|
|
|
|
|
debug_log("非流式响应发送完成") |
|
|
return JSONResponse(content=response_data.model_dump(exclude_none=True)) |