Complexity_algo_cp / solver.py
Isha184's picture
Update solver.py
69bcc2d verified
import math
import random
from typing import Dict, List, Tuple, Optional
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
import io
def convert_numpy(obj):
if isinstance(obj, dict):
return {k: convert_numpy(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_numpy(v) for v in obj]
elif isinstance(obj, (np.integer, np.floating)):
return obj.item()
else:
return obj
# ---------------------------
# Data utils
# ---------------------------
def make_template_dataframe():
return pd.DataFrame({
"id": ["A", "B", "C"],
"x": [10, -5, 15],
"y": [4, -12, 8],
"demand": [1, 2, 1],
"tw_start": [0, 10, 5],
"tw_end": [50, 30, 20],
"service": [2, 3, 1],
})
def parse_uploaded_csv(file) -> pd.DataFrame:
df = pd.read_csv(file.name if hasattr(file, "name") else file)
required = {"id", "x", "y", "demand"}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Missing required columns: {sorted(missing)}")
for opt in ["tw_start", "tw_end", "service"]:
if opt not in df.columns:
df[opt] = 0 if opt != "tw_end" else 999999
df["id"] = df["id"].astype(str)
for col in ["x", "y", "demand", "tw_start", "tw_end", "service"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df = df.dropna().reset_index(drop=True)
return df
def generate_random_instance(
n_clients=15,
n_vehicles=4,
capacity=7,
spread=10, # smaller area = closer stops
demand_min=1,
demand_max=3,
seed=42,
):
rng = np.random.default_rng(seed)
xs = rng.uniform(-spread, spread, size=n_clients)
ys = rng.uniform(-spread, spread, size=n_clients)
demands = rng.integers(demand_min, demand_max + 1, size=n_clients)
# Wider time windows (30–45 minutes)
tw_start = rng.integers(0, 40, size=n_clients)
tw_end = tw_start + rng.integers(30, 45, size=n_clients)
# Service time fixed to 1 minute
#service = np.ones(n_clients, dtype=int)
# Service time between 2 and 3 minutes (inclusive)
service = rng.integers(2, 4, size=n_clients)
df = pd.DataFrame({
"id": [f"C{i+1}" for i in range(n_clients)],
"x": xs,
"y": ys,
"demand": demands,
"tw_start": tw_start,
"tw_end": tw_end,
"service": service
})
return df
# ---------------------------
# Geometry helpers
# ---------------------------
def euclid(a: Tuple[float, float], b: Tuple[float, float]) -> float:
return float(math.hypot(a[0] - b[0], a[1] - b[1]))
def total_distance(points: List[Tuple[float, float]]) -> float:
return sum(euclid(points[i], points[i + 1]) for i in range(len(points) - 1))
# ---------------------------
# Time-window aware clustering
# ---------------------------
def tw_aware_clusters(df: pd.DataFrame, depot: Tuple[float, float],
n_vehicles: int, capacity: float) -> List[List[int]]:
dx = df["x"].values - depot[0]
dy = df["y"].values - depot[1]
ang = np.arctan2(dy, dx)
distances = np.sqrt(dx**2 + dy**2)
#tw_urgency = df["tw_end"].values / (distances + 1.0)
# Earlier deadlines (smaller tw_end) β†’ higher urgency
# Shorter time windows are also treated as slightly more urgent
tw_window = df["tw_end"].values - df["tw_start"].values
tw_urgency = (1.0 / (df["tw_end"].values + 1.0)) * (1.0 + 1.0 / (tw_window + 1.0))
tw_urgency = tw_urgency / (distances + 1.0)
order = np.lexsort((ang,-tw_urgency))
clusters = [[] for _ in range(n_vehicles)]
loads = [0.0] * n_vehicles
v = 0
for idx in order:
d = float(df.loc[idx, "demand"])
if loads[v] + d > capacity and v < n_vehicles - 1:
v += 1
clusters[v].append(int(idx))
loads[v] += d
return clusters
# ---------------------------
# Schedule computation
# ---------------------------
def compute_schedule_for_route(route_idxs: List[int], depot: Tuple[float, float],
df: pd.DataFrame, speed: float = 1.0) -> Dict:
arrivals, departures = [], []
t = 0.0
prev = depot
lateness_count = total_lateness = max_lateness = 0.0
for idx in route_idxs:
cur = (float(df.loc[idx, "x"]), float(df.loc[idx, "y"]))
travel = euclid(prev, cur) / max(speed, 1e-9)
arrival = t + travel
tw_s, tw_e = float(df.loc[idx, "tw_start"]), float(df.loc[idx, "tw_end"])
arrival_eff = max(arrival, tw_s)
lateness = max(0.0, arrival_eff - tw_e)
if lateness > 0:
lateness_count += 1
total_lateness += lateness
max_lateness = max(max_lateness, lateness)
depart = arrival_eff + float(df.loc[idx, "service"])
arrivals.append(arrival_eff)
departures.append(depart)
t = depart
prev = cur
return {
"arrivals": arrivals,
"departures": departures,
"lateness_count": int(lateness_count),
"total_lateness": float(total_lateness),
"max_lateness": float(max_lateness),
"feasible": lateness_count == 0
}
# ---------------------------
# TW-prioritized insertion heuristic
# ---------------------------
def build_route_by_insertion_tw(df: pd.DataFrame, idxs: List[int],
depot: Tuple[float, float], speed: float = 1.0) -> List[int]:
if not idxs:
return []
route, remaining = [], set(idxs)
"""
def urgency_score(i):
dist = euclid(depot, (df.loc[i, "x"], df.loc[i, "y"]))
tw_e = float(df.loc[i, "tw_end"])
return tw_e / (dist + 1.0)"""
def urgency_score(i):
dist = euclid(depot, (df.loc[i, "x"], df.loc[i, "y"]))
tw_s = float(df.loc[i, "tw_start"])
tw_e = float(df.loc[i, "tw_end"])
tw_window = max(1.0, tw_e - tw_s)
# Earlier deadlines and tighter windows β†’ higher urgency (lower numeric score)
return (1.0 / (tw_e + 1.0)) * (1.0 + 1.0 / (tw_window + 1.0)) / (dist + 1.0)
first = min(remaining, key=urgency_score)
route.append(first)
remaining.remove(first)
while remaining:
best_choice = None
remaining_sorted = sorted(remaining, key=urgency_score)
for client in remaining_sorted:
for pos in range(len(route) + 1):
candidate = route[:pos] + [client] + route[pos:]
pts = [depot] + [(float(df.loc[i, "x"]), float(df.loc[i, "y"])) for i in candidate] + [depot]
dist = total_distance(pts)
sched = compute_schedule_for_route(candidate, depot, df, speed)
lateness_penalty = sched["total_lateness"] * 8000.0
cost = dist + lateness_penalty
if best_choice is None or cost < best_choice[2]:
best_choice = (client, pos, cost)
client, pos, _ = best_choice
route.insert(pos, client)
remaining.remove(client)
return route
# ---------------------------
# Local search (2-opt + Or-opt)
# ---------------------------
def two_opt_tw(route, df, depot, speed=1.0, max_iter=300, lateness_weight=40000.0):
if len(route) <= 2:
return route[:]
def route_cost(r):
pts = [depot] + [(float(df.loc[i, "x"]), float(df.loc[i, "y"])) for i in r] + [depot]
dist = total_distance(pts)
sched = compute_schedule_for_route(r, depot, df, speed)
return dist + lateness_weight * sched["total_lateness"]
best = route[:]
best_cost = route_cost(best)
n = len(route)
for _ in range(max_iter):
improved = False
for i in range(n - 1):
for k in range(i + 1, n):
if i == 0 and k == n - 1:
continue
candidate = best[:i] + best[i:k + 1][::-1] + best[k + 1:]
c_cost = route_cost(candidate)
if c_cost < best_cost - 1e-6:
best, best_cost, improved = candidate, c_cost, True
break
if improved:
break
if not improved:
break
return best
def or_opt_tw(route, df, depot, speed=1.0, max_iter=100, lateness_weight=40000.0):
if len(route) <= 2:
return route[:]
def route_cost(r):
pts = [depot] + [(float(df.loc[i, "x"]), float(df.loc[i, "y"])) for i in r] + [depot]
dist = total_distance(pts)
sched = compute_schedule_for_route(r, depot, df, speed)
return dist + lateness_weight * sched["total_lateness"]
best = route[:]
best_cost = route_cost(best)
n = len(route)
for _ in range(max_iter):
improved = False
for length in [1, 2]:
if length >= n:
continue
for i in range(n - length + 1):
seg = best[i:i + length]
rem = best[:i] + best[i + length:]
for j in range(len(rem) + 1):
if j == i:
continue
cand = rem[:j] + seg + rem[j:]
c_cost = route_cost(cand)
if c_cost < best_cost - 1e-6:
best, best_cost, improved = cand, c_cost, True
break
if improved:
break
if improved:
break
if not improved:
break
return best
# ---------------------------
# Multi-phase route optimizer
# ---------------------------
def build_route_for_cluster_tw(df, idxs, depot, speed=1.0):
if not idxs:
return []
route = build_route_by_insertion_tw(df, idxs, depot, speed)
route = two_opt_tw(route, df, depot, speed)
route = or_opt_tw(route, df, depot, speed)
return route
# ---------------------------
# Redistribution helper: force-using empty vehicles
# ---------------------------
def redistribute_to_use_all_vehicles(routes: List[List[int]],
df: pd.DataFrame,
depot: Tuple[float, float],
n_vehicles: int,
capacity: float,
speed: float = 1.0) -> List[List[int]]:
"""
Iteratively create new routes on unused vehicles by extracting the most problematic
client (highest lateness, or earliest tw_end) from the worst route, then rebuilding
the two affected routes. Stop when we've used all vehicles or can't split further.
"""
def route_lateness_per_client(route):
# returns list of (client_idx, lateness, tw_end)
if not route:
return []
sched = compute_schedule_for_route(route, depot, df, speed)
arrivals = sched["arrivals"] # arrival_eff for each client in route order
res = []
for pos, cli in enumerate(route):
tw_e = float(df.loc[cli, "tw_end"])
lateness = max(0.0, arrivals[pos] - tw_e)
res.append((cli, lateness, tw_e))
return res
# copy to avoid mutating original reference
routes = [r[:] for r in routes]
used = sum(1 for r in routes if r)
# ensure routes list has capacity for all vehicles
if len(routes) < n_vehicles:
routes += [[] for _ in range(n_vehicles - len(routes))]
# set of empty vehicle indices available for splits
def first_empty_index():
for i, r in enumerate(routes):
if not r:
return i
return None
# loop: split until used == n_vehicles or can't split
while used < n_vehicles:
# choose route to split: route with largest total lateness (or largest total lateness_weighted)
best_route_idx = None
best_route_lateness = -1.0
for i, r in enumerate(routes):
if not r:
continue
sched = compute_schedule_for_route(r, depot, df, speed)
if sched["total_lateness"] > best_route_lateness:
best_route_lateness = sched["total_lateness"]
best_route_idx = i
# nothing to split
if best_route_idx is None:
break
# compute per-client lateness in that route
per_client = route_lateness_per_client(routes[best_route_idx])
if not per_client:
break
# pick the client with largest lateness; fallback pick earliest tw_end (tightest window)
per_client_sorted = sorted(per_client, key=lambda x: (-x[1], x[2]))
cli_to_move, cli_lateness, _ = per_client_sorted[0]
# If there is no lateness at all, still consider moving the *tightest* deadline client
if cli_lateness <= 0:
# find earliest tw_end client
per_client_sorted = sorted(per_client, key=lambda x: (x[2], -x[1]))
cli_to_move = per_client_sorted[0][0]
# if the client demand > capacity (we cannot move into a single-vehicle), break
if float(df.loc[cli_to_move, "demand"]) > capacity:
# cannot place this client alone on a vehicle; try next candidate
alt = None
for c, laten, tw in per_client_sorted[1:]:
if float(df.loc[c, "demand"]) <= capacity:
alt = c
break
if alt is None:
break
cli_to_move = alt
# find an empty vehicle
empty_idx = first_empty_index()
if empty_idx is None:
break
# remove client from original route
orig_route = routes[best_route_idx]
if cli_to_move not in orig_route:
# safety check
break
new_orig = [c for c in orig_route if c != cli_to_move]
# rebuild both routes (optimize orders)
rebuilt_orig = build_route_for_cluster_tw(df, new_orig, depot, speed) if new_orig else []
rebuilt_new = build_route_for_cluster_tw(df, [cli_to_move], depot, speed)
routes[best_route_idx] = rebuilt_orig
routes[empty_idx] = rebuilt_new
# update used count
used = sum(1 for r in routes if r)
# defensive: if we didn't create an additional non-empty route, break to avoid infinite loop
if sum(1 for r in routes if r) <= used - 1:
break
# ensure we return exactly n_vehicles slots
if len(routes) < n_vehicles:
routes += [[] for _ in range(n_vehicles - len(routes))]
return routes
# ---------------------------
# -----------------------------------------------------
# Helper: Redistribute workload across routes (balance)
# -----------------------------------------------------
def redistribute_workload(routes, df, depot, speed, capacity):
"""
Balances routes by moving low-demand stops from overloaded routes
to underutilized ones. Recomputes distances and loads.
"""
import math
# Calculate per-route load
per_route_loads = [df.loc[r, "demand"].sum() if r else 0.0 for r in routes]
avg_load = sum(per_route_loads) / max(1, len(per_route_loads))
# Identify heavy and light routes
overloaded = [i for i, l in enumerate(per_route_loads) if l > capacity * 0.9]
underused = [i for i, l in enumerate(per_route_loads) if l < capacity * 0.5]
# Try to move one or two smallest-demand customers from heavy β†’ light
for hi in overloaded:
for li in underused:
if not routes[hi]:
continue
# Sort heavy route by smallest demand
sorted_by_demand = sorted(routes[hi], key=lambda idx: df.loc[idx, "demand"])
for cust in sorted_by_demand[:2]:
demand = df.loc[cust, "demand"]
if per_route_loads[li] + demand <= capacity:
# Move stop from hi β†’ li
routes[hi].remove(cust)
routes[li].append(cust)
per_route_loads[hi] -= demand
per_route_loads[li] += demand
break # one transfer per underused route
# Recompute distances for all routes
per_route_dist = []
for route in routes:
if not route:
per_route_dist.append(0.0)
continue
pts = [depot] + [(df.loc[i, "x"], df.loc[i, "y"]) for i in route] + [depot]
dist = total_distance(pts)
per_route_dist.append(dist)
return routes, per_route_dist, per_route_loads
# ---------------------------
# Main solver
# ---------------------------
def solve_vrp_tw(df, depot=(0.0, 0.0), n_vehicles=4,
capacity=10, speed=1.0, force_all_vehicles=False) -> Dict:
if len(df) == 0:
return {
"routes": [[] for _ in range(n_vehicles)],
"total_distance": 0.0,
"per_route_distance": [0.0] * n_vehicles,
"assignments_table": pd.DataFrame(),
"metrics": {}
}
# --- Step 1: Create initial clusters (time-window aware) ---
clusters = tw_aware_clusters(df, depot, n_vehicles, capacity)
# --- Step 2: Optionally force all vehicles to be used evenly ---
if force_all_vehicles:
all_clients = [i for cl in clusters for i in cl]
clusters = [[] for _ in range(n_vehicles)]
for i, idx in enumerate(all_clients):
clusters[i % n_vehicles].append(idx)
# --- Step 3: Build routes for each cluster ---
routes, per_route_dist, per_route_loads = [], [], []
total_late_count = total_late_time = max_late = 0.0
for cl in clusters:
if not cl:
routes.append([])
per_route_dist.append(0.0)
per_route_loads.append(0.0)
continue
cluster_load = sum(df.loc[i, "demand"] for i in cl)
if cluster_load <= capacity:
chunks = [cl]
else:
# Split overloaded clusters into smaller chunks by time-window
cl_sorted = sorted(cl, key=lambda i: df.loc[i, "tw_end"])
chunks, current, load = [], [], 0
for i in cl_sorted:
d = df.loc[i, "demand"]
if load + d > capacity and current:
chunks.append(current)
current, load = [i], d
else:
current.append(i)
load += d
if current:
chunks.append(current)
for chunk in chunks:
route = build_route_for_cluster_tw(df, chunk, depot, speed)
routes.append(route)
pts = [depot] + [(df.loc[i, "x"], df.loc[i, "y"]) for i in route] + [depot]
dist = total_distance(pts)
per_route_dist.append(dist)
per_route_loads.append(df.loc[route, "demand"].sum() if route else 0.0)
sched = compute_schedule_for_route(route, depot, df, speed)
total_late_count += sched["lateness_count"]
total_late_time += sched["total_lateness"]
max_late = max(max_late, sched["max_lateness"])
# Step 3.5 β€” First: activate unused vehicles
#routes = redistribute_to_use_all_vehicles(routes, df, depot, n_vehicles, capacity, speed)
# Step 3.6 β€” Then: balance workload among all active vehicles
#routes, per_route_dist, per_route_loads = redistribute_workload(routes, df, depot, speed, capacity)
# --- NEW SECTION: Redistribute workload before computing totals ---
routes, per_route_dist, per_route_loads = redistribute_workload(routes, df, depot, speed, capacity)
# --- Step 4: Compute total distance ---
total_dist = sum(per_route_dist)
# --- Step 5: Build assignment table for visualization ---
rows = []
for v, route in enumerate(routes):
for seq, idx in enumerate(route, 1):
rows.append({
"vehicle": v + 1,
"sequence": seq,
"id": df.loc[idx, "id"],
"x": float(df.loc[idx, "x"]),
"y": float(df.loc[idx, "y"]),
"demand": float(df.loc[idx, "demand"]),
})
assign_df = pd.DataFrame(rows).sort_values(["vehicle", "sequence"]).reset_index(drop=True)
# --- Step 6: Time-window performance summary ---
if total_late_count == 0:
status = "OK"
elif total_late_time < 300:
status = "Minor Violations"
else:
status = "Violations"
time_window_report = {
"total_lateness_count": int(total_late_count),
"total_lateness": round(total_late_time, 2),
"max_lateness": round(max_late, 2),
"status": status
}
# --- Step 7: Compile metrics ---
metrics = {
"vehicles_used": int(sum(1 for r in routes if r)),
"total_distance": round(total_dist, 2),
"per_route_distance": [round(d, 2) for d in per_route_dist],
"per_route_load": [round(l, 2) for l in per_route_loads],
"capacity": capacity,
"time_window_report": time_window_report,
"note": "Enhanced heuristic (TW-aware clustering β†’ insertion β†’ 2-opt β†’ Or-opt). Auto lateness scaling + load redistribution."
}
# βœ… Convert NumPy values to native Python types
metrics = convert_numpy(metrics)
# --- Step 8: Return final structured result ---
return {
"routes": routes,
"total_distance": total_dist,
"per_route_distance": per_route_dist,
"assignments_table": assign_df,
"metrics": metrics,
}
# ---------------------------
# Visualization
# ---------------------------
def plot_solution(df, sol, depot=(0.0, 0.0)):
routes = sol["routes"]
fig, ax = plt.subplots(figsize=(8, 6))
ax.scatter([depot[0]], [depot[1]], s=120, marker="s", label="Depot", zorder=6)
colors = plt.rcParams["axes.prop_cycle"].by_key().get("color", ["C0", "C1", "C2", "C3", "C4", "C5"])
for v, route in enumerate(routes):
if not route:
continue
c = colors[v % len(colors)]
xs = [depot[0]] + [df.loc[i, "x"] for i in route] + [depot[0]]
ys = [depot[1]] + [df.loc[i, "y"] for i in route] + [depot[1]]
ax.plot(xs, ys, "-", lw=2, color=c, alpha=0.9, label=f"Vehicle {v+1}")
ax.scatter(xs[1:-1], ys[1:-1], s=40, color=c, zorder=5)
for k, idx in enumerate(route, 1):
tw_s, tw_e = int(df.loc[idx, "tw_start"]), int(df.loc[idx, "tw_end"])
ax.text(df.loc[idx, "x"], df.loc[idx, "y"], str(k),
fontsize=8, ha="center", va="center",
color="white", bbox=dict(boxstyle="circle,pad=0.2", fc=c, ec="none", alpha=0.8))
ax.annotate(f"{tw_s}-{tw_e}", (df.loc[idx, "x"], df.loc[idx, "y"]),
textcoords="offset points", xytext=(6, -6), fontsize=7, color="black", alpha=0.7)
ax.set_title("VRPTW Routes (Improved Heuristic)")
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.grid(True, alpha=0.25)
ax.legend(loc="best", fontsize=8, framealpha=0.9)
ax.set_aspect("equal", adjustable="box")
buf = io.BytesIO()
fig.savefig(buf, format="png", bbox_inches="tight", dpi=120)
plt.close(fig)
buf.seek(0)
return Image.open(buf)