File size: 9,902 Bytes
fcf8749 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 | """
Unit tests for RoutePlannerAgent.
Tests optimal assignment, penalty application, and fallback algorithms.
"""
import pytest
from unittest.mock import MagicMock, patch
from uuid import uuid4
from app.services.route_planner_agent import RoutePlannerAgent
from app.schemas.agent_schemas import (
EffortMatrixResult,
EffortBreakdown,
FairnessRecommendations,
)
class MockDriver:
"""Mock Driver for testing."""
def __init__(self, id=None, is_ev=False, battery_range_km=None):
self.id = id or uuid4()
self.is_ev = is_ev
self.vehicle_type = "EV" if is_ev else "ICE"
self.battery_range_km = battery_range_km
class MockRoute:
"""Mock Route for testing."""
def __init__(self, id=None, total_distance_km=25.0, charging_time_minutes=0):
self.id = id or uuid4()
self.total_distance_km = total_distance_km
self.charging_time_minutes = charging_time_minutes
def create_mock_effort_result(
matrix: list[list[float]],
drivers: list[MockDriver],
routes: list[MockRoute],
) -> EffortMatrixResult:
"""Create mock EffortMatrixResult from matrix data."""
driver_ids = [str(d.id) for d in drivers]
route_ids = [str(r.id) for r in routes]
# Create breakdown
breakdown = {}
for i, driver in enumerate(drivers):
for j, route in enumerate(routes):
key = f"{driver.id}:{route.id}"
effort = matrix[i][j]
breakdown[key] = EffortBreakdown(
physical_effort=effort * 0.4,
route_complexity=effort * 0.3,
time_pressure=effort * 0.3,
capacity_penalty=0.0,
total=effort,
)
all_values = [v for row in matrix for v in row]
return EffortMatrixResult(
matrix=matrix,
breakdown=breakdown,
stats={
"min": min(all_values) if all_values else 0.0,
"max": max(all_values) if all_values else 0.0,
"avg": sum(all_values) / len(all_values) if all_values else 0.0,
},
driver_ids=driver_ids,
route_ids=route_ids,
)
class TestRoutePlannerAgent:
"""Test suite for RoutePlannerAgent."""
def test_basic_assignment(self):
"""Test basic 2x2 assignment."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
# Simple cost matrix where diagonal is cheaper
matrix = [
[10.0, 20.0],
[30.0, 15.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
assert len(result.allocation) == 2
assert result.total_effort > 0
assert len(result.per_driver_effort) == 2
def test_optimal_assignment_3x3(self):
"""Test optimal assignment for 3x3 matrix."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute(), MockRoute()]
# Known optimal: D0->R1 (10), D1->R2 (15), D2->R0 (20) = 45
matrix = [
[30.0, 10.0, 50.0], # D0: best is R1 (10)
[40.0, 35.0, 15.0], # D1: best is R2 (15)
[20.0, 25.0, 30.0], # D2: best is R0 (20)
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Should find optimal or near-optimal
assert len(result.allocation) == 3
# Total should be close to optimal (45)
assert result.total_effort <= 50.0 # Allow some tolerance
def test_more_drivers_than_routes(self):
"""Test handling when there are more drivers than routes."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[10.0, 20.0],
[15.0, 25.0],
[30.0, 10.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Only 2 assignments possible
assert len(result.allocation) == 2
def test_more_routes_than_drivers(self):
"""Test handling when there are more routes than drivers."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute(), MockRoute()]
matrix = [
[10.0, 20.0, 30.0],
[15.0, 25.0, 5.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Each driver gets 1 route
assert len(result.allocation) == 2
def test_penalty_application(self):
"""Test that penalties increase costs for specified drivers."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[10.0, 50.0],
[50.0, 10.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
# Without penalty - should assign D0->R0, D1->R1
result1 = agent.plan(effort_result, drivers, routes)
# With heavy penalty on first driver
penalties = {str(drivers[0].id): 10.0} # 10x penalty
result2 = agent.plan(
effort_result, drivers, routes,
fairness_penalties=penalties,
proposal_number=2,
)
assert result2.proposal_number == 2
# The assignment may change due to penalties
def test_build_penalties_from_recommendations(self):
"""Test building penalties from fairness recommendations."""
agent = RoutePlannerAgent()
driver_id = str(uuid4())
recommendations = FairnessRecommendations(
penalize_high_effort_drivers=True,
high_effort_driver_ids=[driver_id],
penalty_factor=1.5,
)
per_driver_effort = {driver_id: 100.0, str(uuid4()): 50.0}
penalties = agent.build_penalties_from_recommendations(
recommendations, per_driver_effort
)
assert penalties[driver_id] == 1.5
# Other driver should have normal weight
other_id = [k for k in per_driver_effort if k != driver_id][0]
assert penalties[other_id] == 1.0
def test_avg_effort_calculation(self):
"""Test average effort is calculated correctly."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[20.0, 40.0],
[40.0, 30.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Average should be total / num_assignments
expected_avg = result.total_effort / len(result.allocation)
assert abs(result.avg_effort - expected_avg) < 0.01
def test_per_driver_effort_tracking(self):
"""Test that per-driver effort is tracked correctly."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[25.0, 75.0],
[60.0, 40.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Each driver should have an entry
assert len(result.per_driver_effort) == 2
# Efforts should sum to total
total_from_per_driver = sum(result.per_driver_effort.values())
assert abs(total_from_per_driver - result.total_effort) < 0.01
def test_empty_inputs(self):
"""Test handling of empty inputs."""
agent = RoutePlannerAgent()
effort_result = EffortMatrixResult(
matrix=[],
breakdown={},
stats={"min": 0, "max": 0, "avg": 0},
driver_ids=[],
route_ids=[],
)
result = agent.plan(effort_result, [], [])
assert result.allocation == []
assert result.total_effort == 0.0
assert result.avg_effort == 0.0
def test_greedy_fallback(self):
"""Test greedy assignment fallback."""
agent = RoutePlannerAgent()
# Use the private greedy method directly
cost_matrix = [
[10.0, 30.0],
[20.0, 5.0],
]
assignments = agent._greedy_assignment(cost_matrix, 2, 2)
assert len(assignments) == 2
# Should find good (if not optimal) assignment
def test_snapshot_generation(self):
"""Test input/output snapshot generation."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [[10.0, 20.0], [30.0, 15.0]]
effort_result = create_mock_effort_result(matrix, drivers, routes)
input_snapshot = agent.get_input_snapshot(effort_result)
assert "matrix_shape" in input_snapshot
assert "effort_stats" in input_snapshot
result = agent.plan(effort_result, drivers, routes)
output_snapshot = agent.get_output_snapshot(result)
assert "proposal_number" in output_snapshot
assert "total_effort" in output_snapshot
|