| """ |
| Unit tests for RoutePlannerAgent. |
| Tests optimal assignment, penalty application, and fallback algorithms. |
| """ |
|
|
| import pytest |
| from unittest.mock import MagicMock, patch |
| from uuid import uuid4 |
|
|
| from app.services.route_planner_agent import RoutePlannerAgent |
| from app.schemas.agent_schemas import ( |
| EffortMatrixResult, |
| EffortBreakdown, |
| FairnessRecommendations, |
| ) |
|
|
|
|
| class MockDriver: |
| """Mock Driver for testing.""" |
| def __init__(self, id=None, is_ev=False, battery_range_km=None): |
| self.id = id or uuid4() |
| self.is_ev = is_ev |
| self.vehicle_type = "EV" if is_ev else "ICE" |
| self.battery_range_km = battery_range_km |
|
|
|
|
| class MockRoute: |
| """Mock Route for testing.""" |
| def __init__(self, id=None, total_distance_km=25.0, charging_time_minutes=0): |
| self.id = id or uuid4() |
| self.total_distance_km = total_distance_km |
| self.charging_time_minutes = charging_time_minutes |
|
|
|
|
| def create_mock_effort_result( |
| matrix: list[list[float]], |
| drivers: list[MockDriver], |
| routes: list[MockRoute], |
| ) -> EffortMatrixResult: |
| """Create mock EffortMatrixResult from matrix data.""" |
| driver_ids = [str(d.id) for d in drivers] |
| route_ids = [str(r.id) for r in routes] |
| |
| |
| breakdown = {} |
| for i, driver in enumerate(drivers): |
| for j, route in enumerate(routes): |
| key = f"{driver.id}:{route.id}" |
| effort = matrix[i][j] |
| breakdown[key] = EffortBreakdown( |
| physical_effort=effort * 0.4, |
| route_complexity=effort * 0.3, |
| time_pressure=effort * 0.3, |
| capacity_penalty=0.0, |
| total=effort, |
| ) |
| |
| all_values = [v for row in matrix for v in row] |
| |
| return EffortMatrixResult( |
| matrix=matrix, |
| breakdown=breakdown, |
| stats={ |
| "min": min(all_values) if all_values else 0.0, |
| "max": max(all_values) if all_values else 0.0, |
| "avg": sum(all_values) / len(all_values) if all_values else 0.0, |
| }, |
| driver_ids=driver_ids, |
| route_ids=route_ids, |
| ) |
|
|
|
|
| class TestRoutePlannerAgent: |
| """Test suite for RoutePlannerAgent.""" |
| |
| def test_basic_assignment(self): |
| """Test basic 2x2 assignment.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute()] |
| |
| |
| matrix = [ |
| [10.0, 20.0], |
| [30.0, 15.0], |
| ] |
| |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| result = agent.plan(effort_result, drivers, routes) |
| |
| assert len(result.allocation) == 2 |
| assert result.total_effort > 0 |
| assert len(result.per_driver_effort) == 2 |
| |
| def test_optimal_assignment_3x3(self): |
| """Test optimal assignment for 3x3 matrix.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute(), MockRoute()] |
| |
| |
| matrix = [ |
| [30.0, 10.0, 50.0], |
| [40.0, 35.0, 15.0], |
| [20.0, 25.0, 30.0], |
| ] |
| |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| result = agent.plan(effort_result, drivers, routes) |
| |
| |
| assert len(result.allocation) == 3 |
| |
| assert result.total_effort <= 50.0 |
| |
| def test_more_drivers_than_routes(self): |
| """Test handling when there are more drivers than routes.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute()] |
| |
| matrix = [ |
| [10.0, 20.0], |
| [15.0, 25.0], |
| [30.0, 10.0], |
| ] |
| |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| result = agent.plan(effort_result, drivers, routes) |
| |
| |
| assert len(result.allocation) == 2 |
| |
| def test_more_routes_than_drivers(self): |
| """Test handling when there are more routes than drivers.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute(), MockRoute()] |
| |
| matrix = [ |
| [10.0, 20.0, 30.0], |
| [15.0, 25.0, 5.0], |
| ] |
| |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| result = agent.plan(effort_result, drivers, routes) |
| |
| |
| assert len(result.allocation) == 2 |
| |
| def test_penalty_application(self): |
| """Test that penalties increase costs for specified drivers.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute()] |
| |
| matrix = [ |
| [10.0, 50.0], |
| [50.0, 10.0], |
| ] |
| |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| |
| |
| result1 = agent.plan(effort_result, drivers, routes) |
| |
| |
| penalties = {str(drivers[0].id): 10.0} |
| result2 = agent.plan( |
| effort_result, drivers, routes, |
| fairness_penalties=penalties, |
| proposal_number=2, |
| ) |
| |
| assert result2.proposal_number == 2 |
| |
| |
| def test_build_penalties_from_recommendations(self): |
| """Test building penalties from fairness recommendations.""" |
| agent = RoutePlannerAgent() |
| |
| driver_id = str(uuid4()) |
| recommendations = FairnessRecommendations( |
| penalize_high_effort_drivers=True, |
| high_effort_driver_ids=[driver_id], |
| penalty_factor=1.5, |
| ) |
| |
| per_driver_effort = {driver_id: 100.0, str(uuid4()): 50.0} |
| |
| penalties = agent.build_penalties_from_recommendations( |
| recommendations, per_driver_effort |
| ) |
| |
| assert penalties[driver_id] == 1.5 |
| |
| other_id = [k for k in per_driver_effort if k != driver_id][0] |
| assert penalties[other_id] == 1.0 |
| |
| def test_avg_effort_calculation(self): |
| """Test average effort is calculated correctly.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute()] |
| |
| matrix = [ |
| [20.0, 40.0], |
| [40.0, 30.0], |
| ] |
| |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| result = agent.plan(effort_result, drivers, routes) |
| |
| |
| expected_avg = result.total_effort / len(result.allocation) |
| assert abs(result.avg_effort - expected_avg) < 0.01 |
| |
| def test_per_driver_effort_tracking(self): |
| """Test that per-driver effort is tracked correctly.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute()] |
| |
| matrix = [ |
| [25.0, 75.0], |
| [60.0, 40.0], |
| ] |
| |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| result = agent.plan(effort_result, drivers, routes) |
| |
| |
| assert len(result.per_driver_effort) == 2 |
| |
| |
| total_from_per_driver = sum(result.per_driver_effort.values()) |
| assert abs(total_from_per_driver - result.total_effort) < 0.01 |
| |
| def test_empty_inputs(self): |
| """Test handling of empty inputs.""" |
| agent = RoutePlannerAgent() |
| |
| effort_result = EffortMatrixResult( |
| matrix=[], |
| breakdown={}, |
| stats={"min": 0, "max": 0, "avg": 0}, |
| driver_ids=[], |
| route_ids=[], |
| ) |
| |
| result = agent.plan(effort_result, [], []) |
| |
| assert result.allocation == [] |
| assert result.total_effort == 0.0 |
| assert result.avg_effort == 0.0 |
| |
| def test_greedy_fallback(self): |
| """Test greedy assignment fallback.""" |
| agent = RoutePlannerAgent() |
| |
| |
| cost_matrix = [ |
| [10.0, 30.0], |
| [20.0, 5.0], |
| ] |
| |
| assignments = agent._greedy_assignment(cost_matrix, 2, 2) |
| |
| assert len(assignments) == 2 |
| |
| |
| def test_snapshot_generation(self): |
| """Test input/output snapshot generation.""" |
| agent = RoutePlannerAgent() |
| |
| drivers = [MockDriver(), MockDriver()] |
| routes = [MockRoute(), MockRoute()] |
| |
| matrix = [[10.0, 20.0], [30.0, 15.0]] |
| effort_result = create_mock_effort_result(matrix, drivers, routes) |
| |
| input_snapshot = agent.get_input_snapshot(effort_result) |
| assert "matrix_shape" in input_snapshot |
| assert "effort_stats" in input_snapshot |
| |
| result = agent.plan(effort_result, drivers, routes) |
| output_snapshot = agent.get_output_snapshot(result) |
| assert "proposal_number" in output_snapshot |
| assert "total_effort" in output_snapshot |
|
|