| |
| """ |
| 测试 done 阶段处理 |
| """ |
|
|
| import sys |
| import os |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from app.utils.sse_tool_handler import SSEToolHandler |
| import json |
|
|
| def test_done_phase_handling(): |
| """测试 done 阶段的处理""" |
| |
| handler = SSEToolHandler("test-model", stream=True) |
| |
| print("🧪 测试 done 阶段处理\n") |
| |
| |
| test_chunks = [ |
| |
| { |
| "phase": "answer", |
| "delta_content": "这是回答内容", |
| "edit_content": "" |
| }, |
| |
| { |
| "phase": "done", |
| "done": True, |
| "delta_content": "", |
| "usage": { |
| "prompt_tokens": 100, |
| "completion_tokens": 50, |
| "total_tokens": 150 |
| } |
| } |
| ] |
| |
| output_chunks = [] |
| |
| for i, chunk in enumerate(test_chunks, 1): |
| print(f"处理块 {i}: phase={chunk['phase']}") |
| |
| results = list(handler.process_sse_chunk(chunk)) |
| output_chunks.extend(results) |
| |
| print(f" 输出数量: {len(results)}") |
| for j, result in enumerate(results): |
| if result.strip() == "data: [DONE]": |
| print(f" 输出 {j+1}: [DONE] 标记") |
| else: |
| print(f" 输出 {j+1}: {result[:80]}{'...' if len(result) > 80 else ''}") |
| print() |
| |
| print(f"📊 测试结果:") |
| print(f" 总输出块数量: {len(output_chunks)}") |
| |
| |
| has_content = False |
| has_final_chunk = False |
| has_done_marker = False |
| has_usage = False |
| |
| for output in output_chunks: |
| if output.startswith("data: "): |
| json_str = output[6:].strip() |
| if json_str == "[DONE]": |
| has_done_marker = True |
| print(" ✅ 找到 [DONE] 标记") |
| elif json_str: |
| try: |
| data = json.loads(json_str) |
| if "choices" in data and data["choices"]: |
| delta = data["choices"][0].get("delta", {}) |
| content = delta.get("content", "") |
| finish_reason = data["choices"][0].get("finish_reason") |
| |
| if content: |
| has_content = True |
| print(f" ✅ 找到内容: '{content}'") |
| |
| if finish_reason == "stop": |
| has_final_chunk = True |
| print(" ✅ 找到最终完成块") |
| |
| if "usage" in data: |
| has_usage = True |
| print(f" ✅ 找到 usage 信息: {data['usage']}") |
| |
| except json.JSONDecodeError as e: |
| print(f" ❌ JSON 解析错误: {e}") |
| |
| |
| success = has_content and has_final_chunk and has_done_marker |
| |
| print(f"\n📋 验证结果:") |
| print(f" 包含回答内容: {'✅' if has_content else '❌'}") |
| print(f" 包含最终完成块: {'✅' if has_final_chunk else '❌'}") |
| print(f" 包含 [DONE] 标记: {'✅' if has_done_marker else '❌'}") |
| print(f" 包含 usage 信息: {'✅' if has_usage else '❌'}") |
| |
| if success: |
| print("\n✅ done 阶段处理测试通过!") |
| return True |
| else: |
| print("\n❌ done 阶段处理测试失败!") |
| return False |
|
|
| def test_done_phase_with_tool_call(): |
| """测试带工具调用的 done 阶段处理""" |
| |
| handler = SSEToolHandler("test-model", stream=True) |
| |
| print("🧪 测试带工具调用的 done 阶段处理\n") |
| |
| |
| test_chunks = [ |
| |
| { |
| "phase": "tool_call", |
| "edit_content": '<glm_block view="">{"type": "mcp", "data": {"metadata": {"id": "call_test", "name": "test_tool", "arguments": "{}", "result": "", "display_result": "", "duration": "...", "status": "completed", "is_error": false, "mcp_server": {"name": "mcp-server"}}, "thought": null, "ppt": null, "browser": null}}</glm_block>', |
| "edit_index": 100 |
| }, |
| |
| { |
| "phase": "other", |
| "edit_content": 'null, "display_result": "", "duration": "...", "status": "completed", "is_error": false, "mcp_server": {"name": "mcp-server"}}, "thought": null, "ppt": null, "browser": null}}</glm_block>', |
| "edit_index": 200 |
| }, |
| |
| { |
| "phase": "answer", |
| "delta_content": "工具调用完成,这是回答。", |
| "edit_content": "" |
| }, |
| |
| { |
| "phase": "done", |
| "done": True, |
| "delta_content": "" |
| } |
| ] |
| |
| output_chunks = [] |
| |
| for i, chunk in enumerate(test_chunks, 1): |
| print(f"处理块 {i}: phase={chunk['phase']}") |
| |
| results = list(handler.process_sse_chunk(chunk)) |
| output_chunks.extend(results) |
| |
| print(f" 输出数量: {len(results)}") |
| print() |
| |
| |
| has_tool_call = any("tool_calls" in output for output in output_chunks) |
| has_answer_content = any("工具调用完成" in output for output in output_chunks) |
| has_done_marker = any(output.strip() == "data: [DONE]" for output in output_chunks) |
| |
| print(f"📊 混合流程测试结果:") |
| print(f" 包含工具调用: {'✅' if has_tool_call else '❌'}") |
| print(f" 包含回答内容: {'✅' if has_answer_content else '❌'}") |
| print(f" 包含 [DONE] 标记: {'✅' if has_done_marker else '❌'}") |
| |
| success = has_tool_call and has_answer_content and has_done_marker |
| |
| if success: |
| print("\n✅ 混合流程 done 阶段测试通过!") |
| return True |
| else: |
| print("\n❌ 混合流程 done 阶段测试失败!") |
| return False |
|
|
| def test_done_phase_warning_fix(): |
| """测试 done 阶段不再产生警告""" |
| |
| handler = SSEToolHandler("test-model", stream=True) |
| |
| print("🧪 测试 done 阶段警告修复\n") |
| |
| |
| chunk = { |
| "phase": "done", |
| "done": True, |
| "delta_content": "" |
| } |
| |
| print("处理 done 阶段块...") |
| |
| |
| try: |
| results = list(handler.process_sse_chunk(chunk)) |
| print(f" 成功处理,输出 {len(results)} 个块") |
| |
| |
| has_done = any(output.strip() == "data: [DONE]" for output in results) |
| print(f" 包含 [DONE] 标记: {'✅' if has_done else '❌'}") |
| |
| print("\n✅ done 阶段不再产生警告!") |
| return True |
| |
| except Exception as e: |
| print(f"\n❌ 处理 done 阶段时出错: {e}") |
| return False |
|
|
| if __name__ == "__main__": |
| print("🔧 测试 done 阶段处理\n") |
| |
| test1_success = test_done_phase_handling() |
| print("\n" + "="*50 + "\n") |
| test2_success = test_done_phase_with_tool_call() |
| print("\n" + "="*50 + "\n") |
| test3_success = test_done_phase_warning_fix() |
| |
| print("\n" + "="*50) |
| print("🎯 总结:") |
| print(f" done 阶段基本处理: {'✅ 通过' if test1_success else '❌ 失败'}") |
| print(f" done 阶段混合流程: {'✅ 通过' if test2_success else '❌ 失败'}") |
| print(f" done 阶段警告修复: {'✅ 通过' if test3_success else '❌ 失败'}") |
| |
| if test1_success and test2_success and test3_success: |
| print("\n🎉 所有测试通过!done 阶段处理完善!") |
| print("\n💡 修复效果:") |
| print(" - 不再显示 '未知的 SSE 阶段: done' 警告") |
| print(" - 正确处理对话完成流程") |
| print(" - 自动刷新缓冲区和完成工具调用") |
| print(" - 发送标准的 OpenAI 完成标记") |
| else: |
| print("\n❌ 部分测试失败,需要进一步调试") |
|
|