idacy's picture
Upload live inference API deployment files
e4b1ed6 verified
Raw
History Blame Contribute Delete
7.71 kB
"""Deterministic evidence-rule baseline for synthetic v0.
The rule baseline: not intended to replace the supervised model; it provides a leakage-free point of comparison and encodes the study's governance constraints around capacity, missingness, and multi-layer evidence
"""
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
try:
from .common import LABELS, PROB_COLUMNS, as_bool, load_feature_table, normalize_probability_array, row_bool, row_float
except ImportError: # pragma: no cover - direct script execution
from common import LABELS, PROB_COLUMNS, as_bool, load_feature_table, normalize_probability_array, row_bool, row_float
FALSE_POSITIVE_RUNTIME_MARKERS = [
"inference",
"hpc",
"mpi",
"nccl",
"benchmark",
"burn_in",
"storage",
"etl",
"synthetic_data",
]
FALSE_POSITIVE_DECLARED_CLASSES = {
"inference",
"hpc",
"benchmark",
"burn_in",
"data",
"reserved",
"none",
}
def evidence_flags(row: pd.Series) -> dict[str, bool]:
capacity = row_bool(row, "capacity_possible")
external_conflict = row_float(row, "o17_external_capacity_conflict_score")
allocation_gpus = row_float(row, "o2_max_concurrent_normalized_gpus")
allocation_hours = row_float(row, "o2_allocation_duration_hours")
gpu_hours_ratio = row_float(row, "o2_gpu_hours_policy_ratio")
policy_ratio = row_float(row, "policy_compute_ratio")
gpu_util = row_float(row, "o4_gpu_util_p95")
gpu_duty = row_float(row, "o4_gpu_util_duty_gt_70")
tensor = row_float(row, "o4_sm_tensor_active_p95")
fabric_footprint = row_float(row, "o7_synchronized_fabric_footprint")
fabric_periodicity = row_float(row, "o7_collective_periodicity_score")
fabric_util = row_float(row, "o7_scaleout_port_util_p95")
rack_power = row_float(row, "o8_rack_power_fraction_p95")
checkpoint = row_float(row, "o11_checkpoint_periodicity_score")
checkpoint_tb = row_float(row, "o11_checkpoint_write_tb_per_event")
storage_pattern = row_float(row, "o11_read_write_training_pattern_score")
signed_logs = row_bool(row, "o12_signed_ml_logs_present")
declared_params = row_float(row, "o12_declared_parameter_count_b")
training_tokens = row_float(row, "o12_training_tokens_b")
optimizer_state = row_bool(row, "o12_optimizer_state_present")
min_coverage = row_float(row, "o14_min_critical_coverage", 1.0)
gap_fraction = row_float(row, "o14_gap_fraction_critical")
cc_fraction = row_float(row, "o13_confidential_compute_mode_fraction")
counter_resets = row_float(row, "o14_counter_reset_count")
runtime = str(row["o10_runtime_framework_class"]).lower() if "o10_runtime_framework_class" in row else ""
declared_class = str(row["o2_declared_workload_class"]).lower() if "o2_declared_workload_class" in row else ""
allocation = allocation_gpus >= 512 or gpu_hours_ratio >= 1.0 or policy_ratio >= 1.0 or (
allocation_gpus >= 256 and allocation_hours >= 12
)
gpu_activity = gpu_util >= 70 or gpu_duty >= 0.45 or tensor >= 0.60
fabric_sync = fabric_footprint >= 256 or fabric_periodicity >= 0.55 or fabric_util >= 0.65
physical_support = rack_power >= 0.55
storage_semantic = checkpoint >= 0.55 or checkpoint_tb >= 0.25 or storage_pattern >= 0.60
runtime_semantic = "training" in runtime or "fine_tune" in runtime or "pytorch_distributed" in runtime
ml_semantic = signed_logs or declared_params >= 50 or training_tokens >= 100 or optimizer_state
integrity = (
gap_fraction > 0.05
or min_coverage < 0.80
or cc_fraction > 0.50
or counter_resets > 0
or row_bool(row, "o15_unapproved_physical_change_near_window")
or str(row.get("o4_missing_reason", "")) == "counter_disabled_by_cc_mode"
)
false_positive_pattern = any(marker in runtime for marker in FALSE_POSITIVE_RUNTIME_MARKERS) or declared_class in (
FALSE_POSITIVE_DECLARED_CLASSES
)
reserved_without_activity = (
allocation_gpus >= 512
and gpu_util < 30
and fabric_footprint < 64
and fabric_periodicity < 0.20
and not storage_semantic
and not ml_semantic
)
return {
"capacity": capacity,
"external_conflict": external_conflict >= 0.5,
"allocation": allocation,
"gpu_activity": gpu_activity,
"fabric_sync": fabric_sync,
"physical_support": physical_support,
"storage_semantic": storage_semantic,
"runtime_semantic": runtime_semantic,
"ml_semantic": ml_semantic,
"signed_logs": signed_logs,
"integrity": integrity,
"false_positive_pattern": false_positive_pattern,
"reserved_without_activity": reserved_without_activity,
"strong_coverage": min_coverage >= 0.90 and gap_fraction <= 0.05,
"policy_scale": policy_ratio >= 1.0 or gpu_hours_ratio >= 1.0 or allocation_gpus >= 512,
}
def predict_rule_label(row: pd.Series) -> int:
flags = evidence_flags(row)
primary_count = int(flags["allocation"]) + int(flags["gpu_activity"]) + int(flags["fabric_sync"])
semantic_count = int(flags["runtime_semantic"]) + int(flags["storage_semantic"]) + int(flags["ml_semantic"])
if not flags["capacity"] and not flags["external_conflict"]:
return 0 if flags["strong_coverage"] else 1
if flags["signed_logs"] and flags["policy_scale"] and primary_count >= 2:
return 4
if (
flags["allocation"]
and flags["gpu_activity"]
and flags["fabric_sync"]
and flags["physical_support"]
and semantic_count >= 2
and flags["policy_scale"]
):
return 4
if flags["reserved_without_activity"]:
return 1
if flags["false_positive_pattern"] and (flags["gpu_activity"] or flags["fabric_sync"] or flags["physical_support"]):
return 2
if flags["integrity"] and not flags["ml_semantic"] and (primary_count >= 1 or flags["physical_support"]):
return 2
if primary_count >= 2 and (flags["physical_support"] or semantic_count >= 1):
return 3
if primary_count >= 1 or flags["physical_support"] or semantic_count >= 1:
return 2
return 1 if flags["capacity"] else 0
def predict_rule_labels(df: pd.DataFrame) -> pd.Series:
return df.apply(predict_rule_label, axis=1).astype(int)
def predict_rule_probabilities(df: pd.DataFrame, confidence: float = 0.82) -> pd.DataFrame:
labels = predict_rule_labels(df)
off_probability = (1.0 - confidence) / (len(LABELS) - 1)
probabilities = np.full((len(df), len(LABELS)), off_probability, dtype=float)
for row_index, label in enumerate(labels):
probabilities[row_index, LABELS.index(int(label))] = confidence
probabilities = normalize_probability_array(probabilities)
return pd.DataFrame(probabilities, columns=PROB_COLUMNS, index=df.index)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--features", type=Path, required=True)
parser.add_argument("--output", type=Path)
args = parser.parse_args(argv)
df = load_feature_table(args.features)
labels = predict_rule_labels(df)
out = df[["feature_row_id", "episode_id", "latent_workload_class", "label_0_to_4"]].copy()
out["rule_predicted_label"] = labels
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
out.to_csv(args.output, index=False)
else:
print(out.head(20).to_string(index=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())