CA / solver.py
Kshitij2604's picture
Upload 5 files
1236727 verified
import math
import random
import time
from typing import Dict, List, Tuple, Optional
from functools import lru_cache
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from PIL import Image
import io
# ---------------------------
# Data utils
# ---------------------------
def make_template_dataframe():
"""Blank template users can download/fill."""
return pd.DataFrame(
{
"id": ["A", "B", "C"],
"x": [10, -5, 15],
"y": [4, -12, 8],
"demand": [1, 2, 1],
"tw_start": [0, 0, 0], # optional: earliest arrival (soft)
"tw_end": [9999, 9999, 9999], # optional: latest arrival (soft)
"service": [0, 0, 0], # optional: service time at stop
}
)
def parse_uploaded_csv(file) -> pd.DataFrame:
df = pd.read_csv(file.name if hasattr(file, "name") else file)
required = {"id", "x", "y", "demand"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {sorted(missing)}")
# fill optional columns if absent
if "tw_start" not in df.columns:
df["tw_start"] = 0
if "tw_end" not in df.columns:
df["tw_end"] = 999999
if "service" not in df.columns:
df["service"] = 0
# Normalize types
df["id"] = df["id"].astype(str)
for col in ["x", "y", "demand", "tw_start", "tw_end", "service"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df = df.dropna()
df.reset_index(drop=True, inplace=True)
return df
def generate_random_instance(
n_clients=30,
n_vehicles=4,
capacity=10,
spread=50,
demand_min=1,
demand_max=3,
seed=42,
) -> pd.DataFrame:
rng = np.random.default_rng(seed)
xs = rng.uniform(-spread, spread, size=n_clients)
ys = rng.uniform(-spread, spread, size=n_clients)
demands = rng.integers(demand_min, demand_max + 1, size=n_clients)
df = pd.DataFrame(
{
"id": [f"C{i+1}" for i in range(n_clients)],
"x": xs,
"y": ys,
"demand": demands,
"tw_start": np.zeros(n_clients, dtype=float),
"tw_end": np.full(n_clients, 999999.0),
"service": np.zeros(n_clients, dtype=float),
}
)
return df
# Validation
def validate_instance(df: pd.DataFrame, n_vehicles: int, capacity: float) -> None:
"""
Validate that the VRP instance is feasible and properly formatted.
Raises ValueError if validation fails.
"""
# Check required columns
required = {'x', 'y', 'demand', 'tw_start', 'tw_end', 'service'}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {sorted(missing)}")
# Check for empty dataframe
if len(df) == 0:
raise ValueError("DataFrame is empty - no clients to route")
# Check for negative values in key fields
if (df['demand'] < 0).any():
raise ValueError("Negative demand values detected")
if (df['service'] < 0).any():
raise ValueError("Negative service time values detected")
# Check time window consistency
if (df['tw_start'] > df['tw_end']).any():
invalid_rows = df[df['tw_start'] > df['tw_end']]
raise ValueError(f"Invalid time windows (start > end) for {len(invalid_rows)} stops")
# Check feasibility: total demand vs total capacity
total_demand = df['demand'].sum()
total_capacity = n_vehicles * capacity
if total_demand > total_capacity:
raise ValueError(
f"Infeasible instance: total demand ({total_demand:.1f}) "
f"exceeds total capacity ({total_capacity:.1f})"
)
# Check for NaN or inf values
numeric_cols = ['x', 'y', 'demand', 'tw_start', 'tw_end', 'service']
if df[numeric_cols].isna().any().any():
raise ValueError("NaN values detected in numeric columns")
if np.isinf(df[numeric_cols].values).any():
raise ValueError("Infinite values detected in numeric columns")
# Warn if capacity is very small
if capacity <= 0:
raise ValueError("Vehicle capacity must be positive")
if n_vehicles <= 0:
raise ValueError("Number of vehicles must be positive")
# Geometry / distance helpers
def euclid(a: Tuple[float, float], b: Tuple[float, float]) -> float:
return float(math.hypot(a[0] - b[0], a[1] - b[1]))
def total_distance(points: List[Tuple[float, float]]) -> float:
return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1))
def build_distance_matrix(df: pd.DataFrame, depot: Tuple[float, float]) -> np.ndarray:
"""
Build a distance matrix for all clients and depot.
Matrix[i][j] = distance from i to j, where 0 is depot.
Returns (n+1) x (n+1) matrix.
"""
n = len(df)
points = [depot] + [(df.loc[i, 'x'], df.loc[i, 'y']) for i in range(n)]
dist_matrix = np.zeros((n + 1, n + 1))
for i in range(n + 1):
for j in range(i + 1, n + 1):
d = euclid(points[i], points[j])
dist_matrix[i][j] = d
dist_matrix[j][i] = d
return dist_matrix
# Clustering algorithms
def sweep_clusters(
df: pd.DataFrame,
depot: Tuple[float, float],
n_vehicles: int,
capacity: float,
) -> List[List[int]]:
"""
Assign clients to vehicles by angular sweep around the depot, roughly balancing
capacity (sum of 'demand').
Returns indices (row numbers) per cluster.
"""
dx = df["x"].values - depot[0]
dy = df["y"].values - depot[1]
ang = np.arctan2(dy, dx)
order = np.argsort(ang)
clusters: List[List[int]] = [[] for _ in range(n_vehicles)]
loads = [0.0] * n_vehicles
v = 0
for idx in order:
d = float(df.loc[idx, "demand"])
# if adding to current vehicle exceeds capacity *by a lot*, move to next
if loads[v] + d > capacity and v < n_vehicles - 1:
v += 1
clusters[v].append(int(idx))
loads[v] += d
return clusters
def kmeans_clusters(
df: pd.DataFrame,
depot: Tuple[float, float],
n_vehicles: int,
capacity: float,
random_state: int = 42,
) -> List[List[int]]:
"""
Assign clients using k-means clustering, then balance capacity.
Returns indices (row numbers) per cluster.
"""
if len(df) == 0:
return [[] for _ in range(n_vehicles)]
# K-means clustering
X = df[['x', 'y']].values
kmeans = KMeans(n_clusters=min(n_vehicles, len(df)), random_state=random_state, n_init=10)
labels = kmeans.fit_predict(X)
# Group by cluster
initial_clusters: List[List[int]] = [[] for _ in range(n_vehicles)]
for idx, label in enumerate(labels):
initial_clusters[label].append(idx)
# Balance capacity: if a cluster exceeds capacity, split it
balanced_clusters: List[List[int]] = []
for cluster in initial_clusters:
if not cluster:
continue
# Sort by angle from depot for deterministic splitting
cluster_sorted = sorted(cluster, key=lambda i: np.arctan2(
df.loc[i, 'y'] - depot[1],
df.loc[i, 'x'] - depot[0]
))
current = []
current_load = 0.0
for idx in cluster_sorted:
demand = float(df.loc[idx, 'demand'])
if current_load + demand > capacity and current:
balanced_clusters.append(current)
current = [idx]
current_load = demand
else:
current.append(idx)
current_load += demand
if current:
balanced_clusters.append(current)
# Pad with empty clusters if needed
while len(balanced_clusters) < n_vehicles:
balanced_clusters.append([])
return balanced_clusters[:n_vehicles]
def clarke_wright_savings(
df: pd.DataFrame,
depot: Tuple[float, float],
n_vehicles: int,
capacity: float,
) -> List[List[int]]:
"""
Clarke-Wright Savings algorithm for VRP clustering.
Returns indices (row numbers) per route.
"""
n = len(df)
if n == 0:
return [[] for _ in range(n_vehicles)]
# Build distance matrix
dist_matrix = build_distance_matrix(df, depot)
# Calculate savings: s(i,j) = d(0,i) + d(0,j) - d(i,j)
savings = []
for i in range(1, n + 1):
for j in range(i + 1, n + 1):
s = dist_matrix[0][i] + dist_matrix[0][j] - dist_matrix[i][j]
savings.append((s, i - 1, j - 1)) # Convert to 0-indexed client ids
# Sort by savings (descending)
savings.sort(reverse=True)
# Initialize: each client in its own route
routes: List[List[int]] = [[i] for i in range(n)]
route_loads = [float(df.loc[i, 'demand']) for i in range(n)]
# Track which route each client belongs to
client_to_route = {i: i for i in range(n)}
# Merge routes based on savings
for saving, i, j in savings:
route_i = client_to_route[i]
route_j = client_to_route[j]
# Skip if already in same route
if route_i == route_j:
continue
# Check capacity constraint
if route_loads[route_i] + route_loads[route_j] > capacity:
continue
# Check if i and j are at ends of their respective routes
ri = routes[route_i]
rj = routes[route_j]
# Only merge if they're at route ends
i_at_end = (i == ri[0] or i == ri[-1])
j_at_end = (j == rj[0] or j == rj[-1])
if not (i_at_end and j_at_end):
continue
# Merge routes
if i == ri[-1] and j == rj[0]:
new_route = ri + rj
elif i == ri[0] and j == rj[-1]:
new_route = rj + ri
elif i == ri[-1] and j == rj[-1]:
new_route = ri + rj[::-1]
elif i == ri[0] and j == rj[0]:
new_route = ri[::-1] + rj
else:
continue
# Update routes
routes[route_i] = new_route
route_loads[route_i] += route_loads[route_j]
routes[route_j] = []
route_loads[route_j] = 0.0
# Update client mapping
for client in rj:
client_to_route[client] = route_i
# Filter out empty routes and limit to n_vehicles
final_routes = [r for r in routes if r]
# If too many routes, merge smallest ones
while len(final_routes) > n_vehicles:
# Find two smallest routes that can be merged
route_sizes = [(sum(df.loc[r, 'demand']), idx) for idx, r in enumerate(final_routes)]
route_sizes.sort()
merged = False
for i in range(len(route_sizes)):
for j in range(i + 1, len(route_sizes)):
idx1, idx2 = route_sizes[i][1], route_sizes[j][1]
if route_sizes[i][0] + route_sizes[j][0] <= capacity:
final_routes[idx1].extend(final_routes[idx2])
final_routes.pop(idx2)
merged = True
break
if merged:
break
if not merged:
# Force merge smallest two even if over capacity
idx1, idx2 = route_sizes[0][1], route_sizes[1][1]
final_routes[idx1].extend(final_routes[idx2])
final_routes.pop(idx2)
# Pad with empty routes if needed
while len(final_routes) < n_vehicles:
final_routes.append([])
return final_routes
def greedy_nearest_neighbor(
df: pd.DataFrame,
depot: Tuple[float, float],
n_vehicles: int,
capacity: float,
) -> List[List[int]]:
"""
Greedy Nearest Neighbor algorithm for VRP.
Build routes by always adding the nearest unvisited client.
"""
n = len(df)
if n == 0:
return [[] for _ in range(n_vehicles)]
unvisited = set(range(n))
routes = []
while unvisited and len(routes) < n_vehicles:
route = []
current_load = 0.0
current_pos = depot
while unvisited:
# Find nearest unvisited client that fits in current vehicle
best_client = None
best_distance = float('inf')
for client_idx in unvisited:
client_pos = (df.loc[client_idx, 'x'], df.loc[client_idx, 'y'])
client_demand = df.loc[client_idx, 'demand']
# Check capacity constraint
if current_load + client_demand <= capacity:
distance = euclid(current_pos, client_pos)
if distance < best_distance:
best_distance = distance
best_client = client_idx
if best_client is None:
break # No more clients fit in this vehicle
# Add client to route
route.append(best_client)
unvisited.remove(best_client)
current_load += df.loc[best_client, 'demand']
current_pos = (df.loc[best_client, 'x'], df.loc[best_client, 'y'])
if route:
routes.append(route)
# If there are remaining unvisited clients, force them into existing routes
route_idx = 0
for client_idx in list(unvisited):
if route_idx < len(routes):
routes[route_idx].append(client_idx)
route_idx = (route_idx + 1) % len(routes)
# Pad with empty routes if needed
while len(routes) < n_vehicles:
routes.append([])
return routes[:n_vehicles]
def genetic_algorithm_vrp(
df: pd.DataFrame,
depot: Tuple[float, float],
n_vehicles: int,
capacity: float,
population_size: int = 50,
generations: int = 100,
mutation_rate: float = 0.1,
elite_size: int = 10,
) -> List[List[int]]:
"""
Genetic Algorithm for VRP.
Evolves a population of solutions over multiple generations.
"""
n = len(df)
if n == 0:
return [[] for _ in range(n_vehicles)]
# Individual representation: list of client indices with vehicle separators
# Use -1 as vehicle separator
def create_individual():
"""Create a random individual (solution)."""
clients = list(range(n))
random.shuffle(clients)
# Insert vehicle separators randomly
individual = []
separators_added = 0
for i, client in enumerate(clients):
individual.append(client)
# Add separator with some probability, but ensure we don't exceed n_vehicles-1 separators
if (separators_added < n_vehicles - 1 and
i < len(clients) - 1 and
random.random() < 0.3):
individual.append(-1)
separators_added += 1
return individual
def individual_to_routes(individual):
"""Convert individual to route format."""
routes = []
current_route = []
for gene in individual:
if gene == -1: # Vehicle separator
if current_route:
routes.append(current_route)
current_route = []
else:
current_route.append(gene)
if current_route:
routes.append(current_route)
# Pad with empty routes
while len(routes) < n_vehicles:
routes.append([])
return routes[:n_vehicles]
def calculate_fitness(individual):
"""Calculate fitness (lower is better, so we'll use negative total distance)."""
routes = individual_to_routes(individual)
total_distance = 0.0
penalty = 0.0
for route in routes:
if not route:
continue
# Check capacity constraint
route_load = sum(df.loc[i, 'demand'] for i in route)
if route_load > capacity:
penalty += (route_load - capacity) * 1000 # Heavy penalty
# Calculate route distance
points = [depot] + [(df.loc[i, 'x'], df.loc[i, 'y']) for i in route] + [depot]
route_distance = total_distance_func(points)
total_distance += route_distance
return -(total_distance + penalty) # Negative because we want to minimize
def total_distance_func(points):
"""Calculate total distance for a sequence of points."""
return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1))
def crossover(parent1, parent2):
"""Order crossover (OX) for VRP."""
# Remove separators for crossover
p1_clients = [g for g in parent1 if g != -1]
p2_clients = [g for g in parent2 if g != -1]
if len(p1_clients) < 2:
return parent1[:], parent2[:]
# Standard order crossover
size = len(p1_clients)
start, end = sorted(random.sample(range(size), 2))
child1 = [None] * size
child2 = [None] * size
# Copy segments
child1[start:end] = p1_clients[start:end]
child2[start:end] = p2_clients[start:end]
# Fill remaining positions
def fill_child(child, other_parent):
remaining = [x for x in other_parent if x not in child]
j = 0
for i in range(size):
if child[i] is None:
child[i] = remaining[j]
j += 1
fill_child(child1, p2_clients)
fill_child(child2, p1_clients)
# Add separators back randomly
def add_separators(child):
result = []
separators_to_add = min(n_vehicles - 1, len(child) // 3)
separator_positions = random.sample(range(1, len(child)), separators_to_add)
for i, gene in enumerate(child):
if i in separator_positions:
result.append(-1)
result.append(gene)
return result
return add_separators(child1), add_separators(child2)
def mutate(individual):
"""Mutation: swap two random clients."""
if random.random() > mutation_rate:
return individual
clients = [i for i, g in enumerate(individual) if g != -1]
if len(clients) < 2:
return individual
# Swap two random clients
idx1, idx2 = random.sample(clients, 2)
individual = individual[:]
individual[idx1], individual[idx2] = individual[idx2], individual[idx1]
return individual
# Initialize population
population = [create_individual() for _ in range(population_size)]
# Evolution
for generation in range(generations):
# Calculate fitness for all individuals
fitness_scores = [(individual, calculate_fitness(individual)) for individual in population]
fitness_scores.sort(key=lambda x: x[1], reverse=True) # Higher fitness is better
# Select elite
elite = [individual for individual, _ in fitness_scores[:elite_size]]
# Create new population
new_population = elite[:]
while len(new_population) < population_size:
# Tournament selection
tournament_size = 5
parent1 = max(random.sample(fitness_scores, min(tournament_size, len(fitness_scores))),
key=lambda x: x[1])[0]
parent2 = max(random.sample(fitness_scores, min(tournament_size, len(fitness_scores))),
key=lambda x: x[1])[0]
# Crossover
child1, child2 = crossover(parent1, parent2)
# Mutation
child1 = mutate(child1)
child2 = mutate(child2)
new_population.extend([child1, child2])
population = new_population[:population_size]
# Return best solution
final_fitness = [(individual, calculate_fitness(individual)) for individual in population]
best_individual = max(final_fitness, key=lambda x: x[1])[0]
return individual_to_routes(best_individual)
# Performance and Quality Analysis
def get_algorithm_complexity(algorithm: str, n_clients: int) -> str:
"""
Return theoretical complexity of the algorithm.
"""
if algorithm == 'greedy':
return f"O(n²) ≈ {n_clients**2:.0f} ops"
elif algorithm == 'clarke_wright':
return f"O(n³) ≈ {n_clients**3:.0f} ops"
elif algorithm == 'genetic':
return f"O(g×p×n) ≈ {100 * 50 * n_clients:.0f} ops (100 gen, 50 pop)"
else:
return "Unknown"
def calculate_solution_quality_score(total_dist: float, dist_balance: float, load_balance: float, capacity_util: float) -> float:
"""
Calculate a composite quality score (0-100, higher is better).
Considers distance efficiency, balance, and utilization.
"""
# Normalize components (lower CV and higher utilization are better)
dist_score = max(0, 100 - (dist_balance * 100)) # Lower CV = higher score
load_score = max(0, 100 - (load_balance * 100)) # Lower CV = higher score
util_score = capacity_util * 100 # Higher utilization = higher score
# Weighted average
quality_score = (dist_score * 0.3 + load_score * 0.3 + util_score * 0.4)
return round(quality_score, 1)
def compare_algorithms(df: pd.DataFrame, depot: Tuple[float, float], n_vehicles: int, capacity: float) -> Dict:
"""
Run all algorithms on the same instance and compare results.
"""
algorithms = ['greedy', 'clarke_wright', 'genetic']
results = {}
for alg in algorithms:
try:
result = solve_vrp(df, depot, n_vehicles, capacity, algorithm=alg)
results[alg] = {
'total_distance': result['metrics']['total_distance'],
'vehicles_used': result['metrics']['vehicles_used'],
'execution_time_ms': result['performance']['total_execution_time_ms'],
'distance_balance_cv': result['metrics']['distance_balance_cv'],
'load_balance_cv': result['metrics']['load_balance_cv'],
'capacity_utilization': result['metrics']['capacity_utilization'],
'quality_score': result['metrics']['solution_quality_score'],
'clients_per_second': result['performance']['clients_per_second'],
}
except Exception as e:
results[alg] = {'error': str(e)}
# Find best performer in each category
comparison = {
'results': results,
'best_distance': min(results.keys(), key=lambda k: results[k].get('total_distance', float('inf')) if 'error' not in results[k] else float('inf')),
'fastest': min(results.keys(), key=lambda k: results[k].get('execution_time_ms', float('inf')) if 'error' not in results[k] else float('inf')),
'best_balance': min(results.keys(), key=lambda k: (results[k].get('distance_balance_cv', float('inf')) + results[k].get('load_balance_cv', float('inf'))) if 'error' not in results[k] else float('inf')),
'best_quality': max(results.keys(), key=lambda k: results[k].get('quality_score', 0) if 'error' not in results[k] else 0),
}
return comparison
def create_performance_chart(comparison_data: Dict) -> Image.Image:
"""
Create a performance comparison chart.
"""
algorithms = list(comparison_data['results'].keys())
valid_algs = [alg for alg in algorithms if 'error' not in comparison_data['results'][alg]]
if not valid_algs:
# Create empty chart if no valid results
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, 'No valid results to display', ha='center', va='center', transform=ax.transAxes)
ax.set_title('Algorithm Comparison - No Data')
buf = io.BytesIO()
fig.savefig(buf, format='png', bbox_inches='tight')
plt.close(fig)
buf.seek(0)
return Image.open(buf)
# Create subplots for different metrics
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 10))
# Distance comparison
distances = [comparison_data['results'][alg]['total_distance'] for alg in valid_algs]
bars1 = ax1.bar(valid_algs, distances, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)])
ax1.set_title('Total Distance')
ax1.set_ylabel('Distance')
best_dist_alg = comparison_data['best_distance']
if best_dist_alg in valid_algs:
bars1[valid_algs.index(best_dist_alg)].set_color('#d62728')
# Execution time comparison
times = [comparison_data['results'][alg]['execution_time_ms'] for alg in valid_algs]
bars2 = ax2.bar(valid_algs, times, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)])
ax2.set_title('Execution Time')
ax2.set_ylabel('Time (ms)')
fastest_alg = comparison_data['fastest']
if fastest_alg in valid_algs:
bars2[valid_algs.index(fastest_alg)].set_color('#d62728')
# Quality score comparison
quality_scores = [comparison_data['results'][alg]['quality_score'] for alg in valid_algs]
bars3 = ax3.bar(valid_algs, quality_scores, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)])
ax3.set_title('Solution Quality Score')
ax3.set_ylabel('Score (0-100)')
best_quality_alg = comparison_data['best_quality']
if best_quality_alg in valid_algs:
bars3[valid_algs.index(best_quality_alg)].set_color('#d62728')
# Capacity utilization comparison
utilizations = [comparison_data['results'][alg]['capacity_utilization'] for alg in valid_algs]
bars4 = ax4.bar(valid_algs, utilizations, color=['#1f77b4', '#ff7f0e', '#2ca02c'][:len(valid_algs)])
ax4.set_title('Capacity Utilization')
ax4.set_ylabel('Utilization (%)')
plt.tight_layout()
# Convert to PIL Image
buf = io.BytesIO()
fig.savefig(buf, format='png', bbox_inches='tight', dpi=100)
plt.close(fig)
buf.seek(0)
return Image.open(buf)
# Route construction + 2-opt
def nearest_neighbor_route(
pts: List[Tuple[float, float]],
start_idx: int = 0,
) -> List[int]:
n = len(pts)
unvisited = set(range(n))
route = [start_idx]
unvisited.remove(start_idx)
while unvisited:
last = route[-1]
nxt = min(unvisited, key=lambda j: euclid(pts[last], pts[j]))
route.append(nxt)
unvisited.remove(nxt)
return route
def two_opt(route: List[int], pts: List[Tuple[float, float]], max_iter=200) -> List[int]:
best = route[:]
best_len = total_distance([pts[i] for i in best])
n = len(route)
improved = True
it = 0
while improved and it < max_iter:
improved = False
it += 1
for i in range(1, n - 2):
for k in range(i + 1, n - 1):
new_route = best[:i] + best[i:k + 1][::-1] + best[k + 1:]
new_len = total_distance([pts[i] for i in new_route])
if new_len + 1e-9 < best_len:
best, best_len = new_route, new_len
improved = True
if improved is False:
break
return best
def build_route_for_cluster(
df: pd.DataFrame,
idxs: List[int],
depot: Tuple[float, float],
) -> List[int]:
"""
Build a TSP tour over cluster points and return client indices in visiting order.
Returns client indices (not including the depot) but representing the order.
"""
# Local point list: depot at 0, then cluster in order
pts = [depot] + [(float(df.loc[i, "x"]), float(df.loc[i, "y"])) for i in idxs]
# Greedy tour over all nodes
rr = nearest_neighbor_route(pts, start_idx=0)
# Ensure route starts at 0 and ends at 0 conceptually; we'll remove the 0s later
# Optimize with 2-opt, but keep depot fixed by converting to a path that starts at 0
rr = two_opt(rr, pts)
# remove the depot index 0 from the sequence (keep order of clients)
order = [idxs[i - 1] for i in rr if i != 0]
return order
# Solve wrapper
# ---------------------------
def solve_vrp(
df: pd.DataFrame,
depot: Tuple[float, float] = (0.0, 0.0),
n_vehicles: int = 4,
capacity: float = 10,
speed: float = 1.0,
algorithm: str = 'sweep',
) -> Dict:
"""
Solve VRP using specified algorithm with performance tracking.
Args:
df: Client data
depot: Depot coordinates
n_vehicles: Number of vehicles
capacity: Vehicle capacity
speed: Travel speed for time calculations
algorithm: Clustering algorithm ('greedy', 'clarke_wright', 'genetic')
Returns:
{
'routes': List[List[int]] (row indices of df),
'total_distance': float,
'per_route_distance': List[float],
'assignments_table': pd.DataFrame,
'metrics': dict,
'performance': dict
}
"""
start_time = time.time()
# Validate instance
validate_instance(df, n_vehicles, capacity)
validation_time = time.time() - start_time
# 1) cluster using selected algorithm
clustering_start = time.time()
if algorithm == 'greedy':
clusters = greedy_nearest_neighbor(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity)
elif algorithm == 'clarke_wright':
clusters = clarke_wright_savings(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity)
elif algorithm == 'genetic':
clusters = genetic_algorithm_vrp(df, depot=depot, n_vehicles=n_vehicles, capacity=capacity)
else:
raise ValueError(f"Unknown algorithm: {algorithm}. Use 'greedy', 'clarke_wright', or 'genetic'")
clustering_time = time.time() - clustering_start
# 2) route per cluster
routing_start = time.time()
routes: List[List[int]] = []
per_route_dist: List[float] = []
soft_tw_violations = 0
per_route_loads: List[float] = []
for cl in clusters:
if len(cl) == 0:
routes.append([])
per_route_dist.append(0.0)
per_route_loads.append(0.0)
continue
order = build_route_for_cluster(df, cl, depot)
routes.append(order)
# compute distance with depot as start/end
pts = [depot] + [(df.loc[i, "x"], df.loc[i, "y"]) for i in order] + [depot]
dist = total_distance(pts)
per_route_dist.append(dist)
# capacity + soft TW check
load = float(df.loc[order, "demand"].sum()) if len(order) else 0.0
per_route_loads.append(load)
# simple arrival time simulation (speed distance units per time)
t = 0.0
prev = depot
for i in order:
cur = (df.loc[i, "x"], df.loc[i, "y"])
t += euclid(prev, cur) / max(speed, 1e-9)
tw_s = float(df.loc[i, "tw_start"])
tw_e = float(df.loc[i, "tw_end"])
if t < tw_s:
t = tw_s # wait
if t > tw_e:
soft_tw_violations += 1
t += float(df.loc[i, "service"])
prev = cur
# back to depot time is irrelevant for TW in this simple model
routing_time = time.time() - routing_start
total_dist = float(sum(per_route_dist))
# Build assignment table
rows = []
for v, route in enumerate(routes):
for seq, idx in enumerate(route, start=1):
rows.append(
{
"vehicle": v + 1,
"sequence": seq,
"id": df.loc[idx, "id"],
"x": float(df.loc[idx, "x"]),
"y": float(df.loc[idx, "y"]),
"demand": float(df.loc[idx, "demand"]),
}
)
assign_df = pd.DataFrame(rows).sort_values(["vehicle", "sequence"]).reset_index(drop=True)
# Enhanced metrics
active_routes = [d for d in per_route_dist if d > 0]
active_loads = [l for l in per_route_loads if l > 0]
# Load balancing: coefficient of variation
load_balance = 0.0
if len(active_loads) > 1:
load_std = np.std(active_loads)
load_mean = np.mean(active_loads)
load_balance = load_std / load_mean if load_mean > 0 else 0.0
# Distance balancing
dist_balance = 0.0
if len(active_routes) > 1:
dist_std = np.std(active_routes)
dist_mean = np.mean(active_routes)
dist_balance = dist_std / dist_mean if dist_mean > 0 else 0.0
# Capacity utilization
total_capacity_used = sum(active_loads)
total_capacity_available = capacity * len(active_loads)
capacity_utilization = total_capacity_used / total_capacity_available if total_capacity_available > 0 else 0.0
total_time = time.time() - start_time
# Performance metrics
performance = {
"total_execution_time_ms": round(total_time * 1000, 2),
"validation_time_ms": round(validation_time * 1000, 2),
"clustering_time_ms": round(clustering_time * 1000, 2),
"routing_time_ms": round(routing_time * 1000, 2),
"clients_per_second": round(len(df) / total_time, 1) if total_time > 0 else 0,
"algorithm_complexity": get_algorithm_complexity(algorithm, len(df)),
}
metrics = {
"algorithm": algorithm,
"vehicles_used": int(sum(1 for r in routes if len(r) > 0)),
"vehicles_available": n_vehicles,
"total_distance": round(total_dist, 3),
"avg_distance_per_vehicle": round(np.mean(active_routes), 3) if active_routes else 0.0,
"max_distance": round(max(active_routes), 3) if active_routes else 0.0,
"min_distance": round(min(active_routes), 3) if active_routes else 0.0,
"distance_balance_cv": round(dist_balance, 3),
"per_route_distance": [round(d, 3) for d in per_route_dist],
"per_route_load": [round(l, 3) for l in per_route_loads],
"avg_load_per_vehicle": round(np.mean(active_loads), 3) if active_loads else 0.0,
"load_balance_cv": round(load_balance, 3),
"capacity_utilization": round(capacity_utilization * 100, 1),
"capacity": capacity,
"soft_time_window_violations": int(soft_tw_violations),
"total_clients": len(df),
"solution_quality_score": calculate_solution_quality_score(total_dist, dist_balance, load_balance, capacity_utilization),
"note": f"Heuristic solution ({algorithm} → greedy → 2-opt). TW are soft. Lower CV = better balance.",
}
return {
"routes": routes,
"total_distance": total_dist,
"per_route_distance": per_route_dist,
"assignments_table": assign_df,
"metrics": metrics,
"performance": performance,
}
# ---------------------------
# Visualization
# ---------------------------
def plot_solution(
df: pd.DataFrame,
sol: Dict,
depot: Tuple[float, float] = (0.0, 0.0),
):
routes = sol["routes"]
fig, ax = plt.subplots(figsize=(7.5, 6.5))
ax.scatter([depot[0]], [depot[1]], s=120, marker="s", label="Depot", zorder=5)
# color cycle
colors = plt.rcParams["axes.prop_cycle"].by_key().get(
"color", ["C0","C1","C2","C3","C4","C5"]
)
for v, route in enumerate(routes):
if not route:
continue
c = colors[v % len(colors)]
xs = [depot[0]] + [float(df.loc[i, "x"]) for i in route] + [depot[0]]
ys = [depot[1]] + [float(df.loc[i, "y"]) for i in route] + [depot[1]]
ax.plot(xs, ys, "-", lw=2, color=c, alpha=0.9, label=f"Vehicle {v+1}")
ax.scatter(xs[1:-1], ys[1:-1], s=36, color=c, zorder=4)
# label sequence numbers lightly
for k, idx in enumerate(route, start=1):
ax.text(
float(df.loc[idx, "x"]),
float(df.loc[idx, "y"]),
str(k),
fontsize=8,
ha="center",
va="center",
color="white",
bbox=dict(boxstyle="circle,pad=0.2", fc=c, ec="none", alpha=0.7),
)
ax.set_title("Ride-Sharing / CVRP Routes (Heuristic)")
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.grid(True, alpha=0.25)
ax.legend(loc="best", fontsize=8, framealpha=0.9)
ax.set_aspect("equal", adjustable="box")
# ✅ Convert Matplotlib figure → PIL.Image for Gradio
buf = io.BytesIO()
fig.savefig(buf, format="png", bbox_inches="tight")
plt.close(fig)
buf.seek(0)
return Image.open(buf)