import os import math import json import requests import gpxpy def haversine(lat1, lon1, lat2, lon2): """Calculate the great-circle distance between two points on the Earth in meters.""" R = 6371000.0 # Radius of Earth in meters phi1 = math.radians(lat1) phi2 = math.radians(lat2) delta_phi = math.radians(lat2 - lat1) delta_lambda = math.radians(lon2 - lon1) a = math.sin(delta_phi / 2.0)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2.0)**2 c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a)) return R * c def fetch_elevations_open_meteo(coords): """ Fetch elevation coordinates in batches of 100 from the Open-Meteo elevation API. Returns a list of floats representing elevation in meters. """ elevations = [] batch_size = 100 for i in range(0, len(coords), batch_size): batch = coords[i:i+batch_size] lats = ",".join(f"{c[0]:.6f}" for c in batch) lons = ",".join(f"{c[1]:.6f}" for c in batch) url = f"https://api.open-meteo.com/v1/elevation?latitude={lats}&longitude={lons}" try: print(f"[gpx_parser] Fetching elevation batch {i//batch_size + 1}...") response = requests.get(url, timeout=10) if response.status_code == 200: data = response.json() batch_elevations = data.get("elevation", []) if len(batch_elevations) == len(batch): elevations.extend(batch_elevations) else: print("[gpx_parser] Elevation list size mismatch. Filling with 0.0") elevations.extend([0.0] * len(batch)) else: print(f"[gpx_parser] API error {response.status_code}. Using 0.0 for batch.") elevations.extend([0.0] * len(batch)) except Exception as e: print(f"[gpx_parser] Network/parsing exception: {e}. Using 0.0 for batch.") elevations.extend([0.0] * len(batch)) return elevations def smooth_elevations(elevations, window_size=5): """Apply a simple moving average window to smooth out elevation profile data.""" if not elevations: return [] smoothed = [] for i in range(len(elevations)): start = max(0, i - window_size // 2) end = min(len(elevations), i + window_size // 2 + 1) window = elevations[start:end] smoothed.append(sum(window) / len(window)) return smoothed def calculate_elevation_gain_loss(elevations, threshold=2.0): """ Calculate cumulative elevation gain and loss in meters. Filters out noise using a threshold value (minimum elevation delta). """ gain = 0.0 loss = 0.0 if len(elevations) < 2: return gain, loss last_val = elevations[0] for val in elevations[1:]: diff = val - last_val if abs(diff) >= threshold: if diff > 0: gain += diff else: loss += abs(diff) last_val = val return gain, loss def fetch_overpass_pois(min_lat, min_lon, max_lat, max_lon): """ Fetch POIs (water, spring, huts, camps, shelter, viewpoint, peak, phone) from Overpass API in the bounding box. """ url = "https://overpass-api.de/api/interpreter" query = f""" [out:json][timeout:25]; ( node["amenity"="drinking_water"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["natural"="spring"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["amenity"="water_point"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["amenity"="fountain"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["tourism"="alpine_hut"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["tourism"="wilderness_hut"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["tourism"="camp_site"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["amenity"="shelter"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["tourism"="viewpoint"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["natural"="peak"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); node["amenity"="phone"]({min_lat:.5f},{min_lon:.5f},{max_lat:.5f},{max_lon:.5f}); ); out body; """ headers = { 'User-Agent': 'TrailheadTrekPlanner/1.0 (skushwaha@hckthn.com)' } try: print(f"[gpx_parser] Querying Overpass API for POIs in bbox: [{min_lat:.5f}, {min_lon:.5f}, {max_lat:.5f}, {max_lon:.5f}]...") response = requests.get(url, params={'data': query}, headers=headers, timeout=25) if response.status_code == 200: data = response.json() elements = data.get("elements", []) pois = [] for el in elements: lat = el.get("lat") lon = el.get("lon") tags = el.get("tags", {}) # Determine type poi_type = "unknown" if "amenity" in tags: poi_type = tags["amenity"] elif "natural" in tags: poi_type = tags["natural"] elif "tourism" in tags: poi_type = tags["tourism"] name = tags.get("name", tags.get("water", poi_type.replace("_", " ").title())) pois.append({ "id": el.get("id"), "lat": lat, "lon": lon, "type": poi_type, "name": name }) print(f"[gpx_parser] Overpass returned {len(pois)} raw POIs.") return pois else: print(f"[gpx_parser] Overpass API returned status code {response.status_code}: {response.text}") return [] except Exception as e: print(f"[gpx_parser] Overpass query failed: {e}") return [] def filter_pois_near_track(points, pois, buffer_meters=150.0): """ Filter POIs that are within buffer_meters of the track. Returns list of POIs with distance and closest track point index. """ enhanced_pois = [] if not points or not pois: return enhanced_pois for poi in pois: min_dist = float('inf') closest_idx = -1 for idx, pt in enumerate(points): d = haversine(poi["lat"], poi["lon"], pt["lat"], pt["lon"]) if d < min_dist: min_dist = d closest_idx = idx if min_dist <= buffer_meters: enhanced_pois.append({ "id": poi.get("id", 0), "lat": poi["lat"], "lon": poi["lon"], "type": poi["type"], "name": poi["name"], "distance": round(min_dist, 1), "track_index": closest_idx }) print(f"[gpx_parser] Filtered {len(enhanced_pois)} POIs within {buffer_meters}m buffer.") return enhanced_pois def extract_pois_from_gpx(gpx): """ Extract POIs from GPX waypoints and track point extensions. Returns a list of POI dictionaries. """ pois = [] # 1. Parse from waypoints for wpt in gpx.waypoints: desc = wpt.description or "" poi_type = "unknown" if "Type: " in desc: parts = desc.split(",") poi_type = parts[0].replace("Type: ", "").strip() elif wpt.name: # guess type from name/attributes name_l = wpt.name.lower() if "water" in name_l or "spring" in name_l or "fountain" in name_l: poi_type = "drinking_water" elif "camp" in name_l: poi_type = "camp_site" elif "hut" in name_l or "refuge" in name_l: poi_type = "alpine_hut" elif "shelter" in name_l: poi_type = "shelter" pois.append({ "lat": wpt.latitude, "lon": wpt.longitude, "name": wpt.name or "Waypoint", "type": poi_type, "distance": 0.0 }) # 2. Parse from track point extensions idx = 0 for track in gpx.tracks: for segment in track.segments: for pt in segment.points: if pt.extensions: for ext in pt.extensions: tag_name = ext.tag if hasattr(ext, 'tag') else '' if 'poi' in tag_name: poi_type = ext.attrib.get('type', 'unknown') poi_name = ext.attrib.get('name', 'Waypoint') try: dist = float(ext.attrib.get('distance', 0.0)) except ValueError: dist = 0.0 pois.append({ "lat": pt.latitude, "lon": pt.longitude, "name": poi_name, "type": poi_type, "distance": dist, "track_index": idx }) idx += 1 return pois def save_enhanced_gpx(original_gpx_path, output_gpx_path, pois): """ Save enhanced GPX file with POIs loaded as waypoints and extensions. """ with open(original_gpx_path, "r", encoding="utf-8") as f: gpx = gpxpy.parse(f) # Overwrite waypoints gpx.waypoints = [] for poi in pois: wpt = gpxpy.gpx.GPXWaypoint(latitude=poi['lat'], longitude=poi['lon'], name=poi['name']) wpt.description = f"Type: {poi['type']}, Distance from track: {poi['distance']:.1f}m" gpx.waypoints.append(wpt) # Add extensions to trackpoints points = [] for track in gpx.tracks: for segment in track.segments: points.extend(segment.points) import xml.etree.ElementTree as ET for poi in pois: idx = poi.get('track_index') if idx is not None and 0 <= idx < len(points): pt = points[idx] # Create sub-element under extensions poi_el = ET.Element('poi', type=poi['type'], name=poi['name'], distance=f"{poi['distance']:.1f}") pt.extensions.append(poi_el) with open(output_gpx_path, "w", encoding="utf-8") as f: f.write(gpx.to_xml()) print(f"[gpx_parser] Saved enhanced GPX with {len(pois)} POIs to {output_gpx_path}") def parse_gpx_file(file_path, cache_dir="./temp", buffer_meters=150.0): """ Parse a GPX file, fetch missing elevations, smooth the profile, and compute trek statistics. Caches results locally to allow offline usage. """ # Create cache directory if needed os.makedirs(cache_dir, exist_ok=True) # Check cache first file_name = os.path.basename(file_path) cache_path = os.path.join(cache_dir, f"{file_name}.cache.json") if os.path.exists(cache_path): try: with open(cache_path, "r", encoding="utf-8") as f: print(f"[gpx_parser] Loading cached GPX data from {cache_path}") return json.load(f) except Exception as e: print(f"[gpx_parser] Cache read error: {e}, parsing raw file...") print(f"[gpx_parser] Parsing raw GPX file: {file_path}") with open(file_path, "r", encoding="utf-8") as f: gpx = gpxpy.parse(f) # Extract track points points_raw = [] for track in gpx.tracks: for segment in track.segments: for pt in segment.points: points_raw.append({ "lat": pt.latitude, "lon": pt.longitude, "ele": pt.elevation }) # If GPX had no track points, look in waypoints or route points if not points_raw: for route in gpx.routes: for pt in route.points: points_raw.append({ "lat": pt.latitude, "lon": pt.longitude, "ele": pt.elevation }) # Still empty? Check waypoints if not points_raw and gpx.waypoints: for wpt in gpx.waypoints: points_raw.append({ "lat": wpt.latitude, "lon": wpt.longitude, "ele": wpt.elevation }) if not points_raw: raise ValueError("No trackpoints, routepoints, or waypoints found in GPX file.") # Check if elevations are missing (all None or 0.0) has_elevation = any(pt["ele"] is not None for pt in points_raw) if not has_elevation: print("[gpx_parser] GPX has no elevation data. Fetching from Open-Meteo elevation API...") coords = [(pt["lat"], pt["lon"]) for pt in points_raw] elevations = fetch_elevations_open_meteo(coords) for i, ele in enumerate(elevations): points_raw[i]["ele"] = ele else: # Fill in any scattered missing elevations for pt in points_raw: if pt["ele"] is None: pt["ele"] = 0.0 # Smooth elevations raw_elevations = [pt["ele"] for pt in points_raw] smoothed_eles = smooth_elevations(raw_elevations) for i, ele in enumerate(smoothed_eles): points_raw[i]["ele"] = ele # Calculate cumulative distances (in meters) and build final points list points_data = [] cum_dist = 0.0 points_data.append({ "lat": points_raw[0]["lat"], "lon": points_raw[0]["lon"], "ele": points_raw[0]["ele"], "cum_dist": 0.0 }) for i in range(1, len(points_raw)): p1 = points_raw[i-1] p2 = points_raw[i] d = haversine(p1["lat"], p1["lon"], p2["lat"], p2["lon"]) cum_dist += d points_data.append({ "lat": p2["lat"], "lon": p2["lon"], "ele": p2["ele"], "cum_dist": cum_dist }) # Calculate statistics total_distance_m = cum_dist total_distance_km = total_distance_m / 1000.0 gain, loss = calculate_elevation_gain_loss(smoothed_eles) min_ele = min(smoothed_eles) if smoothed_eles else 0.0 max_ele = max(smoothed_eles) if smoothed_eles else 0.0 # Naismith's Rule: 5 km/h base speed + 1 hour per 600m ascent naismith_hours = (total_distance_km / 5.0) + (gain / 600.0) estimated_days = max(1.0, naismith_hours / 8.0) # Pre-parse waypoints if they exist in GPX waypoints = [] for wpt in gpx.waypoints: waypoints.append({ "name": wpt.name or "Waypoint", "lat": wpt.latitude, "lon": wpt.longitude, "ele": wpt.elevation or 0.0, "desc": wpt.description or "" }) # Generate checkpoints checkpoints = [] if waypoints: for wpt in waypoints: min_d = float('inf') closest_pt = points_data[0] for pt in points_data: d = haversine(wpt["lat"], wpt["lon"], pt["lat"], pt["lon"]) if d < min_d: min_d = d closest_pt = pt checkpoints.append({ "name": wpt["name"], "lat": wpt["lat"], "lon": wpt["lon"], "ele": closest_pt["ele"], "cum_dist": closest_pt["cum_dist"] / 1000.0 }) checkpoints.sort(key=lambda c: c["cum_dist"]) else: checkpoints = generate_checkpoints(points_data, interval_meters=1000.0) # Parse existing POIs from GPX pois = extract_pois_from_gpx(gpx) # If no POIs exist (like raw user upload), fetch from Overpass API (planning mode online) if not pois: lats = [pt["lat"] for pt in points_data] lons = [pt["lon"] for pt in points_data] min_lat, max_lat = min(lats) - 0.002, max(lats) + 0.002 min_lon, max_lon = min(lons) - 0.002, max(lons) + 0.002 raw_pois = fetch_overpass_pois(min_lat, min_lon, max_lat, max_lon) pois = filter_pois_near_track(points_data, raw_pois, buffer_meters) result = { "file_name": file_name, "total_distance_km": round(total_distance_km, 2), "elevation_gain_m": round(gain, 1), "elevation_loss_m": round(loss, 1), "min_elevation_m": round(min_ele, 1), "max_elevation_m": round(max_ele, 1), "estimated_days": round(estimated_days, 1), "naismith_hours": round(naismith_hours, 1), "points": points_data, "checkpoints": checkpoints, "pois": pois } # Save cache try: with open(cache_path, "w", encoding="utf-8") as f: json.dump(result, f, indent=2) print(f"[gpx_parser] Saved parsed GPX data cache to {cache_path}") except Exception as e: print(f"[gpx_parser] Cache write error: {e}") # Start offline map tiles pre-fetching in background try: start_tile_download(result) except Exception as e: print(f"[gpx_parser] Error starting background tile download: {e}") return result def generate_checkpoints(points_data, interval_meters=1000.0): """Helper to partition track into regular distance checkpoints.""" if not points_data: return [] checkpoints = [] start_pt = points_data[0] checkpoints.append({ "name": "Start", "lat": start_pt["lat"], "lon": start_pt["lon"], "ele": start_pt["ele"], "cum_dist": 0.0 }) total_dist = points_data[-1]["cum_dist"] next_checkpoint_dist = interval_meters pt_idx = 1 while next_checkpoint_dist < total_dist: while pt_idx < len(points_data) and points_data[pt_idx]["cum_dist"] < next_checkpoint_dist: pt_idx += 1 if pt_idx >= len(points_data): break p1 = points_data[pt_idx - 1] p2 = points_data[pt_idx] if abs(p1["cum_dist"] - next_checkpoint_dist) < abs(p2["cum_dist"] - next_checkpoint_dist): chosen = p1 else: chosen = p2 checkpoints.append({ "name": f"Km {next_checkpoint_dist / 1000.0:.1f}", "lat": chosen["lat"], "lon": chosen["lon"], "ele": chosen["ele"], "cum_dist": round(chosen["cum_dist"] / 1000.0, 2) }) next_checkpoint_dist += interval_meters end_pt = points_data[-1] if len(checkpoints) == 1 or (total_dist / 1000.0 - checkpoints[-1]["cum_dist"]) > 0.1: checkpoints.append({ "name": "End", "lat": end_pt["lat"], "lon": end_pt["lon"], "ele": end_pt["ele"], "cum_dist": round(total_dist / 1000.0, 2) }) return checkpoints def deg2num(lat_deg, lon_deg, zoom): """Convert latitude and longitude to OSM tile X and Y coordinates at a given zoom level.""" lat_rad = math.radians(lat_deg) n = 2.0 ** zoom xtile = int((lon_deg + 180.0) / 360.0 * n) ytile = int((1.0 - math.log(math.tan(lat_rad) + (1.0 / math.cos(lat_rad))) / math.pi) / 2.0 * n) return (xtile, ytile) def download_tiles_for_bbox(min_lat, min_lon, max_lat, max_lon, output_dir="./assets/tiles", max_tiles=120): """ Download OSM map tiles for a given bounding box at zoom levels 13 to 16. Restricts zoom levels if the bounding box covers too many tiles. """ import os import requests import time os.makedirs(output_dir, exist_ok=True) zooms = [13, 14, 15, 16] # Calculate total tiles across zoom levels tile_requests = [] for zoom in zooms: x1, y1 = deg2num(max_lat, min_lon, zoom) x2, y2 = deg2num(min_lat, max_lon, zoom) x_start, x_end = min(x1, x2), max(x1, x2) y_start, y_end = min(y1, y2), max(y1, y2) for x in range(x_start, x_end + 1): for y in range(y_start, y_end + 1): tile_requests.append((zoom, x, y)) total_tiles = len(tile_requests) print(f"[tiles] Bounding box requires {total_tiles} tiles across zoom levels 13-16.") if total_tiles > max_tiles: print(f"[tiles] Bounding box too large ({total_tiles} > {max_tiles}). Restricting to zoom 13-15.") zooms = [13, 14, 15] tile_requests = [] for zoom in zooms: x1, y1 = deg2num(max_lat, min_lon, zoom) x2, y2 = deg2num(min_lat, max_lon, zoom) x_start, x_end = min(x1, x2), max(x1, x2) y_start, y_end = min(y1, y2), max(y1, y2) for x in range(x_start, x_end + 1): for y in range(y_start, y_end + 1): tile_requests.append((zoom, x, y)) total_tiles = len(tile_requests) print(f"[tiles] Bounding box now requires {total_tiles} tiles.") headers = { 'User-Agent': 'TrailheadTrekPlanner/1.0 (skushwaha@hckthn.com)' } downloaded = 0 skipped = 0 for zoom, x, y in tile_requests: tile_dir = os.path.join(output_dir, str(zoom), str(x)) os.makedirs(tile_dir, exist_ok=True) tile_path = os.path.join(tile_dir, f"{y}.png") if os.path.exists(tile_path): skipped += 1 continue url = f"https://tile.openstreetmap.org/{zoom}/{x}/{y}.png" try: response = requests.get(url, headers=headers, timeout=5) if response.status_code == 200: with open(tile_path, "wb") as f: f.write(response.content) downloaded += 1 # Small sleep to respect OSM servers usage policy time.sleep(0.05) else: print(f"[tiles] Failed to download tile {zoom}/{x}/{y}: HTTP {response.status_code}") except Exception as e: print(f"[tiles] Exception downloading tile {zoom}/{x}/{y}: {e}") print(f"[tiles] Finished tile sync: downloaded {downloaded}, cached {skipped} (Total: {total_tiles})") return downloaded, skipped, total_tiles def start_tile_download(data): """Trigger the offline tile downloading in a background thread.""" import threading points = data.get("points", []) if not points: return lats = [pt["lat"] for pt in points] lons = [pt["lon"] for pt in points] min_lat, max_lat = min(lats), max(lats) min_lon, max_lon = min(lons), max(lons) # Buffer coordinates slightly to ensure surrounding area is fully covered min_lat -= 0.005 max_lat += 0.005 min_lon -= 0.005 max_lon += 0.005 t = threading.Thread(target=download_tiles_for_bbox, args=(min_lat, min_lon, max_lat, max_lon)) t.daemon = True t.start() print("[tiles] Started background thread to sync offline tiles.")