| | """ |
| | Single-dataset statistical comparison: control vs competitors via |
| | Nadeau–Bengio corrected t-tests on outer folds (k-fold CV), with Holm FWER control. |
| | No Wilcoxon. No Friedman. Test metrics are shown for context only. |
| | """ |
| |
|
| | import argparse |
| | import re |
| | from pathlib import Path |
| | from typing import Dict, List, Optional, Tuple |
| |
|
| | import numpy as np |
| | import pandas as pd |
| | from math import sqrt |
| |
|
| | from statsmodels.stats.multitest import multipletests |
| | from scipy.stats import t as tdist |
| |
|
| | np.random.seed(42) |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def _parse_list_in_brackets(inner: str) -> Optional[List[float]]: |
| | parts = [p.strip() for p in inner.split(",") if p.strip()] |
| | out: List[float] = [] |
| | for p in parts: |
| | try: |
| | out.append(float(p)) |
| | except Exception: |
| | return None |
| | return out |
| |
|
| |
|
| | def parse_log_file( |
| | log_path: Path, expected_k: int = 5 |
| | ) -> Tuple[Optional[List[float]], Optional[List[float]]]: |
| | """ |
| | Extract K outer-fold validation RMSE and MAE arrays. |
| | Looks for: |
| | VAL FOLD RMSEs: [v1, ..., vK] |
| | VAL FOLD MAEs: [v1, ..., vK] |
| | """ |
| | try: |
| | text = log_path.read_text() |
| | except Exception: |
| | return None, None |
| |
|
| | rmse_match = re.search( |
| | r"VAL\s+FOLD\s+RMSEs:\s*\[(.*?)\]", text, flags=re.IGNORECASE |
| | ) |
| | mae_match = re.search(r"VAL\s+FOLD\s+MAEs:\s*\[(.*?)\]", text, flags=re.IGNORECASE) |
| | if not (rmse_match and mae_match): |
| | return None, None |
| |
|
| | rmse_scores = _parse_list_in_brackets(rmse_match.group(1)) |
| | mae_scores = _parse_list_in_brackets(mae_match.group(1)) |
| | if rmse_scores is None or mae_scores is None: |
| | return None, None |
| | if len(rmse_scores) != expected_k or len(mae_scores) != expected_k: |
| | return None, None |
| | return rmse_scores, mae_scores |
| |
|
| |
|
| | def parse_csv_file(csv_path: Path) -> Optional[Dict[str, float]]: |
| | """ |
| | Parse *_final_results.csv with expected columns: |
| | val_rmse_mean, val_rmse_std, val_mae_mean, val_mae_std, |
| | test_rmse_mean, test_rmse_ci_low, test_rmse_ci_high, |
| | test_mae_mean, test_mae_ci_low, test_mae_ci_high |
| | """ |
| | try: |
| | df = pd.read_csv(csv_path) |
| | except Exception: |
| | return None |
| | cols = [ |
| | "val_rmse_mean", |
| | "val_rmse_std", |
| | "val_mae_mean", |
| | "val_mae_std", |
| | "test_rmse_mean", |
| | "test_rmse_ci_low", |
| | "test_rmse_ci_high", |
| | "test_mae_mean", |
| | "test_mae_ci_low", |
| | "test_mae_ci_high", |
| | ] |
| | if not all(c in df.columns for c in cols): |
| | return None |
| | row = df.iloc[0] |
| | return {k: float(row[k]) for k in cols} |
| |
|
| |
|
| | def clean_base_and_exp_id(base_name: str) -> Tuple[str, str]: |
| | """ |
| | Strip trailing _YYYYMMDD_HHMMSS. exp_id := first two tokens (e.g., 'gat_ecfp'), |
| | or 'polyatomic_polyatomic'. |
| | """ |
| | clean = re.sub(r"_\d{8}_\d{6}$", "", base_name) |
| | toks = clean.split("_") |
| | exp_id = "_".join(toks[:2]) if len(toks) >= 2 else clean |
| | return clean, exp_id |
| |
|
| |
|
| | def discover_dataset(results_dir: Path, expected_k: int = 5) -> Dict[str, Dict]: |
| | """ |
| | Scan one dataset dir. Keep first valid (log,csv) per exp_id. |
| | Returns: |
| | data[exp_id] = { |
| | 'rmse_folds': [K] or None, |
| | 'mae_folds' : [K] or None, |
| | 'summary' : {...}, |
| | 'log_path' : str, |
| | 'csv_path' : str |
| | } |
| | """ |
| | data: Dict[str, Dict] = {} |
| | for model_dir in results_dir.iterdir(): |
| | if not model_dir.is_dir(): |
| | continue |
| | for log_file in sorted(model_dir.glob("*.txt")): |
| | base = log_file.stem |
| | clean, exp_id = clean_base_and_exp_id(base) |
| | csv_file = log_file.with_name(f"{clean}_final_results.csv") |
| | if not csv_file.exists(): |
| | continue |
| | rmse_scores, mae_scores = parse_log_file(log_file, expected_k=expected_k) |
| | summary_stats = parse_csv_file(csv_file) |
| | if summary_stats is None: |
| | continue |
| | if exp_id not in data: |
| | data[exp_id] = { |
| | "rmse_folds": rmse_scores, |
| | "mae_folds": mae_scores, |
| | "summary": summary_stats, |
| | "log_path": str(log_file), |
| | "csv_path": str(csv_file), |
| | } |
| | return data |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def nb_corrected_t(diffs: np.ndarray, k: int) -> Tuple[float, float, float]: |
| | """ |
| | Nadeau–Bengio corrected resampled t for k-fold CV differences. |
| | diffs: array length k with (competitor - control) per fold (loss metric). |
| | Negative mean favors control. Returns (t_stat, SE_NB, mean_diff). |
| | Correction uses rho0 = 1/(k-1) → SE_NB = sqrt((1/k + 1/(k-1)) * s^2), |
| | where s^2 is unbiased sample variance across folds. |
| | """ |
| | diffs = np.asarray(diffs, dtype=float) |
| | if diffs.size != k: |
| | raise ValueError("diffs length must equal k") |
| | mean_d = float(diffs.mean()) |
| | if k <= 1: |
| | return np.nan, np.nan, mean_d |
| | s2 = float(np.var(diffs, ddof=1)) |
| | rho0 = 1.0 / (k - 1.0) |
| | se_nb = sqrt((1.0 / k + rho0) * s2) |
| | t_stat = ( |
| | mean_d / se_nb |
| | if se_nb > 0 |
| | else (-np.inf if mean_d < 0 else np.inf if mean_d > 0 else 0.0) |
| | ) |
| | return t_stat, se_nb, mean_d |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def main(): |
| | ap = argparse.ArgumentParser( |
| | description="Single-dataset comparison: NB-corrected t on outer folds (control vs all) with Holm; prints Test metrics for context." |
| | ) |
| | ap.add_argument( |
| | "--results_dir", |
| | type=str, |
| | required=True, |
| | help="Dataset directory (e.g., ./logs_hyperparameter/qm9) with model-family subfolders.", |
| | ) |
| | ap.add_argument( |
| | "--output_dir", type=str, required=True, help="Directory to save the report." |
| | ) |
| | ap.add_argument( |
| | "--exp_name", |
| | type=str, |
| | required=True, |
| | help="Base name for the output report file.", |
| | ) |
| | ap.add_argument( |
| | "--control_model", |
| | type=str, |
| | required=True, |
| | help="Control exp_id (e.g., 'polyatomic_polyatomic'). Unique substring allowed.", |
| | ) |
| | ap.add_argument( |
| | "--k", type=int, default=5, help="Expected number of outer folds (default: 5)." |
| | ) |
| | ap.add_argument( |
| | "--alpha", |
| | type=float, |
| | default=0.05, |
| | help="FWER level for Holm; also used for NB CIs if --print_ci.", |
| | ) |
| | ap.add_argument( |
| | "--print_ci", |
| | action="store_true", |
| | help="Also print NB-style CIs for the mean fold difference.", |
| | ) |
| | args = ap.parse_args() |
| |
|
| | results_dir = Path(args.results_dir) |
| | out_dir = Path(args.output_dir) |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| | report_path = out_dir / f"{args.exp_name}_NB_single_dataset.txt" |
| |
|
| | data = discover_dataset(results_dir, expected_k=args.k) |
| | if not data: |
| | raise SystemExit( |
| | f"No models found under {results_dir} with valid logs + *_final_results.csv" |
| | ) |
| |
|
| | exp_ids = sorted(data.keys()) |
| |
|
| | |
| | ctrl = args.control_model |
| | if ctrl not in data: |
| | matches = [e for e in exp_ids if ctrl.lower() in e.lower()] |
| | if len(matches) == 1: |
| | ctrl = matches[0] |
| | else: |
| | raise SystemExit( |
| | f"Control '{args.control_model}' not found. Available exp_ids: {exp_ids}" |
| | ) |
| |
|
| | with open(report_path, "w") as f: |
| | f.write("=" * 110 + "\n") |
| | f.write( |
| | f"Dataset: {results_dir.name} — Control vs competitors (NB-corrected t on outer folds; Holm across competitors)\n" |
| | ) |
| | f.write("=" * 110 + "\n\n") |
| | f.write(f"Control exp_id: {ctrl}\n") |
| | f.write(f"k folds: {args.k}, alpha: {args.alpha}\n\n") |
| |
|
| | |
| | header = f"{'Model (exp_id)':<26} | {'Test RMSE (95% CI)':<30} | {'Test MAE (95% CI)':<30} | {'Val RMSE mean±sd':<22} | {'Val MAE mean±sd':<22}\n" |
| | f.write(header) |
| | f.write("-" * len(header) + "\n") |
| | for exp_id in exp_ids: |
| | s = data[exp_id]["summary"] |
| | line = ( |
| | f"{exp_id:<26} | " |
| | f"{s['test_rmse_mean']:.6f} [{s['test_rmse_ci_low']:.6f}, {s['test_rmse_ci_high']:.6f}] | " |
| | f"{s['test_mae_mean']:.6f} [{s['test_mae_ci_low']:.6f}, {s['test_mae_ci_high']:.6f}] | " |
| | f"{s['val_rmse_mean']:.6f} ± {s['val_rmse_std']:.6f} | " |
| | f"{s['val_mae_mean']:.6f} ± {s['val_mae_std']:.6f}\n" |
| | ) |
| | f.write(line) |
| | f.write("\n") |
| |
|
| | |
| | comps = [e for e in exp_ids if e != ctrl] |
| | rows = [] |
| | pvals_rmse, labels_rmse = [], [] |
| | pvals_mae, labels_mae = [], [] |
| |
|
| | ctrl_rmse = np.array( |
| | data[ctrl]["rmse_folds"] if data[ctrl]["rmse_folds"] is not None else [], |
| | dtype=float, |
| | ) |
| | ctrl_mae = np.array( |
| | data[ctrl]["mae_folds"] if data[ctrl]["mae_folds"] is not None else [], |
| | dtype=float, |
| | ) |
| |
|
| | if ctrl_rmse.size != args.k or ctrl_mae.size != args.k: |
| | f.write( |
| | "WARNING: Control model missing complete fold arrays; NB testing cannot proceed.\n" |
| | ) |
| | else: |
| | for comp in comps: |
| | comp_rmse = data[comp]["rmse_folds"] |
| | comp_mae = data[comp]["mae_folds"] |
| | if comp_rmse is None or comp_mae is None: |
| | continue |
| | comp_rmse = np.array(comp_rmse, dtype=float) |
| | comp_mae = np.array(comp_mae, dtype=float) |
| | if comp_rmse.size != args.k or comp_mae.size != args.k: |
| | continue |
| |
|
| | |
| | d_rmse = comp_rmse - ctrl_rmse |
| | d_mae = comp_mae - ctrl_mae |
| |
|
| | t_rmse, se_rmse, mean_rmse = nb_corrected_t(d_rmse, k=args.k) |
| | t_mae, se_mae, mean_mae = nb_corrected_t(d_mae, k=args.k) |
| |
|
| | df = args.k - 1 |
| | |
| | p_rmse = float(tdist.sf(t_rmse, df=df)) |
| | p_mae = float(tdist.sf(t_mae, df=df)) |
| |
|
| | pvals_rmse.append(p_rmse) |
| | labels_rmse.append(f"{ctrl} vs {comp}") |
| | pvals_mae.append(p_mae) |
| | labels_mae.append(f"{ctrl} vs {comp}") |
| |
|
| | row = { |
| | "comparison": f"{ctrl} vs {comp}", |
| | "mean_diff_RMSE(comp-ctrl)": mean_rmse, |
| | "t_NB_RMSE": t_rmse, |
| | "p_one_sided_RMSE": p_rmse, |
| | "mean_diff_MAE(comp-ctrl)": mean_mae, |
| | "t_NB_MAE": t_mae, |
| | "p_one_sided_MAE": p_mae, |
| | } |
| |
|
| | if args.print_ci: |
| | tcrit = float(tdist.ppf(1 - args.alpha / 2, df=df)) |
| | row["NB_CI_RMSE_low"] = mean_rmse - tcrit * se_rmse |
| | row["NB_CI_RMSE_high"] = mean_rmse + tcrit * se_rmse |
| | row["NB_CI_MAE_low"] = mean_mae - tcrit * se_mae |
| | row["NB_CI_MAE_high"] = mean_mae + tcrit * se_mae |
| |
|
| | rows.append(row) |
| |
|
| | df_rows = pd.DataFrame(rows) |
| | if not df_rows.empty: |
| | f.write("--- NB-corrected t (outer folds) per competitor ---\n") |
| | f.write( |
| | df_rows.to_string(index=False, float_format=lambda x: f"{x:.6f}") |
| | + "\n\n" |
| | ) |
| | else: |
| | f.write( |
| | "No comparable competitors with complete fold arrays. Nothing to test.\n\n" |
| | ) |
| |
|
| | |
| | def holm_table(pvals: List[float], labels: List[str], title: str): |
| | if not pvals: |
| | f.write(f"{title}\n(no p-values)\n\n") |
| | return |
| | reject, p_adj, _, _ = multipletests( |
| | pvals=pvals, alpha=args.alpha, method="holm" |
| | ) |
| | df = pd.DataFrame( |
| | { |
| | "comparison": labels, |
| | "p_raw": pvals, |
| | "p_holm": p_adj, |
| | "Significant": reject.astype(bool), |
| | } |
| | ) |
| | f.write(title + "\n") |
| | f.write( |
| | df.sort_values("p_raw").to_string( |
| | index=False, float_format=lambda x: f"{x:.6f}" |
| | ) |
| | + "\n\n" |
| | ) |
| |
|
| | holm_table( |
| | pvals_rmse, labels_rmse, "--- Holm-adjusted p-values (RMSE family) ---" |
| | ) |
| | holm_table( |
| | pvals_mae, labels_mae, "--- Holm-adjusted p-values (MAE family) ---" |
| | ) |
| |
|
| | f.write("=" * 110 + "\nNotes:\n") |
| | f.write( |
| | "• Tests are within-dataset, one-sided for control superiority, on outer-fold differences with Nadeau–Bengio SE correction (df = k-1).\n" |
| | ) |
| | f.write( |
| | "• Holm controls family-wise error across competitors per metric family.\n" |
| | ) |
| | f.write( |
| | "• Held-out Test metrics above are for context only; no fold-based omnibus tests are used.\n" |
| | ) |
| |
|
| | print(f"Report successfully written to: {report_path}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|