import math import random import time from typing import Dict, List, Tuple, Optional from functools import lru_cache import matplotlib.pyplot as plt import numpy as np import pandas as pd from sklearn.cluster import KMeans from PIL import Image import io # --------------------------- # Data utils # --------------------------- def make_template_dataframe(): """Blank template users can download/fill.""" return pd.DataFrame( { "id": ["A", "B", "C"], "x": [10, -5, 15], "y": [4, -12, 8], "demand": [1, 2, 1], "tw_start": [0, 0, 0], # optional: earliest arrival (soft) "tw_end": [9999, 9999, 9999], # optional: latest arrival (soft) "service": [0, 0, 0], # optional: service time at stop } ) def parse_uploaded_csv(file) -> pd.DataFrame: df = pd.read_csv(file.name if hasattr(file, "name") else file) required = {"id", "x", "y", "demand"} missing = required - set(df.columns) if missing: raise ValueError(f"Missing required columns: {sorted(missing)}") # fill optional columns if absent if "tw_start" not in df.columns: df["tw_start"] = 0 if "tw_end" not in df.columns: df["tw_end"] = 999999 if "service" not in df.columns: df["service"] = 0 # Normalize types df["id"] = df["id"].astype(str) for col in ["x", "y", "demand", "tw_start", "tw_end", "service"]: df[col] = pd.to_numeric(df[col], errors="coerce") df = df.dropna() df.reset_index(drop=True, inplace=True) return df def generate_random_instance( n_clients=30, n_vehicles=4, capacity=10, spread=50, demand_min=1, demand_max=3, seed=42, ) -> pd.DataFrame: rng = np.random.default_rng(seed) xs = rng.uniform(-spread, spread, size=n_clients) ys = rng.uniform(-spread, spread, size=n_clients) demands = rng.integers(demand_min, demand_max + 1, size=n_clients) df = pd.DataFrame( { "id": [f"C{i+1}" for i in range(n_clients)], "x": xs, "y": ys, "demand": demands, "tw_start": np.zeros(n_clients, dtype=float), "tw_end": np.full(n_clients, 999999.0), "service": np.zeros(n_clients, dtype=float), } ) return df # Validation def validate_instance(df: pd.DataFrame, n_vehicles: int, capacity: float) -> None: """ Validate that the VRP instance is feasible and properly formatted. Raises ValueError if validation fails. """ # Check required columns required = {'x', 'y', 'demand', 'tw_start', 'tw_end', 'service'} missing = required - set(df.columns) if missing: raise ValueError(f"Missing required columns: {sorted(missing)}") # Check for empty dataframe if len(df) == 0: raise ValueError("DataFrame is empty - no clients to route") # Check for negative values in key fields if (df['demand'] < 0).any(): raise ValueError("Negative demand values detected") if (df['service'] < 0).any(): raise ValueError("Negative service time values detected") # Check time window consistency if (df['tw_start'] > df['tw_end']).any(): invalid_rows = df[df['tw_start'] > df['tw_end']] raise ValueError(f"Invalid time windows (start > end) for {len(invalid_rows)} stops") # Check feasibility: total demand vs total capacity total_demand = df['demand'].sum() total_capacity = n_vehicles * capacity if total_demand > total_capacity: raise ValueError( f"Infeasible instance: total demand ({total_demand:.1f}) " f"exceeds total capacity ({total_capacity:.1f})" ) # Check for NaN or inf values numeric_cols = ['x', 'y', 'demand', 'tw_start', 'tw_end', 'service'] if df[numeric_cols].isna().any().any(): raise ValueError("NaN values detected in numeric columns") if np.isinf(df[numeric_cols].values).any(): raise ValueError("Infinite values detected in numeric columns") # Warn if capacity is very small if capacity <= 0: raise ValueError("Vehicle capacity must be positive") if n_vehicles <= 0: raise ValueError("Number of vehicles must be positive") # Geometry / distance helpers def euclid(a: Tuple[float, float], b: Tuple[float, float]) -> float: return float(math.hypot(a[0] - b[0], a[1] - b[1])) def total_distance(points: List[Tuple[float, float]]) -> float: return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1)) def build_distance_matrix(df: pd.DataFrame, depot: Tuple[float, float]) -> np.ndarray: """ Build a distance matrix for all clients and depot. Matrix[i][j] = distance from i to j, where 0 is depot. Returns (n+1) x (n+1) matrix. """ n = len(df) points = [depot] + [(df.loc[i, 'x'], df.loc[i, 'y']) for i in range(n)] dist_matrix = np.zeros((n + 1, n + 1)) for i in range(n + 1): for j in range(i + 1, n + 1): d = euclid(points[i], points[j]) dist_matrix[i][j] = d dist_matrix[j][i] = d return dist_matrix # Clustering algorithms def sweep_clusters( df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float, ) -> List[List[int]]: """ Assign clients to vehicles by angular sweep around the depot, roughly balancing capacity (sum of 'demand'). Returns indices (row numbers) per cluster. """ dx = df["x"].values - depot[0] dy = df["y"].values - depot[1] ang = np.arctan2(dy, dx) order = np.argsort(ang) clusters: List[List[int]] = [[] for _ in range(n_vehicles)] loads = [0.0] * n_vehicles v = 0 for idx in order: d = float(df.loc[idx, "demand"]) # if adding to current vehicle exceeds capacity *by a lot*, move to next if loads[v] + d > capacity and v < n_vehicles - 1: v += 1 clusters[v].append(int(idx)) loads[v] += d return clusters def kmeans_clusters( df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float, random_state: int = 42, ) -> List[List[int]]: """ Assign clients using k-means clustering, then balance capacity. Returns indices (row numbers) per cluster. """ if len(df) == 0: return [[] for _ in range(n_vehicles)] # K-means clustering X = df[['x', 'y']].values kmeans = KMeans(n_clusters=min(n_vehicles, len(df)), random_state=random_state, n_init=10) labels = kmeans.fit_predict(X) # Group by cluster initial_clusters: List[List[int]] = [[] for _ in range(n_vehicles)] for idx, label in enumerate(labels): initial_clusters[label].append(idx) # Balance capacity: if a cluster exceeds capacity, split it balanced_clusters: List[List[int]] = [] for cluster in initial_clusters: if not cluster: continue # Sort by angle from depot for deterministic splitting cluster_sorted = sorted(cluster, key=lambda i: np.arctan2( df.loc[i, 'y'] - depot[1], df.loc[i, 'x'] - depot[0] )) current = [] current_load = 0.0 for idx in cluster_sorted: demand = float(df.loc[idx, 'demand']) if current_load + demand > capacity and current: balanced_clusters.append(current) current = [idx] current_load = demand else: current.append(idx) current_load += demand if current: balanced_clusters.append(current) # Pad with empty clusters if needed while len(balanced_clusters) < n_vehicles: balanced_clusters.append([]) return balanced_clusters[:n_vehicles] def clarke_wright_savings( df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float, ) -> List[List[int]]: """ Clarke-Wright Savings algorithm for VRP clustering. Returns indices (row numbers) per route. """ n = len(df) if n == 0: return [[] for _ in range(n_vehicles)] # Build distance matrix dist_matrix = build_distance_matrix(df, depot) # Calculate savings: s(i,j) = d(0,i) + d(0,j) - d(i,j) savings = [] for i in range(1, n + 1): for j in range(i + 1, n + 1): s = dist_matrix[0][i] + dist_matrix[0][j] - dist_matrix[i][j] savings.append((s, i - 1, j - 1)) # Convert to 0-indexed client ids # Sort by savings (descending) savings.sort(reverse=True) # Initialize: each client in its own route routes: List[List[int]] = [[i] for i in range(n)] route_loads = [float(df.loc[i, 'demand']) for i in range(n)] # Track which route each client belongs to client_to_route = {i: i for i in range(n)} # Merge routes based on savings for saving, i, j in savings: route_i = client_to_route[i] route_j = client_to_route[j] # Skip if already in same route if route_i == route_j: continue # Check capacity constraint if route_loads[route_i] + route_loads[route_j] > capacity: continue # Check if i and j are at ends of their respective routes ri = routes[route_i] rj = routes[route_j] # Only merge if they're at route ends i_at_end = (i == ri[0] or i == ri[-1]) j_at_end = (j == rj[0] or j == rj[-1]) if not (i_at_end and j_at_end): continue # Merge routes if i == ri[-1] and j == rj[0]: new_route = ri + rj elif i == ri[0] and j == rj[-1]: new_route = rj + ri elif i == ri[-1] and j == rj[-1]: new_route = ri + rj[::-1] elif i == ri[0] and j == rj[0]: new_route = ri[::-1] + rj else: continue # Update routes routes[route_i] = new_route route_loads[route_i] += route_loads[route_j] routes[route_j] = [] route_loads[route_j] = 0.0 # Update client mapping for client in rj: client_to_route[client] = route_i # Filter out empty routes and limit to n_vehicles final_routes = [r for r in routes if r] # If too many routes, merge smallest ones while len(final_routes) > n_vehicles: # Find two smallest routes that can be merged route_sizes = [(sum(df.loc[r, 'demand']), idx) for idx, r in enumerate(final_routes)] route_sizes.sort() merged = False for i in range(len(route_sizes)): for j in range(i + 1, len(route_sizes)): idx1, idx2 = route_sizes[i][1], route_sizes[j][1] if route_sizes[i][0] + route_sizes[j][0] <= capacity: final_routes[idx1].extend(final_routes[idx2]) final_routes.pop(idx2) merged = True break if merged: break if not merged: # Force merge smallest two even if over capacity idx1, idx2 = route_sizes[0][1], route_sizes[1][1] final_routes[idx1].extend(final_routes[idx2]) final_routes.pop(idx2) # Pad with empty routes if needed while len(final_routes) < n_vehicles: final_routes.append([]) return final_routes def greedy_nearest_neighbor( df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float, ) -> List[List[int]]: """ Greedy Nearest Neighbor algorithm for VRP. Build routes by always adding the nearest unvisited client. """ n = len(df) if n == 0: return [[] for _ in range(n_vehicles)] unvisited = set(range(n)) routes = [] while unvisited and len(routes) < n_vehicles: route = [] current_load = 0.0 current_pos = depot while unvisited: # Find nearest unvisited client that fits in current vehicle best_client = None best_distance = float('inf') for client_idx in unvisited: client_pos = (df.loc[client_idx, 'x'], df.loc[client_idx, 'y']) client_demand = df.loc[client_idx, 'demand'] # Check capacity constraint if current_load + client_demand <= capacity: distance = euclid(current_pos, client_pos) if distance < best_distance: best_distance = distance best_client = client_idx if best_client is None: break # No more clients fit in this vehicle # Add client to route route.append(best_client) unvisited.remove(best_client) current_load += df.loc[best_client, 'demand'] current_pos = (df.loc[best_client, 'x'], df.loc[best_client, 'y']) if route: routes.append(route) # If there are remaining unvisited clients, force them into existing routes route_idx = 0 for client_idx in list(unvisited): if route_idx < len(routes): routes[route_idx].append(client_idx) route_idx = (route_idx + 1) % len(routes) # Pad with empty routes if needed while len(routes) < n_vehicles: routes.append([]) return routes[:n_vehicles] def genetic_algorithm_vrp( df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float, population_size: int = 50, generations: int = 100, mutation_rate: float = 0.1, elite_size: int = 10, ) -> List[List[int]]: """ Genetic Algorithm for VRP. Evolves a population of solutions over multiple generations. """ n = len(df) if n == 0: return [[] for _ in range(n_vehicles)] # Individual representation: list of client indices with vehicle separators # Use -1 as vehicle separator def create_individual(): """Create a random individual (solution).""" clients = list(range(n)) random.shuffle(clients) # Insert vehicle separators randomly individual = [] separators_added = 0 for i, client in enumerate(clients): individual.append(client) # Add separator with some probability, but ensure we don't exceed n_vehicles-1 separators if (separators_added < n_vehicles - 1 and i < len(clients) - 1 and random.random() < 0.3): individual.append(-1) separators_added += 1 return individual def individual_to_routes(individual): """Convert individual to route format.""" routes = [] current_route = [] for gene in individual: if gene == -1: # Vehicle separator if current_route: routes.append(current_route) current_route = [] else: current_route.append(gene) if current_route: routes.append(current_route) # Pad with empty routes while len(routes) < n_vehicles: routes.append([]) return routes[:n_vehicles] def calculate_fitness(individual): """Calculate fitness (lower is better, so we'll use negative total distance).""" routes = individual_to_routes(individual) total_distance = 0.0 penalty = 0.0 for route in routes: if not route: continue # Check capacity constraint route_load = sum(df.loc[i, 'demand'] for i in route) if route_load > capacity: penalty += (route_load - capacity) * 1000 # Heavy penalty # Calculate route distance points = [depot] + [(df.loc[i, 'x'], df.loc[i, 'y']) for i in route] + [depot] route_distance = total_distance_func(points) total_distance += route_distance return -(total_distance + penalty) # Negative because we want to minimize def total_distance_func(points): """Calculate total distance for a sequence of points.""" return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1)) def crossover(parent1, parent2): """Order crossover (OX) for VRP.""" # Remove separators for crossover p1_clients = [g for g in parent1 if g != -1] p2_clients = [g for g in parent2 if g != -1] if len(p1_clients) < 2: return parent1[:], parent2[:] # Standard order crossover size = len(p1_clients) start, end = sorted(random.sample(range(size), 2)) child1 = [None] * size child2 = [None] * size # Copy segments child1[start:end] = p1_clients[start:end] child2[start:end] = p2_clients[start:end] # Fill remaining positions def fill_child(child, other_parent): remaining = [x for x in other_parent if x not in child] j = 0 for i in range(size): if child[i] is None: child[i] = remaining[j] j += 1 fill_child(child1, p2_clients) fill_child(child2, p1_clients) # Add separators back randomly def add_separators(child): result = [] separators_to_add = min(n_vehicles - 1, len(child) // 3) separator_positions = random.sample(range(1, len(child)), separators_to_add) for i, gene in enumerate(child): if i in separator_positions: result.append(-1) result.append(gene) return result return add_separators(child1), add_separators(child2) def mutate(individual): """Mutation: swap two random clients.""" if random.random() > mutation_rate: return individual clients = [i for i, g in enumerate(individual) if g != -1] if len(clients) < 2: return individual # Swap two random clients idx1, idx2 = random.sample(clients, 2) individual = individual[:] individual[idx1], individual[idx2] = individual[idx2], individual[idx1] return individual # Initialize population population = [create_individual() for _ in range(population_size)] # Evolution for generation in range(generations): # Calculate fitness for all individuals fitness_scores = [(individual, calculate_fitness(individual)) for individual in population] fitness_scores.sort(key=lambda x: x[1], reverse=True) # Higher fitness is better # Select elite elite = [individual for individual, _ in fitness_scores[:elite_size]] # Create new population new_population = elite[:] while len(new_population) < population_size: # Tournament selection tournament_size = 5 parent1 = max(random.sample(fitness_scores, min(tournament_size, len(fitness_scores))), key=lambda x: x[1])[0] parent2 = max(random.sample(fitness_scores, min(tournament_size, len(fitness_scores))), key=lambda x: x[1])[0] # Crossover child1, child2 = crossover(parent1, parent2) # Mutation child1 = mutate(child1) child2 = mutate(child2) new_population.extend([child1, child2]) population = new_population[:population_size] # Return best solution final_fitness = [(individual, calculate_fitness(individual)) for individual in population] best_individual = max(final_fitness, key=lambda x: x[1])[0] return individual_to_routes(best_individual) # Performance and Quality Analysis def get_algorithm_complexity(algorithm: str, n_clients: int) -> str: """ Return theoretical complexity of the algorithm. """ if algorithm == 'greedy': return f"O(n²) ≈ {n_clients**2:.0f} ops" elif algorithm == 'clarke_wright': return f"O(n³) ≈ {n_clients**3:.0f} ops" elif algorithm == 'genetic': return f"O(g×p×n) ≈ {100 * 50 * n_clients:.0f} ops (100 gen, 50 pop)" else: return "Unknown" def calculate_solution_quality_score(total_dist: float, dist_balance: float, load_balance: float, capacity_util: float) -> float: """ Calculate a composite quality score (0-100, higher is better). Considers distance efficiency, balance, and utilization. """ # Normalize components (lower CV and higher utilization are better) dist_score = max(0, 100 - (dist_balance * 100)) # Lower CV = higher score load_score = max(0, 100 - (load_balance * 100)) # Lower CV = higher score util_score = capacity_util * 100 # Higher utilization = higher score # Weighted average quality_score = (dist_score * 0.3 + load_score * 0.3 + util_score * 0.4) return round(quality_score, 1) def compare_algorithms(df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float) -> Dict: """ Run all algorithms on the same instance and compare results. """ algorithms = ['greedy', 'clarke_wright', 'genetic'] results = {} for alg in algorithms: try: result = solve_vrp(df, depot, n_vehicles, capacity, algorithm=alg) results[alg] = { 'total_distance': result['metrics']['total_distance'], 'vehicles_used': result['metrics']['vehicles_used'], 'execution_time_ms': result['performance']['total_execution_time_ms'], 'distance_balance_cv': result['metrics']['distance_balance_cv'], 'load_balance_cv': result['metrics']['load_balance_cv'], 'capacity_utilization': result['metrics']['capacity_utilization'], 'quality_score': result['metrics']['solution_quality_score'], 'clients_per_second': result['performance']['clients_per_second'], } except Exception as e: results[alg] = {'error': str(e)} # Find best performer in each category comparison = { 'results': results, 'best_distance': min(results.keys(), key=lambda k: results[k].get('total_distance', float('inf')) if 'error' not in results[k] else float('inf')), 'fastest': min(results.keys(), key=lambda k: results[k].get('execution_time_ms', float('inf')) if 'error' not in results[k] else float('inf')), 'best_balance': min(results.keys(), key=lambda k: (results[k].get('distance_balance_cv', float('inf')) + results[k].get('load_balance_cv', float('inf'))) if 'error' not in results[k] else float('inf')), 'best_quality': max(results.keys(), key=lambda k: results[k].get('quality_score', 0) if 'error' not in results[k] else 0), } return comparison def create_performance_chart(comparison_data: Dict) -> Image.Image: """ Create a performance comparison chart. """ algorithms = list(comparison_data['results'].keys()) valid_algs = [alg for alg in algorithms if 'error' not in comparison_data['results'][alg]] if not valid_algs: # Create empty chart if no valid results fig, ax = plt.subplots(figsize=(8, 6)) ax.text(0.5, 0.5, 'No valid results to display', ha='center', va='center', transform=ax.transAxes) ax.set_title('Algorithm Comparison - No Data') buf = io.BytesIO() fig.savefig(buf, format='png', bbox_inches='tight') plt.close(fig) buf.seek(0) return Image.open(buf) # Create subplots for different metrics fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10)) # Distance comparison distances = [comparison_data['results'][alg]['total_distance'] for alg in valid_algs] bars1 = ax1.bar(valid_algs, distances, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) ax1.set_title('Total Distance') ax1.set_ylabel('Distance') best_dist_alg = comparison_data['best_distance'] if best_dist_alg in valid_algs: bars1[valid_algs.index(best_dist_alg)].set_color('#d62728') # Execution time comparison times = [comparison_data['results'][alg]['execution_time_ms'] for alg in valid_algs] bars2 = ax2.bar(valid_algs, times, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) ax2.set_title('Execution Time') ax2.set_ylabel('Time (ms)') fastest_alg = comparison_data['fastest'] if fastest_alg in valid_algs: bars2[valid_algs.index(fastest_alg)].set_color('#d62728') # Quality score comparison quality_scores = [comparison_data['results'][alg]['quality_score'] for alg in valid_algs] bars3 = ax3.bar(valid_algs, quality_scores, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) ax3.set_title('Solution Quality Score') ax3.set_ylabel('Score (0-100)') best_quality_alg = comparison_data['best_quality'] if best_quality_alg in valid_algs: bars3[valid_algs.index(best_quality_alg)].set_color('#d62728') # Capacity utilization comparison utilizations = [comparison_data['results'][alg]['capacity_utilization'] for alg in valid_algs] bars4 = ax4.bar(valid_algs, utilizations, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)]) ax4.set_title('Capacity Utilization') ax4.set_ylabel('Utilization (%)') plt.tight_layout() # Convert to PIL Image buf = io.BytesIO() fig.savefig(buf, format='png', bbox_inches='tight', dpi=100) plt.close(fig) buf.seek(0) return Image.open(buf) # Route construction + 2-opt def nearest_neighbor_route( pts: List[Tuple[float, float]], start_idx: int = 0, ) -> List[int]: n = len(pts) unvisited = set(range(n)) route = [start_idx] unvisited.remove(start_idx) while unvisited: last = route[-1] nxt = min(unvisited, key=lambda j: euclid(pts[last], pts[j])) route.append(nxt) unvisited.remove(nxt) return route def two_opt(route: List[int], pts: List[Tuple[float, float]], max_iter=200) -> List[int]: best = route[:] best_len = total_distance([pts[i] for i in best]) n = len(route) improved = True it = 0 while improved and it < max_iter: improved = False it += 1 for i in range(1, n - 2): for k in range(i + 1, n - 1): new_route = best[:i] + best[i:k + 1][::-1] + best[k + 1:] new_len = total_distance([pts[i] for i in new_route]) if new_len + 1e-9 < best_len: best, best_len = new_route, new_len improved = True if improved is False: break return best def build_route_for_cluster( df: pd.DataFrame, idxs: List[int], depot: Tuple[float, float], ) -> List[int]: """ Build a TSP tour over cluster points and return client indices in visiting order. Returns client indices (not including the depot) but representing the order. """ # Local point list: depot at 0, then cluster in order pts = [depot] + [(float(df.loc[i, "x"]), float(df.loc[i, "y"])) for i in idxs] # Greedy tour over all nodes rr = nearest_neighbor_route(pts, start_idx=0) # Ensure route starts at 0 and ends at 0 conceptually; we'll remove the 0s later # Optimize with 2-opt, but keep depot fixed by converting to a path that starts at 0 rr = two_opt(rr, pts) # remove the depot index 0 from the sequence (keep order of clients) order = [idxs[i - 1] for i in rr if i != 0] return order # Solve wrapper # --------------------------- def solve_vrp( df: pd.DataFrame, depot: Tuple[float, float] = (0.0, 0.0), n_vehicles: int = 4, capacity: float = 10, speed: float = 1.0, algorithm: str = 'sweep', ) -> Dict: """ Solve VRP using specified algorithm with performance tracking. Args: df: Client data depot: Depot coordinates n_vehicles: Number of vehicles capacity: Vehicle capacity speed: Travel speed for time calculations algorithm: Clustering algorithm ('greedy', 'clarke_wright', 'genetic') Returns: { 'routes': List[List[int]] (row indices of df), 'total_distance': float, 'per_route_distance': List[float], 'assignments_table': pd.DataFrame, 'metrics': dict, 'performance': dict } """ start_time = time.time() # Validate instance validate_instance(df, n_vehicles, capacity) validation_time = time.time() - start_time # 1) cluster using selected algorithm clustering_start = time.time() if algorithm == 'greedy': clusters = greedy_nearest_neighbor(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity) elif algorithm == 'clarke_wright': clusters = clarke_wright_savings(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity) elif algorithm == 'genetic': clusters = genetic_algorithm_vrp(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity) else: raise ValueError(f"Unknown algorithm: {algorithm}. Use 'greedy', 'clarke_wright', or 'genetic'") clustering_time = time.time() - clustering_start # 2) route per cluster routing_start = time.time() routes: List[List[int]] = [] per_route_dist: List[float] = [] soft_tw_violations = 0 per_route_loads: List[float] = [] for cl in clusters: if len(cl) == 0: routes.append([]) per_route_dist.append(0.0) per_route_loads.append(0.0) continue order = build_route_for_cluster(df, cl, depot) routes.append(order) # compute distance with depot as start/end pts = [depot] + [(df.loc[i, "x"], df.loc[i, "y"]) for i in order] + [depot] dist = total_distance(pts) per_route_dist.append(dist) # capacity + soft TW check load = float(df.loc[order, "demand"].sum()) if len(order) else 0.0 per_route_loads.append(load) # simple arrival time simulation (speed distance units per time) t = 0.0 prev = depot for i in order: cur = (df.loc[i, "x"], df.loc[i, "y"]) t += euclid(prev, cur) / max(speed, 1e-9) tw_s = float(df.loc[i, "tw_start"]) tw_e = float(df.loc[i, "tw_end"]) if t < tw_s: t = tw_s # wait if t > tw_e: soft_tw_violations += 1 t += float(df.loc[i, "service"]) prev = cur # back to depot time is irrelevant for TW in this simple model routing_time = time.time() - routing_start total_dist = float(sum(per_route_dist)) # Build assignment table rows = [] for v, route in enumerate(routes): for seq, idx in enumerate(route, start=1): rows.append( { "vehicle": v + 1, "sequence": seq, "id": df.loc[idx, "id"], "x": float(df.loc[idx, "x"]), "y": float(df.loc[idx, "y"]), "demand": float(df.loc[idx, "demand"]), } ) assign_df = pd.DataFrame(rows).sort_values(["vehicle", "sequence"]).reset_index(drop=True) # Enhanced metrics active_routes = [d for d in per_route_dist if d > 0] active_loads = [l for l in per_route_loads if l > 0] # Load balancing: coefficient of variation load_balance = 0.0 if len(active_loads) > 1: load_std = np.std(active_loads) load_mean = np.mean(active_loads) load_balance = load_std / load_mean if load_mean > 0 else 0.0 # Distance balancing dist_balance = 0.0 if len(active_routes) > 1: dist_std = np.std(active_routes) dist_mean = np.mean(active_routes) dist_balance = dist_std / dist_mean if dist_mean > 0 else 0.0 # Capacity utilization total_capacity_used = sum(active_loads) total_capacity_available = capacity * len(active_loads) capacity_utilization = total_capacity_used / total_capacity_available if total_capacity_available > 0 else 0.0 total_time = time.time() - start_time # Performance metrics performance = { "total_execution_time_ms": round(total_time * 1000, 2), "validation_time_ms": round(validation_time * 1000, 2), "clustering_time_ms": round(clustering_time * 1000, 2), "routing_time_ms": round(routing_time * 1000, 2), "clients_per_second": round(len(df) / total_time, 1) if total_time > 0 else 0, "algorithm_complexity": get_algorithm_complexity(algorithm, len(df)), } metrics = { "algorithm": algorithm, "vehicles_used": int(sum(1 for r in routes if len(r) > 0)), "vehicles_available": n_vehicles, "total_distance": round(total_dist, 3), "avg_distance_per_vehicle": round(np.mean(active_routes), 3) if active_routes else 0.0, "max_distance": round(max(active_routes), 3) if active_routes else 0.0, "min_distance": round(min(active_routes), 3) if active_routes else 0.0, "distance_balance_cv": round(dist_balance, 3), "per_route_distance": [round(d, 3) for d in per_route_dist], "per_route_load": [round(l, 3) for l in per_route_loads], "avg_load_per_vehicle": round(np.mean(active_loads), 3) if active_loads else 0.0, "load_balance_cv": round(load_balance, 3), "capacity_utilization": round(capacity_utilization * 100, 1), "capacity": capacity, "soft_time_window_violations": int(soft_tw_violations), "total_clients": len(df), "solution_quality_score": calculate_solution_quality_score(total_dist, dist_balance, load_balance, capacity_utilization), "note": f"Heuristic solution ({algorithm} → greedy → 2-opt). TW are soft. Lower CV = better balance.", } return { "routes": routes, "total_distance": total_dist, "per_route_distance": per_route_dist, "assignments_table": assign_df, "metrics": metrics, "performance": performance, } # --------------------------- # Visualization # --------------------------- def plot_solution( df: pd.DataFrame, sol: Dict, depot: Tuple[float, float] = (0.0, 0.0), ): routes = sol["routes"] fig, ax = plt.subplots(figsize=(7.5, 6.5)) ax.scatter([depot[0]], [depot[1]], s=120, marker="s", label="Depot", zorder=5) # color cycle colors = plt.rcParams["axes.prop_cycle"].by_key().get( "color", ["C0","C1","C2","C3","C4","C5"] ) for v, route in enumerate(routes): if not route: continue c = colors[v % len(colors)] xs = [depot[0]] + [float(df.loc[i, "x"]) for i in route] + [depot[0]] ys = [depot[1]] + [float(df.loc[i, "y"]) for i in route] + [depot[1]] ax.plot(xs, ys, "-", lw=2, color=c, alpha=0.9, label=f"Vehicle {v+1}") ax.scatter(xs[1:-1], ys[1:-1], s=36, color=c, zorder=4) # label sequence numbers lightly for k, idx in enumerate(route, start=1): ax.text( float(df.loc[idx, "x"]), float(df.loc[idx, "y"]), str(k), fontsize=8, ha="center", va="center", color="white", bbox=dict(boxstyle="circle,pad=0.2", fc=c, ec="none", alpha=0.7), ) ax.set_title("Ride-Sharing / CVRP Routes (Heuristic)") ax.set_xlabel("X") ax.set_ylabel("Y") ax.grid(True, alpha=0.25) ax.legend(loc="best", fontsize=8, framealpha=0.9) ax.set_aspect("equal", adjustable="box") # ✅ Convert Matplotlib figure → PIL.Image for Gradio buf = io.BytesIO() fig.savefig(buf, format="png", bbox_inches="tight") plt.close(fig) buf.seek(0) return Image.open(buf)