FairRelay / brain /tests /test_route_planner_agent.py
MouleeswaranM's picture
Upload folder using huggingface_hub
fcf8749 verified
raw
history blame
9.9 kB
"""
Unit tests for RoutePlannerAgent.
Tests optimal assignment, penalty application, and fallback algorithms.
"""
import pytest
from unittest.mock import MagicMock, patch
from uuid import uuid4
from app.services.route_planner_agent import RoutePlannerAgent
from app.schemas.agent_schemas import (
EffortMatrixResult,
EffortBreakdown,
FairnessRecommendations,
)
class MockDriver:
"""Mock Driver for testing."""
def __init__(self, id=None, is_ev=False, battery_range_km=None):
self.id = id or uuid4()
self.is_ev = is_ev
self.vehicle_type = "EV" if is_ev else "ICE"
self.battery_range_km = battery_range_km
class MockRoute:
"""Mock Route for testing."""
def __init__(self, id=None, total_distance_km=25.0, charging_time_minutes=0):
self.id = id or uuid4()
self.total_distance_km = total_distance_km
self.charging_time_minutes = charging_time_minutes
def create_mock_effort_result(
matrix: list[list[float]],
drivers: list[MockDriver],
routes: list[MockRoute],
) -> EffortMatrixResult:
"""Create mock EffortMatrixResult from matrix data."""
driver_ids = [str(d.id) for d in drivers]
route_ids = [str(r.id) for r in routes]
# Create breakdown
breakdown = {}
for i, driver in enumerate(drivers):
for j, route in enumerate(routes):
key = f"{driver.id}:{route.id}"
effort = matrix[i][j]
breakdown[key] = EffortBreakdown(
physical_effort=effort * 0.4,
route_complexity=effort * 0.3,
time_pressure=effort * 0.3,
capacity_penalty=0.0,
total=effort,
)
all_values = [v for row in matrix for v in row]
return EffortMatrixResult(
matrix=matrix,
breakdown=breakdown,
stats={
"min": min(all_values) if all_values else 0.0,
"max": max(all_values) if all_values else 0.0,
"avg": sum(all_values) / len(all_values) if all_values else 0.0,
},
driver_ids=driver_ids,
route_ids=route_ids,
)
class TestRoutePlannerAgent:
"""Test suite for RoutePlannerAgent."""
def test_basic_assignment(self):
"""Test basic 2x2 assignment."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
# Simple cost matrix where diagonal is cheaper
matrix = [
[10.0, 20.0],
[30.0, 15.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
assert len(result.allocation) == 2
assert result.total_effort > 0
assert len(result.per_driver_effort) == 2
def test_optimal_assignment_3x3(self):
"""Test optimal assignment for 3x3 matrix."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute(), MockRoute()]
# Known optimal: D0->R1 (10), D1->R2 (15), D2->R0 (20) = 45
matrix = [
[30.0, 10.0, 50.0], # D0: best is R1 (10)
[40.0, 35.0, 15.0], # D1: best is R2 (15)
[20.0, 25.0, 30.0], # D2: best is R0 (20)
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Should find optimal or near-optimal
assert len(result.allocation) == 3
# Total should be close to optimal (45)
assert result.total_effort <= 50.0 # Allow some tolerance
def test_more_drivers_than_routes(self):
"""Test handling when there are more drivers than routes."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[10.0, 20.0],
[15.0, 25.0],
[30.0, 10.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Only 2 assignments possible
assert len(result.allocation) == 2
def test_more_routes_than_drivers(self):
"""Test handling when there are more routes than drivers."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute(), MockRoute()]
matrix = [
[10.0, 20.0, 30.0],
[15.0, 25.0, 5.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Each driver gets 1 route
assert len(result.allocation) == 2
def test_penalty_application(self):
"""Test that penalties increase costs for specified drivers."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[10.0, 50.0],
[50.0, 10.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
# Without penalty - should assign D0->R0, D1->R1
result1 = agent.plan(effort_result, drivers, routes)
# With heavy penalty on first driver
penalties = {str(drivers[0].id): 10.0} # 10x penalty
result2 = agent.plan(
effort_result, drivers, routes,
fairness_penalties=penalties,
proposal_number=2,
)
assert result2.proposal_number == 2
# The assignment may change due to penalties
def test_build_penalties_from_recommendations(self):
"""Test building penalties from fairness recommendations."""
agent = RoutePlannerAgent()
driver_id = str(uuid4())
recommendations = FairnessRecommendations(
penalize_high_effort_drivers=True,
high_effort_driver_ids=[driver_id],
penalty_factor=1.5,
)
per_driver_effort = {driver_id: 100.0, str(uuid4()): 50.0}
penalties = agent.build_penalties_from_recommendations(
recommendations, per_driver_effort
)
assert penalties[driver_id] == 1.5
# Other driver should have normal weight
other_id = [k for k in per_driver_effort if k != driver_id][0]
assert penalties[other_id] == 1.0
def test_avg_effort_calculation(self):
"""Test average effort is calculated correctly."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[20.0, 40.0],
[40.0, 30.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Average should be total / num_assignments
expected_avg = result.total_effort / len(result.allocation)
assert abs(result.avg_effort - expected_avg) < 0.01
def test_per_driver_effort_tracking(self):
"""Test that per-driver effort is tracked correctly."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [
[25.0, 75.0],
[60.0, 40.0],
]
effort_result = create_mock_effort_result(matrix, drivers, routes)
result = agent.plan(effort_result, drivers, routes)
# Each driver should have an entry
assert len(result.per_driver_effort) == 2
# Efforts should sum to total
total_from_per_driver = sum(result.per_driver_effort.values())
assert abs(total_from_per_driver - result.total_effort) < 0.01
def test_empty_inputs(self):
"""Test handling of empty inputs."""
agent = RoutePlannerAgent()
effort_result = EffortMatrixResult(
matrix=[],
breakdown={},
stats={"min": 0, "max": 0, "avg": 0},
driver_ids=[],
route_ids=[],
)
result = agent.plan(effort_result, [], [])
assert result.allocation == []
assert result.total_effort == 0.0
assert result.avg_effort == 0.0
def test_greedy_fallback(self):
"""Test greedy assignment fallback."""
agent = RoutePlannerAgent()
# Use the private greedy method directly
cost_matrix = [
[10.0, 30.0],
[20.0, 5.0],
]
assignments = agent._greedy_assignment(cost_matrix, 2, 2)
assert len(assignments) == 2
# Should find good (if not optimal) assignment
def test_snapshot_generation(self):
"""Test input/output snapshot generation."""
agent = RoutePlannerAgent()
drivers = [MockDriver(), MockDriver()]
routes = [MockRoute(), MockRoute()]
matrix = [[10.0, 20.0], [30.0, 15.0]]
effort_result = create_mock_effort_result(matrix, drivers, routes)
input_snapshot = agent.get_input_snapshot(effort_result)
assert "matrix_shape" in input_snapshot
assert "effort_stats" in input_snapshot
result = agent.plan(effort_result, drivers, routes)
output_snapshot = agent.get_output_snapshot(result)
assert "proposal_number" in output_snapshot
assert "total_effort" in output_snapshot