idacy's picture
Upload live inference API deployment files
e4b1ed6 verified
Raw
History Blame Contribute Delete
30.3 kB
"""Evaluate a trained synthetic datacenter verification modeling run."""
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Any
import joblib
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score,
brier_score_loss,
classification_report,
confusion_matrix,
f1_score,
log_loss,
precision_recall_fscore_support,
)
try:
from .common import (
DEFAULT_SEED,
LABELS,
PROB_COLUMNS,
RAW_PROB_COLUMNS,
SELECTED_AUDIT_FEATURES,
add_governance_outputs,
apply_split_manifest,
build_prediction_frame,
load_feature_table,
minimum_critical_coverage,
model_input_frame,
probability_frame,
read_json,
sha256_file,
utc_now_iso,
write_json,
)
from .rule_baseline import predict_rule_labels
except ImportError: # pragma: no cover - direct script execution
from common import (
DEFAULT_SEED,
LABELS,
PROB_COLUMNS,
RAW_PROB_COLUMNS,
SELECTED_AUDIT_FEATURES,
add_governance_outputs,
apply_split_manifest,
build_prediction_frame,
load_feature_table,
minimum_critical_coverage,
model_input_frame,
probability_frame,
read_json,
sha256_file,
utc_now_iso,
write_json,
)
from rule_baseline import predict_rule_labels
HARD_FALSE_POSITIVE_SCENARIOS = {
"large_batch_inference",
"synthetic_data_generation",
"hpc_mpi_simulation",
"nccl_benchmark",
"hardware_burn_in",
"storage_rebuild",
"large_etl_data_movement",
"reserved_but_unused_capacity",
"maintenance_window",
"large_batch_inference",
"model_parallel_inference",
"embedding_generation",
"synthetic_data_generation_gpu_heavy",
"hpc_mpi_collective",
"nccl_extended_benchmark",
"hardware_burn_in_or_thermal_soak",
"storage_rebuild_or_replication",
"large_etl_or_data_movement",
"distributed_database_or_graph_analytics",
"reserved_but_unused_capacity",
"maintenance_with_collector_gaps",
"multi_tenant_fragmented_nontraining",
"capacity_or_integrity_only_warning",
}
TRAINING_SCENARIOS = {
"large_fine_tune",
"pretraining",
"cloud_reservation_used_for_training",
"adversarial_fragmented_training",
"underclocked_long_duration_training",
"pretraining_standard",
"large_fine_tune_standard",
"cloud_training_redacted_runtime",
"training_without_semantic_logs",
"underclocked_energy_capped_training",
"elastic_preempted_training",
"fragmented_training_linked",
"sparse_or_moe_bursty_training",
"training_with_low_fabric_high_checkpoint",
"training_with_delayed_logs",
"multi_stage_training_pipeline",
}
REQUIRED_OUTPUTS = [
"README.md",
"manifest.json",
"model.joblib",
"preprocessing.joblib",
"feature_columns.json",
"excluded_columns.json",
"split_manifest.json",
"metrics.json",
"calibration_metrics.json",
"confusion_matrix.csv",
"classification_report.json",
"predictions_test.csv",
"predictions_all.csv",
"feature_importance.csv",
"evidence_audit_sample.csv",
"validation_summary.md",
]
def predict_for_model_run(model_run_dir: Path, features_path: Path) -> tuple[pd.DataFrame, pd.DataFrame]:
df = load_feature_table(features_path)
split_manifest_path = model_run_dir / "split_manifest.json"
if split_manifest_path.exists():
df = apply_split_manifest(df, read_json(split_manifest_path))
else:
df = df.copy()
df["split"] = "unassigned"
feature_columns = read_json(model_run_dir / "feature_columns.json")
preprocessor = joblib.load(model_run_dir / "preprocessing.joblib")
model = joblib.load(model_run_dir / "model.joblib")
model_frame = model_input_frame(df, feature_columns)
transformed = preprocessor.transform(model_frame)
raw_probabilities = probability_frame(model, transformed)
governance = add_governance_outputs(df, raw_probabilities)
predictions = build_prediction_frame(df, raw_probabilities, governance)
return df, predictions
def binary_prf(true_binary: pd.Series | np.ndarray, predicted_binary: pd.Series | np.ndarray) -> dict[str, float]:
precision, recall, f1, _ = precision_recall_fscore_support(
true_binary.astype(bool),
predicted_binary.astype(bool),
average="binary",
zero_division=0,
)
return {"precision": float(precision), "recall": float(recall), "f1": float(f1)}
def scenario_counts(frame: pd.DataFrame) -> dict[str, int]:
if len(frame) == 0:
return {}
return {str(key): int(value) for key, value in frame["latent_workload_class"].value_counts().sort_values(ascending=False).items()}
def calibration_by_label(test_predictions: pd.DataFrame) -> dict[str, Any]:
y_true = test_predictions["label_0_to_4"].astype(int).to_numpy()
out: dict[str, Any] = {}
for label in LABELS:
observed = (y_true == label).astype(int)
predicted = test_predictions[f"p_label_{label}"].to_numpy()
out[str(label)] = {
"rows": int(len(test_predictions)),
"observed_fraction": float(observed.mean()) if len(observed) else 0.0,
"mean_predicted_probability": float(predicted.mean()) if len(predicted) else 0.0,
"brier": float(brier_score_loss(observed, predicted)) if len(np.unique(observed)) > 1 else float(np.mean((predicted - observed) ** 2)),
}
return out
def reliability_bins(probabilities: np.ndarray, observed: np.ndarray, bin_count: int = 10) -> tuple[list[dict[str, Any]], float]:
bins: list[dict[str, Any]] = []
expected_calibration_error = 0.0
total = len(probabilities)
edges = np.linspace(0.0, 1.0, bin_count + 1)
for index in range(bin_count):
lower = edges[index]
upper = edges[index + 1]
if index == bin_count - 1:
mask = (probabilities >= lower) & (probabilities <= upper)
else:
mask = (probabilities >= lower) & (probabilities < upper)
count = int(mask.sum())
if count:
mean_predicted = float(probabilities[mask].mean())
observed_fraction = float(observed[mask].mean())
expected_calibration_error += (count / total) * abs(mean_predicted - observed_fraction)
else:
mean_predicted = 0.0
observed_fraction = 0.0
bins.append(
{
"bin_index": index,
"lower": float(lower),
"upper": float(upper),
"count": count,
"mean_predicted_probability": mean_predicted,
"observed_fraction": observed_fraction,
}
)
return bins, float(expected_calibration_error)
def compute_calibration_metrics(test_predictions: pd.DataFrame) -> dict[str, Any]:
y_true = test_predictions["label_0_to_4"].astype(int).to_numpy()
probabilities = test_predictions[PROB_COLUMNS].to_numpy()
large_observed = (y_true >= 3).astype(int)
large_probability = test_predictions["p_large_training"].to_numpy()
bins, ece = reliability_bins(large_probability, large_observed)
return {
"log_loss": float(log_loss(y_true, probabilities, labels=LABELS)),
"brier_large_training": float(brier_score_loss(large_observed, large_probability)),
"expected_calibration_error_large_training": ece,
"reliability_bins_large_training": bins,
"calibration_by_label": calibration_by_label(test_predictions),
}
def subgroup_metrics(test_predictions: pd.DataFrame, group_column: str) -> list[dict[str, Any]]:
if group_column not in test_predictions.columns:
return []
rows: list[dict[str, Any]] = []
for value, part in test_predictions.groupby(group_column, dropna=False):
y_true = part["label_0_to_4"].astype(int)
y_pred = part["predicted_label"].astype(int)
large_true = y_true >= 3
large_pred = part["p_large_training"] >= 0.5
prf = binary_prf(large_true.to_numpy(), large_pred.to_numpy())
rows.append(
{
"group": str(value),
"rows": int(len(part)),
"accuracy": float(accuracy_score(y_true, y_pred)),
"macro_f1": float(f1_score(y_true, y_pred, labels=LABELS, average="macro", zero_division=0)),
"large_training_precision_at_0_5": prf["precision"],
"large_training_recall_at_0_5": prf["recall"],
"large_training_false_positives": int(((~large_true) & large_pred).sum()),
"large_training_false_negatives": int((large_true & (~large_pred)).sum()),
}
)
return sorted(rows, key=lambda item: (-item["rows"], item["group"]))
def compute_metrics(test_predictions: pd.DataFrame, test_features: pd.DataFrame) -> tuple[dict[str, Any], dict[str, Any]]:
y_true = test_predictions["label_0_to_4"].astype(int)
y_pred = test_predictions["predicted_label"].astype(int)
probabilities = test_predictions[PROB_COLUMNS].to_numpy()
large_true = y_true >= 3
large_pred_by_label = y_pred >= 3
large_pred_by_threshold = test_predictions["p_large_training"] >= 0.5
large_label_prf = binary_prf(large_true.to_numpy(), large_pred_by_label.to_numpy())
large_threshold_prf = binary_prf(large_true.to_numpy(), large_pred_by_threshold.to_numpy())
y_one_hot = np.zeros_like(probabilities)
y_one_hot[np.arange(len(y_true)), y_true.to_numpy()] = 1.0
rule_labels = predict_rule_labels(test_features)
rule_large = rule_labels >= 3
rule_prf = binary_prf(large_true.to_numpy(), rule_large.to_numpy())
false_positives = test_predictions[(y_true < 3) & large_pred_by_threshold]
false_negatives = test_predictions[(y_true >= 3) & (~large_pred_by_threshold)]
high_coverage_label0 = test_predictions[
(y_true == 0) & (pd.to_numeric(test_predictions["min_critical_coverage"], errors="coerce").fillna(0.0) >= 0.95)
]
label0_missed = high_coverage_label0[high_coverage_label0["predicted_label"] != 0]
metrics: dict[str, Any] = {
"dataset": {
"test_rows": int(len(test_predictions)),
"test_episodes": int(test_predictions["episode_id"].nunique()),
"test_label_distribution": {
str(label): int(count) for label, count in y_true.value_counts().sort_index().items()
},
},
"model": {
"accuracy": float(accuracy_score(y_true, y_pred)),
"macro_f1": float(f1_score(y_true, y_pred, labels=LABELS, average="macro", zero_division=0)),
"weighted_f1": float(f1_score(y_true, y_pred, labels=LABELS, average="weighted", zero_division=0)),
"log_loss": float(log_loss(y_true, probabilities, labels=LABELS)),
"brier_multiclass_mean": float(np.mean((probabilities - y_one_hot) ** 2)),
},
"governance": {
"label_3_4_predicted_label": large_label_prf,
"p_large_training_threshold_0_5": large_threshold_prf,
"false_positive_scenarios_at_0_5": scenario_counts(false_positives),
"false_negative_scenarios_at_0_5": scenario_counts(false_negatives),
"label_0_missed_under_high_coverage": {
"rows": int(len(high_coverage_label0)),
"missed_rows": int(len(label0_missed)),
"missed_rate": float(len(label0_missed) / len(high_coverage_label0)) if len(high_coverage_label0) else 0.0,
},
},
"rule_baseline": {
"accuracy": float(accuracy_score(y_true, rule_labels)),
"macro_f1": float(f1_score(y_true, rule_labels, labels=LABELS, average="macro", zero_division=0)),
"weighted_f1": float(f1_score(y_true, rule_labels, labels=LABELS, average="weighted", zero_division=0)),
"label_3_4_predicted_label": rule_prf,
},
"subgroup_metrics": {
column: subgroup_metrics(test_predictions, column)
for column in [
"latent_workload_class",
"scenario_family",
"data_quality_regime",
"temporal_phase",
"site_id",
"window_length_seconds",
"o4_missing_reason",
"o7_missing_reason",
"o12_missing_reason",
"capacity_possible",
"integrity_warning",
]
},
}
calibration = compute_calibration_metrics(test_predictions)
metrics["calibration"] = {
"log_loss": calibration["log_loss"],
"brier_large_training": calibration["brier_large_training"],
"expected_calibration_error_large_training": calibration["expected_calibration_error_large_training"],
}
return metrics, calibration
def write_confusion_matrix(path: Path, test_predictions: pd.DataFrame) -> None:
y_true = test_predictions["label_0_to_4"].astype(int)
y_pred = test_predictions["predicted_label"].astype(int)
matrix = confusion_matrix(y_true, y_pred, labels=LABELS)
frame = pd.DataFrame(matrix, index=[f"true_{label}" for label in LABELS], columns=[f"pred_{label}" for label in LABELS])
frame.to_csv(path, index_label="true_label")
def write_classification_report(path: Path, test_predictions: pd.DataFrame) -> None:
report = classification_report(
test_predictions["label_0_to_4"].astype(int),
test_predictions["predicted_label"].astype(int),
labels=LABELS,
output_dict=True,
zero_division=0,
)
write_json(path, report)
def compute_feature_importance(
model_run_dir: Path,
test_features: pd.DataFrame,
test_predictions: pd.DataFrame,
seed: int = DEFAULT_SEED,
repeats: int = 3,
) -> pd.DataFrame:
feature_columns = read_json(model_run_dir / "feature_columns.json")
preprocessor = joblib.load(model_run_dir / "preprocessing.joblib")
model = joblib.load(model_run_dir / "model.joblib")
x_test = model_input_frame(test_features, feature_columns)
y_true = test_predictions["label_0_to_4"].astype(int)
baseline_pred = np.asarray(LABELS)[
np.argmax(probability_frame(model, preprocessor.transform(x_test))[PROB_COLUMNS].to_numpy(), axis=1)
]
baseline_score = f1_score(y_true, baseline_pred, labels=LABELS, average="macro", zero_division=0)
rng = np.random.default_rng(seed)
rows: list[dict[str, Any]] = []
for column in feature_columns:
scores: list[float] = []
if x_test[column].nunique(dropna=False) <= 1:
importances = [0.0] * repeats
else:
importances = []
for _ in range(repeats):
permuted = x_test.copy()
values = permuted[column].to_numpy(copy=True)
rng.shuffle(values)
permuted[column] = values
permuted_pred = np.asarray(LABELS)[
np.argmax(
probability_frame(model, preprocessor.transform(permuted))[PROB_COLUMNS].to_numpy(),
axis=1,
)
]
score = f1_score(y_true, permuted_pred, labels=LABELS, average="macro", zero_division=0)
scores.append(float(score))
importances.append(float(baseline_score - score))
rows.append(
{
"feature": column,
"importance_mean": float(np.mean(importances)),
"importance_std": float(np.std(importances)),
"baseline_macro_f1": float(baseline_score),
"permuted_macro_f1_mean": float(np.mean(scores)) if scores else float(baseline_score),
}
)
return pd.DataFrame(rows).sort_values(["importance_mean", "feature"], ascending=[False, True])
def evidence_audit_sample(test_predictions: pd.DataFrame, min_rows: int = 50) -> pd.DataFrame:
selected_indices: set[int] = set()
samples: list[pd.DataFrame] = []
def add_sample(name: str, mask: pd.Series, count: int, sort_column: str | None = None, ascending: bool = False) -> None:
candidates = test_predictions[mask & (~test_predictions.index.isin(selected_indices))].copy()
if sort_column and sort_column in candidates.columns:
candidates = candidates.sort_values(sort_column, ascending=ascending)
candidates = candidates.head(count)
if len(candidates):
selected_indices.update(int(index) for index in candidates.index)
candidates["audit_category"] = name
samples.append(candidates)
true_label = test_predictions["label_0_to_4"].astype(int)
predicted_label = test_predictions["predicted_label"].astype(int)
large_pred = test_predictions["p_large_training"] >= 0.5
add_sample("correct_label_0", (true_label == 0) & (predicted_label == 0), 10, "negative_certification_confidence", False)
add_sample(
"correct_label_2_hard_false_positive",
(true_label == 2)
& (predicted_label == 2)
& test_predictions["latent_workload_class"].isin(HARD_FALSE_POSITIVE_SCENARIOS),
10,
"p_large_training",
False,
)
add_sample("correct_label_3_4_likely_training", (true_label >= 3) & (predicted_label >= 3), 10, "p_large_training", False)
add_sample("false_positive_large_training", (true_label < 3) & large_pred, 10, "p_large_training", False)
add_sample("false_negative_large_training", (true_label >= 3) & (~large_pred), 10, "p_large_training", True)
if samples:
sample = pd.concat(samples, axis=0)
else:
sample = test_predictions.iloc[0:0].copy()
if len(sample) < min_rows:
filler = test_predictions[~test_predictions.index.isin(selected_indices)].copy()
filler["uncertainty_distance"] = (filler["p_large_training"] - 0.5).abs()
filler = filler.sort_values(["uncertainty_distance", "severity_score"], ascending=[True, False]).head(min_rows - len(sample))
filler["audit_category"] = "high_uncertainty_or_coverage_edge"
sample = pd.concat([sample, filler], axis=0)
columns = [
"audit_category",
"feature_row_id",
"split",
"latent_workload_class",
"label_0_to_4",
"predicted_label",
"p_large_training",
"severity_score",
"top_evidence",
"critical_missing_layers",
"integrity_warning",
]
columns += [column for column in SELECTED_AUDIT_FEATURES if column in sample.columns]
return sample.loc[:, [column for column in columns if column in sample.columns]].rename(columns={"label_0_to_4": "true_label"})
def top_scenarios_text(scenarios: dict[str, int], limit: int = 5) -> str:
if not scenarios:
return "none"
return ", ".join(f"{name}: {count}" for name, count in list(scenarios.items())[:limit])
def write_run_readme(model_run_dir: Path, features_path: Path, metrics: dict[str, Any], calibration: dict[str, Any]) -> None:
model_metrics = metrics["model"]
governance = metrics["governance"]
split_manifest = read_json(model_run_dir / "split_manifest.json")
dataset_dir = features_path.parent.parent if features_path.parent.name == "features" else features_path.parent
dataset_name = dataset_dir.name
model_run_name = model_run_dir.name
readme = f"""# {model_run_name} model run
This directory contains a public runnable baseline for the `{dataset_name}` datacenter training-run verification dataset.
## Dataset
- Feature table: `{features_path}`
- Rows: {sum(split['rows'] for split in split_manifest['summary'].values())}
- Episode split: grouped by `episode_id`, scenario-stratified, seed `{split_manifest['seed']}`
- Split rows: train {split_manifest['summary']['train']['rows']}, validation {split_manifest['summary']['validation']['rows']}, test {split_manifest['summary']['test']['rows']}
- Split episodes: train {split_manifest['summary']['train']['episodes']}, validation {split_manifest['summary']['validation']['episodes']}, test {split_manifest['summary']['test']['episodes']}
## Model
- Supervised model: calibrated scikit-learn histogram gradient boosting classifier
- Calibration: validation split only, held-out test evaluated once
- Rule baseline: deterministic evidence rules in `src/datacenter_verification_modeling/rule_baseline.py`
- Leakage exclusions: identifiers, labels, site id, episode id, raw manifest hash, scenario metadata, counterfactual metadata, and synthetic-only audit columns
## Headline Test Metrics
- Accuracy: {model_metrics['accuracy']:.4f}
- Macro F1: {model_metrics['macro_f1']:.4f}
- Weighted F1: {model_metrics['weighted_f1']:.4f}
- Log loss: {model_metrics['log_loss']:.4f}
- Label 3/4 precision by predicted label: {governance['label_3_4_predicted_label']['precision']:.4f}
- Label 3/4 recall by predicted label: {governance['label_3_4_predicted_label']['recall']:.4f}
- `p_large_training >= 0.5` precision: {governance['p_large_training_threshold_0_5']['precision']:.4f}
- `p_large_training >= 0.5` recall: {governance['p_large_training_threshold_0_5']['recall']:.4f}
- Rule baseline macro F1: {metrics['rule_baseline']['macro_f1']:.4f}
## Error Scenarios
- Largest false-positive scenarios at `p_large_training >= 0.5`: {top_scenarios_text(governance['false_positive_scenarios_at_0_5'])}
- Largest false-negative scenarios at `p_large_training >= 0.5`: {top_scenarios_text(governance['false_negative_scenarios_at_0_5'])}
## Calibration
- Brier score for `p_large_training`: {calibration['brier_large_training']:.4f}
- Expected calibration error for `p_large_training`: {calibration['expected_calibration_error_large_training']:.4f}
## Reproduce
```bash
python src/datacenter_verification_modeling/train_model.py \\
--features {features_path} \\
--output {model_run_dir} \\
--seed {split_manifest['seed']}
```
```bash
python src/datacenter_verification_modeling/evaluate_model.py \\
--model-run {model_run_dir} \\
--features {features_path}
```
```bash
python src/datacenter_verification_modeling/predict.py \\
--model-run {model_run_dir} \\
--features {features_path} \\
--output {model_run_dir / 'predictions_all.csv'}
```
## Limitations
- This model is trained on synthetic data only.
- Performance numbers are not real-world deployment claims.
- Adjacent windows are correlated, so group splitting by `episode_id` is mandatory.
- Synthetic labels are generated from rules and latent scenarios, so the model may learn generator assumptions.
- Real datacenter deployment would require calibration on real telemetry and controlled drills.
- The model should assist audit triage; it should not be treated as sole proof of a violation.
"""
(model_run_dir / "README.md").write_text(readme, encoding="utf-8")
def write_validation_summary(
model_run_dir: Path,
features_path: Path,
metrics: dict[str, Any],
validation_status: list[dict[str, Any]] | None,
) -> None:
if validation_status is None and (model_run_dir / "manifest.json").exists():
existing_manifest = read_json(model_run_dir / "manifest.json")
validation_status = existing_manifest.get("validation_status") or None
required_rows = []
for filename in REQUIRED_OUTPUTS:
path = model_run_dir / filename
exists = path.exists() or filename == "validation_summary.md"
required_rows.append(f"- `{filename}`: {'present' if exists else 'missing'}")
validation_lines = []
if validation_status:
for item in validation_status:
validation_lines.append(
f"- `{item['name']}`: return code {item['returncode']}; "
f"{'PASS' if item['returncode'] == 0 else 'FAIL'}"
)
else:
validation_lines.append("- Dataset validation was not rerun by this evaluation command; see `manifest.json` for any recorded training validation status.")
model_metrics = metrics["model"]
governance = metrics["governance"]
text = f"""# Validation Summary
Feature table: `{features_path}`
## Dataset Validation Before Training
{chr(10).join(validation_lines)}
## Required Artifacts
{chr(10).join(required_rows)}
## Test Metrics
- Accuracy: {model_metrics['accuracy']:.4f}
- Macro F1: {model_metrics['macro_f1']:.4f}
- Label 3/4 precision: {governance['label_3_4_predicted_label']['precision']:.4f}
- Label 3/4 recall: {governance['label_3_4_predicted_label']['recall']:.4f}
- `p_large_training >= 0.5` precision: {governance['p_large_training_threshold_0_5']['precision']:.4f}
- `p_large_training >= 0.5` recall: {governance['p_large_training_threshold_0_5']['recall']:.4f}
## Governance Checks
- Capacity gate applied to post-processed probabilities.
- Negative certification confidence is `p_label_0 * min_critical_coverage`.
- Integrity warnings are reported separately from positive training evidence.
- Raw model probabilities are retained as `raw_p_label_0` through `raw_p_label_4`.
"""
(model_run_dir / "validation_summary.md").write_text(text, encoding="utf-8")
def write_manifest(
model_run_dir: Path,
features_path: Path,
metrics: dict[str, Any],
calibration: dict[str, Any],
validation_status: list[dict[str, Any]] | None,
training_metadata: dict[str, Any] | None,
) -> None:
existing: dict[str, Any] = {}
manifest_path = model_run_dir / "manifest.json"
if manifest_path.exists():
existing = read_json(manifest_path)
artifact_hashes = {
filename: sha256_file(model_run_dir / filename)
for filename in REQUIRED_OUTPUTS
if (model_run_dir / filename).exists() and filename != "manifest.json"
}
manifest = {
**existing,
"created_or_updated_at": utc_now_iso(),
"model_run_id": model_run_dir.name,
"features_path": str(features_path),
"model_type": "CalibratedClassifierCV over HistGradientBoostingClassifier",
"calibration_method": existing.get("calibration_method", "sigmoid_on_validation_split"),
"metrics_summary": {
"accuracy": metrics["model"]["accuracy"],
"macro_f1": metrics["model"]["macro_f1"],
"weighted_f1": metrics["model"]["weighted_f1"],
"label_3_4_precision": metrics["governance"]["label_3_4_predicted_label"]["precision"],
"label_3_4_recall": metrics["governance"]["label_3_4_predicted_label"]["recall"],
"p_large_training_precision_at_0_5": metrics["governance"]["p_large_training_threshold_0_5"]["precision"],
"p_large_training_recall_at_0_5": metrics["governance"]["p_large_training_threshold_0_5"]["recall"],
"brier_large_training": calibration["brier_large_training"],
"ece_large_training": calibration["expected_calibration_error_large_training"],
},
"validation_status": validation_status or existing.get("validation_status", []),
"training_metadata": training_metadata or existing.get("training_metadata", {}),
"required_outputs": REQUIRED_OUTPUTS,
"artifact_hashes": artifact_hashes,
"limitations": [
"Trained on synthetic data only.",
"Performance numbers are not real-world deployment claims.",
"Episode-level group splitting is required because adjacent windows are correlated.",
"Synthetic labels encode generator assumptions.",
"Real deployment requires real telemetry calibration and controlled drills.",
],
}
write_json(manifest_path, manifest)
def evaluate_model_run(
model_run_dir: Path,
features_path: Path,
validation_status: list[dict[str, Any]] | None = None,
training_metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
model_run_dir.mkdir(parents=True, exist_ok=True)
feature_df, predictions = predict_for_model_run(model_run_dir, features_path)
predictions.to_csv(model_run_dir / "predictions_all.csv", index=False)
test_predictions = predictions[predictions["split"] == "test"].copy()
test_features = feature_df[feature_df["split"] == "test"].copy()
if test_predictions.empty:
raise ValueError("no test split rows available for evaluation")
test_predictions.to_csv(model_run_dir / "predictions_test.csv", index=False)
metrics, calibration = compute_metrics(test_predictions, test_features)
write_json(model_run_dir / "metrics.json", metrics)
write_json(model_run_dir / "calibration_metrics.json", calibration)
write_confusion_matrix(model_run_dir / "confusion_matrix.csv", test_predictions)
write_classification_report(model_run_dir / "classification_report.json", test_predictions)
importance = compute_feature_importance(model_run_dir, test_features, test_predictions)
importance.to_csv(model_run_dir / "feature_importance.csv", index=False)
audit = evidence_audit_sample(test_predictions)
audit.to_csv(model_run_dir / "evidence_audit_sample.csv", index=False)
write_run_readme(model_run_dir, features_path, metrics, calibration)
write_manifest(model_run_dir, features_path, metrics, calibration, validation_status, training_metadata)
write_validation_summary(model_run_dir, features_path, metrics, validation_status)
write_manifest(model_run_dir, features_path, metrics, calibration, validation_status, training_metadata)
return metrics
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--model-run", type=Path, required=True)
parser.add_argument("--features", type=Path, required=True)
args = parser.parse_args(argv)
metrics = evaluate_model_run(args.model_run, args.features)
print(f"accuracy: {metrics['model']['accuracy']:.4f}")
print(f"macro_f1: {metrics['model']['macro_f1']:.4f}")
print(f"label_3_4_precision: {metrics['governance']['label_3_4_predicted_label']['precision']:.4f}")
print(f"label_3_4_recall: {metrics['governance']['label_3_4_predicted_label']['recall']:.4f}")
return 0
if __name__ == "__main__":
raise SystemExit(main())