Spaces:
Sleeping
Sleeping
File size: 24,379 Bytes
f4baae1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 |
"""
Response handlers for streaming and non-streaming responses
"""
import json
import time
from typing import Generator, Optional
import requests
from fastapi import HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
from app.core.config import settings
from app.models.schemas import (
Message, Delta, Choice, Usage, OpenAIResponse,
UpstreamRequest, UpstreamData, UpstreamError, ModelItem
)
from app.utils.helpers import debug_log, call_upstream_api, transform_thinking_content
from app.core.token_manager import token_manager
from app.utils.sse_parser import SSEParser
from app.utils.tools import extract_tool_invocations, remove_tool_json_content
from app.utils.sse_tool_handler import SSEToolHandler
def create_openai_response_chunk(
model: str,
delta: Optional[Delta] = None,
finish_reason: Optional[str] = None
) -> OpenAIResponse:
"""Create OpenAI response chunk for streaming"""
return OpenAIResponse(
id=f"chatcmpl-{int(time.time())}",
object="chat.completion.chunk",
created=int(time.time()),
model=model,
choices=[Choice(
index=0,
delta=delta or Delta(),
finish_reason=finish_reason
)]
)
def handle_upstream_error(error: UpstreamError) -> Generator[str, None, None]:
"""Handle upstream error response"""
debug_log(f"上游错误: code={error.code}, detail={error.detail}")
# Send end chunk
end_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
finish_reason="stop"
)
yield f"data: {end_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
class ResponseHandler:
"""Base class for response handling"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str):
self.upstream_req = upstream_req
self.chat_id = chat_id
self.auth_token = auth_token
def _call_upstream(self) -> requests.Response:
"""Call upstream API with error handling"""
max_retries = settings.MAX_RETRIES
retry_count = 0
while retry_count < max_retries:
try:
debug_log(f"尝试调用上游API (第 {retry_count + 1}/{max_retries} 次)")
response = call_upstream_api(self.upstream_req, self.chat_id, self.auth_token)
# Check if response is successful
if response.status_code == 200:
# Mark token as successful
token_manager.mark_token_success(self.auth_token)
debug_log("上游API调用成功")
return response
elif response.status_code in [401, 403]:
# Authentication/authorization error - mark token as failed
debug_log(f"Token认证失败 (状态码: {response.status_code}): {self.auth_token[:20]}...")
token_manager.mark_token_failed(self.auth_token)
# Try to get a new token
new_token = token_manager.get_next_token()
if new_token and new_token != self.auth_token:
debug_log(f"尝试使用新token: {new_token[:20]}...")
self.auth_token = new_token
retry_count += 1
continue
else:
debug_log("没有更多可用token")
return response
elif response.status_code in [429]:
# Rate limit - don't mark token as failed, just retry
debug_log(f"遇到速率限制 (状态码: {response.status_code}),等待后重试")
if retry_count < max_retries - 1:
import time
time.sleep(2 ** retry_count) # 指数退避
retry_count += 1
continue
else:
return response
elif response.status_code >= 500:
# Server error - retry without marking token as failed
debug_log(f"服务器错误 (状态码: {response.status_code}),稍后重试")
if retry_count < max_retries - 1:
import time
time.sleep(1)
retry_count += 1
continue
else:
return response
else:
# Other client errors, return response as-is
debug_log(f"客户端错误 (状态码: {response.status_code})")
return response
except Exception as e:
error_msg = str(e)
debug_log(f"调用上游失败 (尝试 {retry_count + 1}/{max_retries}): {error_msg}")
# 判断是否是连接问题还是token问题
is_connection_error = any(keyword in error_msg.lower() for keyword in [
'connection', 'timeout', 'network', 'dns', 'socket', 'ssl'
])
if is_connection_error:
debug_log("检测到网络连接问题,不标记token失败")
# 网络问题不标记token失败,直接重试
if retry_count < max_retries - 1:
import time
time.sleep(2) # 等待2秒后重试
retry_count += 1
continue
else:
raise Exception(f"网络连接问题,重试{max_retries}次后仍失败: {error_msg}")
else:
# 其他错误可能是token问题,标记失败并尝试新token
debug_log("检测到可能的token问题,标记token失败")
token_manager.mark_token_failed(self.auth_token)
# Try to get a new token
new_token = token_manager.get_next_token()
if new_token and new_token != self.auth_token and retry_count < max_retries - 1:
debug_log(f"尝试使用新token: {new_token[:20]}...")
self.auth_token = new_token
retry_count += 1
continue
else:
raise
# If we get here, all retries failed
raise Exception("所有重试尝试均失败")
def _handle_upstream_error(self, response: requests.Response) -> None:
"""Handle upstream error response"""
debug_log(f"上游返回错误状态: {response.status_code}")
if settings.DEBUG_LOGGING:
debug_log(f"上游错误响应: {response.text}")
class StreamResponseHandler(ResponseHandler):
"""Handler for streaming responses"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False):
super().__init__(upstream_req, chat_id, auth_token)
self.has_tools = has_tools
self.buffered_content = ""
self.tool_calls = None
# Initialize SSE tool handler for improved tool processing
self.tool_handler = SSEToolHandler(chat_id, settings.PRIMARY_MODEL) if has_tools else None
# 思考状态跟踪
self.first_thinking_chunk = True
def handle(self) -> Generator[str, None, None]:
"""Handle streaming response"""
debug_log(f"开始处理流式响应 (chat_id={self.chat_id})")
try:
response = self._call_upstream()
except Exception:
yield "data: {\"error\": \"Failed to call upstream\"}\n\n"
return
if response.status_code != 200:
self._handle_upstream_error(response)
yield "data: {\"error\": \"Upstream error\"}\n\n"
return
# Send initial role chunk
first_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(role="assistant")
)
yield f"data: {first_chunk.model_dump_json()}\n\n"
# Process stream
debug_log("开始读取上游SSE流")
sent_initial_answer = False
stream_ended_normally = False
try:
with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
for event in parser.iter_json_data(UpstreamData):
upstream_data = event['data']
# Check for errors
if self._has_error(upstream_data):
error = self._get_error(upstream_data)
yield from handle_upstream_error(error)
stream_ended_normally = True
break
debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, "
f"内容长度: {len(upstream_data.data.delta_content or '')}, 完成: {upstream_data.data.done}")
# Process content
yield from self._process_content_with_tools(upstream_data, sent_initial_answer)
# Update sent_initial_answer flag if we sent content
if not sent_initial_answer and (upstream_data.data.delta_content or upstream_data.data.edit_content):
sent_initial_answer = True
# Check if done
if upstream_data.data.done or upstream_data.data.phase == "done":
debug_log("检测到流结束信号")
yield from self._send_end_chunk()
stream_ended_normally = True
break
except Exception as e:
debug_log(f"SSE流处理异常: {e}")
# 流异常结束,发送错误响应
if not stream_ended_normally:
error_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=f"\n\n[系统提示: 连接中断,响应可能不完整]")
)
yield f"data: {error_chunk.model_dump_json()}\n\n"
# 确保流正常结束
if not stream_ended_normally:
debug_log("流未正常结束,发送结束信号")
yield from self._send_end_chunk(force_stop=True)
def _has_error(self, upstream_data: UpstreamData) -> bool:
"""Check if upstream data contains error"""
return bool(
upstream_data.error or
upstream_data.data.error or
(upstream_data.data.inner and upstream_data.data.inner.error)
)
def _get_error(self, upstream_data: UpstreamData) -> UpstreamError:
"""Get error from upstream data"""
return (
upstream_data.error or
upstream_data.data.error or
(upstream_data.data.inner.error if upstream_data.data.inner else None)
)
def _process_content(
self,
upstream_data: UpstreamData,
sent_initial_answer: bool
) -> Generator[str, None, None]:
"""Process content from upstream data"""
content = upstream_data.data.delta_content or upstream_data.data.edit_content
if not content:
return
# Transform thinking content
if upstream_data.data.phase == "thinking":
content = transform_thinking_content(content)
# Buffer content if tools are enabled
if self.has_tools:
self.buffered_content += content
else:
# Handle initial answer content
if (not sent_initial_answer and
upstream_data.data.edit_content and
upstream_data.data.phase == "answer"):
content = self._extract_edit_content(upstream_data.data.edit_content)
if content:
debug_log(f"发送普通内容: {content}")
chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=content)
)
yield f"data: {chunk.model_dump_json()}\n\n"
sent_initial_answer = True
# Handle delta content
if upstream_data.data.delta_content:
if content:
if upstream_data.data.phase == "thinking":
# 第一个思考块添加<think>开始标签,其他块保持纯内容
if self.first_thinking_chunk:
formatted_content = f"<think>{content}"
self.first_thinking_chunk = False
else:
formatted_content = content
debug_log(f"发送思考内容: {content}")
chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=formatted_content)
)
else:
# 如果从thinking阶段转到其他阶段,需要结束thinking标签
if not self.first_thinking_chunk and upstream_data.data.phase == "answer":
# 先发送思考结束标签
thinking_end_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content="</think>")
)
yield f"data: {thinking_end_chunk.model_dump_json()}\n\n"
# 重置状态
self.first_thinking_chunk = True
debug_log(f"发送普通内容: {content}")
chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=content)
)
yield f"data: {chunk.model_dump_json()}\n\n"
def _extract_edit_content(self, edit_content: str) -> str:
"""Extract content from edit_content field"""
parts = edit_content.split("</details>")
return parts[1] if len(parts) > 1 else ""
def _send_end_chunk(self, force_stop: bool = False) -> Generator[str, None, None]:
"""Send end chunk and DONE signal"""
finish_reason = "stop"
if self.has_tools and not force_stop:
# Try to extract tool calls from buffered content
self.tool_calls = extract_tool_invocations(self.buffered_content)
if self.tool_calls:
debug_log(f"检测到工具调用: {len(self.tool_calls)} 个")
# Send tool calls with proper format
for i, tc in enumerate(self.tool_calls):
tool_call_delta = {
"index": i,
"id": tc.get("id"),
"type": tc.get("type", "function"),
"function": tc.get("function", {}),
}
out_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(tool_calls=[tool_call_delta])
)
yield f"data: {out_chunk.model_dump_json()}\n\n"
finish_reason = "tool_calls"
else:
# Send regular content
trimmed_content = remove_tool_json_content(self.buffered_content)
if trimmed_content:
debug_log(f"发送常规内容: {len(trimmed_content)} 字符")
content_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=trimmed_content)
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
elif force_stop:
# 强制结束时,发送缓冲的内容(如果有)
if self.buffered_content:
debug_log(f"强制结束,发送缓冲内容: {len(self.buffered_content)} 字符")
content_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
delta=Delta(content=self.buffered_content)
)
yield f"data: {content_chunk.model_dump_json()}\n\n"
# Send final chunk
end_chunk = create_openai_response_chunk(
model=settings.PRIMARY_MODEL,
finish_reason=finish_reason
)
yield f"data: {end_chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
debug_log(f"流式响应完成 (finish_reason: {finish_reason})")
def _process_content_with_tools(
self,
upstream_data: UpstreamData,
sent_initial_answer: bool
) -> Generator[str, None, None]:
"""Process content with improved tool handling"""
# Handle tool calls with improved SSE tool handler
if self.has_tools and self.tool_handler:
# Check if this is a tool_call phase
if upstream_data.data.phase == "tool_call":
# Use the improved tool handler for tool call processing
yield from self.tool_handler.process_tool_call_phase(
upstream_data.data.model_dump(),
is_stream=True
)
return
elif upstream_data.data.phase == "other":
# Handle other phase which may contain tool completion signals
yield from self.tool_handler.process_other_phase(
upstream_data.data.model_dump(),
is_stream=True
)
return
# Fall back to original content processing
yield from self._process_content(upstream_data, sent_initial_answer)
class NonStreamResponseHandler(ResponseHandler):
"""Handler for non-streaming responses"""
def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False):
super().__init__(upstream_req, chat_id, auth_token)
self.has_tools = has_tools
# 思考状态跟踪
self.first_thinking_chunk = True
self.in_thinking_phase = False
def handle(self) -> JSONResponse:
"""Handle non-streaming response"""
debug_log(f"开始处理非流式响应 (chat_id={self.chat_id})")
try:
response = self._call_upstream()
except Exception as e:
debug_log(f"调用上游失败: {e}")
raise HTTPException(status_code=502, detail="Failed to call upstream")
if response.status_code != 200:
self._handle_upstream_error(response)
raise HTTPException(status_code=502, detail="Upstream error")
# Collect full response
full_content = []
debug_log("开始收集完整响应内容")
response_completed = False
try:
with SSEParser(response, debug_mode=settings.DEBUG_LOGGING) as parser:
for event in parser.iter_json_data(UpstreamData):
upstream_data = event['data']
if upstream_data.data.delta_content:
content = upstream_data.data.delta_content
if upstream_data.data.phase == "thinking":
content = transform_thinking_content(content)
# 处理思考内容的分块格式
if not self.in_thinking_phase:
# 进入思考阶段,添加开始标签
self.in_thinking_phase = True
if self.first_thinking_chunk:
content = f"<think>{content}"
self.first_thinking_chunk = False
else:
content = f"<think>{content}"
# 如果已经在思考阶段,保持纯内容
else:
# 如果从thinking阶段转到其他阶段
if self.in_thinking_phase:
# 添加结束标签到前一个内容
if full_content and not self.first_thinking_chunk:
full_content.append("</think>")
self.in_thinking_phase = False
self.first_thinking_chunk = True
if content:
full_content.append(content)
if upstream_data.data.done or upstream_data.data.phase == "done":
debug_log("检测到完成信号,停止收集")
response_completed = True
break
except Exception as e:
debug_log(f"非流式响应收集异常: {e}")
if not full_content:
# 如果没有收集到任何内容,抛出异常
raise HTTPException(status_code=502, detail=f"Response collection failed: {str(e)}")
else:
debug_log(f"部分内容收集成功,继续处理 ({len(full_content)} 个片段)")
if not response_completed and not full_content:
debug_log("响应未完成且无内容,可能是连接问题")
raise HTTPException(status_code=502, detail="Incomplete response from upstream")
# 如果响应结束时还在思考阶段,需要添加结束标签
if self.in_thinking_phase and not self.first_thinking_chunk:
full_content.append("</think>")
final_content = "".join(full_content)
debug_log(f"内容收集完成,最终长度: {len(final_content)}")
# Handle tool calls for non-streaming
tool_calls = None
finish_reason = "stop"
message_content = final_content
if self.has_tools:
tool_calls = extract_tool_invocations(final_content)
if tool_calls:
# Content must be null when tool_calls are present (OpenAI spec)
message_content = None
finish_reason = "tool_calls"
debug_log(f"提取到工具调用: {json.dumps(tool_calls, ensure_ascii=False)}")
else:
# Remove tool JSON from content
message_content = remove_tool_json_content(final_content)
if not message_content:
message_content = final_content # 保留原内容如果清理后为空
# Build response
response_data = OpenAIResponse(
id=f"chatcmpl-{int(time.time())}",
object="chat.completion",
created=int(time.time()),
model=settings.PRIMARY_MODEL,
choices=[Choice(
index=0,
message=Message(
role="assistant",
content=message_content,
tool_calls=tool_calls
),
finish_reason=finish_reason
)],
usage=Usage()
)
debug_log("非流式响应发送完成")
return JSONResponse(content=response_data.model_dump(exclude_none=True)) |