MouleeswaranM's picture
Upload folder using huggingface_hub
fcf8749 verified
raw
history blame
11.5 kB
import abc
import numpy as np
import networkx as nx
try:
import osmnx as ox
except ImportError:
ox = None
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass, field
from models import Driver, Package
@dataclass
class Solution:
"""Stores the result of the optimization."""
assignments: Dict[str, List[str]] = field(default_factory=dict) # Driver ID -> List of Package IDs
routes: Dict[str, List[Tuple[float, float]]] = field(default_factory=dict) # Driver ID -> List of (lat, lon) coordinates for the path
total_distance: float = 0.0
fairness_score: float = 0.0 # Variance in route distance
class DistanceCache:
"""Simple cache for network distances to avoid re-computation."""
_cache = {}
@classmethod
def get_dist(cls, graph, source, target):
key = (source, target)
if key in cls._cache:
return cls._cache[key]
try:
val = nx.shortest_path_length(graph, source, target, weight='length')
cls._cache[key] = val
return val
except nx.NetworkXNoPath:
return float('inf')
@classmethod
def clear(cls):
cls._cache = {}
class RouteOptimizer(abc.ABC):
"""Abstract Base Class for Route Optimization Strategies."""
@abc.abstractmethod
def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution:
"""Calculate optimal routes."""
pass
def _get_path_length(self, graph, path_nodes):
"""Calculate total length of a path in meters."""
length = 0
for i in range(len(path_nodes) - 1):
if path_nodes[i] == path_nodes[i+1]: continue
length += DistanceCache.get_dist(graph, path_nodes[i], path_nodes[i+1])
return length
def _get_route_coordinates(self, graph, path_nodes):
"""Convert list of node IDs to (lat, lon) coordinates."""
coords = []
for i in range(len(path_nodes)):
# If we need the actual path between nodes, we must compute it
# For visualization, we DO want the full path
if i < len(path_nodes) - 1 and path_nodes[i] != path_nodes[i+1]:
try:
segment = nx.shortest_path(graph, path_nodes[i], path_nodes[i+1], weight='length')
# Don't add the last point to avoid duplicates with next segment start
for node in segment[:-1]:
coords.append((graph.nodes[node]['y'], graph.nodes[node]['x']))
except nx.NetworkXNoPath:
pass
else:
# Last point
node = path_nodes[i]
coords.append((graph.nodes[node]['y'], graph.nodes[node]['x']))
# Ensure last point is added
if path_nodes:
last = path_nodes[-1]
coords.append((graph.nodes[last]['y'], graph.nodes[last]['x']))
return coords
class SimpleNearestNeighbor(RouteOptimizer):
"""Greedy strategy that assigns nearest available package to driver."""
def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution:
solution = Solution()
DistanceCache.clear() # Clear cache for new run
unassigned_packages = packages.copy()
driver_paths = {d.id: [d.node_id] for d in drivers}
driver_distances = {d.id: 0.0 for d in drivers}
while unassigned_packages:
for driver in drivers:
if not unassigned_packages:
break
current_node = driver_paths[driver.id][-1]
best_pkg = None
best_dist = float('inf')
for pkg in unassigned_packages:
dist = DistanceCache.get_dist(graph, current_node, pkg.node_id)
if dist < best_dist:
best_dist = dist
best_pkg = pkg
if best_pkg:
driver_paths[driver.id].append(best_pkg.node_id)
driver_distances[driver.id] += best_dist
solution.assignments.setdefault(driver.id, []).append(best_pkg.id)
unassigned_packages.remove(best_pkg)
for driver in drivers:
solution.routes[driver.id] = self._get_route_coordinates(graph, driver_paths[driver.id])
solution.total_distance = sum(driver_distances.values())
if driver_distances:
solution.fairness_score = np.var(list(driver_distances.values()))
return solution
class ClusterAndRoute(RouteOptimizer):
"""K-Means Clustering + TSP for balanced zones."""
def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution:
try:
from sklearn.cluster import KMeans
except ImportError:
print("sklearn not found, using Mock/Fallback or failing gracefully.")
return Solution() # Fallback
solution = Solution()
DistanceCache.clear()
if not packages or not drivers:
return solution
# 1. Cluster packages
pkg_coords = np.array([[p.lat, p.lon] for p in packages])
n_clusters = min(len(packages), len(drivers))
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(pkg_coords)
cluster_map = {i: [] for i in range(n_clusters)}
for idx, label in enumerate(labels):
cluster_map[label].append(packages[idx])
# 2. Match drivers to clusters (Greedy by Centroid)
available_clusters = list(cluster_map.keys())
driver_assignments = {}
for driver in drivers:
if not available_clusters:
break
best_c_idx = -1
best_dist = float('inf')
for c_idx in available_clusters:
pts = cluster_map[c_idx]
if not pts: continue
avg_lat = np.mean([p.lat for p in pts])
avg_lon = np.mean([p.lon for p in pts])
# Euclidean dist
d = (driver.lat - avg_lat)**2 + (driver.lon - avg_lon)**2
if d < best_dist:
best_dist = d
best_c_idx = c_idx
if best_c_idx != -1:
driver_assignments[driver.id] = cluster_map[best_c_idx]
available_clusters.remove(best_c_idx)
# 3. Route within clusters (Nearest Neighbor TSP)
total_dist = 0
dist_values = []
for driver_id, pkgs in driver_assignments.items():
driver = next(d for d in drivers if d.id == driver_id)
current_node = driver.node_id
route_nodes = [current_node]
unvisited = pkgs.copy()
driver_dist = 0
route_pkg_ids = []
while unvisited:
best_p = None
best_d_val = float('inf')
for p in unvisited:
d_val = DistanceCache.get_dist(graph, current_node, p.node_id)
if d_val < best_d_val:
best_d_val = d_val
best_p = p
if best_p:
unvisited.remove(best_p)
current_node = best_p.node_id
route_nodes.append(current_node)
driver_dist += best_d_val
route_pkg_ids.append(best_p.id)
solution.assignments[driver_id] = route_pkg_ids
total_dist += driver_dist
dist_values.append(driver_dist)
solution.routes[driver_id] = self._get_route_coordinates(graph, route_nodes)
solution.total_distance = total_dist
if dist_values:
solution.fairness_score = np.var(dist_values)
return solution
class EfficiencyVRP(RouteOptimizer):
"""
Efficiency Focused Strategy (Multi-Depot VRP Heuristic).
Goal: Minimize Total Cost irrespective of Fairness.
Logic: Assign each package to the NEAREST driver, then optimize that driver's route.
This differs from ClusterAndRoute because ClusterAndRoute forces 'n' clusters = 'n' drivers (roughly equal work).
EfficiencyVRP might leave some drivers IDLE if they are far away, saving fuel.
"""
def solve(self, drivers: List[Driver], packages: List[Package], graph: Any) -> Solution:
solution = Solution()
DistanceCache.clear()
if not packages or not drivers:
return solution
driver_packages = {d.id: [] for d in drivers}
# 1. Assignment Phase: Assign each package to the closest driver (by road dist)
# This minimizes the "stem distance" and generally creates tight local routes.
# This disregards load balancing (fairness).
for pkg in packages:
best_driver = None
best_dist = float('inf')
for driver in drivers:
# To speed up, check Euclidean first?
# Prompts ask for optimal, so cache handles the network dist
dist = DistanceCache.get_dist(graph, driver.node_id, pkg.node_id)
if dist < best_dist:
best_dist = dist
best_driver = driver
if best_driver:
driver_packages[best_driver.id].append(pkg)
# 2. Routing Phase: Optimize each driver's assigned pile
total_dist = 0
dist_values = []
for driver in drivers:
my_pkgs = driver_packages[driver.id]
if not my_pkgs:
solution.routes[driver.id] = []
dist_values.append(0)
continue
# TSP solving for this driver
current_node = driver.node_id
route_nodes = [current_node]
unvisited = my_pkgs.copy()
driver_dist = 0
route_pkg_ids = []
while unvisited:
best_p = None
best_d_val = float('inf')
for p in unvisited:
d_val = DistanceCache.get_dist(graph, current_node, p.node_id)
if d_val < best_d_val:
best_d_val = d_val
best_p = p
if best_p:
unvisited.remove(best_p)
current_node = best_p.node_id
route_nodes.append(current_node)
driver_dist += best_d_val
route_pkg_ids.append(best_p.id)
solution.assignments[driver.id] = route_pkg_ids
total_dist += driver_dist
dist_values.append(driver_dist)
solution.routes[driver.id] = self._get_route_coordinates(graph, route_nodes)
solution.total_distance = total_dist
if dist_values:
solution.fairness_score = np.var(dist_values)
return solution