Spaces:
Running
Running
| # src/ingestion/distance.py | |
| from math import radians, sin, cos, sqrt, atan2 | |
| from typing import List, Dict, Any, Optional | |
| EARTH_RADIUS_M = 6371000.0 | |
| def haversine_m(lat1, lon1, lat2, lon2): | |
| """Compute distance in meters between two lat/lon points.""" | |
| if None in (lat1, lon1, lat2, lon2): | |
| return None | |
| phi1, phi2 = radians(lat1), radians(lat2) | |
| dphi = radians(lat2 - lat1) | |
| dlambda = radians(lon2 - lon1) | |
| a = sin(dphi / 2) ** 2 + cos(phi1) * cos(phi2) * sin(dlambda / 2) ** 2 | |
| c = 2 * atan2(sqrt(a), sqrt(1 - a)) | |
| return EARTH_RADIUS_M * c | |
| def compute_total_distance(records: List[Dict[str, Any]]) -> Optional[float]: | |
| """ | |
| Compute total distance for an activity. | |
| Priority: | |
| 1) Use device-reported cumulative distance_m (TCX/FIT) | |
| 2) Reconstruct from GPS coordinates (GPX/TCX lacking distance) | |
| Returns meters or None if unavailable. | |
| """ | |
| if not records: | |
| return None | |
| # 1 β Device-reported cumulative distance | |
| dist_values = [r.get("distance_m") for r in records if r.get("distance_m") is not None] | |
| if len(dist_values) > 1: | |
| return float(dist_values[-1]) | |
| # 2 β Reconstruct from GPS points | |
| total = 0.0 | |
| prev = records[0] | |
| for curr in records[1:]: | |
| d = haversine_m(prev.get("lat"), prev.get("lon"), curr.get("lat"), curr.get("lon")) | |
| if d is not None: | |
| total += d | |
| prev = curr | |
| return total if total > 0 else None | |