| """ |
| Route allocation service. |
| Uses Hungarian algorithm to optimally assign routes to drivers. |
| """ |
|
|
| from dataclasses import dataclass |
| from typing import List, Dict, Any, Tuple |
|
|
| import numpy as np |
| from scipy.optimize import linear_sum_assignment |
|
|
|
|
| @dataclass |
| class AllocationResult: |
| """Result of route allocation for a driver.""" |
| driver_index: int |
| route_index: int |
| cost: float |
|
|
|
|
| def build_cost_matrix( |
| drivers: List[Dict[str, Any]], |
| routes: List[Dict[str, Any]], |
| ) -> np.ndarray: |
| """ |
| Build a cost matrix for the assignment problem. |
| |
| In Phase 1, the cost is simply the route workload score. |
| Future phases could incorporate driver preferences, history, etc. |
| |
| Args: |
| drivers: List of driver dicts |
| routes: List of route dicts with workload_score |
| |
| Returns: |
| Cost matrix of shape (num_drivers, num_routes) |
| """ |
| num_drivers = len(drivers) |
| num_routes = len(routes) |
| |
| |
| cost_matrix = np.zeros((num_drivers, num_routes)) |
| |
| for i, driver in enumerate(drivers): |
| for j, route in enumerate(routes): |
| |
| cost = route.get("workload_score", 0.0) |
| |
| |
| |
| vehicle_capacity = driver.get("vehicle_capacity_kg", 100.0) |
| route_weight = route.get("total_weight_kg", 0.0) |
| |
| if route_weight > vehicle_capacity: |
| |
| cost += (route_weight - vehicle_capacity) * 10 |
| |
| cost_matrix[i, j] = cost |
| |
| return cost_matrix |
|
|
|
|
| def allocate_routes( |
| drivers: List[Dict[str, Any]], |
| routes: List[Dict[str, Any]], |
| ) -> List[AllocationResult]: |
| """ |
| Allocate routes to drivers using the Hungarian algorithm. |
| |
| The Hungarian algorithm finds the optimal assignment that minimizes |
| total cost (in this case, tries to balance workload). |
| |
| Args: |
| drivers: List of driver dicts |
| routes: List of route dicts with workload_score |
| |
| Returns: |
| List of AllocationResult objects |
| """ |
| num_drivers = len(drivers) |
| num_routes = len(routes) |
| |
| if num_drivers == 0 or num_routes == 0: |
| return [] |
| |
| |
| if num_drivers != num_routes: |
| |
| n = max(num_drivers, num_routes) |
| cost_matrix = np.full((n, n), fill_value=1e9) |
| |
| actual_costs = build_cost_matrix(drivers, routes) |
| cost_matrix[:num_drivers, :num_routes] = actual_costs |
| else: |
| cost_matrix = build_cost_matrix(drivers, routes) |
| |
| |
| row_ind, col_ind = linear_sum_assignment(cost_matrix) |
| |
| |
| results = [] |
| for driver_idx, route_idx in zip(row_ind, col_ind): |
| if driver_idx < num_drivers and route_idx < num_routes: |
| results.append(AllocationResult( |
| driver_index=int(driver_idx), |
| route_index=int(route_idx), |
| cost=cost_matrix[driver_idx, route_idx], |
| )) |
| |
| return results |
|
|
|
|
| def greedy_allocate( |
| drivers: List[Dict[str, Any]], |
| routes: List[Dict[str, Any]], |
| ) -> List[AllocationResult]: |
| """ |
| Simple greedy allocation as a fallback. |
| Assigns each route to the next available driver in order. |
| |
| Args: |
| drivers: List of driver dicts |
| routes: List of route dicts |
| |
| Returns: |
| List of AllocationResult objects |
| """ |
| results = [] |
| num_assignments = min(len(drivers), len(routes)) |
| |
| for i in range(num_assignments): |
| results.append(AllocationResult( |
| driver_index=i, |
| route_index=i, |
| cost=routes[i].get("workload_score", 0.0), |
| )) |
| |
| return results |
|
|