Spaces:
Sleeping
Sleeping
| """Evaluate stress splits for synthetic datacenter verification datasets.""" | |
| from __future__ import annotations | |
| import argparse | |
| from collections import Counter | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.ensemble import HistGradientBoostingClassifier | |
| from sklearn.metrics import accuracy_score, f1_score, log_loss, precision_recall_fscore_support | |
| try: | |
| from .common import ( | |
| DEFAULT_SEED, | |
| LABELS, | |
| PROB_COLUMNS, | |
| determine_feature_columns, | |
| ensure_dir, | |
| load_feature_table, | |
| make_episode_split, | |
| make_preprocessor, | |
| model_input_frame, | |
| probability_frame, | |
| write_json, | |
| ) | |
| except ImportError: # pragma: no cover - direct script execution | |
| from common import ( | |
| DEFAULT_SEED, | |
| LABELS, | |
| PROB_COLUMNS, | |
| determine_feature_columns, | |
| ensure_dir, | |
| load_feature_table, | |
| make_episode_split, | |
| make_preprocessor, | |
| model_input_frame, | |
| probability_frame, | |
| write_json, | |
| ) | |
| FAMILY_HOLDOUTS = [ | |
| "underclocked_energy_capped_training", | |
| "fragmented_training_linked", | |
| "multi_tenant_fragmented_nontraining", | |
| "model_parallel_inference", | |
| "hpc_mpi_collective", | |
| ] | |
| SOURCE_ABLATIONS = { | |
| "source_ablation_drop_fabric": ["o6_", "o7_", "fabric_telemetry_trust_level"], | |
| "source_ablation_drop_runtime_and_ml_logs": ["o10_", "o12_"], | |
| "source_ablation_drop_gpu_telemetry": ["o4_", "o5_", "gpu_telemetry_trust_level"], | |
| "source_ablation_drop_power": ["o8_", "o9_", "power_meter_trust_level"], | |
| "source_ablation_drop_storage": ["o11_"], | |
| } | |
| def _base_classifier(seed: int, max_iter: int) -> HistGradientBoostingClassifier: | |
| kwargs: dict[str, Any] = { | |
| "learning_rate": 0.06, | |
| "max_iter": max_iter, | |
| "max_leaf_nodes": 31, | |
| "l2_regularization": 0.04, | |
| "early_stopping": True, | |
| "validation_fraction": 0.15, | |
| "random_state": seed, | |
| } | |
| try: | |
| return HistGradientBoostingClassifier(class_weight="balanced", **kwargs) | |
| except TypeError: # pragma: no cover | |
| return HistGradientBoostingClassifier(**kwargs) | |
| def _binary_prf(y_true: np.ndarray, p_large: np.ndarray) -> dict[str, float]: | |
| precision, recall, f1, _ = precision_recall_fscore_support( | |
| y_true >= 3, | |
| p_large >= 0.5, | |
| average="binary", | |
| zero_division=0, | |
| ) | |
| return {"precision": float(precision), "recall": float(recall), "f1": float(f1)} | |
| def _scenario_counts(df: pd.DataFrame, mask: pd.Series, scenario_column: str) -> dict[str, int]: | |
| if not mask.any(): | |
| return {} | |
| return {str(key): int(value) for key, value in df.loc[mask, scenario_column].value_counts().sort_values(ascending=False).items()} | |
| def _middle_probability_bins(p_large: np.ndarray) -> dict[str, int]: | |
| return { | |
| "0.1_to_0.3": int(((p_large >= 0.1) & (p_large < 0.3)).sum()), | |
| "0.3_to_0.7": int(((p_large >= 0.3) & (p_large < 0.7)).sum()), | |
| "0.7_to_0.9": int(((p_large >= 0.7) & (p_large < 0.9)).sum()), | |
| } | |
| def _fit_predict_split( | |
| df: pd.DataFrame, | |
| train_mask: pd.Series, | |
| test_mask: pd.Series, | |
| feature_columns: list[str], | |
| seed: int, | |
| max_iter: int, | |
| ) -> tuple[pd.DataFrame, dict[str, Any]]: | |
| train_df = df[train_mask].copy() | |
| test_df = df[test_mask].copy() | |
| if train_df.empty or test_df.empty: | |
| raise ValueError("stress split has empty train or test set") | |
| if train_df["label_0_to_4"].nunique() < 2: | |
| raise ValueError("stress split train set has fewer than two labels") | |
| preprocessor = make_preprocessor(train_df, feature_columns) | |
| x_train = preprocessor.fit_transform(model_input_frame(train_df, feature_columns)) | |
| y_train = train_df["label_0_to_4"].astype(int).to_numpy() | |
| x_test = preprocessor.transform(model_input_frame(test_df, feature_columns)) | |
| y_test = test_df["label_0_to_4"].astype(int).to_numpy() | |
| model = _base_classifier(seed, max_iter) | |
| model.fit(x_train, y_train) | |
| probabilities = probability_frame(model, x_test) | |
| p_values = probabilities[PROB_COLUMNS].to_numpy() | |
| y_pred = np.asarray(LABELS)[np.argmax(p_values, axis=1)] | |
| p_large = probabilities["p_label_3"].to_numpy() + probabilities["p_label_4"].to_numpy() | |
| out = test_df[["feature_row_id", "episode_id", "label_0_to_4"]].copy() | |
| for column in ["scenario_family", "latent_workload_class", "site_id", "window_start"]: | |
| if column in test_df.columns: | |
| out[column] = test_df[column].to_numpy() | |
| out["predicted_label"] = y_pred | |
| out["p_large_training"] = p_large | |
| for column in PROB_COLUMNS: | |
| out[column] = probabilities[column].to_numpy() | |
| large_true = y_test >= 3 | |
| large_pred = p_large >= 0.5 | |
| scenario_column = "scenario_family" if "scenario_family" in test_df.columns else "latent_workload_class" | |
| metrics = { | |
| "train_rows": int(len(train_df)), | |
| "test_rows": int(len(test_df)), | |
| "train_episodes": int(train_df["episode_id"].nunique()), | |
| "test_episodes": int(test_df["episode_id"].nunique()), | |
| "test_label_distribution": {str(k): int(v) for k, v in Counter(y_test).items()}, | |
| "accuracy": float(accuracy_score(y_test, y_pred)), | |
| "macro_f1": float(f1_score(y_test, y_pred, labels=LABELS, average="macro", zero_division=0)), | |
| "large_training": _binary_prf(y_test, p_large), | |
| "large_training_false_positive_count": int(((~large_true) & large_pred).sum()), | |
| "large_training_false_negative_count": int((large_true & (~large_pred)).sum()), | |
| "false_positive_scenarios": _scenario_counts(test_df, pd.Series((~large_true) & large_pred, index=test_df.index), scenario_column), | |
| "false_negative_scenarios": _scenario_counts(test_df, pd.Series(large_true & (~large_pred), index=test_df.index), scenario_column), | |
| "middle_probability_bins": _middle_probability_bins(p_large), | |
| } | |
| try: | |
| metrics["log_loss"] = float(log_loss(y_test, p_values, labels=LABELS)) | |
| except ValueError: | |
| metrics["log_loss"] = None | |
| return out, metrics | |
| def _drop_features(feature_columns: list[str], prefixes: list[str]) -> list[str]: | |
| dropped = [] | |
| for column in feature_columns: | |
| if any(column == prefix or column.startswith(prefix) for prefix in prefixes): | |
| continue | |
| dropped.append(column) | |
| return dropped | |
| def evaluate_stress_splits(features_path: Path, output_dir: Path, seed: int = DEFAULT_SEED, max_iter: int = 120) -> dict[str, Any]: | |
| ensure_dir(output_dir) | |
| df = load_feature_table(features_path) | |
| scenario_column = "scenario_family" if "scenario_family" in df.columns else "latent_workload_class" | |
| feature_columns, exclusion_meta = determine_feature_columns(df.copy()) | |
| metrics: dict[str, Any] = { | |
| "features_path": str(features_path), | |
| "seed": int(seed), | |
| "max_iter": int(max_iter), | |
| "scenario_column": scenario_column, | |
| "model_feature_count": int(len(feature_columns)), | |
| "exclusion_metadata": exclusion_meta, | |
| "splits": {}, | |
| } | |
| split_df, split_manifest = make_episode_split(df, seed=seed) | |
| random_train = split_df["split"].isin(["train", "validation"]) | |
| random_test = split_df["split"] == "test" | |
| _, metrics["splits"]["episode_grouped_random"] = _fit_predict_split( | |
| split_df, random_train, random_test, feature_columns, seed, max_iter | |
| ) | |
| sorted_times = pd.to_datetime(df["window_start"], utc=True) | |
| cutoff = sorted_times.quantile(0.80) | |
| _, metrics["splits"]["time_holdout"] = _fit_predict_split( | |
| df, sorted_times < cutoff, sorted_times >= cutoff, feature_columns, seed + 1, max_iter | |
| ) | |
| site_results = {} | |
| for idx, site_id in enumerate(sorted(df["site_id"].dropna().unique())): | |
| test_mask = df["site_id"] == site_id | |
| train_mask = ~test_mask | |
| try: | |
| _, site_results[str(site_id)] = _fit_predict_split(df, train_mask, test_mask, feature_columns, seed + 10 + idx, max_iter) | |
| except ValueError as exc: | |
| site_results[str(site_id)] = {"error": str(exc)} | |
| metrics["splits"]["site_holdout"] = site_results | |
| family_results = {} | |
| for idx, family in enumerate(FAMILY_HOLDOUTS): | |
| if family not in set(df[scenario_column].astype(str)): | |
| family_results[family] = {"error": "family not present"} | |
| continue | |
| test_mask = df[scenario_column].astype(str) == family | |
| train_mask = ~test_mask | |
| try: | |
| _, family_results[family] = _fit_predict_split(df, train_mask, test_mask, feature_columns, seed + 30 + idx, max_iter) | |
| except ValueError as exc: | |
| family_results[family] = {"error": str(exc)} | |
| metrics["splits"]["scenario_family_holdout"] = family_results | |
| ablation_results = {} | |
| for idx, (name, prefixes) in enumerate(SOURCE_ABLATIONS.items()): | |
| ablated_features = _drop_features(feature_columns, prefixes) | |
| try: | |
| _, ablation_results[name] = _fit_predict_split( | |
| split_df, random_train, random_test, ablated_features, seed + 50 + idx, max_iter | |
| ) | |
| ablation_results[name]["feature_count"] = int(len(ablated_features)) | |
| ablation_results[name]["dropped_prefixes"] = prefixes | |
| except ValueError as exc: | |
| ablation_results[name] = {"error": str(exc)} | |
| metrics["splits"]["source_ablations"] = ablation_results | |
| write_json(output_dir / "stress_metrics.json", metrics) | |
| write_stress_summary(output_dir / "stress_summary.md", metrics) | |
| update_model_run_readme_with_stress(output_dir, metrics) | |
| return metrics | |
| def _split_line(name: str, item: dict[str, Any]) -> str: | |
| if "error" in item: | |
| return f"- {name}: ERROR: {item['error']}" | |
| return ( | |
| f"- {name}: rows={item['test_rows']}, macro F1={item['macro_f1']:.3f}, " | |
| f"large precision={item['large_training']['precision']:.3f}, " | |
| f"large recall={item['large_training']['recall']:.3f}, " | |
| f"FP={item['large_training_false_positive_count']}, FN={item['large_training_false_negative_count']}, " | |
| f"middle p bins={item['middle_probability_bins']}" | |
| ) | |
| def write_stress_summary(path: Path, metrics: dict[str, Any]) -> None: | |
| splits = metrics["splits"] | |
| lines = [ | |
| "# Synthetic v1 Stress Summary", | |
| "", | |
| f"Feature table: `{metrics['features_path']}`", | |
| f"Seed: `{metrics['seed']}`", | |
| f"Lightweight stress model max_iter: `{metrics['max_iter']}`", | |
| "", | |
| "## Core Splits", | |
| "", | |
| _split_line("episode_grouped_random", splits["episode_grouped_random"]), | |
| _split_line("time_holdout", splits["time_holdout"]), | |
| "", | |
| "## Site Holdout", | |
| "", | |
| ] | |
| for site_id, item in splits["site_holdout"].items(): | |
| lines.append(_split_line(site_id, item)) | |
| lines.extend(["", "## Scenario Family Holdout", ""]) | |
| for family, item in splits["scenario_family_holdout"].items(): | |
| lines.append(_split_line(family, item)) | |
| if "false_negative_scenarios" in item and item["false_negative_scenarios"]: | |
| lines.append(f" false negatives: {item['false_negative_scenarios']}") | |
| if "false_positive_scenarios" in item and item["false_positive_scenarios"]: | |
| lines.append(f" false positives: {item['false_positive_scenarios']}") | |
| lines.extend(["", "## Source Ablations", ""]) | |
| baseline = splits["episode_grouped_random"] | |
| baseline_f1 = baseline.get("macro_f1") if "error" not in baseline else None | |
| for name, item in splits["source_ablations"].items(): | |
| lines.append(_split_line(name, item)) | |
| if baseline_f1 is not None and "macro_f1" in item: | |
| lines.append(f" macro F1 delta vs random baseline: {item['macro_f1'] - baseline_f1:+.3f}") | |
| lines.extend( | |
| [ | |
| "", | |
| "## Interpretation", | |
| "", | |
| "- These are stress diagnostics, not tuned headline metrics.", | |
| "- Family holdouts identify hard families that still fail under scenario generalization.", | |
| "- Source ablations should degrade performance when the dropped layer carries independent evidence.", | |
| "- Middle probability bins indicate whether v1 has less all-or-nothing large-training probability mass than v0.", | |
| ] | |
| ) | |
| path.write_text("\n".join(lines) + "\n", encoding="utf-8") | |
| def update_model_run_readme_with_stress(model_run_dir: Path, metrics: dict[str, Any]) -> None: | |
| readme_path = model_run_dir / "README.md" | |
| if not readme_path.exists(): | |
| return | |
| splits = metrics["splits"] | |
| baseline = splits["episode_grouped_random"] | |
| fragmented = splits["scenario_family_holdout"].get("fragmented_training_linked", {}) | |
| underclocked = splits["scenario_family_holdout"].get("underclocked_energy_capped_training", {}) | |
| runtime_drop = splits["source_ablations"].get("source_ablation_drop_runtime_and_ml_logs", {}) | |
| baseline_f1 = baseline.get("macro_f1", 0.0) | |
| runtime_delta = runtime_drop.get("macro_f1", baseline_f1) - baseline_f1 if "macro_f1" in runtime_drop else 0.0 | |
| section = f"""## Stress Splits | |
| Stress outputs are in: | |
| - `stress_metrics.json` | |
| - `stress_summary.md` | |
| Key stress checks from the lightweight retraining script: | |
| - Episode-grouped random split macro F1: {baseline.get('macro_f1', 0.0):.4f}; large-training recall: {baseline.get('large_training', {}).get('recall', 0.0):.4f} | |
| - Time holdout macro F1: {splits['time_holdout'].get('macro_f1', 0.0):.4f}; large-training recall: {splits['time_holdout'].get('large_training', {}).get('recall', 0.0):.4f} | |
| - Underclocked holdout large-training recall: {underclocked.get('large_training', {}).get('recall', 0.0):.4f}; false negatives: {underclocked.get('large_training_false_negative_count', 0)} | |
| - Fragmented linked training holdout remains hard: {fragmented.get('large_training_false_negative_count', 0)} large-training false negatives | |
| - Source ablation dropping runtime and ML logs changed macro F1 by {runtime_delta:+.3f} versus the random stress baseline | |
| - Middle `p_large_training` bins are populated in stress checks: {baseline.get('middle_probability_bins', {})} | |
| """ | |
| readme = readme_path.read_text(encoding="utf-8") | |
| start = readme.find("## Stress Splits\n") | |
| if start != -1: | |
| end = readme.find("## Reproduce\n", start) | |
| if end == -1: | |
| end = readme.find("## Limitations\n", start) | |
| if end == -1: | |
| readme = readme[:start].rstrip() + "\n\n" + section | |
| else: | |
| readme = readme[:start].rstrip() + "\n\n" + section + readme[end:] | |
| else: | |
| marker = "## Reproduce\n" | |
| idx = readme.find(marker) | |
| if idx == -1: | |
| readme = readme.rstrip() + "\n\n" + section | |
| else: | |
| readme = readme[:idx].rstrip() + "\n\n" + section + readme[idx:] | |
| readme_path.write_text(readme, encoding="utf-8") | |
| def main(argv: list[str] | None = None) -> int: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("--features", type=Path, required=True) | |
| parser.add_argument("--model-run", type=Path, required=True, help="Directory to write stress outputs into.") | |
| parser.add_argument("--seed", type=int, default=DEFAULT_SEED) | |
| parser.add_argument("--max-iter", type=int, default=120) | |
| args = parser.parse_args(argv) | |
| metrics = evaluate_stress_splits(args.features, args.model_run, seed=args.seed, max_iter=args.max_iter) | |
| random_metrics = metrics["splits"]["episode_grouped_random"] | |
| print(f"episode_grouped_random_macro_f1: {random_metrics['macro_f1']:.4f}") | |
| print(f"episode_grouped_random_large_recall: {random_metrics['large_training']['recall']:.4f}") | |
| print(f"stress_metrics: {args.model_run / 'stress_metrics.json'}") | |
| print(f"stress_summary: {args.model_run / 'stress_summary.md'}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |