calculus-agent / backend /tests /test_comprehensive.py
Đỗ Hải Nam
feat(backend): core multi-agent orchestration and API
ba5110e
import asyncio
import sys
import os
import io
import json
from unittest.mock import MagicMock, patch, AsyncMock
from datetime import datetime
# Add project root to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from backend.agent.state import create_initial_state, AgentState
from backend.agent.nodes import planner_node, parallel_executor_node, synthetic_agent_node, ocr_agent_node
from langchain_core.messages import AIMessage, HumanMessage
# Color codes for output
GREEN = "\033[92m"
RED = "\033[91m"
RESET = "\033[0m"
YELLOW = "\033[93m"
def log(msg, color=RESET):
print(f"{color}{msg}{RESET}")
async def run_scenario_a_happy_path():
log("\n📌 SCENARIO A: Happy Path (Direct + Wolfram + Code)", YELLOW)
state = create_initial_state(session_id="test_happy")
state["ocr_text"] = "Mock Input"
# 1. Planner
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm = MagicMock()
async def mock_plan(*args, **kwargs):
return AIMessage(content="""
```json
{
"questions": [
{"id": 1, "type": "direct", "content": "Q1", "tool_input": null},
{"id": 2, "type": "wolfram", "content": "Q2", "tool_input": "W2"},
{"id": 3, "type": "code", "content": "Q3", "tool_input": "C3"}
]
}
```
""")
mock_llm.ainvoke.side_effect = mock_plan
mock_get_model.return_value = mock_llm
state = await planner_node(state)
if state["current_agent"] != "executor":
log("❌ Planner failed to route to executor", RED)
return False
# 2. Executor
with patch("backend.agent.nodes.get_model") as mock_get_model, \
patch("backend.agent.nodes.query_wolfram_alpha") as mock_wolfram, \
patch("backend.tools.code_executor.CodeTool.execute", new_callable=AsyncMock) as mock_code:
# Mocks
mock_get_model.return_value.ainvoke = AsyncMock(return_value=AIMessage(content="Direct Answer")) # For Direct
mock_wolfram.return_value = (True, "Wolfram Answer") # (Success, Result)
mock_code.return_value = {"success": True, "output": "Code Answer"} # Code Tool
# We also need to mock LLM for Code Generation (CodeTool logic uses LLM to generate code first)
# But wait, nodes.py calls get_model("qwen") for code gen.
# We can just mock execute_single_question internal logic OR mocks get_model to handle both.
# Let's mock get_model to return different mocks based on call?
# Easier: The executor calls get_model multiple times.
# Let's relax the test to just verifying the parallel logic by mocking at a higher level if needed,
# but here we can rely on side_effect.
async def llm_side_effect(*args, **kwargs):
# args[0] is list of messages. Check content to distinguish.
msgs = args[0]
content = msgs[0].content if msgs else ""
if "CODEGEN_PROMPT" in str(content) or "Visualize" in str(content) or "code" in str(content):
return AIMessage(content="```python\nprint('Code Answer')\n```")
return AIMessage(content="Direct Answer")
mock_llm_exec = MagicMock()
mock_llm_exec.ainvoke.side_effect = llm_side_effect
mock_get_model.return_value = mock_llm_exec
state = await parallel_executor_node(state)
results = state.get("question_results", [])
if len(results) != 3:
log(f"❌ Expected 3 results, got {len(results)}", RED)
return False
# Check results
r1 = next(r for r in results if r["type"] == "direct")
r2 = next(r for r in results if r["type"] == "wolfram")
r3 = next(r for r in results if r["type"] == "code")
if r1["result"] == "Direct Answer" and r2["result"] == "Wolfram Answer" and r3["result"] == "Code Answer":
log("✅ Executor produced correct results", GREEN)
else:
log(f"❌ Results mismatch: {results}", RED)
return False
# 3. Synthesizer
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm_synth = MagicMock()
mock_llm_synth.ainvoke = AsyncMock(return_value=AIMessage(content="## Bài 1...\n## Bài 2...\n## Bài 3..."))
mock_get_model.return_value = mock_llm_synth
state = await synthetic_agent_node(state)
if "## Bài 1" in state["final_response"]:
log("✅ Synthesis successful", GREEN)
return True
return False
async def run_scenario_b_partial_failure():
log("\n📌 SCENARIO B: Partial Failure (Rate Limit)", YELLOW)
state = create_initial_state(session_id="test_partial")
state["execution_plan"] = {
"questions": [
{"id": 1, "type": "direct", "content": "Q1"},
{"id": 2, "type": "wolfram", "content": "Q2"}
]
}
with patch("backend.agent.nodes.get_model") as mock_get_model, \
patch("backend.agent.nodes.model_manager.check_rate_limit") as mock_rate_limit:
mock_llm = MagicMock()
mock_llm.ainvoke = AsyncMock(return_value=AIMessage(content="OK"))
mock_get_model.return_value = mock_llm
# Rate limit side effect: Allow Kimi (Direct), Block Wolfram
def rl_side_effect(model_id):
if "wolfram" in model_id:
return False, "Over Quota"
return True, None
mock_rate_limit.side_effect = rl_side_effect
state = await parallel_executor_node(state)
results = state["question_results"]
q1 = results[0]
q2 = results[1]
if q1.get("result") == "OK" and q2.get("error") and "Rate limit" in q2["error"]:
log("✅ Partial failure handled correctly", GREEN)
return True
else:
log(f"❌ Failed: {results}", RED)
return False
async def run_scenario_c_planner_optimization():
log("\n📌 SCENARIO C: Planner Optimization (All Direct)", YELLOW)
state = create_initial_state(session_id="test_opt")
state["messages"] = [HumanMessage(content="Hello")]
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm = MagicMock()
# Planner returns all direct questions
async def mock_plan(*args, **kwargs):
return AIMessage(content='```json\n{"questions": [{"id": 1, "type": "direct"}]}\n```')
mock_llm.ainvoke.side_effect = mock_plan
mock_get_model.return_value = mock_llm
state = await planner_node(state)
if state["current_agent"] == "reasoning":
log("✅ Optimized route: Planner -> Reasoning (Skipped Executor)", GREEN)
return True
else:
log(f"❌ Failed optimization. Agent is: {state['current_agent']}", RED)
return False
async def run_scenario_d_image_processing():
log("\n📌 SCENARIO D: Multi-Image Processing", YELLOW)
state = create_initial_state(session_id="test_img")
# Simulate 2 images strings
state["image_data_list"] = ["base64_img1", "base64_img2"]
# Mock LLM within OCR Node
# Mock LLM within OCR Node
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm = MagicMock()
# Mock OCR response for parallel calls
async def ocr_response(*args, **kwargs):
return AIMessage(content="Recognized Text")
mock_llm.ainvoke.side_effect = ocr_response
mock_get_model.return_value = mock_llm
state = await ocr_agent_node(state)
ocr_res = state.get("ocr_results", [])
# Check if OCR text contains result (it should be concatenated)
if "Recognized Text" in state.get("ocr_text", ""):
log("✅ Processed images in parallel via LLM Mock", GREEN)
return True
else:
log("❌ Image processing failed", RED)
return False
async def run_scenario_e_planner_failure():
log("\n📌 SCENARIO E: Planner JSON Error (Recovery)", YELLOW)
log(" [Input]: User says 'Complex math'", RESET)
state = create_initial_state(session_id="test_fail_json")
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm = MagicMock()
# Planner returns BROKEN JSON
async def mock_bad_plan(*args, **kwargs):
return AIMessage(content='```json\n{ "questions": [INVALID_JSON... \n```')
mock_llm.ainvoke.side_effect = mock_bad_plan
mock_get_model.return_value = mock_llm
state = await planner_node(state)
log(f" [Output Agent]: {state['current_agent']}", RESET)
if state["current_agent"] == "reasoning":
log("✅ System recovered from bad JSON -> Fallback to Reasoning", GREEN)
return True
else:
log(f"❌ Failed to recover. Current agent: {state['current_agent']}", RED)
return False
async def run_scenario_f_unknown_tool():
log("\n📌 SCENARIO F: Unknown Tool in Plan (Hallucination)", YELLOW)
state = create_initial_state(session_id="test_unknown")
state["execution_plan"] = {
"questions": [
{"id": 1, "type": "magic_wand", "content": "Do magic", "tool_input": "abracadabra"}
]
}
# We don't need to mock tools deeply here, just ensure executor doesn't crash
# and marks it as error or handles it
state = await parallel_executor_node(state)
results = state.get("question_results", [])
if not results:
log("❌ No results generated", RED)
return False
res = results[0]
log(f" [Result]: Type={res['type']}, Error={res.get('error')}, Result={res.get('result')}", RESET)
# Depending on implementation, it might default to 'direct' or 'kimi-k2' logic OR return error.
# Looking at parallel_executor_node code:
# else: # direct ... llm = get_model("kimi-k2")
# So unknown types fall through to "Direct" (Kimi). This is a features, not a bug (Panic fallback).
# Wait, my parallel_executor_node code:
# if q_type == "wolfram": ...
# elif q_type == "code": ...
# else: # direct
# So "magic_wand" falls to "direct" -> calls Kimi.
if res['type'] == 'magic_wand' and res.get("result") is not None:
# It tried to solve it with Kimi (Direct fallback)
log("✅ Unknown tool fell back to Direct LLM (Resilience)", GREEN)
return True
elif res.get("error"):
log("✅ Unknown tool reported error", GREEN)
return True
return False
async def run_scenario_g_executor_direct_failure():
log("\n📌 SCENARIO G: Executor Direct Tool Failure", YELLOW)
state = create_initial_state(session_id="test_g")
state["execution_plan"] = {"questions": [{"id": 1, "type": "direct", "content": "Fail me"}]}
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm = MagicMock()
mock_llm.ainvoke.side_effect = Exception("API 500 Error")
mock_get_model.return_value = mock_llm
state = await parallel_executor_node(state)
res = state["question_results"][0]
if res["error"] and "API 500 Error" in res["error"]:
log("✅ Direct tool failure handled gracefully (Error captured)", GREEN)
return True
return False
async def run_scenario_h_synthesizer_failure():
log("\n📌 SCENARIO H: Synthesizer Failure (Fallback)", YELLOW)
state = create_initial_state(session_id="test_h")
state["question_results"] = [{"id": 1, "content": "Q", "result": "A"}]
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm = MagicMock()
mock_llm.ainvoke.side_effect = Exception("Synth Busy")
mock_get_model.return_value = mock_llm
# Should fallback to manual concatenation
state = await synthetic_agent_node(state)
if "Lỗi khi tổng hợp" in state["final_response"] and "Kết quả gốc" in state["final_response"]:
log("✅ Synthesizer failed but returned raw results (Fallback)", GREEN)
return True
return False
async def run_scenario_i_empty_plan():
log("\n📌 SCENARIO I: Empty Plan (Zero Questions)", YELLOW)
state = create_initial_state(session_id="test_i")
with patch("backend.agent.nodes.get_model") as mock_get_model:
mock_llm = MagicMock()
# Planner returns valid JSON but empty list
async def mock_clean_plan(*args, **kwargs):
return AIMessage(content='```json\n{"questions": []}\n```')
mock_llm.ainvoke.side_effect = mock_clean_plan
mock_get_model.return_value = mock_llm
state = await planner_node(state)
if state["current_agent"] == "reasoning":
log("✅ Empty plan redirected to Reasoning Agent", GREEN)
return True
return False
async def main():
log("🚀 STARTING ULTIMATE TEST SUITE (9 SCENARIOS)...\n")
results = []
results.append(await run_scenario_a_happy_path())
results.append(await run_scenario_b_partial_failure())
results.append(await run_scenario_c_planner_optimization())
results.append(await run_scenario_d_image_processing())
results.append(await run_scenario_e_planner_failure())
results.append(await run_scenario_f_unknown_tool())
results.append(await run_scenario_g_executor_direct_failure())
results.append(await run_scenario_h_synthesizer_failure())
results.append(await run_scenario_i_empty_plan())
print("\n" + "="*40)
if all(results):
log("🎉 ALL 9 SCENARIOS PASSED!", GREEN)
exit(0)
else:
log("💥 SOME TESTS FAILED!", RED)
exit(1)
if __name__ == "__main__":
asyncio.run(main())