# src/ingestion/features.py from typing import Dict, Any, List from collections import defaultdict import numpy as np from tools.runner_ai import sec_to_min_km def running_features(runs: List[Dict[str, Any]]) -> Dict[str, Any]: """Extract running features from a list of runs. Args: Runs: list of runs Returns: Running features dict with: - per_run summaries (pace_min_per_km, avg_hr) - time series arrays for charts - weekly aggregates (mileage) - consistency score (0-100) """ per_run = [] pace_series = [] hr_series = [] dates = [] # weekly miles bucket: iso-week -> meters weekly = defaultdict(float) for r in runs: dist_m = r.get("total_distance_m") or 0.0 dur_s = r.get("total_duration_s") or 0.0 start_time = r.get("start_time") avg_hr = None # compute average hr if available hrs = [rec.get("hr_bpm") for rec in r.get("records", []) if rec.get("hr_bpm") is not None] if hrs: avg_hr = int(sum(hrs) / len(hrs)) pace_min_per_km = None if dist_m and dur_s and dist_m > 0: pace_min_per_km = (dur_s / 60.0) / (dist_m / 1000.0) # minutes per km per_run.append( { "id": r.get("id"), "start_time": start_time, "distance_m": dist_m, "duration_s": dur_s, "pace_min_per_km": pace_min_per_km, "avg_hr": avg_hr, } ) if pace_min_per_km is not None: pace_series.append(pace_min_per_km) if avg_hr is not None: hr_series.append(avg_hr) if start_time: dates.append(start_time.date()) iso_week = start_time.isocalendar()[0:2] # (year, week) weekly[iso_week] += dist_m # compute weekly mileage (km) weekly_km = {f"{y}-{w}": v / 1000.0 for (y, w), v in weekly.items()} # consistency: fraction of weeks with >=1 run in last N weeks consistency_score = 0 if weekly_km: weeks = sorted(weekly_km.items(), reverse=True) recent_weeks = weeks[:12] # last 12 weeks active_weeks = sum(1 for _, km in recent_weeks if km > 0) consistency_score = int((active_weeks / len(recent_weeks)) * 100) if recent_weeks else 0 # trend: simple linear regression slope on pace_series (if enough points) pace_trend = None if len(pace_series) >= 3: x = np.arange(len(pace_series)) y = np.array(pace_series) A = np.vstack([x, np.ones(len(x))]).T m, c = np.linalg.lstsq(A, y, rcond=None)[0] pace_trend = float(m) # minutes per km per run index return { "per_run": per_run, "pace_series": pace_series, "hr_series": hr_series, "dates": dates, "weekly_km": weekly_km, "consistency_score": consistency_score, "pace_trend": pace_trend, } def compute_per_run_features(run: Dict[str, Any]) -> Dict[str, Any]: """ Compute key features for a single run parsed from GPX/TCX. Args: run: dict output from parse_gpx_file or parse_tcx_file Returns: features dict """ records = run.get("records", []) # If no records but we have aggregate distance or duration, still allow processing # so that top-level features can be passed through or simple paces computed. if not records and not (run.get("total_distance_m") or run.get("total_duration_s")): return {} # Total distance total_distance_m = run.get("total_distance_m") # Duration total_duration_s = run.get("total_duration_s") if not total_duration_s and records[0].get("time") and records[-1].get("time"): total_duration_s = (records[-1]["time"] - records[0]["time"]).total_seconds() # Pace in sec/km avg_pace_s_per_km = None if total_distance_m and total_distance_m > 0 and total_duration_s: avg_pace_s_per_km = total_duration_s / (total_distance_m / 1000) # Format pace as min/km string avg_pace_min_per_km = None if avg_pace_s_per_km: # minutes = int(avg_pace_s_per_km // 60) # seconds = int(avg_pace_s_per_km % 60) avg_pace_min_per_km = sec_to_min_km(avg_pace_s_per_km) # Heart Rate hr_values = [r["hr_bpm"] for r in records if r.get("hr_bpm") is not None] avg_hr_bpm = sum(hr_values) / len(hr_values) if hr_values else None max_hr_bpm = max(hr_values) if hr_values else None # Cadence cadence_values = [r["cadence_rpm"] for r in records if r.get("cadence_rpm") is not None] avg_cadence_rpm = sum(cadence_values) / len(cadence_values) if cadence_values else None # Elevation gain/loss elevation_values = [r["altitude_m"] for r in records if r.get("altitude_m") is not None] elevation_gain_m = 0.0 elevation_loss_m = 0.0 if elevation_values: for i in range(1, len(elevation_values)): diff = elevation_values[i] - elevation_values[i - 1] if diff > 0: elevation_gain_m += diff else: elevation_loss_m += abs(diff) # Optional: Heart rate zones (example zones) hr_zones = {} if avg_hr_bpm: zones = [(0.5, "Zone1"), (0.6, "Zone2"), (0.7, "Zone3"), (0.8, "Zone4"), (0.9, "Zone5")] max_hr = max_hr_bpm for fraction, name in zones: hr_zones[name] = min(int(max_hr * fraction), max_hr) features = { "id": run.get("id"), "sport": run.get("sport"), "start_time": run.get("start_time"), "total_distance_m": total_distance_m, "total_duration_s": total_duration_s, "avg_pace_s_per_km": avg_pace_s_per_km, "avg_pace_min_per_km": avg_pace_min_per_km, "avg_hr_bpm": avg_hr_bpm, "max_hr_bpm": max_hr_bpm, "avg_cadence_rpm": avg_cadence_rpm, "elevation_gain_m": elevation_gain_m, "elevation_loss_m": elevation_loss_m, "hr_zones": hr_zones, "source_path": run.get("source_path"), } return features def compute_features_batch(runs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Compute features for a batch of runs. """ return [compute_per_run_features(r) for r in runs]