#!/usr/bin/env python3 """ 测试 done 阶段处理 """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from app.utils.sse_tool_handler import SSEToolHandler import json def test_done_phase_handling(): """测试 done 阶段的处理""" handler = SSEToolHandler("test-model", stream=True) print("🧪 测试 done 阶段处理\n") # 模拟完整的对话流程 test_chunks = [ # 回答阶段 { "phase": "answer", "delta_content": "这是回答内容", "edit_content": "" }, # 完成阶段 { "phase": "done", "done": True, "delta_content": "", "usage": { "prompt_tokens": 100, "completion_tokens": 50, "total_tokens": 150 } } ] output_chunks = [] for i, chunk in enumerate(test_chunks, 1): print(f"处理块 {i}: phase={chunk['phase']}") results = list(handler.process_sse_chunk(chunk)) output_chunks.extend(results) print(f" 输出数量: {len(results)}") for j, result in enumerate(results): if result.strip() == "data: [DONE]": print(f" 输出 {j+1}: [DONE] 标记") else: print(f" 输出 {j+1}: {result[:80]}{'...' if len(result) > 80 else ''}") print() print(f"📊 测试结果:") print(f" 总输出块数量: {len(output_chunks)}") # 验证输出内容 has_content = False has_final_chunk = False has_done_marker = False has_usage = False for output in output_chunks: if output.startswith("data: "): json_str = output[6:].strip() if json_str == "[DONE]": has_done_marker = True print(" ✅ 找到 [DONE] 标记") elif json_str: try: data = json.loads(json_str) if "choices" in data and data["choices"]: delta = data["choices"][0].get("delta", {}) content = delta.get("content", "") finish_reason = data["choices"][0].get("finish_reason") if content: has_content = True print(f" ✅ 找到内容: '{content}'") if finish_reason == "stop": has_final_chunk = True print(" ✅ 找到最终完成块") if "usage" in data: has_usage = True print(f" ✅ 找到 usage 信息: {data['usage']}") except json.JSONDecodeError as e: print(f" ❌ JSON 解析错误: {e}") # 验证结果 success = has_content and has_final_chunk and has_done_marker print(f"\n📋 验证结果:") print(f" 包含回答内容: {'✅' if has_content else '❌'}") print(f" 包含最终完成块: {'✅' if has_final_chunk else '❌'}") print(f" 包含 [DONE] 标记: {'✅' if has_done_marker else '❌'}") print(f" 包含 usage 信息: {'✅' if has_usage else '❌'}") if success: print("\n✅ done 阶段处理测试通过!") return True else: print("\n❌ done 阶段处理测试失败!") return False def test_done_phase_with_tool_call(): """测试带工具调用的 done 阶段处理""" handler = SSEToolHandler("test-model", stream=True) print("🧪 测试带工具调用的 done 阶段处理\n") # 模拟工具调用 + 回答 + 完成的流程 test_chunks = [ # 工具调用开始 { "phase": "tool_call", "edit_content": '{"type": "mcp", "data": {"metadata": {"id": "call_test", "name": "test_tool", "arguments": "{}", "result": "", "display_result": "", "duration": "...", "status": "completed", "is_error": false, "mcp_server": {"name": "mcp-server"}}, "thought": null, "ppt": null, "browser": null}}', "edit_index": 100 }, # 工具调用结束 { "phase": "other", "edit_content": 'null, "display_result": "", "duration": "...", "status": "completed", "is_error": false, "mcp_server": {"name": "mcp-server"}}, "thought": null, "ppt": null, "browser": null}}', "edit_index": 200 }, # 回答阶段 { "phase": "answer", "delta_content": "工具调用完成,这是回答。", "edit_content": "" }, # 完成阶段 { "phase": "done", "done": True, "delta_content": "" } ] output_chunks = [] for i, chunk in enumerate(test_chunks, 1): print(f"处理块 {i}: phase={chunk['phase']}") results = list(handler.process_sse_chunk(chunk)) output_chunks.extend(results) print(f" 输出数量: {len(results)}") print() # 检查是否有工具调用、回答内容和完成标记 has_tool_call = any("tool_calls" in output for output in output_chunks) has_answer_content = any("工具调用完成" in output for output in output_chunks) has_done_marker = any(output.strip() == "data: [DONE]" for output in output_chunks) print(f"📊 混合流程测试结果:") print(f" 包含工具调用: {'✅' if has_tool_call else '❌'}") print(f" 包含回答内容: {'✅' if has_answer_content else '❌'}") print(f" 包含 [DONE] 标记: {'✅' if has_done_marker else '❌'}") success = has_tool_call and has_answer_content and has_done_marker if success: print("\n✅ 混合流程 done 阶段测试通过!") return True else: print("\n❌ 混合流程 done 阶段测试失败!") return False def test_done_phase_warning_fix(): """测试 done 阶段不再产生警告""" handler = SSEToolHandler("test-model", stream=True) print("🧪 测试 done 阶段警告修复\n") # 模拟 done 阶段 chunk = { "phase": "done", "done": True, "delta_content": "" } print("处理 done 阶段块...") # 捕获日志输出(这里我们主要检查是否有异常) try: results = list(handler.process_sse_chunk(chunk)) print(f" 成功处理,输出 {len(results)} 个块") # 检查是否有 [DONE] 标记 has_done = any(output.strip() == "data: [DONE]" for output in results) print(f" 包含 [DONE] 标记: {'✅' if has_done else '❌'}") print("\n✅ done 阶段不再产生警告!") return True except Exception as e: print(f"\n❌ 处理 done 阶段时出错: {e}") return False if __name__ == "__main__": print("🔧 测试 done 阶段处理\n") test1_success = test_done_phase_handling() print("\n" + "="*50 + "\n") test2_success = test_done_phase_with_tool_call() print("\n" + "="*50 + "\n") test3_success = test_done_phase_warning_fix() print("\n" + "="*50) print("🎯 总结:") print(f" done 阶段基本处理: {'✅ 通过' if test1_success else '❌ 失败'}") print(f" done 阶段混合流程: {'✅ 通过' if test2_success else '❌ 失败'}") print(f" done 阶段警告修复: {'✅ 通过' if test3_success else '❌ 失败'}") if test1_success and test2_success and test3_success: print("\n🎉 所有测试通过!done 阶段处理完善!") print("\n💡 修复效果:") print(" - 不再显示 '未知的 SSE 阶段: done' 警告") print(" - 正确处理对话完成流程") print(" - 自动刷新缓冲区和完成工具调用") print(" - 发送标准的 OpenAI 完成标记") else: print("\n❌ 部分测试失败,需要进一步调试")