| import abc |
| import numpy as np |
| import networkx as nx |
| try: |
| import osmnx as ox |
| except ImportError: |
| ox = None |
| from typing import List, Dict, Any, Tuple |
| from dataclasses import dataclass, field |
| from models import Driver, Package |
|
|
| @dataclass |
| class Solution: |
| """Stores the result of the optimization.""" |
| assignments: Dict[str, List[str]] = field(default_factory=dict) |
| routes: Dict[str, List[Tuple[float, float]]] = field(default_factory=dict) |
| total_distance: float = 0.0 |
| fairness_score: float = 0.0 |
|
|
| class DistanceCache: |
| """Simple cache for network distances to avoid re-computation.""" |
| _cache = {} |
|
|
| @classmethod |
| def get_dist(cls, graph, source, target): |
| key = (source, target) |
| if key in cls._cache: |
| return cls._cache[key] |
| try: |
| val = nx.shortest_path_length(graph, source, target, weight='length') |
| cls._cache[key] = val |
| return val |
| except nx.NetworkXNoPath: |
| return float('inf') |
|
|
| @classmethod |
| def clear(cls): |
| cls._cache = {} |
|
|
| class RouteOptimizer(abc.ABC): |
| """Abstract Base Class for Route Optimization Strategies.""" |
| |
| @abc.abstractmethod |
| def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution: |
| """Calculate optimal routes.""" |
| pass |
|
|
| def _get_path_length(self, graph, path_nodes): |
| """Calculate total length of a path in meters.""" |
| length = 0 |
| for i in range(len(path_nodes) - 1): |
| if path_nodes[i] == path_nodes[i+1]: continue |
| length += DistanceCache.get_dist(graph, path_nodes[i], path_nodes[i+1]) |
| return length |
|
|
| def _get_route_coordinates(self, graph, path_nodes): |
| """Convert list of node IDs to (lat, lon) coordinates.""" |
| coords = [] |
| for i in range(len(path_nodes)): |
| |
| |
| if i < len(path_nodes) - 1 and path_nodes[i] != path_nodes[i+1]: |
| try: |
| segment = nx.shortest_path(graph, path_nodes[i], path_nodes[i+1], weight='length') |
| |
| for node in segment[:-1]: |
| coords.append((graph.nodes[node]['y'], graph.nodes[node]['x'])) |
| except nx.NetworkXNoPath: |
| pass |
| else: |
| |
| node = path_nodes[i] |
| coords.append((graph.nodes[node]['y'], graph.nodes[node]['x'])) |
| |
| |
| if path_nodes: |
| last = path_nodes[-1] |
| coords.append((graph.nodes[last]['y'], graph.nodes[last]['x'])) |
| |
| return coords |
|
|
| class SimpleNearestNeighbor(RouteOptimizer): |
| """Greedy strategy that assigns nearest available package to driver.""" |
|
|
| def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution: |
| solution = Solution() |
| DistanceCache.clear() |
| |
| unassigned_packages = packages.copy() |
| driver_paths = {d.id: [d.node_id] for d in drivers} |
| driver_distances = {d.id: 0.0 for d in drivers} |
| |
| while unassigned_packages: |
| for driver in drivers: |
| if not unassigned_packages: |
| break |
| |
| current_node = driver_paths[driver.id][-1] |
| |
| best_pkg = None |
| best_dist = float('inf') |
| |
| for pkg in unassigned_packages: |
| dist = DistanceCache.get_dist(graph, current_node, pkg.node_id) |
| if dist < best_dist: |
| best_dist = dist |
| best_pkg = pkg |
| |
| if best_pkg: |
| driver_paths[driver.id].append(best_pkg.node_id) |
| driver_distances[driver.id] += best_dist |
| solution.assignments.setdefault(driver.id, []).append(best_pkg.id) |
| unassigned_packages.remove(best_pkg) |
| |
| for driver in drivers: |
| solution.routes[driver.id] = self._get_route_coordinates(graph, driver_paths[driver.id]) |
| |
| solution.total_distance = sum(driver_distances.values()) |
| if driver_distances: |
| solution.fairness_score = np.var(list(driver_distances.values())) |
| |
| return solution |
|
|
| class ClusterAndRoute(RouteOptimizer): |
| """K-Means Clustering + TSP for balanced zones.""" |
| |
| def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution: |
| try: |
| from sklearn.cluster import KMeans |
| except ImportError: |
| print("sklearn not found, using Mock/Fallback or failing gracefully.") |
| return Solution() |
| |
| solution = Solution() |
| DistanceCache.clear() |
| |
| if not packages or not drivers: |
| return solution |
| |
| |
| pkg_coords = np.array([[p.lat, p.lon] for p in packages]) |
| n_clusters = min(len(packages), len(drivers)) |
| |
| kmeans = KMeans(n_clusters=n_clusters, random_state=42) |
| labels = kmeans.fit_predict(pkg_coords) |
| |
| cluster_map = {i: [] for i in range(n_clusters)} |
| for idx, label in enumerate(labels): |
| cluster_map[label].append(packages[idx]) |
| |
| |
| available_clusters = list(cluster_map.keys()) |
| driver_assignments = {} |
| |
| for driver in drivers: |
| if not available_clusters: |
| break |
| |
| best_c_idx = -1 |
| best_dist = float('inf') |
| |
| for c_idx in available_clusters: |
| pts = cluster_map[c_idx] |
| if not pts: continue |
| avg_lat = np.mean([p.lat for p in pts]) |
| avg_lon = np.mean([p.lon for p in pts]) |
| |
| |
| d = (driver.lat - avg_lat)**2 + (driver.lon - avg_lon)**2 |
| if d < best_dist: |
| best_dist = d |
| best_c_idx = c_idx |
| |
| if best_c_idx != -1: |
| driver_assignments[driver.id] = cluster_map[best_c_idx] |
| available_clusters.remove(best_c_idx) |
| |
| |
| total_dist = 0 |
| dist_values = [] |
| |
| for driver_id, pkgs in driver_assignments.items(): |
| driver = next(d for d in drivers if d.id == driver_id) |
| current_node = driver.node_id |
| route_nodes = [current_node] |
| unvisited = pkgs.copy() |
| driver_dist = 0 |
| route_pkg_ids = [] |
| |
| while unvisited: |
| best_p = None |
| best_d_val = float('inf') |
| |
| for p in unvisited: |
| d_val = DistanceCache.get_dist(graph, current_node, p.node_id) |
| if d_val < best_d_val: |
| best_d_val = d_val |
| best_p = p |
| |
| if best_p: |
| unvisited.remove(best_p) |
| current_node = best_p.node_id |
| route_nodes.append(current_node) |
| driver_dist += best_d_val |
| route_pkg_ids.append(best_p.id) |
| |
| solution.assignments[driver_id] = route_pkg_ids |
| total_dist += driver_dist |
| dist_values.append(driver_dist) |
| |
| solution.routes[driver_id] = self._get_route_coordinates(graph, route_nodes) |
|
|
| solution.total_distance = total_dist |
| if dist_values: |
| solution.fairness_score = np.var(dist_values) |
| |
| return solution |
|
|
| class EfficiencyVRP(RouteOptimizer): |
| """ |
| Efficiency Focused Strategy (Multi-Depot VRP Heuristic). |
| Goal: Minimize Total Cost irrespective of Fairness. |
| Logic: Assign each package to the NEAREST driver, then optimize that driver's route. |
| This differs from ClusterAndRoute because ClusterAndRoute forces 'n' clusters = 'n' drivers (roughly equal work). |
| EfficiencyVRP might leave some drivers IDLE if they are far away, saving fuel. |
| """ |
| |
| def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution: |
| solution = Solution() |
| DistanceCache.clear() |
| |
| if not packages or not drivers: |
| return solution |
| |
| driver_packages = {d.id: [] for d in drivers} |
| |
| |
| |
| |
| |
| for pkg in packages: |
| best_driver = None |
| best_dist = float('inf') |
| |
| for driver in drivers: |
| |
| |
| dist = DistanceCache.get_dist(graph, driver.node_id, pkg.node_id) |
| if dist < best_dist: |
| best_dist = dist |
| best_driver = driver |
| |
| if best_driver: |
| driver_packages[best_driver.id].append(pkg) |
| |
| |
| total_dist = 0 |
| dist_values = [] |
| |
| for driver in drivers: |
| my_pkgs = driver_packages[driver.id] |
| if not my_pkgs: |
| solution.routes[driver.id] = [] |
| dist_values.append(0) |
| continue |
| |
| |
| current_node = driver.node_id |
| route_nodes = [current_node] |
| unvisited = my_pkgs.copy() |
| driver_dist = 0 |
| route_pkg_ids = [] |
| |
| while unvisited: |
| best_p = None |
| best_d_val = float('inf') |
| |
| for p in unvisited: |
| d_val = DistanceCache.get_dist(graph, current_node, p.node_id) |
| if d_val < best_d_val: |
| best_d_val = d_val |
| best_p = p |
| |
| if best_p: |
| unvisited.remove(best_p) |
| current_node = best_p.node_id |
| route_nodes.append(current_node) |
| driver_dist += best_d_val |
| route_pkg_ids.append(best_p.id) |
| |
| solution.assignments[driver.id] = route_pkg_ids |
| total_dist += driver_dist |
| dist_values.append(driver_dist) |
| solution.routes[driver.id] = self._get_route_coordinates(graph, route_nodes) |
| |
| solution.total_distance = total_dist |
| if dist_values: |
| solution.fairness_score = np.var(dist_values) |
| |
| return solution |
|
|
|
|