avfranco's picture
HF Space deploy snapshot (minimal allow-list)
557ee65
# src/ingestion/features.py
from typing import Dict, Any, List
from collections import defaultdict
import numpy as np
from tools.runner_ai import sec_to_min_km
def running_features(runs: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Extract running features from a list of runs.
Args:
Runs: list of runs
Returns:
Running features dict with:
- per_run summaries (pace_min_per_km, avg_hr)
- time series arrays for charts
- weekly aggregates (mileage)
- consistency score (0-100)
"""
per_run = []
pace_series = []
hr_series = []
dates = []
# weekly miles bucket: iso-week -> meters
weekly = defaultdict(float)
for r in runs:
dist_m = r.get("total_distance_m") or 0.0
dur_s = r.get("total_duration_s") or 0.0
start_time = r.get("start_time")
avg_hr = None
# compute average hr if available
hrs = [rec.get("hr_bpm") for rec in r.get("records", []) if rec.get("hr_bpm") is not None]
if hrs:
avg_hr = int(sum(hrs) / len(hrs))
pace_min_per_km = None
if dist_m and dur_s and dist_m > 0:
pace_min_per_km = (dur_s / 60.0) / (dist_m / 1000.0) # minutes per km
per_run.append(
{
"id": r.get("id"),
"start_time": start_time,
"distance_m": dist_m,
"duration_s": dur_s,
"pace_min_per_km": pace_min_per_km,
"avg_hr": avg_hr,
}
)
if pace_min_per_km is not None:
pace_series.append(pace_min_per_km)
if avg_hr is not None:
hr_series.append(avg_hr)
if start_time:
dates.append(start_time.date())
iso_week = start_time.isocalendar()[0:2] # (year, week)
weekly[iso_week] += dist_m
# compute weekly mileage (km)
weekly_km = {f"{y}-{w}": v / 1000.0 for (y, w), v in weekly.items()}
# consistency: fraction of weeks with >=1 run in last N weeks
consistency_score = 0
if weekly_km:
weeks = sorted(weekly_km.items(), reverse=True)
recent_weeks = weeks[:12] # last 12 weeks
active_weeks = sum(1 for _, km in recent_weeks if km > 0)
consistency_score = int((active_weeks / len(recent_weeks)) * 100) if recent_weeks else 0
# trend: simple linear regression slope on pace_series (if enough points)
pace_trend = None
if len(pace_series) >= 3:
x = np.arange(len(pace_series))
y = np.array(pace_series)
A = np.vstack([x, np.ones(len(x))]).T
m, c = np.linalg.lstsq(A, y, rcond=None)[0]
pace_trend = float(m) # minutes per km per run index
return {
"per_run": per_run,
"pace_series": pace_series,
"hr_series": hr_series,
"dates": dates,
"weekly_km": weekly_km,
"consistency_score": consistency_score,
"pace_trend": pace_trend,
}
def compute_per_run_features(run: Dict[str, Any]) -> Dict[str, Any]:
"""
Compute key features for a single run parsed from GPX/TCX.
Args:
run: dict output from parse_gpx_file or parse_tcx_file
Returns:
features dict
"""
records = run.get("records", [])
# If no records but we have aggregate distance or duration, still allow processing
# so that top-level features can be passed through or simple paces computed.
if not records and not (run.get("total_distance_m") or run.get("total_duration_s")):
return {}
# Total distance
total_distance_m = run.get("total_distance_m")
# Duration
total_duration_s = run.get("total_duration_s")
if not total_duration_s and records[0].get("time") and records[-1].get("time"):
total_duration_s = (records[-1]["time"] - records[0]["time"]).total_seconds()
# Pace in sec/km
avg_pace_s_per_km = None
if total_distance_m and total_distance_m > 0 and total_duration_s:
avg_pace_s_per_km = total_duration_s / (total_distance_m / 1000)
# Format pace as min/km string
avg_pace_min_per_km = None
if avg_pace_s_per_km:
# minutes = int(avg_pace_s_per_km // 60)
# seconds = int(avg_pace_s_per_km % 60)
avg_pace_min_per_km = sec_to_min_km(avg_pace_s_per_km)
# Heart Rate
hr_values = [r["hr_bpm"] for r in records if r.get("hr_bpm") is not None]
avg_hr_bpm = sum(hr_values) / len(hr_values) if hr_values else None
max_hr_bpm = max(hr_values) if hr_values else None
# Cadence
cadence_values = [r["cadence_rpm"] for r in records if r.get("cadence_rpm") is not None]
avg_cadence_rpm = sum(cadence_values) / len(cadence_values) if cadence_values else None
# Elevation gain/loss
elevation_values = [r["altitude_m"] for r in records if r.get("altitude_m") is not None]
elevation_gain_m = 0.0
elevation_loss_m = 0.0
if elevation_values:
for i in range(1, len(elevation_values)):
diff = elevation_values[i] - elevation_values[i - 1]
if diff > 0:
elevation_gain_m += diff
else:
elevation_loss_m += abs(diff)
# Optional: Heart rate zones (example zones)
hr_zones = {}
if avg_hr_bpm:
zones = [(0.5, "Zone1"), (0.6, "Zone2"), (0.7, "Zone3"), (0.8, "Zone4"), (0.9, "Zone5")]
max_hr = max_hr_bpm
for fraction, name in zones:
hr_zones[name] = min(int(max_hr * fraction), max_hr)
features = {
"id": run.get("id"),
"sport": run.get("sport"),
"start_time": run.get("start_time"),
"total_distance_m": total_distance_m,
"total_duration_s": total_duration_s,
"avg_pace_s_per_km": avg_pace_s_per_km,
"avg_pace_min_per_km": avg_pace_min_per_km,
"avg_hr_bpm": avg_hr_bpm,
"max_hr_bpm": max_hr_bpm,
"avg_cadence_rpm": avg_cadence_rpm,
"elevation_gain_m": elevation_gain_m,
"elevation_loss_m": elevation_loss_m,
"hr_zones": hr_zones,
"source_path": run.get("source_path"),
}
return features
def compute_features_batch(runs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Compute features for a batch of runs.
"""
return [compute_per_run_features(r) for r in runs]