| """ |
| Tests for LangGraph allocation workflow. |
| Verifies workflow equivalence, decision logging, and performance. |
| """ |
|
|
| import pytest |
| import time |
| from datetime import date |
| from uuid import uuid4 |
|
|
| from app.schemas.allocation_state import AllocationState |
| from app.services.langgraph_nodes import ( |
| ml_effort_node, |
| route_planner_node, |
| fairness_check_node, |
| should_reoptimize, |
| ) |
| from app.services.langgraph_workflow import ( |
| create_allocation_graph, |
| get_workflow_visualization, |
| ) |
|
|
|
|
| class TestAllocationState: |
| """Tests for AllocationState schema.""" |
| |
| def test_allocation_state_defaults(self): |
| """AllocationState should have sensible defaults.""" |
| state = AllocationState() |
| |
| assert state.request == {} |
| assert state.config_used is None |
| assert state.decision_logs == [] |
| assert state.effort_matrix is None |
| assert state.explanations == {} |
| |
| def test_allocation_state_serialization(self): |
| """AllocationState should serialize to dict.""" |
| state = AllocationState( |
| request={"test": "data"}, |
| driver_models=[{"id": "d1", "name": "Driver 1"}], |
| ) |
| |
| data = state.model_dump(mode="json") |
| |
| assert data["request"] == {"test": "data"} |
| assert data["driver_models"] == [{"id": "d1", "name": "Driver 1"}] |
| |
| def test_allocation_state_deserialization(self): |
| """AllocationState should deserialize from dict.""" |
| data = { |
| "request": {"drivers": []}, |
| "config_used": {"gini_threshold": 0.35}, |
| "decision_logs": [{"agent_name": "TEST"}], |
| } |
| |
| state = AllocationState.model_validate(data) |
| |
| assert state.request == {"drivers": []} |
| assert state.config_used["gini_threshold"] == 0.35 |
| assert len(state.decision_logs) == 1 |
|
|
|
|
| class TestLangGraphNodes: |
| """Tests for individual LangGraph nodes.""" |
| |
| def test_should_reoptimize_returns_reoptimize(self): |
| """should_reoptimize should return 'reoptimize' when fairness check says REOPTIMIZE.""" |
| state = AllocationState( |
| fairness_check_1={"status": "REOPTIMIZE"}, |
| route_proposal_2=None, |
| ) |
| |
| result = should_reoptimize(state) |
| |
| assert result == "reoptimize" |
| |
| def test_should_reoptimize_returns_continue(self): |
| """should_reoptimize should return 'continue' when fairness check says ACCEPT.""" |
| state = AllocationState( |
| fairness_check_1={"status": "ACCEPT"}, |
| ) |
| |
| result = should_reoptimize(state) |
| |
| assert result == "continue" |
| |
| def test_should_reoptimize_skips_when_proposal2_exists(self): |
| """should_reoptimize should return 'continue' if proposal 2 already exists.""" |
| state = AllocationState( |
| fairness_check_1={"status": "REOPTIMIZE"}, |
| route_proposal_2={"allocation": []}, |
| ) |
| |
| result = should_reoptimize(state) |
| |
| assert result == "continue" |
|
|
|
|
| class TestWorkflowGraph: |
| """Tests for the LangGraph workflow.""" |
| |
| def test_create_allocation_graph(self): |
| """create_allocation_graph should return a compiled graph.""" |
| graph = create_allocation_graph() |
| |
| assert graph is not None |
| |
| assert hasattr(graph, 'invoke') or hasattr(graph, 'ainvoke') |
| |
| def test_workflow_visualization(self): |
| """get_workflow_visualization should return a Mermaid diagram.""" |
| diagram = get_workflow_visualization() |
| |
| assert "```mermaid" in diagram |
| assert "ml_effort" in diagram |
| assert "fairness_check_1" in diagram |
| assert "explainability" in diagram |
| |
| def test_graph_with_gemini_disabled(self): |
| """Graph should compile without Gemini node.""" |
| import os |
| os.environ.pop("GOOGLE_API_KEY", None) |
| |
| graph = create_allocation_graph(enable_gemini=False) |
| |
| assert graph is not None |
|
|
|
|
| class TestDecisionLogging: |
| """Tests for decision log generation.""" |
| |
| def test_ml_effort_node_creates_log(self): |
| """ml_effort_node should append to decision_logs.""" |
| |
| |
| pass |
| |
| def test_decision_log_format(self): |
| """Decision logs should have required fields.""" |
| log_entry = { |
| "timestamp": "2026-02-04T10:00:00", |
| "agent_name": "ML_EFFORT", |
| "step_type": "MATRIX_GENERATION", |
| "input_snapshot": {"num_drivers": 5}, |
| "output_snapshot": {"matrix_size": 25}, |
| } |
| |
| assert "timestamp" in log_entry |
| assert "agent_name" in log_entry |
| assert "step_type" in log_entry |
| assert "input_snapshot" in log_entry |
| assert "output_snapshot" in log_entry |
|
|
|
|
| class TestWorkflowPerformance: |
| """Performance tests for the workflow.""" |
| |
| @pytest.mark.slow |
| def test_state_serialization_performance(self): |
| """State serialization should be fast.""" |
| state = AllocationState( |
| request={"packages": [{"id": f"pkg_{i}"} for i in range(100)]}, |
| decision_logs=[{"step": i} for i in range(50)], |
| ) |
| |
| start = time.time() |
| for _ in range(100): |
| state.model_dump(mode="json") |
| elapsed = time.time() - start |
| |
| |
| assert elapsed < 1.0, f"Serialization too slow: {elapsed:.2f}s" |
|
|
|
|
| |
| class TestWorkflowEquivalence: |
| """Tests to verify LangGraph produces same results as original.""" |
| |
| @pytest.mark.skip(reason="Requires full DB setup - run manually") |
| async def test_workflow_produces_identical_results(self): |
| """LangGraph workflow should produce identical results to original endpoint.""" |
| |
| |
| |
| |
| pass |
|
|