Spaces:
Sleeping
Sleeping
| import os | |
| import math | |
| import logging | |
| import json | |
| import numpy as np | |
| import pandas as pd | |
| from werkzeug.utils import secure_filename | |
| from werkzeug.exceptions import RequestEntityTooLarge | |
| from flask import Flask, render_template, request, jsonify, send_file | |
| import io | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| app.secret_key = os.urandom(24) | |
| # Configuration for robustness | |
| app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB limit | |
| ALLOWED_EXTENSIONS = {'json', 'xlsx', 'xls', 'csv'} | |
| # --- Helper Functions --- | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def has_null_byte(content): | |
| # Check for null bytes in binary content to prevent certain attacks/errors | |
| if isinstance(content, bytes): | |
| return b'\0' in content | |
| return False | |
| # --- Core Logic: TSP Solver --- | |
| def haversine_distance(coord1, coord2): | |
| """ | |
| Calculate the great circle distance between two points | |
| on the earth (specified in decimal degrees) | |
| """ | |
| lat1, lon1 = coord1 | |
| lat2, lon2 = coord2 | |
| R = 6371 # Earth radius in km | |
| phi1 = math.radians(lat1) | |
| phi2 = math.radians(lat2) | |
| delta_phi = math.radians(lat2 - lat1) | |
| delta_lambda = math.radians(lon2 - lon1) | |
| a = math.sin(delta_phi / 2.0)**2 + \ | |
| math.cos(phi1) * math.cos(phi2) * \ | |
| math.sin(delta_lambda / 2.0)**2 | |
| c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) | |
| return R * c | |
| def calculate_distance_matrix(locations): | |
| """ | |
| Compute N x N distance matrix. | |
| locations: list of dicts with 'lat', 'lng' | |
| """ | |
| n = len(locations) | |
| matrix = np.zeros((n, n)) | |
| coords = [(loc['lat'], loc['lng']) for loc in locations] | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| dist = haversine_distance(coords[i], coords[j]) | |
| matrix[i][j] = dist | |
| matrix[j][i] = dist | |
| return matrix | |
| def solve_tsp_nearest_neighbor(dist_matrix, start_index=0): | |
| """ | |
| Greedy Nearest Neighbor construction. | |
| """ | |
| n = len(dist_matrix) | |
| unvisited = set(range(n)) | |
| unvisited.remove(start_index) | |
| route = [start_index] | |
| current = start_index | |
| while unvisited: | |
| nearest = min(unvisited, key=lambda x: dist_matrix[current][x]) | |
| route.append(nearest) | |
| unvisited.remove(nearest) | |
| current = nearest | |
| # Return to start? Usually logistics routes return to depot. | |
| # Let's assume Closed Loop (TSP). | |
| route.append(start_index) | |
| return route | |
| def two_opt_improvement(route, dist_matrix): | |
| """ | |
| 2-Opt local search optimization. | |
| """ | |
| best_route = route | |
| improved = True | |
| n = len(route) | |
| # Calculate initial distance | |
| def get_route_distance(r): | |
| d = 0 | |
| for i in range(len(r) - 1): | |
| d += dist_matrix[r[i]][r[i+1]] | |
| return d | |
| best_distance = get_route_distance(route) | |
| while improved: | |
| improved = False | |
| # Swap edges (i, i+1) and (j, j+1) | |
| # Iterate through all possible swaps | |
| for i in range(1, n - 2): | |
| for j in range(i + 1, n - 1): | |
| if j - i == 1: continue # Adjacent edges, no change | |
| # New route: reverse segment from i to j | |
| new_route = best_route[:] | |
| new_route[i:j+1] = best_route[i:j+1][::-1] | |
| new_distance = get_route_distance(new_route) | |
| if new_distance < best_distance: | |
| best_distance = new_distance | |
| best_route = new_route | |
| improved = True | |
| # Safety break for large inputs (though 20-50 nodes is fast) | |
| # In production, we might add a time limit or iteration limit. | |
| return best_route, best_distance | |
| # --- Routes --- | |
| def index(): | |
| return render_template('index.html') | |
| def health(): | |
| return jsonify({"status": "healthy"}), 200 | |
| def optimize_route(): | |
| try: | |
| data = request.json | |
| locations = data.get('locations', []) | |
| if not locations or len(locations) < 2: | |
| return jsonify({"error": "Need at least 2 locations (Depot + 1 Stop)"}), 400 | |
| if len(locations) > 100: # Increased limit slightly | |
| return jsonify({"error": "Max 100 stops allowed for demo performance"}), 400 | |
| # 1. Distance Matrix | |
| matrix = calculate_distance_matrix(locations) | |
| # 2. Initial Solution (NN) | |
| initial_route_indices = solve_tsp_nearest_neighbor(matrix, start_index=0) | |
| # 3. Optimization (2-Opt) | |
| optimized_indices, total_distance = two_opt_improvement(initial_route_indices, matrix) | |
| # 4. Format Result | |
| # Map indices back to location objects | |
| optimized_path = [] | |
| for idx in optimized_indices: | |
| loc = locations[idx].copy() | |
| # Mark if it's start/end | |
| if idx == 0: | |
| loc['type'] = 'depot' | |
| else: | |
| loc['type'] = 'stop' | |
| optimized_path.append(loc) | |
| return jsonify({ | |
| "status": "success", | |
| "total_distance_km": round(total_distance, 2), | |
| "stops_count": len(locations), | |
| "optimized_path": optimized_path, | |
| "original_indices": optimized_indices | |
| }) | |
| except Exception as e: | |
| logger.error(f"Optimization error: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def upload_file(): | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({"error": "No file part"}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({"error": "No selected file"}), 400 | |
| if file and allowed_file(file.filename): | |
| filename = secure_filename(file.filename) | |
| # Null byte check | |
| if '\0' in filename: | |
| return jsonify({"error": "Invalid filename (null byte detected)"}), 400 | |
| try: | |
| # Read file into memory | |
| if filename.endswith('.json'): | |
| data = json.load(file) | |
| locations = data if isinstance(data, list) else data.get('locations', []) | |
| elif filename.endswith(('.xlsx', '.xls')): | |
| df = pd.read_excel(file) | |
| # Expect columns like 'lat', 'lng', optional 'type' | |
| if 'lat' not in df.columns or 'lng' not in df.columns: | |
| return jsonify({"error": "Excel must have 'lat' and 'lng' columns"}), 400 | |
| locations = df.to_dict(orient='records') | |
| elif filename.endswith('.csv'): | |
| df = pd.read_csv(file) | |
| if 'lat' not in df.columns or 'lng' not in df.columns: | |
| return jsonify({"error": "CSV must have 'lat' and 'lng' columns"}), 400 | |
| locations = df.to_dict(orient='records') | |
| else: | |
| return jsonify({"error": "Unsupported file format"}), 400 | |
| # Normalize data | |
| cleaned_locations = [] | |
| for idx, loc in enumerate(locations): | |
| try: | |
| cleaned_locations.append({ | |
| "id": loc.get('id', idx), | |
| "lat": float(loc['lat']), | |
| "lng": float(loc['lng']), | |
| "type": loc.get('type', 'stop') | |
| }) | |
| except (ValueError, TypeError): | |
| continue # Skip invalid rows | |
| if not cleaned_locations: | |
| return jsonify({"error": "No valid locations found in file"}), 400 | |
| return jsonify({ | |
| "status": "success", | |
| "locations": cleaned_locations, | |
| "message": f"Successfully loaded {len(cleaned_locations)} locations" | |
| }) | |
| except Exception as e: | |
| logger.error(f"File processing error: {e}") | |
| return jsonify({"error": f"Failed to process file: {str(e)}"}), 500 | |
| return jsonify({"error": "File type not allowed"}), 400 | |
| except RequestEntityTooLarge: | |
| return jsonify({"error": "File too large (Max 50MB)"}), 413 | |
| except Exception as e: | |
| logger.error(f"Upload error: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def request_entity_too_large(error): | |
| return jsonify({"error": "File too large (Max 50MB)"}), 413 | |
| def internal_error(error): | |
| return jsonify({"error": "Internal Server Error"}), 500 | |
| def not_found(error): | |
| return jsonify({"error": "Resource Not Found"}), 404 | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port) | |