z.ai2api_python / app /core /response_handlers.py
bluewinliang's picture
Upload 25 files
f4baae1 verified
"""
Response handlers for streaming and non-streaming responses
"""
import json
import time
from typing import Generator, Optional
import requests
from fastapi import HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from app.core.config import settings
from app.models.schemas import (
Message, Delta, Choice, Usage, OpenAIResponse,
UpstreamRequest, UpstreamData, UpstreamError, ModelItem
)
from app.utils.helpers import debug_log, call_upstream_api, transform_thinking_content
from app.core.token_manager import token_manager
from app.utils.sse_parser import SSEParser
from app.utils.tools import extract_tool_invocations, remove_tool_json_content
from app.utils.sse_tool_handler import SSEToolHandler
def create_openai_response_chunk(
model: str,
delta: Optional[Delta] = None,
finish_reason: Optional[str] = None
) -> OpenAIResponse:
"""Create OpenAI response chunk for streaming"""
return OpenAIResponse(
id=f"chatcmpl-{int(time.time())}",
object="chat.completion.chunk",
created=int(time.time()),
model=model,
choices=[Choice(
index=0,
delta=delta or Delta(),
finish_reason=finish_reason
)]
)
def handle_upstream_error(error: UpstreamError) -> Generator[str, None, None]:
"""Handle upstream error response"""
debug_log(f"上游错误: code={error.code}, detail={error.detail}")
# Send end chunk
end_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
finish_reason="stop"
)
yield f"data: {end_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
class ResponseHandler:
"""Base class for response handling"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str):
self.upstream_req = upstream_req
self.chat_id = chat_id
self.auth_token = auth_token
def _call_upstream(self) -> requests.Response:
"""Call upstream API with error handling"""
max_retries = settings.MAX_RETRIES
retry_count = 0
while retry_count < max_retries:
try:
debug_log(f"尝试调用上游API (第 {retry_count + 1}/{max_retries} 次)")
response = call_upstream_api(self.upstream_req, self.chat_id, self.auth_token)
# Check if response is successful
if response.status_code == 200:
# Mark token as successful
token_manager.mark_token_success(self.auth_token)
debug_log("上游API调用成功")
return response
elif response.status_code in [401, 403]:
# Authentication/authorization error - mark token as failed
debug_log(f"Token认证失败 (状态码: {response.status_code}): {self.auth_token[:20]}...")
token_manager.mark_token_failed(self.auth_token)
# Try to get a new token
new_token = token_manager.get_next_token()
if new_token and new_token != self.auth_token:
debug_log(f"尝试使用新token: {new_token[:20]}...")
self.auth_token = new_token
retry_count += 1
continue
else:
debug_log("没有更多可用token")
return response
elif response.status_code in [429]:
# Rate limit - don't mark token as failed, just retry
debug_log(f"遇到速率限制 (状态码: {response.status_code}),等待后重试")
if retry_count < max_retries - 1:
import time
time.sleep(2 ** retry_count) # 指数退避
retry_count += 1
continue
else:
return response
elif response.status_code >= 500:
# Server error - retry without marking token as failed
debug_log(f"服务器错误 (状态码: {response.status_code}),稍后重试")
if retry_count < max_retries - 1:
import time
time.sleep(1)
retry_count += 1
continue
else:
return response
else:
# Other client errors, return response as-is
debug_log(f"客户端错误 (状态码: {response.status_code})")
return response
except Exception as e:
error_msg = str(e)
debug_log(f"调用上游失败 (尝试 {retry_count + 1}/{max_retries}): {error_msg}")
# 判断是否是连接问题还是token问题
is_connection_error = any(keyword in error_msg.lower() for keyword in [
'connection', 'timeout', 'network', 'dns', 'socket', 'ssl'
])
if is_connection_error:
debug_log("检测到网络连接问题,不标记token失败")
# 网络问题不标记token失败,直接重试
if retry_count < max_retries - 1:
import time
time.sleep(2) # 等待2秒后重试
retry_count += 1
continue
else:
raise Exception(f"网络连接问题,重试{max_retries}次后仍失败: {error_msg}")
else:
# 其他错误可能是token问题,标记失败并尝试新token
debug_log("检测到可能的token问题,标记token失败")
token_manager.mark_token_failed(self.auth_token)
# Try to get a new token
new_token = token_manager.get_next_token()
if new_token and new_token != self.auth_token and retry_count < max_retries - 1:
debug_log(f"尝试使用新token: {new_token[:20]}...")
self.auth_token = new_token
retry_count += 1
continue
else:
raise
# If we get here, all retries failed
raise Exception("所有重试尝试均失败")
def _handle_upstream_error(self, response: requests.Response) -> None:
"""Handle upstream error response"""
debug_log(f"上游返回错误状态: {response.status_code}")
if settings.DEBUG_LOGGING:
debug_log(f"上游错误响应: {response.text}")
class StreamResponseHandler(ResponseHandler):
"""Handler for streaming responses"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False):
super().__init__(upstream_req, chat_id, auth_token)
self.has_tools = has_tools
self.buffered_content = ""
self.tool_calls = None
# Initialize SSE tool handler for improved tool processing
self.tool_handler = SSEToolHandler(chat_id, settings.PRIMARY_MODEL) if has_tools else None
# 思考状态跟踪
self.first_thinking_chunk = True
def handle(self) -> Generator[str, None, None]:
"""Handle streaming response"""
debug_log(f"开始处理流式响应 (chat_id={self.chat_id})")
try:
response = self._call_upstream()
except Exception:
yield "data: {\"error\": \"Failed to call upstream\"}\n\n"
return
if response.status_code != 200:
self._handle_upstream_error(response)
yield "data: {\"error\": \"Upstream error\"}\n\n"
return
# Send initial role chunk
first_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(role="assistant")
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Process stream
debug_log("开始读取上游SSE流")
sent_initial_answer = False
stream_ended_normally = False
try:
with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
for event in parser.iter_json_data(UpstreamData):
upstream_data = event['data']
# Check for errors
if self._has_error(upstream_data):
error = self._get_error(upstream_data)
yield from handle_upstream_error(error)
stream_ended_normally = True
break
debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, "
f"内容长度: {len(upstream_data.data.delta_content or '')}, 完成: {upstream_data.data.done}")
# Process content
yield from self._process_content_with_tools(upstream_data, sent_initial_answer)
# Update sent_initial_answer flag if we sent content
if not sent_initial_answer and (upstream_data.data.delta_content or upstream_data.data.edit_content):
sent_initial_answer = True
# Check if done
if upstream_data.data.done or upstream_data.data.phase == "done":
debug_log("检测到流结束信号")
yield from self._send_end_chunk()
stream_ended_normally = True
break
except Exception as e:
debug_log(f"SSE流处理异常: {e}")
# 流异常结束,发送错误响应
if not stream_ended_normally:
error_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=f"\n\n[系统提示: 连接中断,响应可能不完整]")
)
yield f"data: {error_chunk.model_dump_json()}\n\n"
# 确保流正常结束
if not stream_ended_normally:
debug_log("流未正常结束,发送结束信号")
yield from self._send_end_chunk(force_stop=True)
def _has_error(self, upstream_data: UpstreamData) -> bool:
"""Check if upstream data contains error"""
return bool(
upstream_data.error or
upstream_data.data.error or
(upstream_data.data.inner and upstream_data.data.inner.error)
)
def _get_error(self, upstream_data: UpstreamData) -> UpstreamError:
"""Get error from upstream data"""
return (
upstream_data.error or
upstream_data.data.error or
(upstream_data.data.inner.error if upstream_data.data.inner else None)
)
def _process_content(
self,
upstream_data: UpstreamData,
sent_initial_answer: bool
) -> Generator[str, None, None]:
"""Process content from upstream data"""
content = upstream_data.data.delta_content or upstream_data.data.edit_content
if not content:
return
# Transform thinking content
if upstream_data.data.phase == "thinking":
content = transform_thinking_content(content)
# Buffer content if tools are enabled
if self.has_tools:
self.buffered_content += content
else:
# Handle initial answer content
if (not sent_initial_answer and
upstream_data.data.edit_content and
upstream_data.data.phase == "answer"):
content = self._extract_edit_content(upstream_data.data.edit_content)
if content:
debug_log(f"发送普通内容: {content}")
chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=content)
)
yield f"data: {chunk.model_dump_json()}\n\n"
sent_initial_answer = True
# Handle delta content
if upstream_data.data.delta_content:
if content:
if upstream_data.data.phase == "thinking":
# 第一个思考块添加<think>开始标签,其他块保持纯内容
if self.first_thinking_chunk:
formatted_content = f"<think>{content}"
self.first_thinking_chunk = False
else:
formatted_content = content
debug_log(f"发送思考内容: {content}")
chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=formatted_content)
)
else:
# 如果从thinking阶段转到其他阶段,需要结束thinking标签
if not self.first_thinking_chunk and upstream_data.data.phase == "answer":
# 先发送思考结束标签
thinking_end_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content="</think>")
)
yield f"data: {thinking_end_chunk.model_dump_json()}\n\n"
# 重置状态
self.first_thinking_chunk = True
debug_log(f"发送普通内容: {content}")
chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=content)
)
yield f"data: {chunk.model_dump_json()}\n\n"
def _extract_edit_content(self, edit_content: str) -> str:
"""Extract content from edit_content field"""
parts = edit_content.split("</details>")
return parts[1] if len(parts) > 1 else ""
def _send_end_chunk(self, force_stop: bool = False) -> Generator[str, None, None]:
"""Send end chunk and DONE signal"""
finish_reason = "stop"
if self.has_tools and not force_stop:
# Try to extract tool calls from buffered content
self.tool_calls = extract_tool_invocations(self.buffered_content)
if self.tool_calls:
debug_log(f"检测到工具调用: {len(self.tool_calls)} 个")
# Send tool calls with proper format
for i, tc in enumerate(self.tool_calls):
tool_call_delta = {
"index": i,
"id": tc.get("id"),
"type": tc.get("type", "function"),
"function": tc.get("function", {}),
}
out_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(tool_calls=[tool_call_delta])
)
yield f"data: {out_chunk.model_dump_json()}\n\n"
finish_reason = "tool_calls"
else:
# Send regular content
trimmed_content = remove_tool_json_content(self.buffered_content)
if trimmed_content:
debug_log(f"发送常规内容: {len(trimmed_content)} 字符")
content_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=trimmed_content)
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
elif force_stop:
# 强制结束时,发送缓冲的内容(如果有)
if self.buffered_content:
debug_log(f"强制结束,发送缓冲内容: {len(self.buffered_content)} 字符")
content_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=self.buffered_content)
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
# Send final chunk
end_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
finish_reason=finish_reason
)
yield f"data: {end_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
debug_log(f"流式响应完成 (finish_reason: {finish_reason})")
def _process_content_with_tools(
self,
upstream_data: UpstreamData,
sent_initial_answer: bool
) -> Generator[str, None, None]:
"""Process content with improved tool handling"""
# Handle tool calls with improved SSE tool handler
if self.has_tools and self.tool_handler:
# Check if this is a tool_call phase
if upstream_data.data.phase == "tool_call":
# Use the improved tool handler for tool call processing
yield from self.tool_handler.process_tool_call_phase(
upstream_data.data.model_dump(),
is_stream=True
)
return
elif upstream_data.data.phase == "other":
# Handle other phase which may contain tool completion signals
yield from self.tool_handler.process_other_phase(
upstream_data.data.model_dump(),
is_stream=True
)
return
# Fall back to original content processing
yield from self._process_content(upstream_data, sent_initial_answer)
class NonStreamResponseHandler(ResponseHandler):
"""Handler for non-streaming responses"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False):
super().__init__(upstream_req, chat_id, auth_token)
self.has_tools = has_tools
# 思考状态跟踪
self.first_thinking_chunk = True
self.in_thinking_phase = False
def handle(self) -> JSONResponse:
"""Handle non-streaming response"""
debug_log(f"开始处理非流式响应 (chat_id={self.chat_id})")
try:
response = self._call_upstream()
except Exception as e:
debug_log(f"调用上游失败: {e}")
raise HTTPException(status_code=502, detail="Failed to call upstream")
if response.status_code != 200:
self._handle_upstream_error(response)
raise HTTPException(status_code=502, detail="Upstream error")
# Collect full response
full_content = []
debug_log("开始收集完整响应内容")
response_completed = False
try:
with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
for event in parser.iter_json_data(UpstreamData):
upstream_data = event['data']
if upstream_data.data.delta_content:
content = upstream_data.data.delta_content
if upstream_data.data.phase == "thinking":
content = transform_thinking_content(content)
# 处理思考内容的分块格式
if not self.in_thinking_phase:
# 进入思考阶段,添加开始标签
self.in_thinking_phase = True
if self.first_thinking_chunk:
content = f"<think>{content}"
self.first_thinking_chunk = False
else:
content = f"<think>{content}"
# 如果已经在思考阶段,保持纯内容
else:
# 如果从thinking阶段转到其他阶段
if self.in_thinking_phase:
# 添加结束标签到前一个内容
if full_content and not self.first_thinking_chunk:
full_content.append("</think>")
self.in_thinking_phase = False
self.first_thinking_chunk = True
if content:
full_content.append(content)
if upstream_data.data.done or upstream_data.data.phase == "done":
debug_log("检测到完成信号,停止收集")
response_completed = True
break
except Exception as e:
debug_log(f"非流式响应收集异常: {e}")
if not full_content:
# 如果没有收集到任何内容,抛出异常
raise HTTPException(status_code=502, detail=f"Response collection failed: {str(e)}")
else:
debug_log(f"部分内容收集成功,继续处理 ({len(full_content)} 个片段)")
if not response_completed and not full_content:
debug_log("响应未完成且无内容,可能是连接问题")
raise HTTPException(status_code=502, detail="Incomplete response from upstream")
# 如果响应结束时还在思考阶段,需要添加结束标签
if self.in_thinking_phase and not self.first_thinking_chunk:
full_content.append("</think>")
final_content = "".join(full_content)
debug_log(f"内容收集完成,最终长度: {len(final_content)}")
# Handle tool calls for non-streaming
tool_calls = None
finish_reason = "stop"
message_content = final_content
if self.has_tools:
tool_calls = extract_tool_invocations(final_content)
if tool_calls:
# Content must be null when tool_calls are present (OpenAI spec)
message_content = None
finish_reason = "tool_calls"
debug_log(f"提取到工具调用: {json.dumps(tool_calls, ensure_ascii=False)}")
else:
# Remove tool JSON from content
message_content = remove_tool_json_content(final_content)
if not message_content:
message_content = final_content # 保留原内容如果清理后为空
# Build response
response_data = OpenAIResponse(
id=f"chatcmpl-{int(time.time())}",
object="chat.completion",
created=int(time.time()),
model=settings.PRIMARY_MODEL,
choices=[Choice(
index=0,
message=Message(
role="assistant",
content=message_content,
tool_calls=tool_calls
),
finish_reason=finish_reason
)],
usage=Usage()
)
debug_log("非流式响应发送完成")
return JSONResponse(content=response_data.model_dump(exclude_none=True))