Spaces:
Sleeping
Sleeping
| """Evaluate a trained synthetic datacenter verification modeling run.""" | |
| from __future__ import annotations | |
| import argparse | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.metrics import ( | |
| accuracy_score, | |
| brier_score_loss, | |
| classification_report, | |
| confusion_matrix, | |
| f1_score, | |
| log_loss, | |
| precision_recall_fscore_support, | |
| ) | |
| try: | |
| from .common import ( | |
| DEFAULT_SEED, | |
| LABELS, | |
| PROB_COLUMNS, | |
| RAW_PROB_COLUMNS, | |
| SELECTED_AUDIT_FEATURES, | |
| add_governance_outputs, | |
| apply_split_manifest, | |
| build_prediction_frame, | |
| load_feature_table, | |
| minimum_critical_coverage, | |
| model_input_frame, | |
| probability_frame, | |
| read_json, | |
| sha256_file, | |
| utc_now_iso, | |
| write_json, | |
| ) | |
| from .rule_baseline import predict_rule_labels | |
| except ImportError: # pragma: no cover - direct script execution | |
| from common import ( | |
| DEFAULT_SEED, | |
| LABELS, | |
| PROB_COLUMNS, | |
| RAW_PROB_COLUMNS, | |
| SELECTED_AUDIT_FEATURES, | |
| add_governance_outputs, | |
| apply_split_manifest, | |
| build_prediction_frame, | |
| load_feature_table, | |
| minimum_critical_coverage, | |
| model_input_frame, | |
| probability_frame, | |
| read_json, | |
| sha256_file, | |
| utc_now_iso, | |
| write_json, | |
| ) | |
| from rule_baseline import predict_rule_labels | |
| HARD_FALSE_POSITIVE_SCENARIOS = { | |
| "large_batch_inference", | |
| "synthetic_data_generation", | |
| "hpc_mpi_simulation", | |
| "nccl_benchmark", | |
| "hardware_burn_in", | |
| "storage_rebuild", | |
| "large_etl_data_movement", | |
| "reserved_but_unused_capacity", | |
| "maintenance_window", | |
| "large_batch_inference", | |
| "model_parallel_inference", | |
| "embedding_generation", | |
| "synthetic_data_generation_gpu_heavy", | |
| "hpc_mpi_collective", | |
| "nccl_extended_benchmark", | |
| "hardware_burn_in_or_thermal_soak", | |
| "storage_rebuild_or_replication", | |
| "large_etl_or_data_movement", | |
| "distributed_database_or_graph_analytics", | |
| "reserved_but_unused_capacity", | |
| "maintenance_with_collector_gaps", | |
| "multi_tenant_fragmented_nontraining", | |
| "capacity_or_integrity_only_warning", | |
| } | |
| TRAINING_SCENARIOS = { | |
| "large_fine_tune", | |
| "pretraining", | |
| "cloud_reservation_used_for_training", | |
| "adversarial_fragmented_training", | |
| "underclocked_long_duration_training", | |
| "pretraining_standard", | |
| "large_fine_tune_standard", | |
| "cloud_training_redacted_runtime", | |
| "training_without_semantic_logs", | |
| "underclocked_energy_capped_training", | |
| "elastic_preempted_training", | |
| "fragmented_training_linked", | |
| "sparse_or_moe_bursty_training", | |
| "training_with_low_fabric_high_checkpoint", | |
| "training_with_delayed_logs", | |
| "multi_stage_training_pipeline", | |
| } | |
| REQUIRED_OUTPUTS = [ | |
| "README.md", | |
| "manifest.json", | |
| "model.joblib", | |
| "preprocessing.joblib", | |
| "feature_columns.json", | |
| "excluded_columns.json", | |
| "split_manifest.json", | |
| "metrics.json", | |
| "calibration_metrics.json", | |
| "confusion_matrix.csv", | |
| "classification_report.json", | |
| "predictions_test.csv", | |
| "predictions_all.csv", | |
| "feature_importance.csv", | |
| "evidence_audit_sample.csv", | |
| "validation_summary.md", | |
| ] | |
| def predict_for_model_run(model_run_dir: Path, features_path: Path) -> tuple[pd.DataFrame, pd.DataFrame]: | |
| df = load_feature_table(features_path) | |
| split_manifest_path = model_run_dir / "split_manifest.json" | |
| if split_manifest_path.exists(): | |
| df = apply_split_manifest(df, read_json(split_manifest_path)) | |
| else: | |
| df = df.copy() | |
| df["split"] = "unassigned" | |
| feature_columns = read_json(model_run_dir / "feature_columns.json") | |
| preprocessor = joblib.load(model_run_dir / "preprocessing.joblib") | |
| model = joblib.load(model_run_dir / "model.joblib") | |
| model_frame = model_input_frame(df, feature_columns) | |
| transformed = preprocessor.transform(model_frame) | |
| raw_probabilities = probability_frame(model, transformed) | |
| governance = add_governance_outputs(df, raw_probabilities) | |
| predictions = build_prediction_frame(df, raw_probabilities, governance) | |
| return df, predictions | |
| def binary_prf(true_binary: pd.Series | np.ndarray, predicted_binary: pd.Series | np.ndarray) -> dict[str, float]: | |
| precision, recall, f1, _ = precision_recall_fscore_support( | |
| true_binary.astype(bool), | |
| predicted_binary.astype(bool), | |
| average="binary", | |
| zero_division=0, | |
| ) | |
| return {"precision": float(precision), "recall": float(recall), "f1": float(f1)} | |
| def scenario_counts(frame: pd.DataFrame) -> dict[str, int]: | |
| if len(frame) == 0: | |
| return {} | |
| return {str(key): int(value) for key, value in frame["latent_workload_class"].value_counts().sort_values(ascending=False).items()} | |
| def calibration_by_label(test_predictions: pd.DataFrame) -> dict[str, Any]: | |
| y_true = test_predictions["label_0_to_4"].astype(int).to_numpy() | |
| out: dict[str, Any] = {} | |
| for label in LABELS: | |
| observed = (y_true == label).astype(int) | |
| predicted = test_predictions[f"p_label_{label}"].to_numpy() | |
| out[str(label)] = { | |
| "rows": int(len(test_predictions)), | |
| "observed_fraction": float(observed.mean()) if len(observed) else 0.0, | |
| "mean_predicted_probability": float(predicted.mean()) if len(predicted) else 0.0, | |
| "brier": float(brier_score_loss(observed, predicted)) if len(np.unique(observed)) > 1 else float(np.mean((predicted - observed) ** 2)), | |
| } | |
| return out | |
| def reliability_bins(probabilities: np.ndarray, observed: np.ndarray, bin_count: int = 10) -> tuple[list[dict[str, Any]], float]: | |
| bins: list[dict[str, Any]] = [] | |
| expected_calibration_error = 0.0 | |
| total = len(probabilities) | |
| edges = np.linspace(0.0, 1.0, bin_count + 1) | |
| for index in range(bin_count): | |
| lower = edges[index] | |
| upper = edges[index + 1] | |
| if index == bin_count - 1: | |
| mask = (probabilities >= lower) & (probabilities <= upper) | |
| else: | |
| mask = (probabilities >= lower) & (probabilities < upper) | |
| count = int(mask.sum()) | |
| if count: | |
| mean_predicted = float(probabilities[mask].mean()) | |
| observed_fraction = float(observed[mask].mean()) | |
| expected_calibration_error += (count / total) * abs(mean_predicted - observed_fraction) | |
| else: | |
| mean_predicted = 0.0 | |
| observed_fraction = 0.0 | |
| bins.append( | |
| { | |
| "bin_index": index, | |
| "lower": float(lower), | |
| "upper": float(upper), | |
| "count": count, | |
| "mean_predicted_probability": mean_predicted, | |
| "observed_fraction": observed_fraction, | |
| } | |
| ) | |
| return bins, float(expected_calibration_error) | |
| def compute_calibration_metrics(test_predictions: pd.DataFrame) -> dict[str, Any]: | |
| y_true = test_predictions["label_0_to_4"].astype(int).to_numpy() | |
| probabilities = test_predictions[PROB_COLUMNS].to_numpy() | |
| large_observed = (y_true >= 3).astype(int) | |
| large_probability = test_predictions["p_large_training"].to_numpy() | |
| bins, ece = reliability_bins(large_probability, large_observed) | |
| return { | |
| "log_loss": float(log_loss(y_true, probabilities, labels=LABELS)), | |
| "brier_large_training": float(brier_score_loss(large_observed, large_probability)), | |
| "expected_calibration_error_large_training": ece, | |
| "reliability_bins_large_training": bins, | |
| "calibration_by_label": calibration_by_label(test_predictions), | |
| } | |
| def subgroup_metrics(test_predictions: pd.DataFrame, group_column: str) -> list[dict[str, Any]]: | |
| if group_column not in test_predictions.columns: | |
| return [] | |
| rows: list[dict[str, Any]] = [] | |
| for value, part in test_predictions.groupby(group_column, dropna=False): | |
| y_true = part["label_0_to_4"].astype(int) | |
| y_pred = part["predicted_label"].astype(int) | |
| large_true = y_true >= 3 | |
| large_pred = part["p_large_training"] >= 0.5 | |
| prf = binary_prf(large_true.to_numpy(), large_pred.to_numpy()) | |
| rows.append( | |
| { | |
| "group": str(value), | |
| "rows": int(len(part)), | |
| "accuracy": float(accuracy_score(y_true, y_pred)), | |
| "macro_f1": float(f1_score(y_true, y_pred, labels=LABELS, average="macro", zero_division=0)), | |
| "large_training_precision_at_0_5": prf["precision"], | |
| "large_training_recall_at_0_5": prf["recall"], | |
| "large_training_false_positives": int(((~large_true) & large_pred).sum()), | |
| "large_training_false_negatives": int((large_true & (~large_pred)).sum()), | |
| } | |
| ) | |
| return sorted(rows, key=lambda item: (-item["rows"], item["group"])) | |
| def compute_metrics(test_predictions: pd.DataFrame, test_features: pd.DataFrame) -> tuple[dict[str, Any], dict[str, Any]]: | |
| y_true = test_predictions["label_0_to_4"].astype(int) | |
| y_pred = test_predictions["predicted_label"].astype(int) | |
| probabilities = test_predictions[PROB_COLUMNS].to_numpy() | |
| large_true = y_true >= 3 | |
| large_pred_by_label = y_pred >= 3 | |
| large_pred_by_threshold = test_predictions["p_large_training"] >= 0.5 | |
| large_label_prf = binary_prf(large_true.to_numpy(), large_pred_by_label.to_numpy()) | |
| large_threshold_prf = binary_prf(large_true.to_numpy(), large_pred_by_threshold.to_numpy()) | |
| y_one_hot = np.zeros_like(probabilities) | |
| y_one_hot[np.arange(len(y_true)), y_true.to_numpy()] = 1.0 | |
| rule_labels = predict_rule_labels(test_features) | |
| rule_large = rule_labels >= 3 | |
| rule_prf = binary_prf(large_true.to_numpy(), rule_large.to_numpy()) | |
| false_positives = test_predictions[(y_true < 3) & large_pred_by_threshold] | |
| false_negatives = test_predictions[(y_true >= 3) & (~large_pred_by_threshold)] | |
| high_coverage_label0 = test_predictions[ | |
| (y_true == 0) & (pd.to_numeric(test_predictions["min_critical_coverage"], errors="coerce").fillna(0.0) >= 0.95) | |
| ] | |
| label0_missed = high_coverage_label0[high_coverage_label0["predicted_label"] != 0] | |
| metrics: dict[str, Any] = { | |
| "dataset": { | |
| "test_rows": int(len(test_predictions)), | |
| "test_episodes": int(test_predictions["episode_id"].nunique()), | |
| "test_label_distribution": { | |
| str(label): int(count) for label, count in y_true.value_counts().sort_index().items() | |
| }, | |
| }, | |
| "model": { | |
| "accuracy": float(accuracy_score(y_true, y_pred)), | |
| "macro_f1": float(f1_score(y_true, y_pred, labels=LABELS, average="macro", zero_division=0)), | |
| "weighted_f1": float(f1_score(y_true, y_pred, labels=LABELS, average="weighted", zero_division=0)), | |
| "log_loss": float(log_loss(y_true, probabilities, labels=LABELS)), | |
| "brier_multiclass_mean": float(np.mean((probabilities - y_one_hot) ** 2)), | |
| }, | |
| "governance": { | |
| "label_3_4_predicted_label": large_label_prf, | |
| "p_large_training_threshold_0_5": large_threshold_prf, | |
| "false_positive_scenarios_at_0_5": scenario_counts(false_positives), | |
| "false_negative_scenarios_at_0_5": scenario_counts(false_negatives), | |
| "label_0_missed_under_high_coverage": { | |
| "rows": int(len(high_coverage_label0)), | |
| "missed_rows": int(len(label0_missed)), | |
| "missed_rate": float(len(label0_missed) / len(high_coverage_label0)) if len(high_coverage_label0) else 0.0, | |
| }, | |
| }, | |
| "rule_baseline": { | |
| "accuracy": float(accuracy_score(y_true, rule_labels)), | |
| "macro_f1": float(f1_score(y_true, rule_labels, labels=LABELS, average="macro", zero_division=0)), | |
| "weighted_f1": float(f1_score(y_true, rule_labels, labels=LABELS, average="weighted", zero_division=0)), | |
| "label_3_4_predicted_label": rule_prf, | |
| }, | |
| "subgroup_metrics": { | |
| column: subgroup_metrics(test_predictions, column) | |
| for column in [ | |
| "latent_workload_class", | |
| "scenario_family", | |
| "data_quality_regime", | |
| "temporal_phase", | |
| "site_id", | |
| "window_length_seconds", | |
| "o4_missing_reason", | |
| "o7_missing_reason", | |
| "o12_missing_reason", | |
| "capacity_possible", | |
| "integrity_warning", | |
| ] | |
| }, | |
| } | |
| calibration = compute_calibration_metrics(test_predictions) | |
| metrics["calibration"] = { | |
| "log_loss": calibration["log_loss"], | |
| "brier_large_training": calibration["brier_large_training"], | |
| "expected_calibration_error_large_training": calibration["expected_calibration_error_large_training"], | |
| } | |
| return metrics, calibration | |
| def write_confusion_matrix(path: Path, test_predictions: pd.DataFrame) -> None: | |
| y_true = test_predictions["label_0_to_4"].astype(int) | |
| y_pred = test_predictions["predicted_label"].astype(int) | |
| matrix = confusion_matrix(y_true, y_pred, labels=LABELS) | |
| frame = pd.DataFrame(matrix, index=[f"true_{label}" for label in LABELS], columns=[f"pred_{label}" for label in LABELS]) | |
| frame.to_csv(path, index_label="true_label") | |
| def write_classification_report(path: Path, test_predictions: pd.DataFrame) -> None: | |
| report = classification_report( | |
| test_predictions["label_0_to_4"].astype(int), | |
| test_predictions["predicted_label"].astype(int), | |
| labels=LABELS, | |
| output_dict=True, | |
| zero_division=0, | |
| ) | |
| write_json(path, report) | |
| def compute_feature_importance( | |
| model_run_dir: Path, | |
| test_features: pd.DataFrame, | |
| test_predictions: pd.DataFrame, | |
| seed: int = DEFAULT_SEED, | |
| repeats: int = 3, | |
| ) -> pd.DataFrame: | |
| feature_columns = read_json(model_run_dir / "feature_columns.json") | |
| preprocessor = joblib.load(model_run_dir / "preprocessing.joblib") | |
| model = joblib.load(model_run_dir / "model.joblib") | |
| x_test = model_input_frame(test_features, feature_columns) | |
| y_true = test_predictions["label_0_to_4"].astype(int) | |
| baseline_pred = np.asarray(LABELS)[ | |
| np.argmax(probability_frame(model, preprocessor.transform(x_test))[PROB_COLUMNS].to_numpy(), axis=1) | |
| ] | |
| baseline_score = f1_score(y_true, baseline_pred, labels=LABELS, average="macro", zero_division=0) | |
| rng = np.random.default_rng(seed) | |
| rows: list[dict[str, Any]] = [] | |
| for column in feature_columns: | |
| scores: list[float] = [] | |
| if x_test[column].nunique(dropna=False) <= 1: | |
| importances = [0.0] * repeats | |
| else: | |
| importances = [] | |
| for _ in range(repeats): | |
| permuted = x_test.copy() | |
| values = permuted[column].to_numpy(copy=True) | |
| rng.shuffle(values) | |
| permuted[column] = values | |
| permuted_pred = np.asarray(LABELS)[ | |
| np.argmax( | |
| probability_frame(model, preprocessor.transform(permuted))[PROB_COLUMNS].to_numpy(), | |
| axis=1, | |
| ) | |
| ] | |
| score = f1_score(y_true, permuted_pred, labels=LABELS, average="macro", zero_division=0) | |
| scores.append(float(score)) | |
| importances.append(float(baseline_score - score)) | |
| rows.append( | |
| { | |
| "feature": column, | |
| "importance_mean": float(np.mean(importances)), | |
| "importance_std": float(np.std(importances)), | |
| "baseline_macro_f1": float(baseline_score), | |
| "permuted_macro_f1_mean": float(np.mean(scores)) if scores else float(baseline_score), | |
| } | |
| ) | |
| return pd.DataFrame(rows).sort_values(["importance_mean", "feature"], ascending=[False, True]) | |
| def evidence_audit_sample(test_predictions: pd.DataFrame, min_rows: int = 50) -> pd.DataFrame: | |
| selected_indices: set[int] = set() | |
| samples: list[pd.DataFrame] = [] | |
| def add_sample(name: str, mask: pd.Series, count: int, sort_column: str | None = None, ascending: bool = False) -> None: | |
| candidates = test_predictions[mask & (~test_predictions.index.isin(selected_indices))].copy() | |
| if sort_column and sort_column in candidates.columns: | |
| candidates = candidates.sort_values(sort_column, ascending=ascending) | |
| candidates = candidates.head(count) | |
| if len(candidates): | |
| selected_indices.update(int(index) for index in candidates.index) | |
| candidates["audit_category"] = name | |
| samples.append(candidates) | |
| true_label = test_predictions["label_0_to_4"].astype(int) | |
| predicted_label = test_predictions["predicted_label"].astype(int) | |
| large_pred = test_predictions["p_large_training"] >= 0.5 | |
| add_sample("correct_label_0", (true_label == 0) & (predicted_label == 0), 10, "negative_certification_confidence", False) | |
| add_sample( | |
| "correct_label_2_hard_false_positive", | |
| (true_label == 2) | |
| & (predicted_label == 2) | |
| & test_predictions["latent_workload_class"].isin(HARD_FALSE_POSITIVE_SCENARIOS), | |
| 10, | |
| "p_large_training", | |
| False, | |
| ) | |
| add_sample("correct_label_3_4_likely_training", (true_label >= 3) & (predicted_label >= 3), 10, "p_large_training", False) | |
| add_sample("false_positive_large_training", (true_label < 3) & large_pred, 10, "p_large_training", False) | |
| add_sample("false_negative_large_training", (true_label >= 3) & (~large_pred), 10, "p_large_training", True) | |
| if samples: | |
| sample = pd.concat(samples, axis=0) | |
| else: | |
| sample = test_predictions.iloc[0:0].copy() | |
| if len(sample) < min_rows: | |
| filler = test_predictions[~test_predictions.index.isin(selected_indices)].copy() | |
| filler["uncertainty_distance"] = (filler["p_large_training"] - 0.5).abs() | |
| filler = filler.sort_values(["uncertainty_distance", "severity_score"], ascending=[True, False]).head(min_rows - len(sample)) | |
| filler["audit_category"] = "high_uncertainty_or_coverage_edge" | |
| sample = pd.concat([sample, filler], axis=0) | |
| columns = [ | |
| "audit_category", | |
| "feature_row_id", | |
| "split", | |
| "latent_workload_class", | |
| "label_0_to_4", | |
| "predicted_label", | |
| "p_large_training", | |
| "severity_score", | |
| "top_evidence", | |
| "critical_missing_layers", | |
| "integrity_warning", | |
| ] | |
| columns += [column for column in SELECTED_AUDIT_FEATURES if column in sample.columns] | |
| return sample.loc[:, [column for column in columns if column in sample.columns]].rename(columns={"label_0_to_4": "true_label"}) | |
| def top_scenarios_text(scenarios: dict[str, int], limit: int = 5) -> str: | |
| if not scenarios: | |
| return "none" | |
| return ", ".join(f"{name}: {count}" for name, count in list(scenarios.items())[:limit]) | |
| def write_run_readme(model_run_dir: Path, features_path: Path, metrics: dict[str, Any], calibration: dict[str, Any]) -> None: | |
| model_metrics = metrics["model"] | |
| governance = metrics["governance"] | |
| split_manifest = read_json(model_run_dir / "split_manifest.json") | |
| dataset_dir = features_path.parent.parent if features_path.parent.name == "features" else features_path.parent | |
| dataset_name = dataset_dir.name | |
| model_run_name = model_run_dir.name | |
| readme = f"""# {model_run_name} model run | |
| This directory contains a public runnable baseline for the `{dataset_name}` datacenter training-run verification dataset. | |
| ## Dataset | |
| - Feature table: `{features_path}` | |
| - Rows: {sum(split['rows'] for split in split_manifest['summary'].values())} | |
| - Episode split: grouped by `episode_id`, scenario-stratified, seed `{split_manifest['seed']}` | |
| - Split rows: train {split_manifest['summary']['train']['rows']}, validation {split_manifest['summary']['validation']['rows']}, test {split_manifest['summary']['test']['rows']} | |
| - Split episodes: train {split_manifest['summary']['train']['episodes']}, validation {split_manifest['summary']['validation']['episodes']}, test {split_manifest['summary']['test']['episodes']} | |
| ## Model | |
| - Supervised model: calibrated scikit-learn histogram gradient boosting classifier | |
| - Calibration: validation split only, held-out test evaluated once | |
| - Rule baseline: deterministic evidence rules in `src/datacenter_verification_modeling/rule_baseline.py` | |
| - Leakage exclusions: identifiers, labels, site id, episode id, raw manifest hash, scenario metadata, counterfactual metadata, and synthetic-only audit columns | |
| ## Headline Test Metrics | |
| - Accuracy: {model_metrics['accuracy']:.4f} | |
| - Macro F1: {model_metrics['macro_f1']:.4f} | |
| - Weighted F1: {model_metrics['weighted_f1']:.4f} | |
| - Log loss: {model_metrics['log_loss']:.4f} | |
| - Label 3/4 precision by predicted label: {governance['label_3_4_predicted_label']['precision']:.4f} | |
| - Label 3/4 recall by predicted label: {governance['label_3_4_predicted_label']['recall']:.4f} | |
| - `p_large_training >= 0.5` precision: {governance['p_large_training_threshold_0_5']['precision']:.4f} | |
| - `p_large_training >= 0.5` recall: {governance['p_large_training_threshold_0_5']['recall']:.4f} | |
| - Rule baseline macro F1: {metrics['rule_baseline']['macro_f1']:.4f} | |
| ## Error Scenarios | |
| - Largest false-positive scenarios at `p_large_training >= 0.5`: {top_scenarios_text(governance['false_positive_scenarios_at_0_5'])} | |
| - Largest false-negative scenarios at `p_large_training >= 0.5`: {top_scenarios_text(governance['false_negative_scenarios_at_0_5'])} | |
| ## Calibration | |
| - Brier score for `p_large_training`: {calibration['brier_large_training']:.4f} | |
| - Expected calibration error for `p_large_training`: {calibration['expected_calibration_error_large_training']:.4f} | |
| ## Reproduce | |
| ```bash | |
| python src/datacenter_verification_modeling/train_model.py \\ | |
| --features {features_path} \\ | |
| --output {model_run_dir} \\ | |
| --seed {split_manifest['seed']} | |
| ``` | |
| ```bash | |
| python src/datacenter_verification_modeling/evaluate_model.py \\ | |
| --model-run {model_run_dir} \\ | |
| --features {features_path} | |
| ``` | |
| ```bash | |
| python src/datacenter_verification_modeling/predict.py \\ | |
| --model-run {model_run_dir} \\ | |
| --features {features_path} \\ | |
| --output {model_run_dir / 'predictions_all.csv'} | |
| ``` | |
| ## Limitations | |
| - This model is trained on synthetic data only. | |
| - Performance numbers are not real-world deployment claims. | |
| - Adjacent windows are correlated, so group splitting by `episode_id` is mandatory. | |
| - Synthetic labels are generated from rules and latent scenarios, so the model may learn generator assumptions. | |
| - Real datacenter deployment would require calibration on real telemetry and controlled drills. | |
| - The model should assist audit triage; it should not be treated as sole proof of a violation. | |
| """ | |
| (model_run_dir / "README.md").write_text(readme, encoding="utf-8") | |
| def write_validation_summary( | |
| model_run_dir: Path, | |
| features_path: Path, | |
| metrics: dict[str, Any], | |
| validation_status: list[dict[str, Any]] | None, | |
| ) -> None: | |
| if validation_status is None and (model_run_dir / "manifest.json").exists(): | |
| existing_manifest = read_json(model_run_dir / "manifest.json") | |
| validation_status = existing_manifest.get("validation_status") or None | |
| required_rows = [] | |
| for filename in REQUIRED_OUTPUTS: | |
| path = model_run_dir / filename | |
| exists = path.exists() or filename == "validation_summary.md" | |
| required_rows.append(f"- `{filename}`: {'present' if exists else 'missing'}") | |
| validation_lines = [] | |
| if validation_status: | |
| for item in validation_status: | |
| validation_lines.append( | |
| f"- `{item['name']}`: return code {item['returncode']}; " | |
| f"{'PASS' if item['returncode'] == 0 else 'FAIL'}" | |
| ) | |
| else: | |
| validation_lines.append("- Dataset validation was not rerun by this evaluation command; see `manifest.json` for any recorded training validation status.") | |
| model_metrics = metrics["model"] | |
| governance = metrics["governance"] | |
| text = f"""# Validation Summary | |
| Feature table: `{features_path}` | |
| ## Dataset Validation Before Training | |
| {chr(10).join(validation_lines)} | |
| ## Required Artifacts | |
| {chr(10).join(required_rows)} | |
| ## Test Metrics | |
| - Accuracy: {model_metrics['accuracy']:.4f} | |
| - Macro F1: {model_metrics['macro_f1']:.4f} | |
| - Label 3/4 precision: {governance['label_3_4_predicted_label']['precision']:.4f} | |
| - Label 3/4 recall: {governance['label_3_4_predicted_label']['recall']:.4f} | |
| - `p_large_training >= 0.5` precision: {governance['p_large_training_threshold_0_5']['precision']:.4f} | |
| - `p_large_training >= 0.5` recall: {governance['p_large_training_threshold_0_5']['recall']:.4f} | |
| ## Governance Checks | |
| - Capacity gate applied to post-processed probabilities. | |
| - Negative certification confidence is `p_label_0 * min_critical_coverage`. | |
| - Integrity warnings are reported separately from positive training evidence. | |
| - Raw model probabilities are retained as `raw_p_label_0` through `raw_p_label_4`. | |
| """ | |
| (model_run_dir / "validation_summary.md").write_text(text, encoding="utf-8") | |
| def write_manifest( | |
| model_run_dir: Path, | |
| features_path: Path, | |
| metrics: dict[str, Any], | |
| calibration: dict[str, Any], | |
| validation_status: list[dict[str, Any]] | None, | |
| training_metadata: dict[str, Any] | None, | |
| ) -> None: | |
| existing: dict[str, Any] = {} | |
| manifest_path = model_run_dir / "manifest.json" | |
| if manifest_path.exists(): | |
| existing = read_json(manifest_path) | |
| artifact_hashes = { | |
| filename: sha256_file(model_run_dir / filename) | |
| for filename in REQUIRED_OUTPUTS | |
| if (model_run_dir / filename).exists() and filename != "manifest.json" | |
| } | |
| manifest = { | |
| **existing, | |
| "created_or_updated_at": utc_now_iso(), | |
| "model_run_id": model_run_dir.name, | |
| "features_path": str(features_path), | |
| "model_type": "CalibratedClassifierCV over HistGradientBoostingClassifier", | |
| "calibration_method": existing.get("calibration_method", "sigmoid_on_validation_split"), | |
| "metrics_summary": { | |
| "accuracy": metrics["model"]["accuracy"], | |
| "macro_f1": metrics["model"]["macro_f1"], | |
| "weighted_f1": metrics["model"]["weighted_f1"], | |
| "label_3_4_precision": metrics["governance"]["label_3_4_predicted_label"]["precision"], | |
| "label_3_4_recall": metrics["governance"]["label_3_4_predicted_label"]["recall"], | |
| "p_large_training_precision_at_0_5": metrics["governance"]["p_large_training_threshold_0_5"]["precision"], | |
| "p_large_training_recall_at_0_5": metrics["governance"]["p_large_training_threshold_0_5"]["recall"], | |
| "brier_large_training": calibration["brier_large_training"], | |
| "ece_large_training": calibration["expected_calibration_error_large_training"], | |
| }, | |
| "validation_status": validation_status or existing.get("validation_status", []), | |
| "training_metadata": training_metadata or existing.get("training_metadata", {}), | |
| "required_outputs": REQUIRED_OUTPUTS, | |
| "artifact_hashes": artifact_hashes, | |
| "limitations": [ | |
| "Trained on synthetic data only.", | |
| "Performance numbers are not real-world deployment claims.", | |
| "Episode-level group splitting is required because adjacent windows are correlated.", | |
| "Synthetic labels encode generator assumptions.", | |
| "Real deployment requires real telemetry calibration and controlled drills.", | |
| ], | |
| } | |
| write_json(manifest_path, manifest) | |
| def evaluate_model_run( | |
| model_run_dir: Path, | |
| features_path: Path, | |
| validation_status: list[dict[str, Any]] | None = None, | |
| training_metadata: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| model_run_dir.mkdir(parents=True, exist_ok=True) | |
| feature_df, predictions = predict_for_model_run(model_run_dir, features_path) | |
| predictions.to_csv(model_run_dir / "predictions_all.csv", index=False) | |
| test_predictions = predictions[predictions["split"] == "test"].copy() | |
| test_features = feature_df[feature_df["split"] == "test"].copy() | |
| if test_predictions.empty: | |
| raise ValueError("no test split rows available for evaluation") | |
| test_predictions.to_csv(model_run_dir / "predictions_test.csv", index=False) | |
| metrics, calibration = compute_metrics(test_predictions, test_features) | |
| write_json(model_run_dir / "metrics.json", metrics) | |
| write_json(model_run_dir / "calibration_metrics.json", calibration) | |
| write_confusion_matrix(model_run_dir / "confusion_matrix.csv", test_predictions) | |
| write_classification_report(model_run_dir / "classification_report.json", test_predictions) | |
| importance = compute_feature_importance(model_run_dir, test_features, test_predictions) | |
| importance.to_csv(model_run_dir / "feature_importance.csv", index=False) | |
| audit = evidence_audit_sample(test_predictions) | |
| audit.to_csv(model_run_dir / "evidence_audit_sample.csv", index=False) | |
| write_run_readme(model_run_dir, features_path, metrics, calibration) | |
| write_manifest(model_run_dir, features_path, metrics, calibration, validation_status, training_metadata) | |
| write_validation_summary(model_run_dir, features_path, metrics, validation_status) | |
| write_manifest(model_run_dir, features_path, metrics, calibration, validation_status, training_metadata) | |
| return metrics | |
| def main(argv: list[str] | None = None) -> int: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("--model-run", type=Path, required=True) | |
| parser.add_argument("--features", type=Path, required=True) | |
| args = parser.parse_args(argv) | |
| metrics = evaluate_model_run(args.model_run, args.features) | |
| print(f"accuracy: {metrics['model']['accuracy']:.4f}") | |
| print(f"macro_f1: {metrics['model']['macro_f1']:.4f}") | |
| print(f"label_3_4_precision: {metrics['governance']['label_3_4_predicted_label']['precision']:.4f}") | |
| print(f"label_3_4_recall: {metrics['governance']['label_3_4_predicted_label']['recall']:.4f}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |