Spaces:
Sleeping
Sleeping
| import asyncio | |
| import sys | |
| import os | |
| import io | |
| import json | |
| from unittest.mock import MagicMock, patch, AsyncMock | |
| from datetime import datetime | |
| # Add project root to path | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) | |
| from backend.agent.state import create_initial_state, AgentState | |
| from backend.agent.nodes import planner_node, parallel_executor_node, synthetic_agent_node, ocr_agent_node | |
| from langchain_core.messages import AIMessage, HumanMessage | |
| # Color codes for output | |
| GREEN = "\033[92m" | |
| RED = "\033[91m" | |
| RESET = "\033[0m" | |
| YELLOW = "\033[93m" | |
| def log(msg, color=RESET): | |
| print(f"{color}{msg}{RESET}") | |
| async def run_scenario_a_happy_path(): | |
| log("\n📌 SCENARIO A: Happy Path (Direct + Wolfram + Code)", YELLOW) | |
| state = create_initial_state(session_id="test_happy") | |
| state["ocr_text"] = "Mock Input" | |
| # 1. Planner | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm = MagicMock() | |
| async def mock_plan(*args, **kwargs): | |
| return AIMessage(content=""" | |
| ```json | |
| { | |
| "questions": [ | |
| {"id": 1, "type": "direct", "content": "Q1", "tool_input": null}, | |
| {"id": 2, "type": "wolfram", "content": "Q2", "tool_input": "W2"}, | |
| {"id": 3, "type": "code", "content": "Q3", "tool_input": "C3"} | |
| ] | |
| } | |
| ``` | |
| """) | |
| mock_llm.ainvoke.side_effect = mock_plan | |
| mock_get_model.return_value = mock_llm | |
| state = await planner_node(state) | |
| if state["current_agent"] != "executor": | |
| log("❌ Planner failed to route to executor", RED) | |
| return False | |
| # 2. Executor | |
| with patch("backend.agent.nodes.get_model") as mock_get_model, \ | |
| patch("backend.agent.nodes.query_wolfram_alpha") as mock_wolfram, \ | |
| patch("backend.tools.code_executor.CodeTool.execute", new_callable=AsyncMock) as mock_code: | |
| # Mocks | |
| mock_get_model.return_value.ainvoke = AsyncMock(return_value=AIMessage(content="Direct Answer")) # For Direct | |
| mock_wolfram.return_value = (True, "Wolfram Answer") # (Success, Result) | |
| mock_code.return_value = {"success": True, "output": "Code Answer"} # Code Tool | |
| # We also need to mock LLM for Code Generation (CodeTool logic uses LLM to generate code first) | |
| # But wait, nodes.py calls get_model("qwen") for code gen. | |
| # We can just mock execute_single_question internal logic OR mocks get_model to handle both. | |
| # Let's mock get_model to return different mocks based on call? | |
| # Easier: The executor calls get_model multiple times. | |
| # Let's relax the test to just verifying the parallel logic by mocking at a higher level if needed, | |
| # but here we can rely on side_effect. | |
| async def llm_side_effect(*args, **kwargs): | |
| # args[0] is list of messages. Check content to distinguish. | |
| msgs = args[0] | |
| content = msgs[0].content if msgs else "" | |
| if "CODEGEN_PROMPT" in str(content) or "Visualize" in str(content) or "code" in str(content): | |
| return AIMessage(content="```python\nprint('Code Answer')\n```") | |
| return AIMessage(content="Direct Answer") | |
| mock_llm_exec = MagicMock() | |
| mock_llm_exec.ainvoke.side_effect = llm_side_effect | |
| mock_get_model.return_value = mock_llm_exec | |
| state = await parallel_executor_node(state) | |
| results = state.get("question_results", []) | |
| if len(results) != 3: | |
| log(f"❌ Expected 3 results, got {len(results)}", RED) | |
| return False | |
| # Check results | |
| r1 = next(r for r in results if r["type"] == "direct") | |
| r2 = next(r for r in results if r["type"] == "wolfram") | |
| r3 = next(r for r in results if r["type"] == "code") | |
| if r1["result"] == "Direct Answer" and r2["result"] == "Wolfram Answer" and r3["result"] == "Code Answer": | |
| log("✅ Executor produced correct results", GREEN) | |
| else: | |
| log(f"❌ Results mismatch: {results}", RED) | |
| return False | |
| # 3. Synthesizer | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm_synth = MagicMock() | |
| mock_llm_synth.ainvoke = AsyncMock(return_value=AIMessage(content="## Bài 1...\n## Bài 2...\n## Bài 3...")) | |
| mock_get_model.return_value = mock_llm_synth | |
| state = await synthetic_agent_node(state) | |
| if "## Bài 1" in state["final_response"]: | |
| log("✅ Synthesis successful", GREEN) | |
| return True | |
| return False | |
| async def run_scenario_b_partial_failure(): | |
| log("\n📌 SCENARIO B: Partial Failure (Rate Limit)", YELLOW) | |
| state = create_initial_state(session_id="test_partial") | |
| state["execution_plan"] = { | |
| "questions": [ | |
| {"id": 1, "type": "direct", "content": "Q1"}, | |
| {"id": 2, "type": "wolfram", "content": "Q2"} | |
| ] | |
| } | |
| with patch("backend.agent.nodes.get_model") as mock_get_model, \ | |
| patch("backend.agent.nodes.model_manager.check_rate_limit") as mock_rate_limit: | |
| mock_llm = MagicMock() | |
| mock_llm.ainvoke = AsyncMock(return_value=AIMessage(content="OK")) | |
| mock_get_model.return_value = mock_llm | |
| # Rate limit side effect: Allow Kimi (Direct), Block Wolfram | |
| def rl_side_effect(model_id): | |
| if "wolfram" in model_id: | |
| return False, "Over Quota" | |
| return True, None | |
| mock_rate_limit.side_effect = rl_side_effect | |
| state = await parallel_executor_node(state) | |
| results = state["question_results"] | |
| q1 = results[0] | |
| q2 = results[1] | |
| if q1.get("result") == "OK" and q2.get("error") and "Rate limit" in q2["error"]: | |
| log("✅ Partial failure handled correctly", GREEN) | |
| return True | |
| else: | |
| log(f"❌ Failed: {results}", RED) | |
| return False | |
| async def run_scenario_c_planner_optimization(): | |
| log("\n📌 SCENARIO C: Planner Optimization (All Direct)", YELLOW) | |
| state = create_initial_state(session_id="test_opt") | |
| state["messages"] = [HumanMessage(content="Hello")] | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm = MagicMock() | |
| # Planner returns all direct questions | |
| async def mock_plan(*args, **kwargs): | |
| return AIMessage(content='```json\n{"questions": [{"id": 1, "type": "direct"}]}\n```') | |
| mock_llm.ainvoke.side_effect = mock_plan | |
| mock_get_model.return_value = mock_llm | |
| state = await planner_node(state) | |
| if state["current_agent"] == "reasoning": | |
| log("✅ Optimized route: Planner -> Reasoning (Skipped Executor)", GREEN) | |
| return True | |
| else: | |
| log(f"❌ Failed optimization. Agent is: {state['current_agent']}", RED) | |
| return False | |
| async def run_scenario_d_image_processing(): | |
| log("\n📌 SCENARIO D: Multi-Image Processing", YELLOW) | |
| state = create_initial_state(session_id="test_img") | |
| # Simulate 2 images strings | |
| state["image_data_list"] = ["base64_img1", "base64_img2"] | |
| # Mock LLM within OCR Node | |
| # Mock LLM within OCR Node | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm = MagicMock() | |
| # Mock OCR response for parallel calls | |
| async def ocr_response(*args, **kwargs): | |
| return AIMessage(content="Recognized Text") | |
| mock_llm.ainvoke.side_effect = ocr_response | |
| mock_get_model.return_value = mock_llm | |
| state = await ocr_agent_node(state) | |
| ocr_res = state.get("ocr_results", []) | |
| # Check if OCR text contains result (it should be concatenated) | |
| if "Recognized Text" in state.get("ocr_text", ""): | |
| log("✅ Processed images in parallel via LLM Mock", GREEN) | |
| return True | |
| else: | |
| log("❌ Image processing failed", RED) | |
| return False | |
| async def run_scenario_e_planner_failure(): | |
| log("\n📌 SCENARIO E: Planner JSON Error (Recovery)", YELLOW) | |
| log(" [Input]: User says 'Complex math'", RESET) | |
| state = create_initial_state(session_id="test_fail_json") | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm = MagicMock() | |
| # Planner returns BROKEN JSON | |
| async def mock_bad_plan(*args, **kwargs): | |
| return AIMessage(content='```json\n{ "questions": [INVALID_JSON... \n```') | |
| mock_llm.ainvoke.side_effect = mock_bad_plan | |
| mock_get_model.return_value = mock_llm | |
| state = await planner_node(state) | |
| log(f" [Output Agent]: {state['current_agent']}", RESET) | |
| if state["current_agent"] == "reasoning": | |
| log("✅ System recovered from bad JSON -> Fallback to Reasoning", GREEN) | |
| return True | |
| else: | |
| log(f"❌ Failed to recover. Current agent: {state['current_agent']}", RED) | |
| return False | |
| async def run_scenario_f_unknown_tool(): | |
| log("\n📌 SCENARIO F: Unknown Tool in Plan (Hallucination)", YELLOW) | |
| state = create_initial_state(session_id="test_unknown") | |
| state["execution_plan"] = { | |
| "questions": [ | |
| {"id": 1, "type": "magic_wand", "content": "Do magic", "tool_input": "abracadabra"} | |
| ] | |
| } | |
| # We don't need to mock tools deeply here, just ensure executor doesn't crash | |
| # and marks it as error or handles it | |
| state = await parallel_executor_node(state) | |
| results = state.get("question_results", []) | |
| if not results: | |
| log("❌ No results generated", RED) | |
| return False | |
| res = results[0] | |
| log(f" [Result]: Type={res['type']}, Error={res.get('error')}, Result={res.get('result')}", RESET) | |
| # Depending on implementation, it might default to 'direct' or 'kimi-k2' logic OR return error. | |
| # Looking at parallel_executor_node code: | |
| # else: # direct ... llm = get_model("kimi-k2") | |
| # So unknown types fall through to "Direct" (Kimi). This is a features, not a bug (Panic fallback). | |
| # Wait, my parallel_executor_node code: | |
| # if q_type == "wolfram": ... | |
| # elif q_type == "code": ... | |
| # else: # direct | |
| # So "magic_wand" falls to "direct" -> calls Kimi. | |
| if res['type'] == 'magic_wand' and res.get("result") is not None: | |
| # It tried to solve it with Kimi (Direct fallback) | |
| log("✅ Unknown tool fell back to Direct LLM (Resilience)", GREEN) | |
| return True | |
| elif res.get("error"): | |
| log("✅ Unknown tool reported error", GREEN) | |
| return True | |
| return False | |
| async def run_scenario_g_executor_direct_failure(): | |
| log("\n📌 SCENARIO G: Executor Direct Tool Failure", YELLOW) | |
| state = create_initial_state(session_id="test_g") | |
| state["execution_plan"] = {"questions": [{"id": 1, "type": "direct", "content": "Fail me"}]} | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm = MagicMock() | |
| mock_llm.ainvoke.side_effect = Exception("API 500 Error") | |
| mock_get_model.return_value = mock_llm | |
| state = await parallel_executor_node(state) | |
| res = state["question_results"][0] | |
| if res["error"] and "API 500 Error" in res["error"]: | |
| log("✅ Direct tool failure handled gracefully (Error captured)", GREEN) | |
| return True | |
| return False | |
| async def run_scenario_h_synthesizer_failure(): | |
| log("\n📌 SCENARIO H: Synthesizer Failure (Fallback)", YELLOW) | |
| state = create_initial_state(session_id="test_h") | |
| state["question_results"] = [{"id": 1, "content": "Q", "result": "A"}] | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm = MagicMock() | |
| mock_llm.ainvoke.side_effect = Exception("Synth Busy") | |
| mock_get_model.return_value = mock_llm | |
| # Should fallback to manual concatenation | |
| state = await synthetic_agent_node(state) | |
| if "Lỗi khi tổng hợp" in state["final_response"] and "Kết quả gốc" in state["final_response"]: | |
| log("✅ Synthesizer failed but returned raw results (Fallback)", GREEN) | |
| return True | |
| return False | |
| async def run_scenario_i_empty_plan(): | |
| log("\n📌 SCENARIO I: Empty Plan (Zero Questions)", YELLOW) | |
| state = create_initial_state(session_id="test_i") | |
| with patch("backend.agent.nodes.get_model") as mock_get_model: | |
| mock_llm = MagicMock() | |
| # Planner returns valid JSON but empty list | |
| async def mock_clean_plan(*args, **kwargs): | |
| return AIMessage(content='```json\n{"questions": []}\n```') | |
| mock_llm.ainvoke.side_effect = mock_clean_plan | |
| mock_get_model.return_value = mock_llm | |
| state = await planner_node(state) | |
| if state["current_agent"] == "reasoning": | |
| log("✅ Empty plan redirected to Reasoning Agent", GREEN) | |
| return True | |
| return False | |
| async def main(): | |
| log("🚀 STARTING ULTIMATE TEST SUITE (9 SCENARIOS)...\n") | |
| results = [] | |
| results.append(await run_scenario_a_happy_path()) | |
| results.append(await run_scenario_b_partial_failure()) | |
| results.append(await run_scenario_c_planner_optimization()) | |
| results.append(await run_scenario_d_image_processing()) | |
| results.append(await run_scenario_e_planner_failure()) | |
| results.append(await run_scenario_f_unknown_tool()) | |
| results.append(await run_scenario_g_executor_direct_failure()) | |
| results.append(await run_scenario_h_synthesizer_failure()) | |
| results.append(await run_scenario_i_empty_plan()) | |
| print("\n" + "="*40) | |
| if all(results): | |
| log("🎉 ALL 9 SCENARIOS PASSED!", GREEN) | |
| exit(0) | |
| else: | |
| log("💥 SOME TESTS FAILED!", RED) | |
| exit(1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |