import asyncio import sys import os import io import json from unittest.mock import MagicMock, patch, AsyncMock from datetime import datetime # Add project root to path sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from backend.agent.state import create_initial_state, AgentState from backend.agent.nodes import planner_node, parallel_executor_node, synthetic_agent_node, ocr_agent_node from langchain_core.messages import AIMessage, HumanMessage # Color codes for output GREEN = "\033[92m" RED = "\033[91m" RESET = "\033[0m" YELLOW = "\033[93m" def log(msg, color=RESET): print(f"{color}{msg}{RESET}") async def run_scenario_a_happy_path(): log("\nπŸ“Œ SCENARIO A: Happy Path (Direct + Wolfram + Code)", YELLOW) state = create_initial_state(session_id="test_happy") state["ocr_text"] = "Mock Input" # 1. Planner with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm = MagicMock() async def mock_plan(*args, **kwargs): return AIMessage(content=""" ```json { "questions": [ {"id": 1, "type": "direct", "content": "Q1", "tool_input": null}, {"id": 2, "type": "wolfram", "content": "Q2", "tool_input": "W2"}, {"id": 3, "type": "code", "content": "Q3", "tool_input": "C3"} ] } ``` """) mock_llm.ainvoke.side_effect = mock_plan mock_get_model.return_value = mock_llm state = await planner_node(state) if state["current_agent"] != "executor": log("❌ Planner failed to route to executor", RED) return False # 2. Executor with patch("backend.agent.nodes.get_model") as mock_get_model, \ patch("backend.agent.nodes.query_wolfram_alpha") as mock_wolfram, \ patch("backend.tools.code_executor.CodeTool.execute", new_callable=AsyncMock) as mock_code: # Mocks mock_get_model.return_value.ainvoke = AsyncMock(return_value=AIMessage(content="Direct Answer")) # For Direct mock_wolfram.return_value = (True, "Wolfram Answer") # (Success, Result) mock_code.return_value = {"success": True, "output": "Code Answer"} # Code Tool # We also need to mock LLM for Code Generation (CodeTool logic uses LLM to generate code first) # But wait, nodes.py calls get_model("qwen") for code gen. # We can just mock execute_single_question internal logic OR mocks get_model to handle both. # Let's mock get_model to return different mocks based on call? # Easier: The executor calls get_model multiple times. # Let's relax the test to just verifying the parallel logic by mocking at a higher level if needed, # but here we can rely on side_effect. async def llm_side_effect(*args, **kwargs): # args[0] is list of messages. Check content to distinguish. msgs = args[0] content = msgs[0].content if msgs else "" if "CODEGEN_PROMPT" in str(content) or "Visualize" in str(content) or "code" in str(content): return AIMessage(content="```python\nprint('Code Answer')\n```") return AIMessage(content="Direct Answer") mock_llm_exec = MagicMock() mock_llm_exec.ainvoke.side_effect = llm_side_effect mock_get_model.return_value = mock_llm_exec state = await parallel_executor_node(state) results = state.get("question_results", []) if len(results) != 3: log(f"❌ Expected 3 results, got {len(results)}", RED) return False # Check results r1 = next(r for r in results if r["type"] == "direct") r2 = next(r for r in results if r["type"] == "wolfram") r3 = next(r for r in results if r["type"] == "code") if r1["result"] == "Direct Answer" and r2["result"] == "Wolfram Answer" and r3["result"] == "Code Answer": log("βœ… Executor produced correct results", GREEN) else: log(f"❌ Results mismatch: {results}", RED) return False # 3. Synthesizer with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm_synth = MagicMock() mock_llm_synth.ainvoke = AsyncMock(return_value=AIMessage(content="## BΓ i 1...\n## BΓ i 2...\n## BΓ i 3...")) mock_get_model.return_value = mock_llm_synth state = await synthetic_agent_node(state) if "## BΓ i 1" in state["final_response"]: log("βœ… Synthesis successful", GREEN) return True return False async def run_scenario_b_partial_failure(): log("\nπŸ“Œ SCENARIO B: Partial Failure (Rate Limit)", YELLOW) state = create_initial_state(session_id="test_partial") state["execution_plan"] = { "questions": [ {"id": 1, "type": "direct", "content": "Q1"}, {"id": 2, "type": "wolfram", "content": "Q2"} ] } with patch("backend.agent.nodes.get_model") as mock_get_model, \ patch("backend.agent.nodes.model_manager.check_rate_limit") as mock_rate_limit: mock_llm = MagicMock() mock_llm.ainvoke = AsyncMock(return_value=AIMessage(content="OK")) mock_get_model.return_value = mock_llm # Rate limit side effect: Allow Kimi (Direct), Block Wolfram def rl_side_effect(model_id): if "wolfram" in model_id: return False, "Over Quota" return True, None mock_rate_limit.side_effect = rl_side_effect state = await parallel_executor_node(state) results = state["question_results"] q1 = results[0] q2 = results[1] if q1.get("result") == "OK" and q2.get("error") and "Rate limit" in q2["error"]: log("βœ… Partial failure handled correctly", GREEN) return True else: log(f"❌ Failed: {results}", RED) return False async def run_scenario_c_planner_optimization(): log("\nπŸ“Œ SCENARIO C: Planner Optimization (All Direct)", YELLOW) state = create_initial_state(session_id="test_opt") state["messages"] = [HumanMessage(content="Hello")] with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm = MagicMock() # Planner returns all direct questions async def mock_plan(*args, **kwargs): return AIMessage(content='```json\n{"questions": [{"id": 1, "type": "direct"}]}\n```') mock_llm.ainvoke.side_effect = mock_plan mock_get_model.return_value = mock_llm state = await planner_node(state) if state["current_agent"] == "reasoning": log("βœ… Optimized route: Planner -> Reasoning (Skipped Executor)", GREEN) return True else: log(f"❌ Failed optimization. Agent is: {state['current_agent']}", RED) return False async def run_scenario_d_image_processing(): log("\nπŸ“Œ SCENARIO D: Multi-Image Processing", YELLOW) state = create_initial_state(session_id="test_img") # Simulate 2 images strings state["image_data_list"] = ["base64_img1", "base64_img2"] # Mock LLM within OCR Node # Mock LLM within OCR Node with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm = MagicMock() # Mock OCR response for parallel calls async def ocr_response(*args, **kwargs): return AIMessage(content="Recognized Text") mock_llm.ainvoke.side_effect = ocr_response mock_get_model.return_value = mock_llm state = await ocr_agent_node(state) ocr_res = state.get("ocr_results", []) # Check if OCR text contains result (it should be concatenated) if "Recognized Text" in state.get("ocr_text", ""): log("βœ… Processed images in parallel via LLM Mock", GREEN) return True else: log("❌ Image processing failed", RED) return False async def run_scenario_e_planner_failure(): log("\nπŸ“Œ SCENARIO E: Planner JSON Error (Recovery)", YELLOW) log(" [Input]: User says 'Complex math'", RESET) state = create_initial_state(session_id="test_fail_json") with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm = MagicMock() # Planner returns BROKEN JSON async def mock_bad_plan(*args, **kwargs): return AIMessage(content='```json\n{ "questions": [INVALID_JSON... \n```') mock_llm.ainvoke.side_effect = mock_bad_plan mock_get_model.return_value = mock_llm state = await planner_node(state) log(f" [Output Agent]: {state['current_agent']}", RESET) if state["current_agent"] == "reasoning": log("βœ… System recovered from bad JSON -> Fallback to Reasoning", GREEN) return True else: log(f"❌ Failed to recover. Current agent: {state['current_agent']}", RED) return False async def run_scenario_f_unknown_tool(): log("\nπŸ“Œ SCENARIO F: Unknown Tool in Plan (Hallucination)", YELLOW) state = create_initial_state(session_id="test_unknown") state["execution_plan"] = { "questions": [ {"id": 1, "type": "magic_wand", "content": "Do magic", "tool_input": "abracadabra"} ] } # We don't need to mock tools deeply here, just ensure executor doesn't crash # and marks it as error or handles it state = await parallel_executor_node(state) results = state.get("question_results", []) if not results: log("❌ No results generated", RED) return False res = results[0] log(f" [Result]: Type={res['type']}, Error={res.get('error')}, Result={res.get('result')}", RESET) # Depending on implementation, it might default to 'direct' or 'kimi-k2' logic OR return error. # Looking at parallel_executor_node code: # else: # direct ... llm = get_model("kimi-k2") # So unknown types fall through to "Direct" (Kimi). This is a features, not a bug (Panic fallback). # Wait, my parallel_executor_node code: # if q_type == "wolfram": ... # elif q_type == "code": ... # else: # direct # So "magic_wand" falls to "direct" -> calls Kimi. if res['type'] == 'magic_wand' and res.get("result") is not None: # It tried to solve it with Kimi (Direct fallback) log("βœ… Unknown tool fell back to Direct LLM (Resilience)", GREEN) return True elif res.get("error"): log("βœ… Unknown tool reported error", GREEN) return True return False async def run_scenario_g_executor_direct_failure(): log("\nπŸ“Œ SCENARIO G: Executor Direct Tool Failure", YELLOW) state = create_initial_state(session_id="test_g") state["execution_plan"] = {"questions": [{"id": 1, "type": "direct", "content": "Fail me"}]} with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm = MagicMock() mock_llm.ainvoke.side_effect = Exception("API 500 Error") mock_get_model.return_value = mock_llm state = await parallel_executor_node(state) res = state["question_results"][0] if res["error"] and "API 500 Error" in res["error"]: log("βœ… Direct tool failure handled gracefully (Error captured)", GREEN) return True return False async def run_scenario_h_synthesizer_failure(): log("\nπŸ“Œ SCENARIO H: Synthesizer Failure (Fallback)", YELLOW) state = create_initial_state(session_id="test_h") state["question_results"] = [{"id": 1, "content": "Q", "result": "A"}] with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm = MagicMock() mock_llm.ainvoke.side_effect = Exception("Synth Busy") mock_get_model.return_value = mock_llm # Should fallback to manual concatenation state = await synthetic_agent_node(state) if "Lα»—i khi tα»•ng hợp" in state["final_response"] and "KαΊΏt quαΊ£ gα»‘c" in state["final_response"]: log("βœ… Synthesizer failed but returned raw results (Fallback)", GREEN) return True return False async def run_scenario_i_empty_plan(): log("\nπŸ“Œ SCENARIO I: Empty Plan (Zero Questions)", YELLOW) state = create_initial_state(session_id="test_i") with patch("backend.agent.nodes.get_model") as mock_get_model: mock_llm = MagicMock() # Planner returns valid JSON but empty list async def mock_clean_plan(*args, **kwargs): return AIMessage(content='```json\n{"questions": []}\n```') mock_llm.ainvoke.side_effect = mock_clean_plan mock_get_model.return_value = mock_llm state = await planner_node(state) if state["current_agent"] == "reasoning": log("βœ… Empty plan redirected to Reasoning Agent", GREEN) return True return False async def main(): log("πŸš€ STARTING ULTIMATE TEST SUITE (9 SCENARIOS)...\n") results = [] results.append(await run_scenario_a_happy_path()) results.append(await run_scenario_b_partial_failure()) results.append(await run_scenario_c_planner_optimization()) results.append(await run_scenario_d_image_processing()) results.append(await run_scenario_e_planner_failure()) results.append(await run_scenario_f_unknown_tool()) results.append(await run_scenario_g_executor_direct_failure()) results.append(await run_scenario_h_synthesizer_failure()) results.append(await run_scenario_i_empty_plan()) print("\n" + "="*40) if all(results): log("πŸŽ‰ ALL 9 SCENARIOS PASSED!", GREEN) exit(0) else: log("πŸ’₯ SOME TESTS FAILED!", RED) exit(1) if __name__ == "__main__": asyncio.run(main())