FairRelay / brain /app /services /allocation.py
MouleeswaranM's picture
Upload folder using huggingface_hub
fcf8749 verified
"""
Route allocation service.
Uses Hungarian algorithm to optimally assign routes to drivers.
"""
from dataclasses import dataclass
from typing import List, Dict, Any, Tuple
import numpy as np
from scipy.optimize import linear_sum_assignment
@dataclass
class AllocationResult:
"""Result of route allocation for a driver."""
driver_index: int
route_index: int
cost: float
def build_cost_matrix(
drivers: List[Dict[str, Any]],
routes: List[Dict[str, Any]],
) -> np.ndarray:
"""
Build a cost matrix for the assignment problem.
In Phase 1, the cost is simply the route workload score.
Future phases could incorporate driver preferences, history, etc.
Args:
drivers: List of driver dicts
routes: List of route dicts with workload_score
Returns:
Cost matrix of shape (num_drivers, num_routes)
"""
num_drivers = len(drivers)
num_routes = len(routes)
# Initialize cost matrix
cost_matrix = np.zeros((num_drivers, num_routes))
for i, driver in enumerate(drivers):
for j, route in enumerate(routes):
# Base cost is the route workload score
cost = route.get("workload_score", 0.0)
# Future: could add driver-specific costs here
# e.g., penalize if route weight exceeds vehicle capacity
vehicle_capacity = driver.get("vehicle_capacity_kg", 100.0)
route_weight = route.get("total_weight_kg", 0.0)
if route_weight > vehicle_capacity:
# Penalize over-capacity assignments
cost += (route_weight - vehicle_capacity) * 10
cost_matrix[i, j] = cost
return cost_matrix
def allocate_routes(
drivers: List[Dict[str, Any]],
routes: List[Dict[str, Any]],
) -> List[AllocationResult]:
"""
Allocate routes to drivers using the Hungarian algorithm.
The Hungarian algorithm finds the optimal assignment that minimizes
total cost (in this case, tries to balance workload).
Args:
drivers: List of driver dicts
routes: List of route dicts with workload_score
Returns:
List of AllocationResult objects
"""
num_drivers = len(drivers)
num_routes = len(routes)
if num_drivers == 0 or num_routes == 0:
return []
# Handle rectangular matrices (more drivers than routes or vice versa)
if num_drivers != num_routes:
# Pad to make square matrix
n = max(num_drivers, num_routes)
cost_matrix = np.full((n, n), fill_value=1e9) # High cost for dummy assignments
actual_costs = build_cost_matrix(drivers, routes)
cost_matrix[:num_drivers, :num_routes] = actual_costs
else:
cost_matrix = build_cost_matrix(drivers, routes)
# Solve assignment problem
row_ind, col_ind = linear_sum_assignment(cost_matrix)
# Extract valid assignments (ignore dummy assignments)
results = []
for driver_idx, route_idx in zip(row_ind, col_ind):
if driver_idx < num_drivers and route_idx < num_routes:
results.append(AllocationResult(
driver_index=int(driver_idx),
route_index=int(route_idx),
cost=cost_matrix[driver_idx, route_idx],
))
return results
def greedy_allocate(
drivers: List[Dict[str, Any]],
routes: List[Dict[str, Any]],
) -> List[AllocationResult]:
"""
Simple greedy allocation as a fallback.
Assigns each route to the next available driver in order.
Args:
drivers: List of driver dicts
routes: List of route dicts
Returns:
List of AllocationResult objects
"""
results = []
num_assignments = min(len(drivers), len(routes))
for i in range(num_assignments):
results.append(AllocationResult(
driver_index=i,
route_index=i,
cost=routes[i].get("workload_score", 0.0),
))
return results