iserrano0511's picture
Upload folder using huggingface_hub
ced9e75 verified
"""
Simple inference module for WHOOP Overtraining Detection.
This module provides an easy-to-use interface for detecting overtraining
patterns using the pre-trained autoencoder model.
Usage:
from inference import OvertainingPredictor
predictor = OvertainingPredictor.from_pretrained("iserrano0511/whoop-overtraining-detector")
results = predictor.predict_from_csv("your_data.csv")
"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
# ============================================================================
# MODEL ARCHITECTURE (self-contained)
# ============================================================================
class ConvAutoencoder(nn.Module):
"""1D Convolutional autoencoder for sequence anomaly detection."""
def __init__(self, input_size: int, seq_len: int,
latent_size: int = 32, dropout: float = 0.2):
super().__init__()
self.input_size = input_size
self.seq_len = seq_len
self.latent_size = latent_size
# Encoder
self.encoder = nn.Sequential(
nn.Conv1d(input_size, 64, kernel_size=3, padding=1),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Conv1d(64, 32, kernel_size=3, padding=1),
nn.BatchNorm1d(32),
nn.ReLU(),
nn.Dropout(dropout),
nn.Conv1d(32, 16, kernel_size=3, padding=1),
nn.BatchNorm1d(16),
nn.ReLU(),
)
self.flatten_size = 16 * seq_len
self.encoder_fc = nn.Linear(self.flatten_size, latent_size)
# Decoder
self.decoder_fc = nn.Linear(latent_size, self.flatten_size)
self.decoder = nn.Sequential(
nn.Conv1d(16, 32, kernel_size=3, padding=1),
nn.BatchNorm1d(32),
nn.ReLU(),
nn.Dropout(dropout),
nn.Conv1d(32, 64, kernel_size=3, padding=1),
nn.BatchNorm1d(64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Conv1d(64, input_size, kernel_size=3, padding=1),
)
def encode(self, x: torch.Tensor) -> torch.Tensor:
x = x.permute(0, 2, 1) # (batch, seq, features) -> (batch, features, seq)
x = self.encoder(x)
x = x.view(x.size(0), -1)
return self.encoder_fc(x)
def decode(self, z: torch.Tensor) -> torch.Tensor:
x = self.decoder_fc(z)
x = x.view(x.size(0), 16, self.seq_len)
x = self.decoder(x)
return x.permute(0, 2, 1) # Back to (batch, seq, features)
def forward(self, x: torch.Tensor):
z = self.encode(x)
return self.decode(z), z
# ============================================================================
# DATA CLASSES
# ============================================================================
@dataclass
class PredictionResult:
"""Result of overtraining prediction for a single sample."""
date: str
user_id: str
reconstruction_error: float
is_anomaly: bool
anomaly_score: float
risk_level: str # 'normal', 'elevated', 'high', 'critical'
recovery_score: Optional[float] = None
top_features: Optional[List[str]] = None
# ============================================================================
# FEATURE ENGINEERING
# ============================================================================
FEATURE_COLUMNS = [
'hrv', 'resting_heart_rate', 'respiratory_rate', 'skin_temp_deviation',
'hrv_deviation', 'rhr_deviation', 'hrv_pct_baseline', 'rhr_pct_baseline',
'sleep_hours', 'sleep_efficiency', 'deep_sleep_ratio', 'rem_sleep_ratio',
'sleep_debt_indicator', 'wake_ups', 'time_to_fall_asleep_min',
'day_strain', 'recovery_score', 'strain_recovery_ratio',
'hrv_rolling_3d', 'hrv_rolling_7d',
'resting_heart_rate_rolling_3d', 'resting_heart_rate_rolling_7d',
'day_strain_rolling_3d', 'day_strain_rolling_7d',
'recovery_score_rolling_3d', 'recovery_score_rolling_7d',
'sleep_efficiency_rolling_3d', 'sleep_efficiency_rolling_7d',
'cumulative_strain_3d', 'cumulative_strain_7d',
'hrv_change', 'rhr_change', 'recovery_change',
]
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Create derived features for overtraining detection."""
df = df.copy()
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(['user_id', 'date']).reset_index(drop=True)
# Baseline deviations
df['hrv_deviation'] = df['hrv'] - df['hrv_baseline']
df['hrv_pct_baseline'] = (df['hrv'] / df['hrv_baseline']) * 100
df['rhr_deviation'] = df['resting_heart_rate'] - df['rhr_baseline']
df['rhr_pct_baseline'] = (df['resting_heart_rate'] / df['rhr_baseline']) * 100
# Sleep indicators
df['sleep_debt_indicator'] = df['sleep_hours'] * (1 - df['sleep_efficiency'] / 100)
df['deep_sleep_ratio'] = df['deep_sleep_hours'] / df['sleep_hours'].clip(lower=0.1)
df['rem_sleep_ratio'] = df['rem_sleep_hours'] / df['sleep_hours'].clip(lower=0.1)
# Rolling averages
for col in ['hrv', 'resting_heart_rate', 'day_strain', 'recovery_score', 'sleep_efficiency']:
df[f'{col}_rolling_3d'] = df.groupby('user_id')[col].transform(
lambda x: x.rolling(3, min_periods=1).mean())
df[f'{col}_rolling_7d'] = df.groupby('user_id')[col].transform(
lambda x: x.rolling(7, min_periods=1).mean())
# Cumulative strain
df['cumulative_strain_3d'] = df.groupby('user_id')['day_strain'].transform(
lambda x: x.rolling(3, min_periods=1).sum())
df['cumulative_strain_7d'] = df.groupby('user_id')['day_strain'].transform(
lambda x: x.rolling(7, min_periods=1).sum())
# Strain-to-recovery ratio
df['strain_recovery_ratio'] = df['day_strain'] / df['recovery_score'].clip(lower=1)
# Day-over-day changes
df['hrv_change'] = df.groupby('user_id')['hrv'].diff()
df['rhr_change'] = df.groupby('user_id')['resting_heart_rate'].diff()
df['recovery_change'] = df.groupby('user_id')['recovery_score'].diff()
df = df.bfill().ffill().fillna(0)
return df
# ============================================================================
# PREDICTOR CLASS
# ============================================================================
class OvertainingPredictor:
"""
Easy-to-use predictor for overtraining detection.
Example:
predictor = OvertainingPredictor.from_pretrained("iserrano0511/whoop-overtraining-detector")
results = predictor.predict_from_csv("data.csv")
for r in results:
if r.is_anomaly:
print(f"{r.date}: {r.risk_level} risk (score: {r.anomaly_score:.2f})")
"""
def __init__(self, model: nn.Module, config: dict, scaler):
self.model = model
self.config = config
self.scaler = scaler
self.device = next(model.parameters()).device
self.seq_len = config.get('seq_len', 7)
self.threshold = config.get('threshold', 0.5)
self.train_error_mean = config.get('train_error_mean', 0.2)
self.train_error_std = config.get('train_error_std', 0.1)
@classmethod
def from_pretrained(cls, repo_id: str, device: str = 'auto') -> 'OvertainingPredictor':
"""
Load predictor from Hugging Face Hub.
Args:
repo_id: Hugging Face repo ID (e.g., "iserrano0511/whoop-overtraining-detector")
device: 'cuda', 'cpu', or 'auto'
"""
try:
from huggingface_hub import hf_hub_download
except ImportError:
raise ImportError("Please install huggingface_hub: pip install huggingface_hub")
try:
import joblib
except ImportError:
raise ImportError("Please install joblib: pip install joblib")
# Determine device
if device == 'auto':
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = torch.device(device)
# Download files
print(f"Downloading model from {repo_id}...")
model_path = hf_hub_download(repo_id=repo_id, filename="model.pt")
scaler_path = hf_hub_download(repo_id=repo_id, filename="scaler.pkl")
config_path = hf_hub_download(repo_id=repo_id, filename="config.json")
# Load config
with open(config_path, 'r') as f:
config = json.load(f)
# Load model
checkpoint = torch.load(model_path, map_location=device, weights_only=False)
model = ConvAutoencoder(
input_size=config['input_size'],
seq_len=config['seq_len'],
latent_size=config['latent_size'],
dropout=config.get('dropout', 0.2)
)
model.load_state_dict(checkpoint['model_state_dict'])
model = model.to(device)
model.eval()
# Load scaler
scaler = joblib.load(scaler_path)
print(f"Model loaded successfully on {device}")
return cls(model, config, scaler)
@classmethod
def from_local(cls, model_path: str, scaler_path: str, config_path: str,
device: str = 'auto') -> 'OvertainingPredictor':
"""Load predictor from local files."""
import joblib
if device == 'auto':
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = torch.device(device)
with open(config_path, 'r') as f:
config = json.load(f)
checkpoint = torch.load(model_path, map_location=device, weights_only=False)
model = ConvAutoencoder(
input_size=config['input_size'],
seq_len=config['seq_len'],
latent_size=config['latent_size'],
dropout=config.get('dropout', 0.2)
)
model.load_state_dict(checkpoint['model_state_dict'])
model = model.to(device)
model.eval()
scaler = joblib.load(scaler_path)
return cls(model, config, scaler)
def _get_risk_level(self, score: float) -> str:
if score < 1.5:
return 'normal'
elif score < 2.5:
return 'elevated'
elif score < 3.5:
return 'high'
return 'critical'
def predict(self, data: np.ndarray, user_id: str = 'unknown',
date: str = 'unknown', recovery_score: float = None) -> PredictionResult:
"""
Predict overtraining risk for a single sequence.
Args:
data: Scaled data of shape (seq_len, n_features)
user_id: User identifier
date: Date string
recovery_score: Optional recovery score
Returns:
PredictionResult with risk assessment
"""
if len(data.shape) == 2:
data = data.reshape(1, *data.shape)
x = torch.FloatTensor(data).to(self.device)
with torch.no_grad():
x_recon, _ = self.model(x)
error = ((x - x_recon) ** 2).mean().item()
anomaly_score = (error - self.train_error_mean) / (self.train_error_std + 1e-8)
return PredictionResult(
date=str(date),
user_id=str(user_id),
reconstruction_error=error,
is_anomaly=error > self.threshold,
anomaly_score=anomaly_score,
risk_level=self._get_risk_level(anomaly_score),
recovery_score=recovery_score,
)
def predict_from_csv(self, csv_path: str, user_id: Optional[str] = None) -> List[PredictionResult]:
"""
Predict overtraining risk from a CSV file.
Args:
csv_path: Path to CSV with WHOOP data
user_id: Optional filter for specific user
Returns:
List of PredictionResult objects
"""
df = pd.read_csv(csv_path)
df = engineer_features(df)
if user_id:
df = df[df['user_id'] == user_id]
results = []
for uid in df['user_id'].unique():
user_data = df[df['user_id'] == uid].sort_values('date')
if len(user_data) < self.seq_len:
continue
for i in range(len(user_data) - self.seq_len + 1):
seq = user_data.iloc[i:i + self.seq_len]
data = seq[FEATURE_COLUMNS].values.astype(np.float32)
data_scaled = self.scaler.transform(data)
result = self.predict(
data_scaled,
user_id=uid,
date=str(seq.iloc[-1]['date']),
recovery_score=seq.iloc[-1]['recovery_score']
)
results.append(result)
return results
def predict_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Predict and return results as a DataFrame.
Args:
df: DataFrame with WHOOP data (will be feature-engineered)
Returns:
DataFrame with predictions
"""
results = []
df = engineer_features(df)
for uid in df['user_id'].unique():
user_data = df[df['user_id'] == uid].sort_values('date')
if len(user_data) < self.seq_len:
continue
for i in range(len(user_data) - self.seq_len + 1):
seq = user_data.iloc[i:i + self.seq_len]
data = seq[FEATURE_COLUMNS].values.astype(np.float32)
data_scaled = self.scaler.transform(data)
result = self.predict(
data_scaled,
user_id=uid,
date=str(seq.iloc[-1]['date']),
recovery_score=seq.iloc[-1]['recovery_score']
)
results.append({
'date': result.date,
'user_id': result.user_id,
'reconstruction_error': result.reconstruction_error,
'is_anomaly': result.is_anomaly,
'anomaly_score': result.anomaly_score,
'risk_level': result.risk_level,
'recovery_score': result.recovery_score,
})
return pd.DataFrame(results)
def summary(self, results: List[PredictionResult]) -> str:
"""Generate a text summary of predictions."""
if not results:
return "No results to summarize."
anomalies = [r for r in results if r.is_anomaly]
risk_counts = {'normal': 0, 'elevated': 0, 'high': 0, 'critical': 0}
for r in results:
risk_counts[r.risk_level] += 1
lines = [
"=" * 50,
"OVERTRAINING DETECTION SUMMARY",
"=" * 50,
f"Total samples analyzed: {len(results)}",
f"Anomalies detected: {len(anomalies)} ({len(anomalies)/len(results)*100:.1f}%)",
"",
"Risk Level Distribution:",
]
for level, count in risk_counts.items():
pct = count / len(results) * 100
bar = '#' * int(pct / 2)
lines.append(f" {level:10}: {count:5} ({pct:5.1f}%) {bar}")
if anomalies:
lines.extend(["", "Top Anomalies:", "-" * 30])
top = sorted(anomalies, key=lambda x: x.anomaly_score, reverse=True)[:5]
for i, r in enumerate(top, 1):
lines.append(f"{i}. {r.date} | {r.risk_level.upper()} | Score: {r.anomaly_score:.2f}")
lines.append("=" * 50)
return "\n".join(lines)
# ============================================================================
# CLI
# ============================================================================
def main():
import argparse
parser = argparse.ArgumentParser(description='WHOOP Overtraining Detection')
parser.add_argument('--data', type=str, required=True, help='Path to CSV data file')
parser.add_argument('--repo', type=str, default='iserrano0511/whoop-overtraining-detector',
help='Hugging Face repo ID')
parser.add_argument('--user', type=str, default=None, help='Filter by user ID')
parser.add_argument('--output', type=str, default=None, help='Output CSV path')
args = parser.parse_args()
predictor = OvertainingPredictor.from_pretrained(args.repo)
results = predictor.predict_from_csv(args.data, user_id=args.user)
print(predictor.summary(results))
if args.output:
df = pd.DataFrame([{
'date': r.date,
'user_id': r.user_id,
'reconstruction_error': r.reconstruction_error,
'is_anomaly': r.is_anomaly,
'anomaly_score': r.anomaly_score,
'risk_level': r.risk_level,
'recovery_score': r.recovery_score,
} for r in results])
df.to_csv(args.output, index=False)
print(f"\nResults saved to {args.output}")
if __name__ == "__main__":
main()