Spaces:
Sleeping
Sleeping
| """Sklearn model loading and one-row inference for the live API.""" | |
| from __future__ import annotations | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from src.datacenter_verification_api import __version__, build_info | |
| from src.datacenter_verification_api.feature_completion import ( | |
| FeatureSchema, | |
| complete_features, | |
| jsonable, | |
| normalize_mapping, | |
| number_or_none, | |
| ) | |
| from src.datacenter_verification_api.schemas import MetadataResponse, PredictRequest, PredictResponse | |
| from src.datacenter_verification_modeling.common import ( | |
| LABELS, | |
| PROB_COLUMNS, | |
| add_governance_outputs, | |
| model_input_frame, | |
| probability_frame, | |
| read_json, | |
| ) | |
| REPO_ROOT = Path(__file__).resolve().parents[2] | |
| DEFAULT_MODEL_RUN_DIR = Path("data/model_runs/synthetic_v1_baseline") | |
| DEFAULT_FEATURE_TABLE = Path("data/synthetic_v1/features/window_features_all.csv") | |
| KNOWN_NON_MODEL_METADATA_FIELDS = { | |
| "capacity_evidence_only", | |
| "integrity_evidence_only", | |
| "physical_evidence_only", | |
| } | |
| NO_ACTIVE_ALLOCATION_PROBABILITIES = { | |
| 0: 0.97, | |
| 1: 0.02, | |
| 2: 0.006, | |
| 3: 0.002, | |
| 4: 0.002, | |
| } | |
| NO_ACTIVE_ALLOCATION_KEYS = { | |
| "o2_max_concurrent_normalized_gpus", | |
| "o2_allocation_duration_hours", | |
| } | |
| def resolve_repo_path(value: str | Path | None, default: Path) -> Path: | |
| path = Path(value) if value else default | |
| if path.is_absolute(): | |
| return path | |
| return (REPO_ROOT / path).resolve() | |
| def repo_relative(path: Path | None) -> str | None: | |
| if path is None: | |
| return None | |
| try: | |
| return path.resolve().relative_to(REPO_ROOT).as_posix() | |
| except ValueError: | |
| return path.name | |
| def read_json_if_exists(path: Path) -> Any: | |
| return read_json(path) if path.exists() else {} | |
| def split_semicolon(value: Any) -> list[str]: | |
| if value is None: | |
| return [] | |
| try: | |
| if pd.isna(value): | |
| return [] | |
| except (TypeError, ValueError): | |
| pass | |
| return [item.strip() for item in str(value).split(";") if item.strip()] | |
| class ModelService: | |
| """Loaded model artifacts plus base-row lookup for prediction requests.""" | |
| def __init__(self, model_run_dir: Path, feature_table_path: Path | None = None) -> None: | |
| self.model_run_dir = model_run_dir | |
| self.feature_table_path = feature_table_path | |
| self.manifest = read_json_if_exists(model_run_dir / "manifest.json") | |
| self.metrics = read_json_if_exists(model_run_dir / "metrics.json") | |
| self.feature_columns: list[str] = read_json(model_run_dir / "feature_columns.json") | |
| self.preprocessor = joblib.load(model_run_dir / "preprocessing.joblib") | |
| self.model = joblib.load(model_run_dir / "model.joblib") | |
| self.feature_table = self._load_feature_table(feature_table_path) | |
| self.feature_schema = FeatureSchema.from_frame(self.feature_columns, self.feature_table) | |
| self.feature_lookup = self._build_feature_lookup(self.feature_table) | |
| self.model_run_id = str(self.manifest.get("model_run_id") or model_run_dir.name) | |
| def from_env(cls) -> "ModelService": | |
| model_run_dir = resolve_repo_path(os.getenv("DCV_MODEL_RUN_DIR"), DEFAULT_MODEL_RUN_DIR) | |
| feature_table = resolve_repo_path(os.getenv("DCV_FEATURE_TABLE"), DEFAULT_FEATURE_TABLE) | |
| return cls(model_run_dir=model_run_dir, feature_table_path=feature_table) | |
| def _load_feature_table(self, path: Path | None) -> pd.DataFrame | None: | |
| if path is None or not path.exists(): | |
| return None | |
| frame = pd.read_csv(path) | |
| if "feature_row_id" not in frame.columns: | |
| raise ValueError(f"feature table lacks feature_row_id column: {path}") | |
| return frame | |
| def _build_feature_lookup(self, frame: pd.DataFrame | None) -> dict[str, dict[str, Any]]: | |
| if frame is None: | |
| return {} | |
| rows: dict[str, dict[str, Any]] = {} | |
| for record in frame.to_dict(orient="records"): | |
| row_id = record.get("feature_row_id") | |
| if row_id is not None: | |
| rows[str(row_id)] = dict(record) | |
| return rows | |
| def dataset_id(self) -> str | None: | |
| training_metadata = self.manifest.get("training_metadata", {}) | |
| if isinstance(training_metadata, dict): | |
| path = training_metadata.get("features_path") or self.manifest.get("features_path") | |
| else: | |
| path = self.manifest.get("features_path") | |
| if isinstance(path, str) and "synthetic_v1" in path: | |
| return "synthetic_v1" | |
| if self.feature_table is not None and "dataset_id" in self.feature_table.columns: | |
| values = self.feature_table["dataset_id"].dropna().unique() | |
| if len(values): | |
| return str(values[0]) | |
| return None | |
| def dataset_scale(self) -> str | None: | |
| if self.feature_table is not None and "dataset_id" in self.feature_table.columns: | |
| values = self.feature_table["dataset_id"].dropna().unique() | |
| if len(values): | |
| return str(values[0]) | |
| return None | |
| def model_type(self) -> str | None: | |
| value = self.manifest.get("model_type") | |
| if isinstance(value, str): | |
| return value | |
| training_metadata = self.manifest.get("training_metadata", {}) | |
| if isinstance(training_metadata, dict): | |
| inner = training_metadata.get("model_type") | |
| if isinstance(inner, str): | |
| return inner | |
| return type(self.model).__name__ | |
| def metadata(self) -> MetadataResponse: | |
| metrics_summary = self.manifest.get("metrics_summary") | |
| if not isinstance(metrics_summary, dict): | |
| metrics_summary = { | |
| "model": self.metrics.get("model", {}), | |
| "governance": self.metrics.get("governance", {}), | |
| "calibration": self.metrics.get("calibration", {}), | |
| } | |
| build = build_info() | |
| return MetadataResponse( | |
| api_version=__version__, | |
| build_sha=build.sha, | |
| build_source=build.source, | |
| model_run_id=self.model_run_id, | |
| model_run_dir=repo_relative(self.model_run_dir) or self.model_run_dir.name, | |
| feature_table=repo_relative(self.feature_table_path), | |
| dataset_id=self.dataset_id, | |
| dataset_scale=self.dataset_scale, | |
| model_type=self.model_type, | |
| metrics_summary=metrics_summary, | |
| feature_count=len(self.feature_columns), | |
| feature_columns=self.feature_columns, | |
| supported_labels=LABELS, | |
| base_row_lookup_enabled=bool(self.feature_lookup), | |
| ) | |
| def build_feature_row(self, request: PredictRequest) -> tuple[dict[str, Any], list[str], list[str]]: | |
| warnings: list[str] = [] | |
| debug_warnings: list[str] = [] | |
| base_row = None | |
| if request.feature_row_id: | |
| base_row = self.feature_lookup.get(request.feature_row_id) | |
| if base_row is None: | |
| warnings.append( | |
| "The selected datapoint was not found in the live API reference table. " | |
| "Live scoring may be less reliable because many model inputs may be missing." | |
| ) | |
| row: dict[str, Any] | |
| has_base_row = base_row is not None | |
| if base_row is not None: | |
| row = dict(base_row) | |
| else: | |
| row = {column: None for column in self.feature_columns} | |
| if request.feature_row_id: | |
| row["feature_row_id"] = request.feature_row_id | |
| context = normalize_mapping(request.context, self.feature_schema) | |
| features = normalize_mapping(request.features, self.feature_schema) | |
| changed_keys: set[str] = set() | |
| feature_keys: set[str] = set(features) | |
| for source_name, values in [("context", context), ("features", features)]: | |
| for key, value in values.items(): | |
| row[key] = value | |
| changed_keys.add(key) | |
| if key in {"feature_row_id", *KNOWN_NON_MODEL_METADATA_FIELDS}: | |
| if key in KNOWN_NON_MODEL_METADATA_FIELDS: | |
| debug_warnings.append(f"{source_name} field is metadata-only and was not sent to the model: {key}") | |
| elif key not in self.feature_columns: | |
| warnings.append(f"The live API ignored an unrecognized input field: {key}") | |
| for column in self.feature_columns: | |
| row.setdefault(column, None) | |
| row, completion_warnings = complete_features( | |
| row, | |
| changed_keys, | |
| has_base_row=has_base_row, | |
| derive=request.derive, | |
| edited_feature_keys=feature_keys, | |
| ) | |
| warnings.extend(completion_warnings) | |
| if not has_base_row: | |
| null_count = sum(1 for column in self.feature_columns if row.get(column) is None) | |
| if null_count > len(self.feature_columns) // 3: | |
| warnings.append( | |
| f"The live API is missing {null_count} of {len(self.feature_columns)} model inputs. " | |
| "Choose a sampled datapoint before editing controls for a more reliable live score." | |
| ) | |
| return row, warnings, debug_warnings | |
| def no_active_allocation_counterfactual(self, request: PredictRequest, row: dict[str, Any]) -> bool: | |
| if not request.derive: | |
| return False | |
| if not (NO_ACTIVE_ALLOCATION_KEYS & set(request.features)): | |
| return False | |
| allocation = number_or_none(row.get("o2_max_concurrent_normalized_gpus")) | |
| duration = number_or_none(row.get("o2_allocation_duration_hours")) | |
| return allocation is not None and allocation <= 0 and duration is not None and duration <= 0 | |
| def predict(self, request: PredictRequest) -> PredictResponse: | |
| row, warnings, debug_warnings = self.build_feature_row(request) | |
| record = { | |
| column: (np.nan if row.get(column) is None else row.get(column)) | |
| for column in sorted(set(row) | set(self.feature_columns)) | |
| } | |
| frame = pd.DataFrame([record]) | |
| model_frame = model_input_frame(frame, self.feature_columns) | |
| transformed = self.preprocessor.transform(model_frame) | |
| raw_probabilities = probability_frame(self.model, transformed) | |
| governance = add_governance_outputs(frame, raw_probabilities) | |
| raw = raw_probabilities.iloc[0] | |
| post = governance.iloc[0] | |
| probabilities = [float(post[f"p_label_{label}"]) for label in LABELS] | |
| predicted_label = int(post["predicted_label"]) | |
| p_large_training = float(post["p_large_training"]) | |
| severity_score = float(post["severity_score"]) | |
| negative_certification_confidence = float(post["negative_certification_confidence"]) | |
| integrity_warning = bool(post["integrity_warning"]) | |
| top_evidence = split_semicolon(post["top_evidence"]) | |
| if self.no_active_allocation_counterfactual(request, row): | |
| probabilities = [float(NO_ACTIVE_ALLOCATION_PROBABILITIES[label]) for label in LABELS] | |
| predicted_label = 0 | |
| p_large_training = float(NO_ACTIVE_ALLOCATION_PROBABILITIES[3] + NO_ACTIVE_ALLOCATION_PROBABILITIES[4]) | |
| severity_score = float(sum(label * NO_ACTIVE_ALLOCATION_PROBABILITIES[label] for label in LABELS)) | |
| negative_certification_confidence = float( | |
| NO_ACTIVE_ALLOCATION_PROBABILITIES[0] * float(post["min_critical_coverage"]) | |
| ) | |
| integrity_warning = False | |
| top_evidence = ["no active allocation", "no strong positive evidence"] | |
| debug_warnings.append("applied no-active-allocation counterfactual override") | |
| completed_features = { | |
| column: jsonable(row.get(column)) | |
| for column in self.feature_columns | |
| if request.return_completed_features | |
| } | |
| return PredictResponse( | |
| model_run_id=self.model_run_id, | |
| feature_row_id=request.feature_row_id or jsonable(row.get("feature_row_id")), | |
| predicted_label=predicted_label, | |
| p_large_training=p_large_training, | |
| severity_score=severity_score, | |
| negative_certification_confidence=negative_certification_confidence, | |
| integrity_warning=integrity_warning, | |
| capacity_possible=bool(post["capacity_possible"]), | |
| min_critical_coverage=float(post["min_critical_coverage"]), | |
| probabilities=probabilities, | |
| probability_by_label={str(label): float(probabilities[index]) for index, label in enumerate(LABELS)}, | |
| raw_probability_by_label={str(label): float(raw[f"p_label_{label}"]) for label in LABELS}, | |
| top_evidence=top_evidence, | |
| critical_missing_layers=split_semicolon(post["critical_missing_layers"]), | |
| input_warnings=warnings, | |
| debug_warnings=debug_warnings, | |
| completed_features=completed_features, | |
| ) | |