File size: 4,079 Bytes
fcf8749
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Route allocation service.
Uses Hungarian algorithm to optimally assign routes to drivers.
"""

from dataclasses import dataclass
from typing import List, Dict, Any, Tuple

import numpy as np
from scipy.optimize import linear_sum_assignment


@dataclass
class AllocationResult:
    """Result of route allocation for a driver."""
    driver_index: int
    route_index: int
    cost: float


def build_cost_matrix(
    drivers: List[Dict[str, Any]],
    routes: List[Dict[str, Any]],
) -> np.ndarray:
    """
    Build a cost matrix for the assignment problem.
    
    In Phase 1, the cost is simply the route workload score.
    Future phases could incorporate driver preferences, history, etc.
    
    Args:
        drivers: List of driver dicts
        routes: List of route dicts with workload_score
    
    Returns:
        Cost matrix of shape (num_drivers, num_routes)
    """
    num_drivers = len(drivers)
    num_routes = len(routes)
    
    # Initialize cost matrix
    cost_matrix = np.zeros((num_drivers, num_routes))
    
    for i, driver in enumerate(drivers):
        for j, route in enumerate(routes):
            # Base cost is the route workload score
            cost = route.get("workload_score", 0.0)
            
            # Future: could add driver-specific costs here
            # e.g., penalize if route weight exceeds vehicle capacity
            vehicle_capacity = driver.get("vehicle_capacity_kg", 100.0)
            route_weight = route.get("total_weight_kg", 0.0)
            
            if route_weight > vehicle_capacity:
                # Penalize over-capacity assignments
                cost += (route_weight - vehicle_capacity) * 10
            
            cost_matrix[i, j] = cost
    
    return cost_matrix


def allocate_routes(
    drivers: List[Dict[str, Any]],
    routes: List[Dict[str, Any]],
) -> List[AllocationResult]:
    """
    Allocate routes to drivers using the Hungarian algorithm.
    
    The Hungarian algorithm finds the optimal assignment that minimizes
    total cost (in this case, tries to balance workload).
    
    Args:
        drivers: List of driver dicts
        routes: List of route dicts with workload_score
    
    Returns:
        List of AllocationResult objects
    """
    num_drivers = len(drivers)
    num_routes = len(routes)
    
    if num_drivers == 0 or num_routes == 0:
        return []
    
    # Handle rectangular matrices (more drivers than routes or vice versa)
    if num_drivers != num_routes:
        # Pad to make square matrix
        n = max(num_drivers, num_routes)
        cost_matrix = np.full((n, n), fill_value=1e9)  # High cost for dummy assignments
        
        actual_costs = build_cost_matrix(drivers, routes)
        cost_matrix[:num_drivers, :num_routes] = actual_costs
    else:
        cost_matrix = build_cost_matrix(drivers, routes)
    
    # Solve assignment problem
    row_ind, col_ind = linear_sum_assignment(cost_matrix)
    
    # Extract valid assignments (ignore dummy assignments)
    results = []
    for driver_idx, route_idx in zip(row_ind, col_ind):
        if driver_idx < num_drivers and route_idx < num_routes:
            results.append(AllocationResult(
                driver_index=int(driver_idx),
                route_index=int(route_idx),
                cost=cost_matrix[driver_idx, route_idx],
            ))
    
    return results


def greedy_allocate(
    drivers: List[Dict[str, Any]],
    routes: List[Dict[str, Any]],
) -> List[AllocationResult]:
    """
    Simple greedy allocation as a fallback.
    Assigns each route to the next available driver in order.
    
    Args:
        drivers: List of driver dicts
        routes: List of route dicts
    
    Returns:
        List of AllocationResult objects
    """
    results = []
    num_assignments = min(len(drivers), len(routes))
    
    for i in range(num_assignments):
        results.append(AllocationResult(
            driver_index=i,
            route_index=i,
            cost=routes[i].get("workload_score", 0.0),
        ))
    
    return results