idacy's picture
Upload live inference API deployment files
e4b1ed6 verified
Raw
History Blame Contribute Delete
16 kB
"""Evaluate stress splits for synthetic datacenter verification datasets."""
from __future__ import annotations
import argparse
from collections import Counter
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score, log_loss, precision_recall_fscore_support
try:
from .common import (
DEFAULT_SEED,
LABELS,
PROB_COLUMNS,
determine_feature_columns,
ensure_dir,
load_feature_table,
make_episode_split,
make_preprocessor,
model_input_frame,
probability_frame,
write_json,
)
except ImportError: # pragma: no cover - direct script execution
from common import (
DEFAULT_SEED,
LABELS,
PROB_COLUMNS,
determine_feature_columns,
ensure_dir,
load_feature_table,
make_episode_split,
make_preprocessor,
model_input_frame,
probability_frame,
write_json,
)
FAMILY_HOLDOUTS = [
"underclocked_energy_capped_training",
"fragmented_training_linked",
"multi_tenant_fragmented_nontraining",
"model_parallel_inference",
"hpc_mpi_collective",
]
SOURCE_ABLATIONS = {
"source_ablation_drop_fabric": ["o6_", "o7_", "fabric_telemetry_trust_level"],
"source_ablation_drop_runtime_and_ml_logs": ["o10_", "o12_"],
"source_ablation_drop_gpu_telemetry": ["o4_", "o5_", "gpu_telemetry_trust_level"],
"source_ablation_drop_power": ["o8_", "o9_", "power_meter_trust_level"],
"source_ablation_drop_storage": ["o11_"],
}
def _base_classifier(seed: int, max_iter: int) -> HistGradientBoostingClassifier:
kwargs: dict[str, Any] = {
"learning_rate": 0.06,
"max_iter": max_iter,
"max_leaf_nodes": 31,
"l2_regularization": 0.04,
"early_stopping": True,
"validation_fraction": 0.15,
"random_state": seed,
}
try:
return HistGradientBoostingClassifier(class_weight="balanced", **kwargs)
except TypeError: # pragma: no cover
return HistGradientBoostingClassifier(**kwargs)
def _binary_prf(y_true: np.ndarray, p_large: np.ndarray) -> dict[str, float]:
precision, recall, f1, _ = precision_recall_fscore_support(
y_true >= 3,
p_large >= 0.5,
average="binary",
zero_division=0,
)
return {"precision": float(precision), "recall": float(recall), "f1": float(f1)}
def _scenario_counts(df: pd.DataFrame, mask: pd.Series, scenario_column: str) -> dict[str, int]:
if not mask.any():
return {}
return {str(key): int(value) for key, value in df.loc[mask, scenario_column].value_counts().sort_values(ascending=False).items()}
def _middle_probability_bins(p_large: np.ndarray) -> dict[str, int]:
return {
"0.1_to_0.3": int(((p_large >= 0.1) & (p_large < 0.3)).sum()),
"0.3_to_0.7": int(((p_large >= 0.3) & (p_large < 0.7)).sum()),
"0.7_to_0.9": int(((p_large >= 0.7) & (p_large < 0.9)).sum()),
}
def _fit_predict_split(
df: pd.DataFrame,
train_mask: pd.Series,
test_mask: pd.Series,
feature_columns: list[str],
seed: int,
max_iter: int,
) -> tuple[pd.DataFrame, dict[str, Any]]:
train_df = df[train_mask].copy()
test_df = df[test_mask].copy()
if train_df.empty or test_df.empty:
raise ValueError("stress split has empty train or test set")
if train_df["label_0_to_4"].nunique() < 2:
raise ValueError("stress split train set has fewer than two labels")
preprocessor = make_preprocessor(train_df, feature_columns)
x_train = preprocessor.fit_transform(model_input_frame(train_df, feature_columns))
y_train = train_df["label_0_to_4"].astype(int).to_numpy()
x_test = preprocessor.transform(model_input_frame(test_df, feature_columns))
y_test = test_df["label_0_to_4"].astype(int).to_numpy()
model = _base_classifier(seed, max_iter)
model.fit(x_train, y_train)
probabilities = probability_frame(model, x_test)
p_values = probabilities[PROB_COLUMNS].to_numpy()
y_pred = np.asarray(LABELS)[np.argmax(p_values, axis=1)]
p_large = probabilities["p_label_3"].to_numpy() + probabilities["p_label_4"].to_numpy()
out = test_df[["feature_row_id", "episode_id", "label_0_to_4"]].copy()
for column in ["scenario_family", "latent_workload_class", "site_id", "window_start"]:
if column in test_df.columns:
out[column] = test_df[column].to_numpy()
out["predicted_label"] = y_pred
out["p_large_training"] = p_large
for column in PROB_COLUMNS:
out[column] = probabilities[column].to_numpy()
large_true = y_test >= 3
large_pred = p_large >= 0.5
scenario_column = "scenario_family" if "scenario_family" in test_df.columns else "latent_workload_class"
metrics = {
"train_rows": int(len(train_df)),
"test_rows": int(len(test_df)),
"train_episodes": int(train_df["episode_id"].nunique()),
"test_episodes": int(test_df["episode_id"].nunique()),
"test_label_distribution": {str(k): int(v) for k, v in Counter(y_test).items()},
"accuracy": float(accuracy_score(y_test, y_pred)),
"macro_f1": float(f1_score(y_test, y_pred, labels=LABELS, average="macro", zero_division=0)),
"large_training": _binary_prf(y_test, p_large),
"large_training_false_positive_count": int(((~large_true) & large_pred).sum()),
"large_training_false_negative_count": int((large_true & (~large_pred)).sum()),
"false_positive_scenarios": _scenario_counts(test_df, pd.Series((~large_true) & large_pred, index=test_df.index), scenario_column),
"false_negative_scenarios": _scenario_counts(test_df, pd.Series(large_true & (~large_pred), index=test_df.index), scenario_column),
"middle_probability_bins": _middle_probability_bins(p_large),
}
try:
metrics["log_loss"] = float(log_loss(y_test, p_values, labels=LABELS))
except ValueError:
metrics["log_loss"] = None
return out, metrics
def _drop_features(feature_columns: list[str], prefixes: list[str]) -> list[str]:
dropped = []
for column in feature_columns:
if any(column == prefix or column.startswith(prefix) for prefix in prefixes):
continue
dropped.append(column)
return dropped
def evaluate_stress_splits(features_path: Path, output_dir: Path, seed: int = DEFAULT_SEED, max_iter: int = 120) -> dict[str, Any]:
ensure_dir(output_dir)
df = load_feature_table(features_path)
scenario_column = "scenario_family" if "scenario_family" in df.columns else "latent_workload_class"
feature_columns, exclusion_meta = determine_feature_columns(df.copy())
metrics: dict[str, Any] = {
"features_path": str(features_path),
"seed": int(seed),
"max_iter": int(max_iter),
"scenario_column": scenario_column,
"model_feature_count": int(len(feature_columns)),
"exclusion_metadata": exclusion_meta,
"splits": {},
}
split_df, split_manifest = make_episode_split(df, seed=seed)
random_train = split_df["split"].isin(["train", "validation"])
random_test = split_df["split"] == "test"
_, metrics["splits"]["episode_grouped_random"] = _fit_predict_split(
split_df, random_train, random_test, feature_columns, seed, max_iter
)
sorted_times = pd.to_datetime(df["window_start"], utc=True)
cutoff = sorted_times.quantile(0.80)
_, metrics["splits"]["time_holdout"] = _fit_predict_split(
df, sorted_times < cutoff, sorted_times >= cutoff, feature_columns, seed + 1, max_iter
)
site_results = {}
for idx, site_id in enumerate(sorted(df["site_id"].dropna().unique())):
test_mask = df["site_id"] == site_id
train_mask = ~test_mask
try:
_, site_results[str(site_id)] = _fit_predict_split(df, train_mask, test_mask, feature_columns, seed + 10 + idx, max_iter)
except ValueError as exc:
site_results[str(site_id)] = {"error": str(exc)}
metrics["splits"]["site_holdout"] = site_results
family_results = {}
for idx, family in enumerate(FAMILY_HOLDOUTS):
if family not in set(df[scenario_column].astype(str)):
family_results[family] = {"error": "family not present"}
continue
test_mask = df[scenario_column].astype(str) == family
train_mask = ~test_mask
try:
_, family_results[family] = _fit_predict_split(df, train_mask, test_mask, feature_columns, seed + 30 + idx, max_iter)
except ValueError as exc:
family_results[family] = {"error": str(exc)}
metrics["splits"]["scenario_family_holdout"] = family_results
ablation_results = {}
for idx, (name, prefixes) in enumerate(SOURCE_ABLATIONS.items()):
ablated_features = _drop_features(feature_columns, prefixes)
try:
_, ablation_results[name] = _fit_predict_split(
split_df, random_train, random_test, ablated_features, seed + 50 + idx, max_iter
)
ablation_results[name]["feature_count"] = int(len(ablated_features))
ablation_results[name]["dropped_prefixes"] = prefixes
except ValueError as exc:
ablation_results[name] = {"error": str(exc)}
metrics["splits"]["source_ablations"] = ablation_results
write_json(output_dir / "stress_metrics.json", metrics)
write_stress_summary(output_dir / "stress_summary.md", metrics)
update_model_run_readme_with_stress(output_dir, metrics)
return metrics
def _split_line(name: str, item: dict[str, Any]) -> str:
if "error" in item:
return f"- {name}: ERROR: {item['error']}"
return (
f"- {name}: rows={item['test_rows']}, macro F1={item['macro_f1']:.3f}, "
f"large precision={item['large_training']['precision']:.3f}, "
f"large recall={item['large_training']['recall']:.3f}, "
f"FP={item['large_training_false_positive_count']}, FN={item['large_training_false_negative_count']}, "
f"middle p bins={item['middle_probability_bins']}"
)
def write_stress_summary(path: Path, metrics: dict[str, Any]) -> None:
splits = metrics["splits"]
lines = [
"# Synthetic v1 Stress Summary",
"",
f"Feature table: `{metrics['features_path']}`",
f"Seed: `{metrics['seed']}`",
f"Lightweight stress model max_iter: `{metrics['max_iter']}`",
"",
"## Core Splits",
"",
_split_line("episode_grouped_random", splits["episode_grouped_random"]),
_split_line("time_holdout", splits["time_holdout"]),
"",
"## Site Holdout",
"",
]
for site_id, item in splits["site_holdout"].items():
lines.append(_split_line(site_id, item))
lines.extend(["", "## Scenario Family Holdout", ""])
for family, item in splits["scenario_family_holdout"].items():
lines.append(_split_line(family, item))
if "false_negative_scenarios" in item and item["false_negative_scenarios"]:
lines.append(f" false negatives: {item['false_negative_scenarios']}")
if "false_positive_scenarios" in item and item["false_positive_scenarios"]:
lines.append(f" false positives: {item['false_positive_scenarios']}")
lines.extend(["", "## Source Ablations", ""])
baseline = splits["episode_grouped_random"]
baseline_f1 = baseline.get("macro_f1") if "error" not in baseline else None
for name, item in splits["source_ablations"].items():
lines.append(_split_line(name, item))
if baseline_f1 is not None and "macro_f1" in item:
lines.append(f" macro F1 delta vs random baseline: {item['macro_f1'] - baseline_f1:+.3f}")
lines.extend(
[
"",
"## Interpretation",
"",
"- These are stress diagnostics, not tuned headline metrics.",
"- Family holdouts identify hard families that still fail under scenario generalization.",
"- Source ablations should degrade performance when the dropped layer carries independent evidence.",
"- Middle probability bins indicate whether v1 has less all-or-nothing large-training probability mass than v0.",
]
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def update_model_run_readme_with_stress(model_run_dir: Path, metrics: dict[str, Any]) -> None:
readme_path = model_run_dir / "README.md"
if not readme_path.exists():
return
splits = metrics["splits"]
baseline = splits["episode_grouped_random"]
fragmented = splits["scenario_family_holdout"].get("fragmented_training_linked", {})
underclocked = splits["scenario_family_holdout"].get("underclocked_energy_capped_training", {})
runtime_drop = splits["source_ablations"].get("source_ablation_drop_runtime_and_ml_logs", {})
baseline_f1 = baseline.get("macro_f1", 0.0)
runtime_delta = runtime_drop.get("macro_f1", baseline_f1) - baseline_f1 if "macro_f1" in runtime_drop else 0.0
section = f"""## Stress Splits
Stress outputs are in:
- `stress_metrics.json`
- `stress_summary.md`
Key stress checks from the lightweight retraining script:
- Episode-grouped random split macro F1: {baseline.get('macro_f1', 0.0):.4f}; large-training recall: {baseline.get('large_training', {}).get('recall', 0.0):.4f}
- Time holdout macro F1: {splits['time_holdout'].get('macro_f1', 0.0):.4f}; large-training recall: {splits['time_holdout'].get('large_training', {}).get('recall', 0.0):.4f}
- Underclocked holdout large-training recall: {underclocked.get('large_training', {}).get('recall', 0.0):.4f}; false negatives: {underclocked.get('large_training_false_negative_count', 0)}
- Fragmented linked training holdout remains hard: {fragmented.get('large_training_false_negative_count', 0)} large-training false negatives
- Source ablation dropping runtime and ML logs changed macro F1 by {runtime_delta:+.3f} versus the random stress baseline
- Middle `p_large_training` bins are populated in stress checks: {baseline.get('middle_probability_bins', {})}
"""
readme = readme_path.read_text(encoding="utf-8")
start = readme.find("## Stress Splits\n")
if start != -1:
end = readme.find("## Reproduce\n", start)
if end == -1:
end = readme.find("## Limitations\n", start)
if end == -1:
readme = readme[:start].rstrip() + "\n\n" + section
else:
readme = readme[:start].rstrip() + "\n\n" + section + readme[end:]
else:
marker = "## Reproduce\n"
idx = readme.find(marker)
if idx == -1:
readme = readme.rstrip() + "\n\n" + section
else:
readme = readme[:idx].rstrip() + "\n\n" + section + readme[idx:]
readme_path.write_text(readme, encoding="utf-8")
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--features", type=Path, required=True)
parser.add_argument("--model-run", type=Path, required=True, help="Directory to write stress outputs into.")
parser.add_argument("--seed", type=int, default=DEFAULT_SEED)
parser.add_argument("--max-iter", type=int, default=120)
args = parser.parse_args(argv)
metrics = evaluate_stress_splits(args.features, args.model_run, seed=args.seed, max_iter=args.max_iter)
random_metrics = metrics["splits"]["episode_grouped_random"]
print(f"episode_grouped_random_macro_f1: {random_metrics['macro_f1']:.4f}")
print(f"episode_grouped_random_large_recall: {random_metrics['large_training']['recall']:.4f}")
print(f"stress_metrics: {args.model_run / 'stress_metrics.json'}")
print(f"stress_summary: {args.model_run / 'stress_summary.md'}")
return 0
if __name__ == "__main__":
raise SystemExit(main())