""" Simple inference module for WHOOP Overtraining Detection. This module provides an easy-to-use interface for detecting overtraining patterns using the pre-trained autoencoder model. Usage: from inference import OvertainingPredictor predictor = OvertainingPredictor.from_pretrained("iserrano0511/whoop-overtraining-detector") results = predictor.predict_from_csv("your_data.csv") """ import json from pathlib import Path from typing import Dict, List, Optional, Union from dataclasses import dataclass import numpy as np import pandas as pd import torch import torch.nn as nn # ============================================================================ # MODEL ARCHITECTURE (self-contained) # ============================================================================ class ConvAutoencoder(nn.Module): """1D Convolutional autoencoder for sequence anomaly detection.""" def __init__(self, input_size: int, seq_len: int, latent_size: int = 32, dropout: float = 0.2): super().__init__() self.input_size = input_size self.seq_len = seq_len self.latent_size = latent_size # Encoder self.encoder = nn.Sequential( nn.Conv1d(input_size, 64, kernel_size=3, padding=1), nn.BatchNorm1d(64), nn.ReLU(), nn.Dropout(dropout), nn.Conv1d(64, 32, kernel_size=3, padding=1), nn.BatchNorm1d(32), nn.ReLU(), nn.Dropout(dropout), nn.Conv1d(32, 16, kernel_size=3, padding=1), nn.BatchNorm1d(16), nn.ReLU(), ) self.flatten_size = 16 * seq_len self.encoder_fc = nn.Linear(self.flatten_size, latent_size) # Decoder self.decoder_fc = nn.Linear(latent_size, self.flatten_size) self.decoder = nn.Sequential( nn.Conv1d(16, 32, kernel_size=3, padding=1), nn.BatchNorm1d(32), nn.ReLU(), nn.Dropout(dropout), nn.Conv1d(32, 64, kernel_size=3, padding=1), nn.BatchNorm1d(64), nn.ReLU(), nn.Dropout(dropout), nn.Conv1d(64, input_size, kernel_size=3, padding=1), ) def encode(self, x: torch.Tensor) -> torch.Tensor: x = x.permute(0, 2, 1) # (batch, seq, features) -> (batch, features, seq) x = self.encoder(x) x = x.view(x.size(0), -1) return self.encoder_fc(x) def decode(self, z: torch.Tensor) -> torch.Tensor: x = self.decoder_fc(z) x = x.view(x.size(0), 16, self.seq_len) x = self.decoder(x) return x.permute(0, 2, 1) # Back to (batch, seq, features) def forward(self, x: torch.Tensor): z = self.encode(x) return self.decode(z), z # ============================================================================ # DATA CLASSES # ============================================================================ @dataclass class PredictionResult: """Result of overtraining prediction for a single sample.""" date: str user_id: str reconstruction_error: float is_anomaly: bool anomaly_score: float risk_level: str # 'normal', 'elevated', 'high', 'critical' recovery_score: Optional[float] = None top_features: Optional[List[str]] = None # ============================================================================ # FEATURE ENGINEERING # ============================================================================ FEATURE_COLUMNS = [ 'hrv', 'resting_heart_rate', 'respiratory_rate', 'skin_temp_deviation', 'hrv_deviation', 'rhr_deviation', 'hrv_pct_baseline', 'rhr_pct_baseline', 'sleep_hours', 'sleep_efficiency', 'deep_sleep_ratio', 'rem_sleep_ratio', 'sleep_debt_indicator', 'wake_ups', 'time_to_fall_asleep_min', 'day_strain', 'recovery_score', 'strain_recovery_ratio', 'hrv_rolling_3d', 'hrv_rolling_7d', 'resting_heart_rate_rolling_3d', 'resting_heart_rate_rolling_7d', 'day_strain_rolling_3d', 'day_strain_rolling_7d', 'recovery_score_rolling_3d', 'recovery_score_rolling_7d', 'sleep_efficiency_rolling_3d', 'sleep_efficiency_rolling_7d', 'cumulative_strain_3d', 'cumulative_strain_7d', 'hrv_change', 'rhr_change', 'recovery_change', ] def engineer_features(df: pd.DataFrame) -> pd.DataFrame: """Create derived features for overtraining detection.""" df = df.copy() df['date'] = pd.to_datetime(df['date']) df = df.sort_values(['user_id', 'date']).reset_index(drop=True) # Baseline deviations df['hrv_deviation'] = df['hrv'] - df['hrv_baseline'] df['hrv_pct_baseline'] = (df['hrv'] / df['hrv_baseline']) * 100 df['rhr_deviation'] = df['resting_heart_rate'] - df['rhr_baseline'] df['rhr_pct_baseline'] = (df['resting_heart_rate'] / df['rhr_baseline']) * 100 # Sleep indicators df['sleep_debt_indicator'] = df['sleep_hours'] * (1 - df['sleep_efficiency'] / 100) df['deep_sleep_ratio'] = df['deep_sleep_hours'] / df['sleep_hours'].clip(lower=0.1) df['rem_sleep_ratio'] = df['rem_sleep_hours'] / df['sleep_hours'].clip(lower=0.1) # Rolling averages for col in ['hrv', 'resting_heart_rate', 'day_strain', 'recovery_score', 'sleep_efficiency']: df[f'{col}_rolling_3d'] = df.groupby('user_id')[col].transform( lambda x: x.rolling(3, min_periods=1).mean()) df[f'{col}_rolling_7d'] = df.groupby('user_id')[col].transform( lambda x: x.rolling(7, min_periods=1).mean()) # Cumulative strain df['cumulative_strain_3d'] = df.groupby('user_id')['day_strain'].transform( lambda x: x.rolling(3, min_periods=1).sum()) df['cumulative_strain_7d'] = df.groupby('user_id')['day_strain'].transform( lambda x: x.rolling(7, min_periods=1).sum()) # Strain-to-recovery ratio df['strain_recovery_ratio'] = df['day_strain'] / df['recovery_score'].clip(lower=1) # Day-over-day changes df['hrv_change'] = df.groupby('user_id')['hrv'].diff() df['rhr_change'] = df.groupby('user_id')['resting_heart_rate'].diff() df['recovery_change'] = df.groupby('user_id')['recovery_score'].diff() df = df.bfill().ffill().fillna(0) return df # ============================================================================ # PREDICTOR CLASS # ============================================================================ class OvertainingPredictor: """ Easy-to-use predictor for overtraining detection. Example: predictor = OvertainingPredictor.from_pretrained("iserrano0511/whoop-overtraining-detector") results = predictor.predict_from_csv("data.csv") for r in results: if r.is_anomaly: print(f"{r.date}: {r.risk_level} risk (score: {r.anomaly_score:.2f})") """ def __init__(self, model: nn.Module, config: dict, scaler): self.model = model self.config = config self.scaler = scaler self.device = next(model.parameters()).device self.seq_len = config.get('seq_len', 7) self.threshold = config.get('threshold', 0.5) self.train_error_mean = config.get('train_error_mean', 0.2) self.train_error_std = config.get('train_error_std', 0.1) @classmethod def from_pretrained(cls, repo_id: str, device: str = 'auto') -> 'OvertainingPredictor': """ Load predictor from Hugging Face Hub. Args: repo_id: Hugging Face repo ID (e.g., "iserrano0511/whoop-overtraining-detector") device: 'cuda', 'cpu', or 'auto' """ try: from huggingface_hub import hf_hub_download except ImportError: raise ImportError("Please install huggingface_hub: pip install huggingface_hub") try: import joblib except ImportError: raise ImportError("Please install joblib: pip install joblib") # Determine device if device == 'auto': device = 'cuda' if torch.cuda.is_available() else 'cpu' device = torch.device(device) # Download files print(f"Downloading model from {repo_id}...") model_path = hf_hub_download(repo_id=repo_id, filename="model.pt") scaler_path = hf_hub_download(repo_id=repo_id, filename="scaler.pkl") config_path = hf_hub_download(repo_id=repo_id, filename="config.json") # Load config with open(config_path, 'r') as f: config = json.load(f) # Load model checkpoint = torch.load(model_path, map_location=device, weights_only=False) model = ConvAutoencoder( input_size=config['input_size'], seq_len=config['seq_len'], latent_size=config['latent_size'], dropout=config.get('dropout', 0.2) ) model.load_state_dict(checkpoint['model_state_dict']) model = model.to(device) model.eval() # Load scaler scaler = joblib.load(scaler_path) print(f"Model loaded successfully on {device}") return cls(model, config, scaler) @classmethod def from_local(cls, model_path: str, scaler_path: str, config_path: str, device: str = 'auto') -> 'OvertainingPredictor': """Load predictor from local files.""" import joblib if device == 'auto': device = 'cuda' if torch.cuda.is_available() else 'cpu' device = torch.device(device) with open(config_path, 'r') as f: config = json.load(f) checkpoint = torch.load(model_path, map_location=device, weights_only=False) model = ConvAutoencoder( input_size=config['input_size'], seq_len=config['seq_len'], latent_size=config['latent_size'], dropout=config.get('dropout', 0.2) ) model.load_state_dict(checkpoint['model_state_dict']) model = model.to(device) model.eval() scaler = joblib.load(scaler_path) return cls(model, config, scaler) def _get_risk_level(self, score: float) -> str: if score < 1.5: return 'normal' elif score < 2.5: return 'elevated' elif score < 3.5: return 'high' return 'critical' def predict(self, data: np.ndarray, user_id: str = 'unknown', date: str = 'unknown', recovery_score: float = None) -> PredictionResult: """ Predict overtraining risk for a single sequence. Args: data: Scaled data of shape (seq_len, n_features) user_id: User identifier date: Date string recovery_score: Optional recovery score Returns: PredictionResult with risk assessment """ if len(data.shape) == 2: data = data.reshape(1, *data.shape) x = torch.FloatTensor(data).to(self.device) with torch.no_grad(): x_recon, _ = self.model(x) error = ((x - x_recon) ** 2).mean().item() anomaly_score = (error - self.train_error_mean) / (self.train_error_std + 1e-8) return PredictionResult( date=str(date), user_id=str(user_id), reconstruction_error=error, is_anomaly=error > self.threshold, anomaly_score=anomaly_score, risk_level=self._get_risk_level(anomaly_score), recovery_score=recovery_score, ) def predict_from_csv(self, csv_path: str, user_id: Optional[str] = None) -> List[PredictionResult]: """ Predict overtraining risk from a CSV file. Args: csv_path: Path to CSV with WHOOP data user_id: Optional filter for specific user Returns: List of PredictionResult objects """ df = pd.read_csv(csv_path) df = engineer_features(df) if user_id: df = df[df['user_id'] == user_id] results = [] for uid in df['user_id'].unique(): user_data = df[df['user_id'] == uid].sort_values('date') if len(user_data) < self.seq_len: continue for i in range(len(user_data) - self.seq_len + 1): seq = user_data.iloc[i:i + self.seq_len] data = seq[FEATURE_COLUMNS].values.astype(np.float32) data_scaled = self.scaler.transform(data) result = self.predict( data_scaled, user_id=uid, date=str(seq.iloc[-1]['date']), recovery_score=seq.iloc[-1]['recovery_score'] ) results.append(result) return results def predict_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: """ Predict and return results as a DataFrame. Args: df: DataFrame with WHOOP data (will be feature-engineered) Returns: DataFrame with predictions """ results = [] df = engineer_features(df) for uid in df['user_id'].unique(): user_data = df[df['user_id'] == uid].sort_values('date') if len(user_data) < self.seq_len: continue for i in range(len(user_data) - self.seq_len + 1): seq = user_data.iloc[i:i + self.seq_len] data = seq[FEATURE_COLUMNS].values.astype(np.float32) data_scaled = self.scaler.transform(data) result = self.predict( data_scaled, user_id=uid, date=str(seq.iloc[-1]['date']), recovery_score=seq.iloc[-1]['recovery_score'] ) results.append({ 'date': result.date, 'user_id': result.user_id, 'reconstruction_error': result.reconstruction_error, 'is_anomaly': result.is_anomaly, 'anomaly_score': result.anomaly_score, 'risk_level': result.risk_level, 'recovery_score': result.recovery_score, }) return pd.DataFrame(results) def summary(self, results: List[PredictionResult]) -> str: """Generate a text summary of predictions.""" if not results: return "No results to summarize." anomalies = [r for r in results if r.is_anomaly] risk_counts = {'normal': 0, 'elevated': 0, 'high': 0, 'critical': 0} for r in results: risk_counts[r.risk_level] += 1 lines = [ "=" * 50, "OVERTRAINING DETECTION SUMMARY", "=" * 50, f"Total samples analyzed: {len(results)}", f"Anomalies detected: {len(anomalies)} ({len(anomalies)/len(results)*100:.1f}%)", "", "Risk Level Distribution:", ] for level, count in risk_counts.items(): pct = count / len(results) * 100 bar = '#' * int(pct / 2) lines.append(f" {level:10}: {count:5} ({pct:5.1f}%) {bar}") if anomalies: lines.extend(["", "Top Anomalies:", "-" * 30]) top = sorted(anomalies, key=lambda x: x.anomaly_score, reverse=True)[:5] for i, r in enumerate(top, 1): lines.append(f"{i}. {r.date} | {r.risk_level.upper()} | Score: {r.anomaly_score:.2f}") lines.append("=" * 50) return "\n".join(lines) # ============================================================================ # CLI # ============================================================================ def main(): import argparse parser = argparse.ArgumentParser(description='WHOOP Overtraining Detection') parser.add_argument('--data', type=str, required=True, help='Path to CSV data file') parser.add_argument('--repo', type=str, default='iserrano0511/whoop-overtraining-detector', help='Hugging Face repo ID') parser.add_argument('--user', type=str, default=None, help='Filter by user ID') parser.add_argument('--output', type=str, default=None, help='Output CSV path') args = parser.parse_args() predictor = OvertainingPredictor.from_pretrained(args.repo) results = predictor.predict_from_csv(args.data, user_id=args.user) print(predictor.summary(results)) if args.output: df = pd.DataFrame([{ 'date': r.date, 'user_id': r.user_id, 'reconstruction_error': r.reconstruction_error, 'is_anomaly': r.is_anomaly, 'anomaly_score': r.anomaly_score, 'risk_level': r.risk_level, 'recovery_score': r.recovery_score, } for r in results]) df.to_csv(args.output, index=False) print(f"\nResults saved to {args.output}") if __name__ == "__main__": main()