| | """ |
| | Simple inference module for WHOOP Overtraining Detection. |
| | |
| | This module provides an easy-to-use interface for detecting overtraining |
| | patterns using the pre-trained autoencoder model. |
| | |
| | Usage: |
| | from inference import OvertainingPredictor |
| | |
| | predictor = OvertainingPredictor.from_pretrained("iserrano0511/whoop-overtraining-detector") |
| | results = predictor.predict_from_csv("your_data.csv") |
| | """ |
| |
|
| | import json |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Union |
| | from dataclasses import dataclass |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | import torch |
| | import torch.nn as nn |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ConvAutoencoder(nn.Module): |
| | """1D Convolutional autoencoder for sequence anomaly detection.""" |
| |
|
| | def __init__(self, input_size: int, seq_len: int, |
| | latent_size: int = 32, dropout: float = 0.2): |
| | super().__init__() |
| |
|
| | self.input_size = input_size |
| | self.seq_len = seq_len |
| | self.latent_size = latent_size |
| |
|
| | |
| | self.encoder = nn.Sequential( |
| | nn.Conv1d(input_size, 64, kernel_size=3, padding=1), |
| | nn.BatchNorm1d(64), |
| | nn.ReLU(), |
| | nn.Dropout(dropout), |
| | nn.Conv1d(64, 32, kernel_size=3, padding=1), |
| | nn.BatchNorm1d(32), |
| | nn.ReLU(), |
| | nn.Dropout(dropout), |
| | nn.Conv1d(32, 16, kernel_size=3, padding=1), |
| | nn.BatchNorm1d(16), |
| | nn.ReLU(), |
| | ) |
| |
|
| | self.flatten_size = 16 * seq_len |
| | self.encoder_fc = nn.Linear(self.flatten_size, latent_size) |
| |
|
| | |
| | self.decoder_fc = nn.Linear(latent_size, self.flatten_size) |
| | self.decoder = nn.Sequential( |
| | nn.Conv1d(16, 32, kernel_size=3, padding=1), |
| | nn.BatchNorm1d(32), |
| | nn.ReLU(), |
| | nn.Dropout(dropout), |
| | nn.Conv1d(32, 64, kernel_size=3, padding=1), |
| | nn.BatchNorm1d(64), |
| | nn.ReLU(), |
| | nn.Dropout(dropout), |
| | nn.Conv1d(64, input_size, kernel_size=3, padding=1), |
| | ) |
| |
|
| | def encode(self, x: torch.Tensor) -> torch.Tensor: |
| | x = x.permute(0, 2, 1) |
| | x = self.encoder(x) |
| | x = x.view(x.size(0), -1) |
| | return self.encoder_fc(x) |
| |
|
| | def decode(self, z: torch.Tensor) -> torch.Tensor: |
| | x = self.decoder_fc(z) |
| | x = x.view(x.size(0), 16, self.seq_len) |
| | x = self.decoder(x) |
| | return x.permute(0, 2, 1) |
| |
|
| | def forward(self, x: torch.Tensor): |
| | z = self.encode(x) |
| | return self.decode(z), z |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class PredictionResult: |
| | """Result of overtraining prediction for a single sample.""" |
| | date: str |
| | user_id: str |
| | reconstruction_error: float |
| | is_anomaly: bool |
| | anomaly_score: float |
| | risk_level: str |
| | recovery_score: Optional[float] = None |
| | top_features: Optional[List[str]] = None |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | FEATURE_COLUMNS = [ |
| | 'hrv', 'resting_heart_rate', 'respiratory_rate', 'skin_temp_deviation', |
| | 'hrv_deviation', 'rhr_deviation', 'hrv_pct_baseline', 'rhr_pct_baseline', |
| | 'sleep_hours', 'sleep_efficiency', 'deep_sleep_ratio', 'rem_sleep_ratio', |
| | 'sleep_debt_indicator', 'wake_ups', 'time_to_fall_asleep_min', |
| | 'day_strain', 'recovery_score', 'strain_recovery_ratio', |
| | 'hrv_rolling_3d', 'hrv_rolling_7d', |
| | 'resting_heart_rate_rolling_3d', 'resting_heart_rate_rolling_7d', |
| | 'day_strain_rolling_3d', 'day_strain_rolling_7d', |
| | 'recovery_score_rolling_3d', 'recovery_score_rolling_7d', |
| | 'sleep_efficiency_rolling_3d', 'sleep_efficiency_rolling_7d', |
| | 'cumulative_strain_3d', 'cumulative_strain_7d', |
| | 'hrv_change', 'rhr_change', 'recovery_change', |
| | ] |
| |
|
| |
|
| | def engineer_features(df: pd.DataFrame) -> pd.DataFrame: |
| | """Create derived features for overtraining detection.""" |
| | df = df.copy() |
| | df['date'] = pd.to_datetime(df['date']) |
| | df = df.sort_values(['user_id', 'date']).reset_index(drop=True) |
| |
|
| | |
| | df['hrv_deviation'] = df['hrv'] - df['hrv_baseline'] |
| | df['hrv_pct_baseline'] = (df['hrv'] / df['hrv_baseline']) * 100 |
| | df['rhr_deviation'] = df['resting_heart_rate'] - df['rhr_baseline'] |
| | df['rhr_pct_baseline'] = (df['resting_heart_rate'] / df['rhr_baseline']) * 100 |
| |
|
| | |
| | df['sleep_debt_indicator'] = df['sleep_hours'] * (1 - df['sleep_efficiency'] / 100) |
| | df['deep_sleep_ratio'] = df['deep_sleep_hours'] / df['sleep_hours'].clip(lower=0.1) |
| | df['rem_sleep_ratio'] = df['rem_sleep_hours'] / df['sleep_hours'].clip(lower=0.1) |
| |
|
| | |
| | for col in ['hrv', 'resting_heart_rate', 'day_strain', 'recovery_score', 'sleep_efficiency']: |
| | df[f'{col}_rolling_3d'] = df.groupby('user_id')[col].transform( |
| | lambda x: x.rolling(3, min_periods=1).mean()) |
| | df[f'{col}_rolling_7d'] = df.groupby('user_id')[col].transform( |
| | lambda x: x.rolling(7, min_periods=1).mean()) |
| |
|
| | |
| | df['cumulative_strain_3d'] = df.groupby('user_id')['day_strain'].transform( |
| | lambda x: x.rolling(3, min_periods=1).sum()) |
| | df['cumulative_strain_7d'] = df.groupby('user_id')['day_strain'].transform( |
| | lambda x: x.rolling(7, min_periods=1).sum()) |
| |
|
| | |
| | df['strain_recovery_ratio'] = df['day_strain'] / df['recovery_score'].clip(lower=1) |
| |
|
| | |
| | df['hrv_change'] = df.groupby('user_id')['hrv'].diff() |
| | df['rhr_change'] = df.groupby('user_id')['resting_heart_rate'].diff() |
| | df['recovery_change'] = df.groupby('user_id')['recovery_score'].diff() |
| |
|
| | df = df.bfill().ffill().fillna(0) |
| | return df |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class OvertainingPredictor: |
| | """ |
| | Easy-to-use predictor for overtraining detection. |
| | |
| | Example: |
| | predictor = OvertainingPredictor.from_pretrained("iserrano0511/whoop-overtraining-detector") |
| | results = predictor.predict_from_csv("data.csv") |
| | |
| | for r in results: |
| | if r.is_anomaly: |
| | print(f"{r.date}: {r.risk_level} risk (score: {r.anomaly_score:.2f})") |
| | """ |
| |
|
| | def __init__(self, model: nn.Module, config: dict, scaler): |
| | self.model = model |
| | self.config = config |
| | self.scaler = scaler |
| | self.device = next(model.parameters()).device |
| | self.seq_len = config.get('seq_len', 7) |
| | self.threshold = config.get('threshold', 0.5) |
| | self.train_error_mean = config.get('train_error_mean', 0.2) |
| | self.train_error_std = config.get('train_error_std', 0.1) |
| |
|
| | @classmethod |
| | def from_pretrained(cls, repo_id: str, device: str = 'auto') -> 'OvertainingPredictor': |
| | """ |
| | Load predictor from Hugging Face Hub. |
| | |
| | Args: |
| | repo_id: Hugging Face repo ID (e.g., "iserrano0511/whoop-overtraining-detector") |
| | device: 'cuda', 'cpu', or 'auto' |
| | """ |
| | try: |
| | from huggingface_hub import hf_hub_download |
| | except ImportError: |
| | raise ImportError("Please install huggingface_hub: pip install huggingface_hub") |
| |
|
| | try: |
| | import joblib |
| | except ImportError: |
| | raise ImportError("Please install joblib: pip install joblib") |
| |
|
| | |
| | if device == 'auto': |
| | device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| | device = torch.device(device) |
| |
|
| | |
| | print(f"Downloading model from {repo_id}...") |
| | model_path = hf_hub_download(repo_id=repo_id, filename="model.pt") |
| | scaler_path = hf_hub_download(repo_id=repo_id, filename="scaler.pkl") |
| | config_path = hf_hub_download(repo_id=repo_id, filename="config.json") |
| |
|
| | |
| | with open(config_path, 'r') as f: |
| | config = json.load(f) |
| |
|
| | |
| | checkpoint = torch.load(model_path, map_location=device, weights_only=False) |
| | model = ConvAutoencoder( |
| | input_size=config['input_size'], |
| | seq_len=config['seq_len'], |
| | latent_size=config['latent_size'], |
| | dropout=config.get('dropout', 0.2) |
| | ) |
| | model.load_state_dict(checkpoint['model_state_dict']) |
| | model = model.to(device) |
| | model.eval() |
| |
|
| | |
| | scaler = joblib.load(scaler_path) |
| |
|
| | print(f"Model loaded successfully on {device}") |
| | return cls(model, config, scaler) |
| |
|
| | @classmethod |
| | def from_local(cls, model_path: str, scaler_path: str, config_path: str, |
| | device: str = 'auto') -> 'OvertainingPredictor': |
| | """Load predictor from local files.""" |
| | import joblib |
| |
|
| | if device == 'auto': |
| | device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| | device = torch.device(device) |
| |
|
| | with open(config_path, 'r') as f: |
| | config = json.load(f) |
| |
|
| | checkpoint = torch.load(model_path, map_location=device, weights_only=False) |
| | model = ConvAutoencoder( |
| | input_size=config['input_size'], |
| | seq_len=config['seq_len'], |
| | latent_size=config['latent_size'], |
| | dropout=config.get('dropout', 0.2) |
| | ) |
| | model.load_state_dict(checkpoint['model_state_dict']) |
| | model = model.to(device) |
| | model.eval() |
| |
|
| | scaler = joblib.load(scaler_path) |
| | return cls(model, config, scaler) |
| |
|
| | def _get_risk_level(self, score: float) -> str: |
| | if score < 1.5: |
| | return 'normal' |
| | elif score < 2.5: |
| | return 'elevated' |
| | elif score < 3.5: |
| | return 'high' |
| | return 'critical' |
| |
|
| | def predict(self, data: np.ndarray, user_id: str = 'unknown', |
| | date: str = 'unknown', recovery_score: float = None) -> PredictionResult: |
| | """ |
| | Predict overtraining risk for a single sequence. |
| | |
| | Args: |
| | data: Scaled data of shape (seq_len, n_features) |
| | user_id: User identifier |
| | date: Date string |
| | recovery_score: Optional recovery score |
| | |
| | Returns: |
| | PredictionResult with risk assessment |
| | """ |
| | if len(data.shape) == 2: |
| | data = data.reshape(1, *data.shape) |
| |
|
| | x = torch.FloatTensor(data).to(self.device) |
| |
|
| | with torch.no_grad(): |
| | x_recon, _ = self.model(x) |
| | error = ((x - x_recon) ** 2).mean().item() |
| |
|
| | anomaly_score = (error - self.train_error_mean) / (self.train_error_std + 1e-8) |
| |
|
| | return PredictionResult( |
| | date=str(date), |
| | user_id=str(user_id), |
| | reconstruction_error=error, |
| | is_anomaly=error > self.threshold, |
| | anomaly_score=anomaly_score, |
| | risk_level=self._get_risk_level(anomaly_score), |
| | recovery_score=recovery_score, |
| | ) |
| |
|
| | def predict_from_csv(self, csv_path: str, user_id: Optional[str] = None) -> List[PredictionResult]: |
| | """ |
| | Predict overtraining risk from a CSV file. |
| | |
| | Args: |
| | csv_path: Path to CSV with WHOOP data |
| | user_id: Optional filter for specific user |
| | |
| | Returns: |
| | List of PredictionResult objects |
| | """ |
| | df = pd.read_csv(csv_path) |
| | df = engineer_features(df) |
| |
|
| | if user_id: |
| | df = df[df['user_id'] == user_id] |
| |
|
| | results = [] |
| | for uid in df['user_id'].unique(): |
| | user_data = df[df['user_id'] == uid].sort_values('date') |
| |
|
| | if len(user_data) < self.seq_len: |
| | continue |
| |
|
| | for i in range(len(user_data) - self.seq_len + 1): |
| | seq = user_data.iloc[i:i + self.seq_len] |
| | data = seq[FEATURE_COLUMNS].values.astype(np.float32) |
| | data_scaled = self.scaler.transform(data) |
| |
|
| | result = self.predict( |
| | data_scaled, |
| | user_id=uid, |
| | date=str(seq.iloc[-1]['date']), |
| | recovery_score=seq.iloc[-1]['recovery_score'] |
| | ) |
| | results.append(result) |
| |
|
| | return results |
| |
|
| | def predict_dataframe(self, df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Predict and return results as a DataFrame. |
| | |
| | Args: |
| | df: DataFrame with WHOOP data (will be feature-engineered) |
| | |
| | Returns: |
| | DataFrame with predictions |
| | """ |
| | results = [] |
| | df = engineer_features(df) |
| |
|
| | for uid in df['user_id'].unique(): |
| | user_data = df[df['user_id'] == uid].sort_values('date') |
| |
|
| | if len(user_data) < self.seq_len: |
| | continue |
| |
|
| | for i in range(len(user_data) - self.seq_len + 1): |
| | seq = user_data.iloc[i:i + self.seq_len] |
| | data = seq[FEATURE_COLUMNS].values.astype(np.float32) |
| | data_scaled = self.scaler.transform(data) |
| |
|
| | result = self.predict( |
| | data_scaled, |
| | user_id=uid, |
| | date=str(seq.iloc[-1]['date']), |
| | recovery_score=seq.iloc[-1]['recovery_score'] |
| | ) |
| | results.append({ |
| | 'date': result.date, |
| | 'user_id': result.user_id, |
| | 'reconstruction_error': result.reconstruction_error, |
| | 'is_anomaly': result.is_anomaly, |
| | 'anomaly_score': result.anomaly_score, |
| | 'risk_level': result.risk_level, |
| | 'recovery_score': result.recovery_score, |
| | }) |
| |
|
| | return pd.DataFrame(results) |
| |
|
| | def summary(self, results: List[PredictionResult]) -> str: |
| | """Generate a text summary of predictions.""" |
| | if not results: |
| | return "No results to summarize." |
| |
|
| | anomalies = [r for r in results if r.is_anomaly] |
| | risk_counts = {'normal': 0, 'elevated': 0, 'high': 0, 'critical': 0} |
| | for r in results: |
| | risk_counts[r.risk_level] += 1 |
| |
|
| | lines = [ |
| | "=" * 50, |
| | "OVERTRAINING DETECTION SUMMARY", |
| | "=" * 50, |
| | f"Total samples analyzed: {len(results)}", |
| | f"Anomalies detected: {len(anomalies)} ({len(anomalies)/len(results)*100:.1f}%)", |
| | "", |
| | "Risk Level Distribution:", |
| | ] |
| |
|
| | for level, count in risk_counts.items(): |
| | pct = count / len(results) * 100 |
| | bar = '#' * int(pct / 2) |
| | lines.append(f" {level:10}: {count:5} ({pct:5.1f}%) {bar}") |
| |
|
| | if anomalies: |
| | lines.extend(["", "Top Anomalies:", "-" * 30]) |
| | top = sorted(anomalies, key=lambda x: x.anomaly_score, reverse=True)[:5] |
| | for i, r in enumerate(top, 1): |
| | lines.append(f"{i}. {r.date} | {r.risk_level.upper()} | Score: {r.anomaly_score:.2f}") |
| |
|
| | lines.append("=" * 50) |
| | return "\n".join(lines) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def main(): |
| | import argparse |
| |
|
| | parser = argparse.ArgumentParser(description='WHOOP Overtraining Detection') |
| | parser.add_argument('--data', type=str, required=True, help='Path to CSV data file') |
| | parser.add_argument('--repo', type=str, default='iserrano0511/whoop-overtraining-detector', |
| | help='Hugging Face repo ID') |
| | parser.add_argument('--user', type=str, default=None, help='Filter by user ID') |
| | parser.add_argument('--output', type=str, default=None, help='Output CSV path') |
| |
|
| | args = parser.parse_args() |
| |
|
| | predictor = OvertainingPredictor.from_pretrained(args.repo) |
| | results = predictor.predict_from_csv(args.data, user_id=args.user) |
| |
|
| | print(predictor.summary(results)) |
| |
|
| | if args.output: |
| | df = pd.DataFrame([{ |
| | 'date': r.date, |
| | 'user_id': r.user_id, |
| | 'reconstruction_error': r.reconstruction_error, |
| | 'is_anomaly': r.is_anomaly, |
| | 'anomaly_score': r.anomaly_score, |
| | 'risk_level': r.risk_level, |
| | 'recovery_score': r.recovery_score, |
| | } for r in results]) |
| | df.to_csv(args.output, index=False) |
| | print(f"\nResults saved to {args.output}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|