menopause-ml / menopause.py
techatcreated's picture
v1
66d45ea verified
"""
SWAN Menopause Stage Prediction (pre / peri / post) using self-reported features
Uses only the uploaded SWAN TSV file (no synthetic data, no external datasets).
Outputs:
- saved artifacts in ./swan_ml_output/
- documentation.md summarizing steps and results
- optional CSV outputs for stage predictions and symptom predictions (separate files)
Notes:
- The script attempts to locate a menopause-stage column heuristically (common names like MENOSTAT,
MENO, MENOSYM, MENOP etc.). Please verify the chosen stage column against the codebook.
- Self-reported features are identified using name-pattern heuristics (VMS/HOT/SLEEP/CESD/STRESS/MOOD/SMOK/ALCOH/EXER/PHYS/VAG/URINE/SEX/PAIN etc).
- Duplicate column names are tolerantly handled by renaming duplicates.
"""
import os, re, sys, argparse
import numpy as np
import pandas as pd
import importlib
import sklearn
import matplotlib
# Use a non-interactive backend by default so the script can run on servers/CI
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from sklearn.inspection import permutation_importance
from sklearn.preprocessing import label_binarize
# --------------------------
# Environment / CLI defaults
# --------------------------
# Defaults may be overridden by environment variables or CLI args below
DATA_PATH = os.environ.get('MENOPAUSE_DATA', "ICPSR_31901/DS0001/31901-0001-Data.tsv")
OUTPUT_DIR = os.environ.get('MENOPAUSE_OUT', "swan_ml_output")
# Parse CLI args (safe to parse here for a script; this will be ignored when imported)
parser = argparse.ArgumentParser(description='Run menopause stage prediction pipeline')
parser.add_argument('--data', '-d', default=DATA_PATH, help='Path to SWAN TSV file')
parser.add_argument('--output', '-o', default=OUTPUT_DIR, help='Output directory for artifacts')
parser.add_argument('--show', action='store_true', help='Show plots interactively (default: off)')
parser.add_argument('--stage-col', default=None, help='Override detected stage column name')
# Symptom cycle prediction CLI options
parser.add_argument('--predict-symptoms', action='store_true', help='Run symptom cycle prediction from CSV input')
parser.add_argument('--symptoms-input', default=None, help='Input CSV for symptom predictions')
parser.add_argument('--symptoms-output', default=None, help='Output CSV to write symptom predictions')
parser.add_argument('--lmp-col', default='LMP', help='Column name used as LMP (date string or day-of-month integer)')
parser.add_argument('--date-col', default=None, help='Column name for target date; if omitted, uses today or VISIT date if present')
parser.add_argument('--cycle-length', type=int, default=28, help='Average cycle length in days for symptom prediction')
# Dual prediction CLI options (separate inputs/outputs for each model)
parser.add_argument('--predict-dual', action='store_true', help='Run stage + symptom predictions using separate input/output files')
parser.add_argument('--stage-input', default=None, help='Input CSV for menopause stage predictions')
parser.add_argument('--stage-output', default=None, help='Output CSV for menopause stage predictions')
parser.add_argument('--stage-model', default='RandomForest', help='Model for stage prediction: RandomForest or LogisticRegression')
parser.add_argument('--forecast-dir', default=OUTPUT_DIR, help='Directory containing saved forecast models')
parser.add_argument('--menopause-stage-col', default=None, help='(Deprecated) Kept for backward compatibility; symptom forecasting no longer uses menopause stage')
# Parse CLI args only when script is run directly; when imported (e.g., during testing), avoid consuming external argv
if __name__ == '__main__':
args = parser.parse_args()
else:
# Use defaults when module is imported to avoid interfering with external CLI (pytest, etc.)
args = parser.parse_args([])
DATA_PATH = args.data
OUTPUT_DIR = args.output
SHOW_PLOTS = bool(args.show)
STAGE_COL_OVERRIDE = args.stage_col
# If user only wants symptom-cycle predictions, provide a fast-path before loading the large TSV
# Define a light-weight cycle-based symptom forecaster and CSV helper so users can run predictions
# without training the menopause models (useful for small CSV inputs).
class SymptomCycleForecaster:
def __init__(self, cycle_length=28, hot_mu=14, hot_sigma=5, mood_mu=26, mood_sigma=4,
base_hot=0.1, amp_hot=0.4, base_mood=0.1, amp_mood=0.45, threshold=0.5):
self.cycle_length = cycle_length
self.hot_mu = hot_mu
self.hot_sigma = hot_sigma
self.mood_mu = mood_mu
self.mood_sigma = mood_sigma
self.base_hot = base_hot
self.amp_hot = amp_hot
self.base_mood = base_mood
self.amp_mood = amp_mood
self.threshold = threshold
def _parse_lmp(self, lmp, reference_date=None):
if pd.isna(lmp):
return None
try:
lmp_int = int(lmp)
if reference_date is None:
ref = pd.Timestamp(datetime.today()).to_pydatetime()
else:
ref = pd.to_datetime(reference_date, errors='coerce')
if pd.isna(ref):
ref = pd.Timestamp(datetime.today()).to_pydatetime()
else:
ref = ref.to_pydatetime()
day = max(1, min(lmp_int, 28))
return datetime(ref.year, ref.month, day)
except Exception:
try:
return pd.to_datetime(lmp, errors='coerce').to_pydatetime()
except Exception:
return None
def compute_cycle_day(self, lmp, target_date=None):
if target_date is None:
tdate = datetime.today()
else:
tdate = pd.to_datetime(target_date, errors='coerce')
if pd.isna(tdate):
tdate = datetime.today()
else:
tdate = tdate.to_pydatetime()
lmp_date = self._parse_lmp(lmp, reference_date=tdate)
if lmp_date is None:
return None
delta = (tdate - lmp_date).days
if delta < 0:
lmp_date = lmp_date - timedelta(days=self.cycle_length)
delta = (tdate - lmp_date).days
cycle_day = (delta % self.cycle_length) + 1
return int(cycle_day)
def _gauss_prob(self, day, mu, sigma, base, amp):
if day is None:
return np.nan
val = base + amp * np.exp(-0.5 * ((day - mu) / float(sigma)) ** 2)
return float(min(max(val, 0.0), 1.0))
def predict_single(self, lmp, target_date=None):
day = self.compute_cycle_day(lmp, target_date=target_date)
hot_p = self._gauss_prob(day, self.hot_mu, self.hot_sigma, self.base_hot, self.amp_hot)
mood_p = self._gauss_prob(day, self.mood_mu, self.mood_sigma, self.base_mood, self.amp_mood)
return {
'cycle_day': day,
'hotflash_prob': hot_p,
'hotflash_pred': hot_p >= self.threshold if not np.isnan(hot_p) else None,
'mood_prob': mood_p,
'mood_pred': mood_p >= self.threshold if not np.isnan(mood_p) else None
}
def predict_df(self, df, lmp_col='LMP', date_col=None, menopause_stage_col=None):
df = df.copy()
results = df.apply(
lambda row: pd.Series(self.predict_single(
lmp=row.get(lmp_col),
target_date=(row.get(date_col) if date_col is not None else None)
)), axis=1
)
out = pd.concat([df.reset_index(drop=True), results.reset_index(drop=True)], axis=1)
return out
def predict_symptoms_from_csv(input_csv, output_csv, lmp_col='LMP', date_col=None,
menopause_stage_col=None, cycle_length=28, **kwargs):
df = pd.read_csv(input_csv)
fore = SymptomCycleForecaster(cycle_length=cycle_length)
out_df = fore.predict_df(df, lmp_col=lmp_col, date_col=date_col, menopause_stage_col=menopause_stage_col)
out_df.to_csv(output_csv, index=False)
print(f"Wrote symptom predictions for {out_df.shape[0]} rows to {output_csv}")
print("Sample predictions (first 5 rows):")
print(out_df[[lmp_col] + ['cycle_day','hotflash_prob','hotflash_pred','mood_prob','mood_pred']].head().to_string())
# If the user requested only symptom predictions from a CSV, run fast-path and exit
if args.predict_symptoms:
if not args.symptoms_input or not args.symptoms_output:
print("Error: --symptoms-input and --symptoms-output are required when --predict-symptoms is set")
sys.exit(1)
else:
predict_symptoms_from_csv(
input_csv=args.symptoms_input,
output_csv=args.symptoms_output,
lmp_col=args.lmp_col,
date_col=args.date_col,
menopause_stage_col=None,
cycle_length=args.cycle_length
)
sys.exit(0)
# Fast-path for dual predictions (separate stage + symptoms) without loading large TSV
if args.predict_dual:
if not args.stage_input or not args.stage_output or not args.symptoms_input or not args.symptoms_output:
print("Error: --stage-input, --stage-output, --symptoms-input, and --symptoms-output are required when --predict-dual is set")
sys.exit(1)
# Load saved pipeline directly via joblib to avoid initializing full training pipeline
import joblib
model_file = os.path.join(args.forecast_dir, 'rf_pipeline.pkl' if args.stage_model == 'RandomForest' else 'lr_pipeline.pkl')
try:
pipeline = joblib.load(model_file)
except Exception as e:
print(f"ERROR: Could not load model file '{model_file}': {e}")
print("Please train the models first (run the script without --predict-dual) or provide correct --forecast-dir")
sys.exit(1)
# Stage predictions
try:
stage_data = pd.read_csv(args.stage_input)
except Exception as e:
print(f"ERROR: Could not read stage input CSV '{args.stage_input}': {e}")
sys.exit(1)
id_cols = ['ID', 'id', 'SWANID', 'individual', 'Individual', 'subject', 'Subject']
feature_cols = [c for c in stage_data.columns if c not in id_cols]
# Attempt to load feature metadata so we can reindex inputs to expected features
import json
metadata_path = os.path.join(args.forecast_dir, 'forecast_metadata.json')
try:
with open(metadata_path, 'r') as f:
metadata = json.load(f)
expected_features = metadata.get('feature_names', feature_cols)
except Exception:
expected_features = feature_cols
X = stage_data.reindex(columns=expected_features, fill_value=np.nan)
preds = pd.DataFrame({'predicted_stage': pipeline.predict(X), 'model': args.stage_model})
try:
proba = pipeline.predict_proba(X)
final_est = pipeline.named_steps[list(pipeline.named_steps.keys())[-1]]
preds['confidence'] = np.max(proba, axis=1)
for i, cls in enumerate(final_est.classes_):
preds[f'prob_{cls}'] = proba[:, i]
except Exception:
preds['confidence'] = np.nan
id_data = stage_data[[c for c in id_cols if c in stage_data.columns]] if any(c in stage_data.columns for c in id_cols) else None
if id_data is not None:
stage_results = pd.concat([id_data.reset_index(drop=True), preds.reset_index(drop=True)], axis=1)
else:
stage_results = preds.reset_index(drop=True)
stage_results.insert(0, 'individual', range(1, len(stage_results) + 1))
stage_results.to_csv(args.stage_output, index=False)
print(f"Wrote stage predictions for {stage_results.shape[0]} rows to {args.stage_output}")
# Symptom predictions (independent input/output)
try:
symptom_data = pd.read_csv(args.symptoms_input)
except Exception as e:
print(f"ERROR: Could not read symptom input CSV '{args.symptoms_input}': {e}")
sys.exit(1)
date_col = args.date_col if args.date_col else ('date' if 'date' in symptom_data.columns else None)
fore = SymptomCycleForecaster(cycle_length=args.cycle_length)
symptom_results = fore.predict_df(symptom_data, lmp_col=args.lmp_col, date_col=date_col)
symptom_results.to_csv(args.symptoms_output, index=False)
print(f"Wrote symptom predictions for {symptom_results.shape[0]} rows to {args.symptoms_output}")
sys.exit(0)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# --------------------------
# Utility: make column names unique (pandas allows duplicates)
# --------------------------
def make_unique_columns(cols):
counts = {}
new_cols = []
for c in cols:
if c not in counts:
counts[c] = 0
new_cols.append(c)
else:
counts[c] += 1
new_cols.append(f"{c}__dup{counts[c]}")
return new_cols
# --------------------------
# 1. Load data
# --------------------------
# Guard: only run training and heavy data loading when script is executed directly
if __name__ == '__main__' and os.path.exists(DATA_PATH):
print("Loading data from:", DATA_PATH)
df = pd.read_csv(DATA_PATH, sep='\t', low_memory=False)
print("Original shape:", df.shape)
# make column names unique for robust selection (duplicates -> __dup1, __dup2)
df.columns = make_unique_columns(df.columns.tolist())
# Show a few columns (first 40) so user can inspect if running interactively
print("First 40 column names (for inspection):")
print(df.columns[:40].tolist())
# --------------------------
# 2. Identify candidate self-reported features and menopause-stage variable
# --------------------------
# Heuristic patterns for self-report variables (adjust if you'd like to include additional columns)
selfreport_patterns = [
r'VMS', r'HOT', r'HOTFL', r'NIGHTSW', r'SLEEP', r'CESD', r'STRESS', r'MOOD',
r'SMOK', r'ALCOH', r'ALCO', r'EXER', r'PHYS', r'ACTIV', r'VAG', r'URINE', r'SEX', r'PAIN',
r'FATIG', r'IRRIT', r'ANXI', r'DEPRESS', r'BLEED', r'MENSE', r'PERIOD', r'LMP',
r'HOTSW', r'QOL', r'DRY'
]
# Exclude laboratory/biomarker variable name patterns
biomarker_exclude = r'E2|FSH|GLUCOSE|CHOLESTEROL|HDL|TRIG|SHBG|DHEAS|INSULIN|BMD|BP|HEIGHT|WEIGHT'
upper_cols = {c: c.upper() for c in df.columns}
selfreport_cols = []
for orig, up in upper_cols.items():
for pat in selfreport_patterns:
if re.search(pat, up):
# skip biomarkers that match both symptom patterns and biomarker patterns
if re.search(biomarker_exclude, up):
continue
selfreport_cols.append(orig)
break
# Also include basic self-report demographics commonly present (AGE, RACE)
for dem in ['AGE7','AGE','RACE','LANGINT7','LANGINT']:
if dem in df.columns and dem not in selfreport_cols:
selfreport_cols.append(dem)
# Deduplicate preserving order
seen=set()
selfreport_cols = [x for x in selfreport_cols if not (x in seen or seen.add(x))]
print(f"Found {len(selfreport_cols)} candidate self-reported columns (first 50 shown):")
print(selfreport_cols[:50])
# Identify menopause-stage variable heuristically
stage_cand_patterns = [r'MENOSTAT', r'MENOSYM', r'MENO', r'MENOP', r'MENST', r'MENSE', r'STATUS']
stage_candidates = [c for c in df.columns if any(re.search(p, c, flags=re.I) for p in stage_cand_patterns)]
print("Stage-like candidate columns (found):", stage_candidates[:10])
# If user provided an override for stage column via CLI, honor it (if present in data)
if STAGE_COL_OVERRIDE:
if STAGE_COL_OVERRIDE in df.columns:
print(f"Using overridden stage column: {STAGE_COL_OVERRIDE}")
stage_candidates = [STAGE_COL_OVERRIDE]
else:
print(f"Warning: requested stage column '{STAGE_COL_OVERRIDE}' not present in data; proceeding with heuristic detection")
# If multiple candidates choose one with few unique values (likely coded categories)
stage_col = None
for c in stage_candidates:
nunique = df[c].nunique(dropna=True)
# prefer small discrete sets (e.g., 2-6 categories)
if 1 < nunique <= 20:
stage_col = c
break
if stage_col is None and stage_candidates:
# fallback to first candidate
stage_col = stage_candidates[0]
if stage_col is None:
raise RuntimeError("No menopause-stage-like column found automatically. Inspect df.columns and pick the proper variable (e.g., MENOSTAT).")
print("Selected stage column:", stage_col, " unique values:", df[stage_col].nunique(dropna=True))
print("Sample raw counts:")
print(df[stage_col].value_counts(dropna=False).head(20))
# --------------------------
# 3. Create working dataframe with self-report features + stage
# --------------------------
use_cols = [stage_col] + [c for c in selfreport_cols if c in df.columns and c != stage_col]
data = df[use_cols].copy()
# Replace common SWAN missing codes with NaN
missing_values = [-9, -8, -7, -1, '.', 'NA', 'N/A', '999', 9999]
data.replace(missing_values, np.nan, inplace=True)
# Try convert object columns to numeric when appropriate
for col in data.columns:
if data[col].dtype == object:
coerced = pd.to_numeric(data[col].astype(str).str.strip(), errors='coerce')
# If many values become numeric, use numeric version; else leave as categorical string
if coerced.notna().sum() > len(coerced) * 0.5:
data[col] = coerced
else:
# replace blank/'nan' strings with np.nan
data[col] = data[col].astype(str).str.strip().replace({'nan': np.nan, '': np.nan})
# --------------------------
# 4. Map stage variable to standardized labels {pre, peri, post}
# *Important*: this is heuristic. Verify using the codebook and adjust mapping if needed.
# --------------------------
def map_stage_to_labels(series):
# Try textual mapping first
s = series.copy()
try:
uniques = [str(x).lower() for x in s.dropna().unique()]
except Exception:
uniques = []
# textual mapping
if any(x in ['pre','premenopausal','premenopause','pre-menopausal'] for x in uniques):
s = s.astype(str).str.lower()
s = s.replace({'premenopausal':'pre','pre-menopausal':'pre','pre-menopause':'pre','pre':'pre'})
s = s.replace({'perimenopausal':'peri','peri-menopausal':'peri','peri':'peri'})
s = s.replace({'postmenopausal':'post','post-menopausal':'post','post':'post'})
return s.map({'pre':'pre','peri':'peri','post':'post'})
# numeric mapping heuristic: map min->pre, median->peri, max->post
num = pd.to_numeric(s, errors='coerce')
num_unique = sorted(num.dropna().unique().tolist())
if len(num_unique) >= 3:
mapping = {num_unique[0]:'pre', num_unique[len(num_unique)//2]:'peri', num_unique[-1]:'post'}
return num.map(mapping)
# 2-level mapping (assume 1->pre,2->post) or fallback
if len(num_unique) == 2:
return num.map({num_unique[0]:'pre', num_unique[1]:'post'})
# If not mappable, return NaN series
return pd.Series([np.nan]*len(s), index=s.index)
mapped_stage = map_stage_to_labels(data[stage_col])
# If mapping failed (too many NaNs), attempt a simple bleed-based heuristic (last menstrual period)
if mapped_stage.isna().mean() > 0.9:
bleed_candidates = [c for c in data.columns if re.search(r'LMP|BLEED|PERIOD|MENSTR', c, flags=re.I)]
if len(bleed_candidates) > 0:
lcol = bleed_candidates[0]
lnum = pd.to_numeric(data[lcol], errors='coerce')
mapped_stage = pd.Series(index=data.index, dtype=object)
mapped_stage[lnum.isna()] = 'post'
mapped_stage[lnum.notna()] = 'pre'
else:
raise RuntimeError("Failed to map stage variable to pre/peri/post and no bleed/LMP variable found.")
data['_menopause_stage'] = mapped_stage
print("Mapped stage counts (after heuristic mapping):")
print(data['_menopause_stage'].value_counts(dropna=False))
# Drop rows with no mapped stage
data = data[~data['_menopause_stage'].isna()].copy()
print("Rows available for modeling:", data.shape[0])
# --------------------------
# 5. Feature selection for modeling
# Keep only self-report fields with enough non-missing values and >1 unique value
# --------------------------
feature_candidates = [c for c in use_cols if c != stage_col]
selected_features = []
for c in feature_candidates:
non_null = data[c].notna().sum()
# require at least 2% nonmissing or minimum 50 observations
if non_null < max(50, len(data) * 0.02):
continue
if data[c].nunique(dropna=True) <= 1:
continue
selected_features.append(c)
print("Number of features selected for modeling:", len(selected_features))
print("First 40 features (if many):", selected_features[:40])
# --------------------------
# 6. Preprocessing pipeline
# Numeric features: impute mean
# Categorical features: impute most frequent + one-hot encode
# Normalization: only added for logistic regression pipeline (tree-based RF doesn't need scaling)
# --------------------------
numeric_feats = [c for c in selected_features if pd.api.types.is_numeric_dtype(data[c])]
cat_feats = [c for c in selected_features if c not in numeric_feats]
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='mean'))
])
# Construct OneHotEncoder in a sklearn-version compatible way
try:
ohe = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
except TypeError:
# older sklearn versions use `sparse` kwarg
ohe = OneHotEncoder(handle_unknown='ignore', sparse=False)
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', ohe)
])
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_feats),
('cat', categorical_transformer, cat_feats)
], remainder='drop')
# Two pipelines: RandomForest (no scaling) and LogisticRegression (scaling)
rf_pipeline = Pipeline(steps=[
('pre', preprocessor),
('rf', RandomForestClassifier(n_estimators=200, random_state=42, n_jobs=-1))
])
lr_pipeline = Pipeline(steps=[
('pre', preprocessor),
('scaler', StandardScaler()),
('lr', LogisticRegression(solver='lbfgs', max_iter=1000))
])
# --------------------------
# 7. Prepare data, train/test split
# --------------------------
X = data[selected_features].copy()
y = data['_menopause_stage'].copy().astype(str) # values: 'pre','peri','post' (hopefully)
print("Target class distribution:")
print(y.value_counts())
# Stratified split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, stratify=y, random_state=42)
print("Train / test sizes:", X_train.shape[0], X_test.shape[0])
# --------------------------
# 8. Train models
# --------------------------
print("Training RandomForest...")
rf_pipeline.fit(X_train, y_train)
print("RandomForest trained.")
print("Training LogisticRegression (multinomial)...")
lr_pipeline.fit(X_train, y_train)
print("LogisticRegression trained.")
# --------------------------
# 9. Predictions and assessment
# --------------------------
def evaluate_model(pipeline, X_test, y_test, model_name, output_dir=OUTPUT_DIR):
y_pred = pipeline.predict(X_test)
report = classification_report(y_test, y_pred)
print(f"\n=== {model_name} Classification Report ===\n{report}")
# confusion matrix
labels = sorted(y_test.unique())
cm = confusion_matrix(y_test, y_pred, labels=labels)
print(f"{model_name} Confusion Matrix (rows=true, cols=pred):\nLabels: {labels}\n{cm}")
# Save classification report
with open(os.path.join(output_dir, f"classification_report_{model_name.replace(' ','_')}.txt"), "w") as f:
f.write(report)
# Plot confusion matrix with matplotlib
fig, ax = plt.subplots(figsize=(5,4))
im = ax.imshow(cm, interpolation='nearest')
ax.set_xticks(range(len(labels))); ax.set_xticklabels(labels, rotation=45)
ax.set_yticks(range(len(labels))); ax.set_yticklabels(labels)
ax.set_title(f"{model_name} Confusion Matrix")
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
ax.text(j, i, format(cm[i, j], 'd'), ha="center", va="center")
plt.tight_layout()
plt.savefig(os.path.join(output_dir, f"{model_name.replace(' ','_')}_confusion_matrix.png"))
# Show plots only when requested; otherwise close to free resources (non-interactive default)
if SHOW_PLOTS:
plt.show()
else:
plt.close('all')
return y_pred, cm
rf_pred, rf_cm = evaluate_model(rf_pipeline, X_test, y_test, "RandomForest")
lr_pred, lr_cm = evaluate_model(lr_pipeline, X_test, y_test, "LogisticRegression")
# 10. Feature importance
# Extract feature names after preprocessing (numerics stay same; categorical one-hot create names)
pre = rf_pipeline.named_steps['pre']
# Get numeric feature names
feature_names = []
if len(numeric_feats) > 0:
feature_names.extend(numeric_feats)
if len(cat_feats) > 0:
# Get onehot output names
ohe = pre.named_transformers_['cat'].named_steps['onehot']
try:
cat_onehot_names = ohe.get_feature_names_out(cat_feats)
except Exception:
# fallback
cat_onehot_names = []
feature_names.extend(cat_onehot_names.tolist() if hasattr(cat_onehot_names, 'tolist') else list(cat_onehot_names))
# Feature importances from RandomForest
rf_model = rf_pipeline.named_steps['rf']
importances = rf_model.feature_importances_
imp_df = pd.DataFrame({'feature': feature_names, 'importance': importances}).sort_values('importance', ascending=False)
imp_df.to_csv(os.path.join(OUTPUT_DIR, "rf_feature_importances.csv"), index=False)
print("\nTop 20 RF feature importances:")
print(imp_df.head(20).to_string(index=False))
# Permutation importance (robust)
print("Computing permutation importance (this can take some time)...")
perm = permutation_importance(rf_pipeline, X_test, y_test, n_repeats=10, random_state=42, n_jobs=-1)
perm_idx = perm.importances_mean.argsort()[::-1]
perm_df = pd.DataFrame({
'feature': np.array(feature_names)[perm_idx],
'importance_mean': perm.importances_mean[perm_idx],
'importance_std': perm.importances_std[perm_idx]
})
perm_df.to_csv(os.path.join(OUTPUT_DIR, "rf_permutation_importances.csv"), index=False)
print("Top 20 permutation importances:")
print(perm_df.head(20).to_string(index=False))
# Plot RF top features
topn = min(20, imp_df.shape[0])
fig, ax = plt.subplots(figsize=(8,6))
ax.barh(imp_df['feature'].head(topn)[::-1], imp_df['importance'].head(topn)[::-1])
ax.set_title("RandomForest: Top feature importances")
ax.set_xlabel("Importance")
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, "rf_top_feature_importances.png"))
if SHOW_PLOTS:
plt.show()
else:
plt.close('all')
# 11. ROC curves (one-vs-rest) if predict_proba available
def plot_multiclass_roc(pipeline, X_test, y_test, model_name):
if not hasattr(pipeline, "predict_proba"):
print(f"{model_name} has no predict_proba; skipping ROC plot.")
return
# Must use same class order as pipeline's final estimator
final_est = pipeline.named_steps[list(pipeline.named_steps.keys())[-1]]
classes = final_est.classes_
y_test_bin = label_binarize(y_test, classes=classes)
y_score = pipeline.predict_proba(X_test)
for i, cls in enumerate(classes):
fpr, tpr, _ = roc_curve(y_test_bin[:, i], y_score[:, i])
roc_auc = auc(fpr, tpr)
plt.figure()
plt.plot(fpr, tpr, label=f"AUC = {roc_auc:.3f}")
plt.plot([0,1],[0,1], linestyle='--')
plt.title(f"{model_name} ROC for class {cls}")
plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
plt.legend(loc='lower right')
plt.savefig(os.path.join(OUTPUT_DIR, f"{model_name.replace(' ','_')}_ROC_{cls}.png"))
if SHOW_PLOTS:
plt.show()
else:
plt.close('all')
print("Plotting ROC curves for RandomForest and LogisticRegression (if available)...")
if __name__ == '__main__' and 'rf_pipeline' in globals():
plot_multiclass_roc(rf_pipeline, X_test, y_test, "RandomForest")
plot_multiclass_roc(lr_pipeline, X_test, y_test, "LogisticRegression")
# ==========================================================================================
# 12. FORECASTING MODULE: Predict menopausal stage for new individuals
# ==========================================================================================
class MenopauseForecast:
"""
Forecasting module for predicting menopausal stage (pre/peri/post) given self-reported features.
This class encapsulates the trained models and preprocessing pipeline to make predictions
on new data with the same features used during training.
"""
def __init__(self, rf_pipeline, lr_pipeline, feature_names, stage_classes):
"""
Initialize the forecaster with trained pipelines.
Parameters:
-----------
rf_pipeline : sklearn Pipeline
Trained RandomForest pipeline
lr_pipeline : sklearn Pipeline
Trained LogisticRegression pipeline
feature_names : list
List of feature column names used for training
stage_classes : list
List of possible menopause stage classes (e.g., ['pre', 'peri', 'post'])
"""
self.rf_pipeline = rf_pipeline
self.lr_pipeline = lr_pipeline
self.feature_names = feature_names
self.stage_classes = stage_classes
self.models = {
'RandomForest': rf_pipeline,
'LogisticRegression': lr_pipeline
}
def predict_single(self, feature_dict, model='RandomForest', return_proba=True):
"""
Predict menopausal stage for a single individual.
Parameters:
-----------
feature_dict : dict
Dictionary with feature names as keys and values for prediction.
Example: {'HOT7': 1, 'SLEEP7': 2, 'CESD': 10, ...}
model : str
Which model to use for prediction: 'RandomForest' or 'LogisticRegression'
return_proba : bool
If True, return prediction probabilities; otherwise just the class label
Returns:
--------
dict : Contains 'stage', 'confidence', and optionally 'probabilities'
"""
if model not in self.models:
raise ValueError(f"Model '{model}' not found. Available: {list(self.models.keys())}")
# Create DataFrame with single row, reindex to match training features
X = pd.DataFrame([feature_dict]).reindex(columns=self.feature_names, fill_value=np.nan)
pipeline = self.models[model]
prediction = pipeline.predict(X)[0]
result = {
'stage': prediction,
'model': model,
'confidence': None,
'probabilities': None
}
if return_proba:
try:
proba = pipeline.predict_proba(X)[0]
result['confidence'] = float(np.max(proba))
result['probabilities'] = {
cls: float(prob)
for cls, prob in zip(pipeline.named_steps[list(pipeline.named_steps.keys())[-1]].classes_, proba)
}
except Exception as e:
print(f"Warning: Could not compute probabilities: {e}")
return result
def predict_batch(self, df, model='RandomForest', return_proba=True):
"""
Predict menopausal stage for multiple individuals (batch prediction).
Parameters:
-----------
df : pd.DataFrame
DataFrame with feature columns matching training features.
Missing values will be handled by the preprocessing pipeline.
model : str
Which model to use: 'RandomForest' or 'LogisticRegression'
return_proba : bool
If True, return prediction probabilities
Returns:
--------
pd.DataFrame : Contains 'predicted_stage', 'confidence', and probability columns
"""
if model not in self.models:
raise ValueError(f"Model '{model}' not found. Available: {list(self.models.keys())}")
# Reindex to match training features
X = df.reindex(columns=self.feature_names, fill_value=np.nan)
pipeline = self.models[model]
predictions = pipeline.predict(X)
result_df = pd.DataFrame({
'predicted_stage': predictions,
'model': model
})
if return_proba:
try:
proba = pipeline.predict_proba(X)
final_est = pipeline.named_steps[list(pipeline.named_steps.keys())[-1]]
result_df['confidence'] = np.max(proba, axis=1)
# Add probability column for each class
for i, cls in enumerate(final_est.classes_):
result_df[f'prob_{cls}'] = proba[:, i]
except Exception as e:
print(f"Warning: Could not compute probabilities: {e}")
return result_df
def compare_models(self, feature_dict):
"""
Compare predictions from both RandomForest and LogisticRegression models.
Parameters:
-----------
feature_dict : dict
Feature values for the individual
Returns:
--------
dict : Predictions and probabilities from both models
"""
rf_result = self.predict_single(feature_dict, model='RandomForest', return_proba=True)
lr_result = self.predict_single(feature_dict, model='LogisticRegression', return_proba=True)
return {
'RandomForest': rf_result,
'LogisticRegression': lr_result
}
def get_feature_info(self):
"""Return information about required features."""
return {
'num_features': len(self.feature_names),
'feature_names': self.feature_names,
'stage_classes': self.stage_classes
}
def create_forecast_example():
"""
Create an example forecast instance and demonstrate usage.
This function is robust: if the training artifacts (`rf_pipeline`, `lr_pipeline`,
`selected_features`, `X_train`, `X_test`) are not available in memory (e.g., when
the module is imported in another process), it attempts to load saved pipelines
from `OUTPUT_DIR` via `load_forecast_model()` and uses placeholder inputs.
"""
print("\n" + "="*80)
print("FORECASTING MODULE EXAMPLE: Predicting Menopausal Stage")
print("="*80)
# Determine pipelines and feature metadata (use in-memory if available, else load from disk)
try:
_rf = rf_pipeline
_lr = lr_pipeline
_features = selected_features
_stage_classes = sorted(y.unique().tolist())
has_training = True
except NameError:
print("Training artifacts not present in memory; attempting to load from disk...")
try:
_loaded = load_forecast_model(OUTPUT_DIR)
_rf = _loaded.rf_pipeline
_lr = _loaded.lr_pipeline
_features = _loaded.feature_names
_stage_classes = _loaded.stage_classes
has_training = False
except Exception as e:
raise RuntimeError(f"Failed to initialize forecaster from disk: {e}")
forecast = MenopauseForecast(
rf_pipeline=_rf,
lr_pipeline=_lr,
feature_names=_features,
stage_classes=_stage_classes
)
print(f"\nForecaster initialized with {len(_features)} features")
print(f"Predicting stages: {_stage_classes}")
# Example 1: Single individual prediction
print("\n--- Example 1: Predict for a single individual ---")
example_individual = {}
n_example_feats = min(10, len(_features))
if has_training:
for feat in _features[:n_example_feats]:
try:
example_individual[feat] = float(pd.to_numeric(X_train[feat], errors='coerce').median())
except Exception:
# Fallback to mode or NaN
try:
example_individual[feat] = X_train[feat].mode().iloc[0]
except Exception:
example_individual[feat] = np.nan
else:
# No training DF available; provide NaN placeholders to let pipeline impute
for feat in _features[:n_example_feats]:
example_individual[feat] = np.nan
result = forecast.predict_single(example_individual, model='RandomForest', return_proba=True)
print(f"Predicted stage: {result.get('stage')}")
print(f"Confidence: {result.get('confidence'):.3f}" if result.get('confidence') is not None else "Confidence: None")
if result.get('probabilities'):
print("Stage probabilities:")
for stage, prob in sorted(result['probabilities'].items()):
print(f" {stage}: {prob:.3f}")
# Example 2: Compare models
print("\n--- Example 2: Compare RandomForest vs LogisticRegression ---")
comparison = forecast.compare_models(example_individual)
for model_name, cres in comparison.items():
print(f"\n{model_name}:")
print(f" Predicted stage: {cres.get('stage')}")
print(f" Confidence: {cres.get('confidence'):.3f}" if cres.get('confidence') is not None else " Confidence: None")
# Example 3: Batch prediction on a small sample (either X_test if available or placeholder rows)
print("\n--- Example 3: Batch prediction (small sample) ---")
if has_training:
try:
test_sample = X_test.iloc[:5].copy()
batch_results = forecast.predict_batch(test_sample, model='RandomForest', return_proba=True)
print(batch_results.to_string())
except Exception as e:
print(f"Batch prediction failed on training sample: {e}")
else:
# Create a small placeholder DataFrame with feature columns filled with NaN
placeholder = pd.DataFrame([{f: np.nan for f in _features[:n_example_feats]}])
batch_results = forecast.predict_batch(placeholder, model='RandomForest', return_proba=True)
print(batch_results.to_string())
return forecast
def save_forecast_model(forecast_instance, output_dir=OUTPUT_DIR):
"""
Save the forecast model instance for later use (optional: can use joblib for production).
For now, saves metadata about features and classes that can be used to reinitialize
the forecaster.
Parameters:
-----------
forecast_instance : MenopauseForecast
The forecaster to save
output_dir : str
Directory to save metadata
"""
import json
import joblib
metadata = {
'feature_names': forecast_instance.feature_names,
'stage_classes': forecast_instance.stage_classes,
'num_features': len(forecast_instance.feature_names)
}
# Save metadata as JSON
with open(os.path.join(output_dir, 'forecast_metadata.json'), 'w') as f:
json.dump(metadata, f, indent=2)
# Save trained pipelines using joblib (allows full reuse)
joblib.dump(forecast_instance.rf_pipeline, os.path.join(output_dir, 'rf_pipeline.pkl'))
joblib.dump(forecast_instance.lr_pipeline, os.path.join(output_dir, 'lr_pipeline.pkl'))
print(f"Forecast model saved to {output_dir}")
print(f" - forecast_metadata.json")
print(f" - rf_pipeline.pkl")
print(f" - lr_pipeline.pkl")
def load_forecast_model(output_dir=OUTPUT_DIR):
"""
Load a previously saved forecast model.
Parameters:
-----------
output_dir : str
Directory containing saved models
Returns:
--------
MenopauseForecast : The loaded forecaster
"""
import json
import joblib
# Load metadata
with open(os.path.join(output_dir, 'forecast_metadata.json'), 'r') as f:
metadata = json.load(f)
# Load pipelines
rf_pipeline_loaded = joblib.load(os.path.join(output_dir, 'rf_pipeline.pkl'))
lr_pipeline_loaded = joblib.load(os.path.join(output_dir, 'lr_pipeline.pkl'))
# Recreate forecaster
forecast = MenopauseForecast(
rf_pipeline=rf_pipeline_loaded,
lr_pipeline=lr_pipeline_loaded,
feature_names=metadata['feature_names'],
stage_classes=metadata['stage_classes']
)
print(f"Forecast model loaded from {output_dir}")
return forecast
# Initialize and demonstrate the forecasting module
# Symptom cycle forecasting (defined earlier near CLI args)
class SymptomCycleForecaster:
"""
Predicts the probability of hot flashes and mood changes within a menstrual cycle
based on last menstrual period (LMP) date and target date.
"""
def __init__(self, cycle_length=28, hot_mu=14, hot_sigma=5, mood_mu=26, mood_sigma=4,
base_hot=0.1, amp_hot=0.4, base_mood=0.1, amp_mood=0.45, threshold=0.5):
self.cycle_length = cycle_length
self.hot_mu = hot_mu
self.hot_sigma = hot_sigma
self.mood_mu = mood_mu
self.mood_sigma = mood_sigma
self.base_hot = base_hot
self.amp_hot = amp_hot
self.base_mood = base_mood
self.amp_mood = amp_mood
self.threshold = threshold
def _parse_lmp(self, lmp, reference_date=None):
"""Parse LMP input which may be a full date string or an integer day-of-month."""
if pd.isna(lmp):
return None
# If numeric day (int-like), construct a date in the same month as reference_date
try:
lmp_int = int(lmp)
if reference_date is None:
ref = pd.Timestamp(datetime.today()).to_pydatetime()
else:
ref = pd.to_datetime(reference_date, errors='coerce')
if pd.isna(ref):
ref = pd.Timestamp(datetime.today()).to_pydatetime()
else:
ref = ref.to_pydatetime()
# Clamp day to valid range
day = max(1, min(lmp_int, 28))
return datetime(ref.year, ref.month, day)
except Exception:
# Try parse as full date string
try:
return pd.to_datetime(lmp, errors='coerce').to_pydatetime()
except Exception:
return None
def compute_cycle_day(self, lmp, target_date=None):
"""Return 1-based cycle day (1..cycle_length) or None if cannot compute."""
if target_date is None:
tdate = datetime.today()
else:
tdate = pd.to_datetime(target_date, errors='coerce')
if pd.isna(tdate):
tdate = datetime.today()
else:
tdate = tdate.to_pydatetime()
lmp_date = self._parse_lmp(lmp, reference_date=tdate)
if lmp_date is None:
return None
delta = (tdate - lmp_date).days
if delta < 0:
# If LMP is in the future, assume it refers to previous cycle (subtract one month)
lmp_date = lmp_date - timedelta(days=self.cycle_length)
delta = (tdate - lmp_date).days
cycle_day = (delta % self.cycle_length) + 1
return int(cycle_day)
def _gauss_prob(self, day, mu, sigma, base, amp):
if day is None:
return np.nan
val = base + amp * np.exp(-0.5 * ((day - mu) / float(sigma)) ** 2)
return float(min(max(val, 0.0), 1.0))
def predict_single(self, lmp, target_date=None):
day = self.compute_cycle_day(lmp, target_date=target_date)
hot_p = self._gauss_prob(day, self.hot_mu, self.hot_sigma, self.base_hot, self.amp_hot)
mood_p = self._gauss_prob(day, self.mood_mu, self.mood_sigma, self.base_mood, self.amp_mood)
return {
'cycle_day': day,
'hotflash_prob': hot_p,
'hotflash_pred': hot_p >= self.threshold if not np.isnan(hot_p) else None,
'mood_prob': mood_p,
'mood_pred': mood_p >= self.threshold if not np.isnan(mood_p) else None
}
def predict_df(self, df, lmp_col='LMP', date_col=None, menopause_stage_col=None):
df = df.copy()
results = df.apply(
lambda row: pd.Series(self.predict_single(
lmp=row.get(lmp_col),
target_date=(row.get(date_col) if date_col is not None else None)
)), axis=1
)
out = pd.concat([df.reset_index(drop=True), results.reset_index(drop=True)], axis=1)
return out
def predict_symptoms_from_csv(input_csv, output_csv, lmp_col='LMP', date_col=None,
menopause_stage_col=None, cycle_length=28, **kwargs):
"""Read input CSV, predict hot flashes/mood by cycle day, and write output CSV."""
df = pd.read_csv(input_csv)
fore = SymptomCycleForecaster(cycle_length=cycle_length)
out_df = fore.predict_df(df, lmp_col=lmp_col, date_col=date_col, menopause_stage_col=menopause_stage_col)
out_df.to_csv(output_csv, index=False)
# Print a brief summary
print(f"Wrote symptom predictions for {out_df.shape[0]} rows to {output_csv}")
print("Sample predictions (first 5 rows):")
print(out_df[[lmp_col] + ['cycle_day','hotflash_prob','hotflash_pred','mood_prob','mood_pred']].head().to_string())
# CLI integration: run symptom prediction if requested
if __name__ == '__main__':
# If symptom prediction requested via CLI, run fast-path and exit
if args.predict_symptoms:
if not args.symptoms_input or not args.symptoms_output:
print("Error: --symptoms-input and --symptoms-output are required when --predict-symptoms is set")
sys.exit(1)
else:
predict_symptoms_from_csv(
input_csv=args.symptoms_input,
output_csv=args.symptoms_output,
lmp_col=args.lmp_col,
date_col=args.date_col,
cycle_length=args.cycle_length
)
sys.exit(0)
# Dual predictions are handled in the early fast-path above to avoid training.
# Default behavior: create demo forecaster, save trained models and show summary
forecast_model = create_forecast_example()
save_forecast_model(forecast_model)
print("\n" + "="*80)
print("FORECASTING MODULE SUMMARY")
print("="*80)
print("""
The MenopauseForecast class provides three main methods for predictions:
1. predict_single(feature_dict, model='RandomForest', return_proba=True)
- Predict stage for one individual given feature values
- Returns predicted stage and confidence scores
2. predict_batch(df, model='RandomForest', return_proba=True)
- Predict stages for multiple individuals
- Returns DataFrame with predictions and probabilities for each stage
3. compare_models(feature_dict)
- Compare predictions from both RandomForest and LogisticRegression
- Useful for validating model agreement
Usage in your own code:
from menopause import load_forecast_model
# Load the trained forecaster
forecast = load_forecast_model('swan_ml_output')
# Predict for an individual
features = {'HOT7': 1, 'SLEEP7': 2, 'CESD': 10, ...}
result = forecast.predict_single(features, model='RandomForest')
# Predict for multiple individuals
results_df = forecast.predict_batch(your_dataframe, model='RandomForest')
""")
# ==========================================================================================
# 13. CSV INPUT/OUTPUT FUNCTIONALITY: Batch prediction from CSV files
# ==========================================================================================
def predict_from_csv(input_csv, forecast_instance, output_csv=None, model='RandomForest', output_dir=OUTPUT_DIR):
"""
Read individual data from CSV, make predictions, and save results.
Parameters:
-----------
input_csv : str
Path to input CSV file with feature columns for individuals
CSV should have columns matching training features (or subset)
forecast_instance : MenopauseForecast
The trained forecaster instance
output_csv : str
Path to output CSV file (default: input_csv with '_predictions' appended)
model : str
Which model to use ('RandomForest' or 'LogisticRegression')
output_dir : str
Directory to save results (for metadata)
Returns:
--------
pd.DataFrame : Results with predictions and confidence scores
Example:
--------
forecast = load_forecast_model('swan_ml_output')
results = predict_from_csv('individuals.csv', forecast)
# Results saved to 'individuals_predictions.csv'
"""
import os
# Read input CSV
print(f"Reading input data from: {input_csv}")
try:
data = pd.read_csv(input_csv)
except FileNotFoundError:
print(f"ERROR: File not found: {input_csv}")
return None
n_samples = len(data)
print(f"Loaded {n_samples} individuals")
# Identify feature columns (exclude ID columns)
id_cols = ['ID', 'id', 'SWANID', 'individual', 'Individual', 'subject', 'Subject']
feature_cols = [c for c in data.columns if c not in id_cols]
# Separate ID columns from features
id_data = data[[c for c in id_cols if c in data.columns]] if any(c in data.columns for c in id_cols) else None
# Make predictions
print(f"Making predictions using {model}...")
predictions = forecast_instance.predict_batch(
data[feature_cols],
model=model,
return_proba=True
)
# Combine with original data
if id_data is not None:
results = pd.concat([id_data.reset_index(drop=True), predictions.reset_index(drop=True)], axis=1)
else:
results = predictions.reset_index(drop=True)
# Add individual index if no ID column
if id_data is None:
results.insert(0, 'individual', range(1, n_samples + 1))
# Set output file path
if output_csv is None:
base, ext = os.path.splitext(input_csv)
output_csv = f"{base}_predictions{ext}"
# Save results
print(f"Saving predictions to: {output_csv}")
results.to_csv(output_csv, index=False)
return results
def predict_dual_from_csv(stage_input_csv, stage_output_csv, symptoms_input_csv, symptoms_output_csv,
forecast_dir=OUTPUT_DIR, model='RandomForest', lmp_col='LMP',
date_col=None, cycle_length=28):
"""Run menopause stage prediction and symptom-cycle prediction using separate
input and output files for each model.
Returns:
--------
dict : {'stage': stage_results_df, 'symptoms': symptom_results_df}
"""
print(f"Reading stage input data from: {stage_input_csv}")
try:
stage_data = pd.read_csv(stage_input_csv)
except FileNotFoundError:
print(f"ERROR: File not found: {stage_input_csv}")
return None
# Load forecast model
try:
forecast = load_forecast_model(output_dir=forecast_dir)
except Exception as e:
print(f"ERROR: Could not load forecast model from '{forecast_dir}': {e}")
return None
# Identify id and feature columns
id_cols = ['ID', 'id', 'SWANID', 'individual', 'Individual', 'subject', 'Subject']
feature_cols = [c for c in stage_data.columns if c not in id_cols]
# Make stage predictions
print(f"Making menopause stage predictions using {model}...")
stage_preds = forecast.predict_batch(stage_data[feature_cols], model=model, return_proba=True)
id_data = stage_data[[c for c in id_cols if c in stage_data.columns]] if any(c in stage_data.columns for c in id_cols) else None
if id_data is not None:
stage_results = pd.concat([id_data.reset_index(drop=True), stage_preds.reset_index(drop=True)], axis=1)
else:
stage_results = stage_preds.reset_index(drop=True)
stage_results.insert(0, 'individual', range(1, len(stage_results) + 1))
# Default stage output path if not provided
if stage_output_csv is None:
base, ext = os.path.splitext(stage_input_csv)
stage_output_csv = f"{base}_stage_predictions{ext}"
print(f"Saving stage predictions to: {stage_output_csv}")
stage_results.to_csv(stage_output_csv, index=False)
# Symptom predictions (independent)
print(f"Reading symptom input data from: {symptoms_input_csv}")
try:
symptom_data = pd.read_csv(symptoms_input_csv)
except FileNotFoundError:
print(f"ERROR: File not found: {symptoms_input_csv}")
return None
if date_col is None and 'date' in symptom_data.columns:
date_col = 'date'
fore = SymptomCycleForecaster(cycle_length=cycle_length)
symptom_results = fore.predict_df(symptom_data, lmp_col=lmp_col, date_col=date_col)
# Default symptom output path if not provided
if symptoms_output_csv is None:
base, ext = os.path.splitext(symptoms_input_csv)
symptoms_output_csv = f"{base}_symptom_predictions{ext}"
print(f"Saving symptom predictions to: {symptoms_output_csv}")
symptom_results.to_csv(symptoms_output_csv, index=False)
return {'stage': stage_results, 'symptoms': symptom_results}
def predict_combined_from_csv(*args, **kwargs):
"""Deprecated: combined predictions are removed in favor of separate input/output files."""
raise ValueError(
"Combined predictions are deprecated. Use predict_dual_from_csv() with separate stage and symptom input/output files."
)
def create_demo_csv(forecast_instance, num_individuals=5, output_file='demo_individuals.csv', output_dir=OUTPUT_DIR):
"""
Create a demo CSV file with sample individuals for testing predictions.
Uses statistics from the training data to generate realistic feature values.
Parameters:
-----------
forecast_instance : MenopauseForecast
The trained forecaster (used to get feature names)
num_individuals : int
Number of demo individuals to generate
output_file : str
Path to output CSV file
output_dir : str
Directory to save demo file
Returns:
--------
str : Path to created CSV file
"""
# Get feature names from forecaster
feature_names = forecast_instance.feature_names
# Create demo data with random realistic values
np.random.seed(42)
demo_data = {}
# Add individual ID
demo_data['individual'] = [f"Individual_{i+1}" for i in range(num_individuals)]
# Generate random feature values (using ranges typical for SWAN data)
for feat in feature_names:
# Random values between 1 and 5 (typical Likert scale for SWAN)
demo_data[feat] = np.random.randint(1, 6, size=num_individuals)
# Create DataFrame
demo_df = pd.DataFrame(demo_data)
# Create full path
full_path = os.path.join(output_dir, output_file)
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Save demo file
demo_df.to_csv(full_path, index=False)
print(f"✅ Demo CSV created: {full_path}")
print(f" Individuals: {num_individuals}")
print(f" Features: {len(feature_names)}")
print(f" File shape: {demo_df.shape}")
return full_path
def add_performance_metrics_to_csv(results_df, y_test=None, model_name='RandomForest'):
"""
Add performance metrics to predictions CSV.
If true labels available, computes accuracy, precision, recall, F1-score.
Parameters:
-----------
results_df : pd.DataFrame
Results dataframe with predictions
y_test : array-like
True labels (optional)
model_name : str
Name of model used
Returns:
--------
pd.DataFrame : Results with metrics appended
"""
if y_test is not None:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
acc = accuracy_score(y_test, results_df['predicted_stage'])
prec = precision_score(y_test, results_df['predicted_stage'], average='weighted', zero_division=0)
recall = recall_score(y_test, results_df['predicted_stage'], average='weighted', zero_division=0)
f1 = f1_score(y_test, results_df['predicted_stage'], average='weighted', zero_division=0)
# Add as metadata comment at bottom
metrics_text = f"\n# Performance Metrics ({model_name})\n"
metrics_text += f"# Accuracy: {acc:.3f}\n"
metrics_text += f"# Precision (weighted): {prec:.3f}\n"
metrics_text += f"# Recall (weighted): {recall:.3f}\n"
metrics_text += f"# F1-Score (weighted): {f1:.3f}\n"
return results_df, metrics_text
return results_df, None