tsp / app.py
Soumyajit9979's picture
Update app.py
4c540ba verified
"""
FastAPI deployment for TSP optimization.
Supports both heuristic and ML-based approaches.
Upload .xlsm file and get optimized routes.
"""
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
import pandas as pd
import numpy as np
import tempfile
import os
from math import radians, sin, cos, sqrt, atan2
from sklearn.cluster import KMeans
import time
app = FastAPI(
title="TSP Optimization API",
description="Multi-agent TSP solver with heuristic and ML approaches",
version="1.0.0"
)
# ==================== Core Functions ====================
def haversine_distance(p1, p2):
"""Compute great-circle distance between two points."""
R = 6371
lat1, lon1 = radians(p1[0]), radians(p1[1])
lat2, lon2 = radians(p2[0]), radians(p2[1])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1)*cos(lat2)*sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1 - a))
return R * c
def compute_distance_matrix(coords):
"""Precompute pairwise distances."""
n = len(coords)
dist = np.zeros((n, n))
for i in range(n):
for j in range(n):
dist[i, j] = haversine_distance(coords[i], coords[j])
return dist
def route_distance(route, dist_matrix):
"""Total distance of a closed route."""
if len(route) <= 1:
return 0.0
total = sum(dist_matrix[route[i], route[i+1]] for i in range(len(route)-1))
total += dist_matrix[route[-1], route[0]]
return total
def route_with_coords(route, coords):
"""Return route indices plus their latitude/longitude coordinates."""
return [
{
"index": int(idx),
"latitude": float(coords[idx][0]),
"longitude": float(coords[idx][1])
}
for idx in route
]
def nearest_neighbor_tsp(indices, dist_matrix, start_idx=None):
"""Nearest neighbor TSP heuristic."""
if len(indices) == 0:
return []
if len(indices) == 1:
return [indices[0]]
idx_set = set(indices)
if start_idx is None or start_idx not in idx_set:
start_idx = indices[0]
route = [start_idx]
remaining = idx_set - {start_idx}
while remaining:
last = route[-1]
next_city = min(remaining, key=lambda x: dist_matrix[last, x])
route.append(next_city)
remaining.remove(next_city)
return route
def two_opt_optimize(route, dist_matrix, max_iterations=50, window=20, time_limit_sec=0.5):
"""Fast 2-opt local search with time limit."""
if len(route) < 4:
return route
start_time = time.time()
improved = True
iterations = 0
n = len(route)
w = max(2, min(window, n))
while improved and iterations < max_iterations:
if time.time() - start_time > time_limit_sec:
break
improved = False
iterations += 1
for i in range(1, n - 2):
j_max = min(i + w, n - 1)
for j in range(i + 1, j_max):
a, b = route[i - 1], route[i]
c, d = route[j], route[(j + 1) % n]
current = dist_matrix[a, b] + dist_matrix[c, d]
proposed = dist_matrix[a, c] + dist_matrix[b, d]
if proposed + 1e-9 < current:
route[i:j+1] = reversed(route[i:j+1])
improved = True
break
if improved or (time.time() - start_time > time_limit_sec):
break
return route
# ==================== HEURISTIC APPROACH ====================
def partition_locations_geographically(coords, num_agents):
"""Pure heuristic: geographic partitioning by latitude."""
latitudes = coords[:, 0]
sorted_indices = np.argsort(latitudes)
labels = np.zeros(len(coords), dtype=int)
points_per_agent = len(coords) // num_agents
for i, idx in enumerate(sorted_indices):
agent_id = min(i // points_per_agent, num_agents - 1)
labels[idx] = agent_id
return labels
def solve_heuristic(coords, num_agents, dist_matrix):
"""Heuristic-based multi-agent TSP."""
labels = partition_locations_geographically(coords, num_agents)
agent_routes = {}
for agent_id in range(num_agents):
indices = np.where(labels == agent_id)[0]
if len(indices) > 0:
route = nearest_neighbor_tsp(indices, dist_matrix)
agent_routes[agent_id] = route
return agent_routes
# ==================== ML APPROACH ====================
def solve_ml(coords, num_agents, dist_matrix):
"""ML-based multi-agent TSP with KMeans clustering and 2-opt."""
kmeans = KMeans(n_clusters=num_agents, random_state=42, n_init=10)
labels = kmeans.fit_predict(coords)
centers = kmeans.cluster_centers_
agent_routes = {}
for agent_id in range(num_agents):
indices = np.where(labels == agent_id)[0]
if len(indices) <= 1:
agent_routes[agent_id] = list(indices)
else:
center = centers[agent_id]
dists_to_center = [haversine_distance(coords[idx], center) for idx in indices]
start_idx = indices[np.argmin(dists_to_center)]
initial_route = nearest_neighbor_tsp(indices, dist_matrix, start_idx)
optimized_route = two_opt_optimize(initial_route, dist_matrix)
agent_routes[agent_id] = optimized_route
return agent_routes
# ==================== API ENDPOINTS ====================
@app.get("/")
async def root():
"""API health check and info."""
return {
"name": "TSP Optimization API",
"version": "1.0.0",
"endpoints": {
"/heuristic": "POST - Solve using geographic partitioning heuristic",
"/ml": "POST - Solve using ML-based clustering + optimization",
"/compare": "POST - Run both and compare results"
}
}
@app.post("/heuristic")
async def solve_heuristic_endpoint(file: UploadFile = File(...)):
"""
Solve TSP using pure heuristic approach.
Upload: .xlsm file with sheets 'Lat-Long' and 'TSP agents'
"""
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsm") as tmp:
tmp.write(await file.read())
tmp_path = tmp.name
try:
points_df = pd.read_excel(tmp_path, sheet_name="Lat-Long")
agents_df = pd.read_excel(tmp_path, sheet_name="TSP agents")
coords = points_df[["Latitude", "Longitude"]].to_numpy()
num_agents = len(agents_df)
dist_matrix = compute_distance_matrix(coords)
agent_routes = solve_heuristic(coords, num_agents, dist_matrix)
results = {
"method": "Heuristic (Geographic Partitioning)",
"num_agents": num_agents,
"num_locations": len(coords),
"agents": {}
}
total_distance = 0
for agent_id, route in agent_routes.items():
dist = route_distance(route, dist_matrix)
total_distance += dist
results["agents"][f"Agent {agent_id + 1}"] = {
"distance_km": round(dist, 2),
"route_indices": [int(idx) for idx in route],
"route_points": route_with_coords(route, coords)
}
results["total_distance_km"] = round(total_distance, 2)
return results
finally:
os.unlink(tmp_path)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error processing file: {str(e)}")
@app.post("/ml")
async def solve_ml_endpoint(file: UploadFile = File(...)):
"""
Solve TSP using ML-based approach (KMeans + 2-opt optimization).
Upload: .xlsm file with sheets 'Lat-Long' and 'TSP agents'
"""
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsm") as tmp:
tmp.write(await file.read())
tmp_path = tmp.name
try:
points_df = pd.read_excel(tmp_path, sheet_name="Lat-Long")
agents_df = pd.read_excel(tmp_path, sheet_name="TSP agents")
coords = points_df[["Latitude", "Longitude"]].to_numpy()
num_agents = len(agents_df)
dist_matrix = compute_distance_matrix(coords)
agent_routes = solve_ml(coords, num_agents, dist_matrix)
results = {
"method": "ML-Based (KMeans + 2-opt Optimization)",
"num_agents": num_agents,
"num_locations": len(coords),
"agents": {}
}
total_distance = 0
for agent_id, route in agent_routes.items():
dist = route_distance(route, dist_matrix)
total_distance += dist
results["agents"][f"Agent {agent_id + 1}"] = {
"distance_km": round(dist, 2),
"route_indices": [int(idx) for idx in route],
"route_points": route_with_coords(route, coords)
}
results["total_distance_km"] = round(total_distance, 2)
return results
finally:
os.unlink(tmp_path)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error processing file: {str(e)}")
@app.post("/compare")
async def compare_endpoint(file: UploadFile = File(...)):
"""
Run both heuristic and ML approaches and compare results.
Upload: .xlsm file with sheets 'Lat-Long' and 'TSP agents'
"""
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".xlsm") as tmp:
tmp.write(await file.read())
tmp_path = tmp.name
try:
points_df = pd.read_excel(tmp_path, sheet_name="Lat-Long")
agents_df = pd.read_excel(tmp_path, sheet_name="TSP agents")
coords = points_df[["Latitude", "Longitude"]].to_numpy()
num_agents = len(agents_df)
dist_matrix = compute_distance_matrix(coords)
# Heuristic
heur_routes = solve_heuristic(coords, num_agents, dist_matrix)
heur_total = sum(route_distance(r, dist_matrix) for r in heur_routes.values())
# ML
ml_routes = solve_ml(coords, num_agents, dist_matrix)
ml_total = sum(route_distance(r, dist_matrix) for r in ml_routes.values())
improvement = ((heur_total - ml_total) / heur_total * 100) if heur_total > 0 else 0
results = {
"comparison": True,
"num_agents": num_agents,
"num_locations": len(coords),
"heuristic": {
"method": "Geographic Partitioning",
"total_distance_km": round(heur_total, 2),
"agents": {
f"Agent {aid + 1}": {
"distance_km": round(route_distance(r, dist_matrix), 2),
"route_indices": [int(idx) for idx in r],
"route_points": route_with_coords(r, coords)
} for aid, r in heur_routes.items()
}
},
"ml": {
"method": "KMeans + 2-opt",
"total_distance_km": round(ml_total, 2),
"agents": {
f"Agent {aid + 1}": {
"distance_km": round(route_distance(r, dist_matrix), 2),
"route_indices": [int(idx) for idx in r],
"route_points": route_with_coords(r, coords)
} for aid, r in ml_routes.items()
}
},
"analysis": {
"improvement_percent": round(improvement, 2),
"better_method": "ML" if improvement > 0 else "Heuristic"
}
}
return results
finally:
os.unlink(tmp_path)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error processing file: {str(e)}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)