boq-api / utils /model.py
gabcares's picture
Upload 80 files
72fdabd verified
Raw
History Blame Contribute Delete
28.2 kB
import json
import logging
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.base import BaseEstimator
from dataclasses import dataclass, field
from .preprocessing import QSPreprocessor
from sklearn.metrics import accuracy_score, f1_score
from sklearn.calibration import CalibratedClassifierCV, LabelEncoder
from typing import Any, Dict, Iterable, Mapping, Sequence, Tuple, Union
# Type Aliases
FrameLike = Union[pd.DataFrame, Dict[str, Iterable[Any]]]
LabelArrayLike = Union[pd.Series, Sequence[str], np.ndarray]
@dataclass
class QSSelectiveCalibratedModel:
"""
Production-ready selective QS classifier.
This object is the single source of truth for:
- Schema normalization and enforcement
- Schema drift logging
- Label encoding and class management
- Calibrated, thresholded selective prediction
- NRM hierarchy inference
- Lineage and metadata exposure
It is designed to be:
- Mostly Hermetic (zero external internal module dependencies)
- Immutable (no .fit methods)
- Deterministic (same input → same output)
- Routes low-confidence predictions to 'Others'
- Audit-ready (drift logs + lineage)
- Deployment-safe (API- and batch-friendly)
"""
calibrator: CalibratedClassifierCV
preprocessor: QSPreprocessor
label_encoder: LabelEncoder
threshold: float
others_label: str = "Others"
# NRM hierarchy map: category → (lvl1, lvl2, lvl3, lvl4)
nrm_hierarchy_map: Mapping[str, Tuple[str, str, str, str]] = field(
default_factory=dict
)
# Versioning / lineage
model_version: str = "v1.0.0"
schema_version: str = "v1"
hierarchy_version: str = "v1"
model_name: str = field(init=False)
# Drift logging
drift_log_path: Path = Path("schema_drift.log")
# -------------------------------------------------------------------------
# Class-level schema definitions
# -------------------------------------------------------------------------
TRAIN_SCHEMA: Tuple[str, ...] = field(default_factory=tuple)
# Semantic aliases (Inputs are already snake_cased by the time this runs)
RENAME_MAP: Dict[str, str] = field(
default_factory=lambda: {
"desc": "description",
"items": "item",
"units": "uom",
"quantity": "qty",
}
)
# -------------------------------------------------------------------------
# INIT
# -------------------------------------------------------------------------
def __post_init__(self):
# Canonical class space = EXACT allowlist from the fitted label encoder
self.classes_ = self.label_encoder.classes_
self.n_classes_ = len(self.classes_)
self.threshold = float(self.threshold)
self.drift_log_path = Path(self.drift_log_path)
self.TRAIN_SCHEMA = self._get_train_schema()
base_algo = self._get_base_algorithm_name(self.calibrator)
# Dynamically generate model name for logging and lineage
self.model_name = (
f"QS_{base_algo}_SelectiveCalibratedModel_{self.model_version}"
)
self._logger = logging.getLogger(self.model_name)
if not self._logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
)
self._logger.addHandler(handler)
self._logger.setLevel(logging.INFO)
# -------------------------------------------------------------------------
# COLLAPSE LOGIC — EXACTLY MATCHES CALIBRATION PIPELINE
# -------------------------------------------------------------------------
def collapse_labels(self, y: pd.Series) -> pd.Series:
"""
EXACT collapse logic used in calibration:
- normalize to string
- collapse unseen/rare labels → Others
"""
known = set(self.classes_) # EXACT allowlist
# 1. Cast the series to string safely
y_str = y.astype(str)
# 2. Vectorized condition:
# Keep the value if it is in the known vocabulary, otherwise replace with others_label.
# This completely avoids the type-unsafe LabelEncoder loop and is 100x faster.
y_collapsed = y_str.where(y_str.isin(known), self.others_label)
return y_collapsed
# -------------------------------------------------------------------------
# DECODE LOGIC — EXACTLY MATCHES CALIBRATION
# -------------------------------------------------------------------------
def decode_predictions(self, max_idx: np.ndarray) -> np.ndarray:
"""
Decode integer predictions back to canonical string labels.
argmax index → calibrator.classes_ (integers) → label_encoder.inverse_transform → strings
"""
pred_integers = self.calibrator.classes_[max_idx]
pred_strings = self.label_encoder.inverse_transform(pred_integers)
return pred_strings
# -------------------------------------------------------------------------
# Public API: high-level prediction with hierarchy + lineage
# -------------------------------------------------------------------------
def predict_full(
self,
X_raw: FrameLike,
) -> pd.DataFrame:
"""
Run the full QS inference pipeline:
- Normalize BoQ schema
- Log schema drift (if any)
- Preprocess and predict categories
- Compute calibrated confidence
- Infer NRM hierarchy levels
- Smart dynamic hierarchy placement (Overwrite missing, interleave existing)
- Attach scalar lineage metadata as columns for downstream analysis
Parameters
----------
X_raw : FrameLike
Raw input data (DataFrame or dict-like) in BoQ-style schema.
Returns
-------
pd.DataFrame
DataFrame containing:
- normalized input columns
- Predicted_Category
- Prediction_Confidence
- ge, group_element, e, element (NRM hierarchy)
- lineage metadata columns
"""
df = self._to_dataframe(X_raw)
df_norm = self._normalize_boq_schema(df)
y_pred, conf = self.selective_predict(df_norm)
df_out = df_norm.copy()
df_out["Predicted_Category"] = y_pred
df_out["Prediction_Confidence"] = conf.round(4)
hierarchy_df = df_out["Predicted_Category"].apply(
lambda cat: self.infer_nrm_hierarchy(cat)
)
hierarchy_df = pd.DataFrame(list(hierarchy_df))
# ---------------------------------------------------------
# SMART HIERARCHY PLACEMENT
# If original is missing (all NaNs), overwrite it directly.
# If original exists, keep it and create 'Predicted_...' next to it.
# ---------------------------------------------------------
hierarchy_cols = ["ge", "group_element", "e", "element"]
dynamic_pairs = {}
for col in hierarchy_cols:
# Check if the original column is completely empty/missing
if df_out[col].isna().all():
# Overwrite the empty original column directly
df_out[col] = hierarchy_df[col]
dynamic_pairs[col] = None # Flag to NOT create a duplicate
else:
# User provided data; preserve it and create the Predicted column
pred_col = f"Predicted_{col}"
df_out[pred_col] = hierarchy_df[col]
dynamic_pairs[col] = pred_col
# Attach lineage metadata as columns (ONLY scalars to prevent broadcasting errors)
meta = self.metadata()
meta_cols = []
for k, v in meta.items():
if isinstance(v, (str, int, float, bool)):
col_name = f"meta_{k}"
df_out[col_name] = v
meta_cols.append(col_name)
# ---------------------------------------------------------
# SMART COLUMN ORDERING
# ---------------------------------------------------------
ordered_cols = []
for col in self.TRAIN_SCHEMA:
ordered_cols.append(col)
# Insert the predicted column immediately after the original, if we created one
if col in dynamic_pairs and dynamic_pairs[col] is not None:
ordered_cols.append(dynamic_pairs[col])
# Append final ML outputs
ordered_cols.extend(["Predicted_Category", "Prediction_Confidence"])
# Append Lineage
ordered_cols.extend(meta_cols)
# Safely catch any extra payload columns to prevent dropping data
remaining = [c for c in df_out.columns if c not in ordered_cols]
ordered_cols.extend(remaining)
return df_out[ordered_cols]
# -------------------------------------------------------------------------
# Core selective prediction
# -------------------------------------------------------------------------
def selective_predict(
self,
X_raw: FrameLike,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Predict with selective routing using calibrated probabilities and
a frozen confidence threshold.
Parameters
----------
X_raw : FrameLike
Raw input data (DataFrame or dict-like).
Returns
-------
Tuple[np.ndarray, np.ndarray]
- y_pred: predicted labels or Others
- max_conf: max calibrated confidence per sample
"""
df = self._to_dataframe(X_raw)
df_norm = self._normalize_boq_schema(df)
X_t = self.preprocessor.transform(df_norm)
proba = self.calibrator.predict_proba(X_t)
max_conf = proba.max(axis=1)
max_idx = proba.argmax(axis=1)
pred_strings = self.decode_predictions(max_idx)
y_pred = np.where(
max_conf >= self.threshold,
pred_strings,
self.others_label,
)
return y_pred, max_conf
def predict(self, X_raw: FrameLike) -> np.ndarray:
"""
Standard prediction API: returns only labels (with Others routing).
"""
y_pred, _ = self.selective_predict(X_raw)
return y_pred
def predict_with_confidence(
self,
X_raw: FrameLike,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Explicit alias for selective_predict for clarity in external APIs.
"""
return self.selective_predict(X_raw)
def predict_proba(self, X_raw: FrameLike) -> np.ndarray:
"""
Standard probability prediction API using calibrated probabilities.
"""
df = self._to_dataframe(X_raw)
df_norm = self._normalize_boq_schema(df)
X_t = self.preprocessor.transform(df_norm)
return self.calibrator.predict_proba(X_t)
# -------------------------------------------------------------------------
# Evaluation helper
# -------------------------------------------------------------------------
def evaluate(
self,
X_raw: FrameLike,
y_true_raw: LabelArrayLike,
) -> Dict[str, Any]:
"""
Governance-grade evaluation for the selective QS classifier.
This implementation mirrors the calibration pipeline to ensure:
- Schema normalization is applied exactly once
- y_true is validated, collapsed, and encoded consistently
- Predictions are decoded back to canonical string labels
- Selective thresholding is applied post-decoding
- Row alignment is guaranteed (no index drift)
- Metrics reflect true model performance
"""
# ---------------------------------------------------------
# 1. Normalize X once (same path as production inference)
# ---------------------------------------------------------
df_raw = self._to_dataframe(X_raw)
df_norm = self._normalize_boq_schema(df_raw).reset_index(drop=True)
# ---------------------------------------------------------
# 2. Validate and standardize y_true
# ---------------------------------------------------------
y_true = self.validate_y_true(y_true_raw)
# ---------------------------------------------------------
# 3. Collapse y_true using training logic
# ---------------------------------------------------------
y_true_collapsed = self.collapse_labels(y_true)
# ---------------------------------------------------------
# 4. Encode y_true using the same label encoder used in training
# ---------------------------------------------------------
# Governance guardrail: ensure label encoder can transform y_true
_ = self.label_encoder.transform(y_true_collapsed)
# ---------------------------------------------------------
# 5. Predict calibrated probabilities
# ---------------------------------------------------------
X_t = self.preprocessor.transform(df_norm)
proba = self.calibrator.predict_proba(X_t)
max_conf = proba.max(axis=1)
max_idx = proba.argmax(axis=1)
# ---------------------------------------------------------
# 6. Decode predictions back to canonical string labels
# ---------------------------------------------------------
pred_strings = self.decode_predictions(max_idx)
# ---------------------------------------------------------
# 7. Apply selective threshold AFTER decoding
# ---------------------------------------------------------
y_pred = np.where(
max_conf >= self.threshold,
pred_strings,
self.others_label,
)
# ---------------------------------------------------------
# 8. Confidence diagnostics
# ---------------------------------------------------------
mask_acc = y_pred != self.others_label
accepted = max_conf[mask_acc]
rejected = max_conf[~mask_acc]
# ---------------------------------------------------------
# 9. Return governance metrics
# ---------------------------------------------------------
return {
"macro_f1": float(f1_score(y_true_collapsed, y_pred, average="macro")),
"weighted_f1": float(
f1_score(y_true_collapsed, y_pred, average="weighted")
),
"accuracy": float(accuracy_score(y_true_collapsed, y_pred)),
"others_rate": float(np.mean(y_pred == self.others_label)),
"accepted_conf_mean": float(accepted.mean()) if accepted.size else None,
"accepted_conf_std": float(accepted.std()) if accepted.size else None,
"rejected_conf_mean": float(rejected.mean()) if rejected.size else None,
"rejected_conf_std": float(rejected.std()) if rejected.size else None,
"threshold": self.threshold,
}
# -------------------------------------------------------------------------
# NRM hierarchy inference
# -------------------------------------------------------------------------
def infer_nrm_hierarchy(self, category: str) -> Dict[str, str]:
"""
Infer NRM hierarchy levels from a predicted category.
QS logic only — the mapping should be maintained as a single source
of truth and versioned alongside the model.
Parameters
----------
category : str
Predicted NRM category.
Returns
-------
Dict[str, str]
Dictionary with keys: ge, group_element, e, element.
"""
lvl1, lvl2, lvl3, lvl4 = self.nrm_hierarchy_map.get(
category,
("Unknown", "Unknown", "Unknown", "Unknown"),
)
return {
"ge": lvl1,
"group_element": lvl2,
"e": lvl3,
"element": lvl4,
}
# -------------------------------------------------------------------------
# Schema normalization + drift logging
# -------------------------------------------------------------------------
def _normalize_boq_schema(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize any incoming BoQ-like dataframe into the canonical schema
expected by the preprocessor and classifier.
Beware of Train Serving skew or pipeline skew if your training data was not normalized with the same logic!
Responsibilities:
- Hermetic Snake-Casing: Inline vectorization mirroring `utils.preprocessing.to_snake_case`
as the single source of truth without relying on external file dependencies.
- Rename alternative names → canonical names
- Add missing columns with safe defaults (NaN)
- Reorder columns
- Log schema drift events
"""
original_cols = list(df.columns)
# RAW DRIFT
self._log_schema_drift(
stage="raw",
incoming_cols=set(original_cols),
normalized_cols=set(),
final_cols=set(),
)
df = df.copy()
# ---------------------------------------------------------
# HERMETIC SNAKE_CASE VECTORIZATION
# ---------------------------------------------------------
df.columns = (
df.columns.str.replace(
r"(?<=[a-z0-9])(?=[A-Z])", "_", regex=True
) # Split CamelCase/PascalCase
.str.lower() # Convert to lowercase
.str.replace(r"[\s/()-]+", "_", regex=True) # Replace separators with '_'
.str.replace(r"[^0-9a-z_]", "", regex=True) # Remove invalid characters
.str.strip("_") # Trim leading/trailing '_'
)
# Apply rename map (only for columns that exist)
rename_map = {k: v for k, v in self.RENAME_MAP.items() if k in df.columns}
df = df.rename(columns=rename_map)
# Ensure all TRAIN_SCHEMA columns exist (even if not canonical)
for col in self.TRAIN_SCHEMA:
if col not in df.columns:
df[col] = np.nan
# POST-NORMALIZATION DRIFT
self._log_schema_drift(
stage="post_normalization",
incoming_cols=set(original_cols),
normalized_cols=set(df.columns),
final_cols=set(),
)
# Reorder to training schema
df = df[list(self.TRAIN_SCHEMA)]
# FINAL DRIFT
self._log_schema_drift(
stage="final",
incoming_cols=set(original_cols),
normalized_cols=set(df.columns),
final_cols=set(df.columns),
)
# VALIDATE
self._validate_final_schema(set(df.columns))
return df
def _log_schema_drift(
self,
stage: str,
incoming_cols: set[str],
normalized_cols: set[str],
final_cols: set[str],
) -> None:
"""
Log schema drift at different stages of normalization.
Parameters
----------
stage : str
One of {"raw", "post_normalization", "final"} indicating when drift is logged.
incoming_cols : set
Columns present in the raw incoming dataframe.
normalized_cols : set
Columns after normalization (lowercasing, renaming, dtype enforcement).
final_cols : set
Columns after final filtering to TRAIN_SCHEMA.
Notes
-----
- RAW drift detects unexpected columns in the incoming dataset.
- POST-NORMALIZATION drift detects mismatches after canonicalization.
- FINAL drift should always be empty; otherwise, the model will misbehave.
"""
expected = set(self.TRAIN_SCHEMA)
event: Dict[str, Any] = {
"stage": stage,
"model_version": self.model_version,
"schema_version": self.schema_version,
"hierarchy_version": self.hierarchy_version,
"model_name": self.model_name,
}
has_drift = False
# Evaluate metrics selectively based on the active stage
if stage == "raw":
extra_in_raw = incoming_cols - expected
if extra_in_raw:
event["extra_in_raw"] = sorted(list(extra_in_raw))
has_drift = True
elif stage == "post_normalization":
extra_after_norm = normalized_cols - expected
missing_after_norm = expected - normalized_cols
if extra_after_norm or missing_after_norm:
event["extra_after_normalization"] = sorted(list(extra_after_norm))
event["missing_after_normalization"] = sorted(list(missing_after_norm))
has_drift = True
elif stage == "final":
missing_in_final = expected - final_cols
extra_in_final = final_cols - expected
if missing_in_final or extra_in_final:
event["missing_in_final"] = sorted(list(missing_in_final))
event["extra_in_final"] = sorted(list(extra_in_final))
has_drift = True
# Only append to file and log warning if actual drift occurred
if has_drift:
# Include exact column layouts for debugging context
event["incoming_columns"] = sorted(list(incoming_cols))
event["normalized_columns"] = sorted(list(normalized_cols))
event["final_columns"] = sorted(list(final_cols))
self._logger.warning(
"Schema drift detected at stage [%s]: %s", stage, event
)
self.drift_log_path.parent.mkdir(parents=True, exist_ok=True)
with self.drift_log_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(event) + "\n")
def read_schema_drift_log(self, max_lines: int = 200) -> str:
"""
Return the most recent schema drift events as a single string.
Parameters
----------
max_lines : int
Maximum number of lines to return from the drift log.
Returns
-------
str
Tail of the drift log. Empty string if no log exists.
"""
if not self.drift_log_path.exists():
return ""
with self.drift_log_path.open("r", encoding="utf-8") as f:
lines = f.readlines()
return "".join(lines[-max_lines:])
# -------------------------------------------------------------------------
# Lineage / metadata
# -------------------------------------------------------------------------
def metadata(self) -> Dict[str, Any]:
"""
Return lineage and metadata for this production object.
This is the single source of truth for:
- model version
- model name
- schema version
- hierarchy version
- threshold
- classes
"""
return {
"model_version": self.model_version,
"model_name": self.model_name,
"schema_version": self.schema_version,
"hierarchy_version": self.hierarchy_version,
"threshold": self.threshold,
"n_classes": self.n_classes_,
"classes": self.classes_.tolist(),
}
# -------------------------------------------------------------------------
# Utilities
# -------------------------------------------------------------------------
@staticmethod
def _to_dataframe(X_raw: FrameLike) -> pd.DataFrame:
"""
Convert raw input into a pandas DataFrame in a safe, predictable way.
"""
if isinstance(X_raw, pd.DataFrame):
return X_raw.copy()
return pd.DataFrame(X_raw)
@staticmethod
def _get_base_algorithm_name(estimator: BaseEstimator) -> str:
"""Recursively unwrap meta-estimators to find the core algorithm name."""
inner_estimator = getattr(estimator, "estimator", None)
if inner_estimator is not None:
return QSSelectiveCalibratedModel._get_base_algorithm_name(inner_estimator)
base_estimator = getattr(estimator, "base_estimator", None)
if base_estimator is not None:
return QSSelectiveCalibratedModel._get_base_algorithm_name(base_estimator)
final_estimator = getattr(estimator, "_final_estimator", None)
if final_estimator is not None:
return QSSelectiveCalibratedModel._get_base_algorithm_name(final_estimator)
return type(estimator).__name__
def validate_y_true(self, y_true_raw: LabelArrayLike) -> pd.Series:
"""
Validate and standardize y_true for evaluation.
Responsibilities:
- Ensure y_true is 1D
- Ensure index alignment (reset_index)
- Ensure string dtype
- Prevent common evaluation mistakes (e.g., passing column name)
- Governance guardrail: detect if y_true contains only 1 unique value
"""
y = pd.Series(y_true_raw)
if (
len(y) == 1
and isinstance(y.iloc[0], str)
and y.iloc[0] in self.TRAIN_SCHEMA
):
raise ValueError(
f"y_true appears to be the COLUMN NAME '{y.iloc[0]}' "
f"instead of the actual label values."
)
y = y.reset_index(drop=True).astype(str)
if y.nunique() <= 1:
self._logger.warning(
f"y_true has only {y.nunique()} unique value(s). "
f"This usually indicates a data extraction or alignment issue."
)
return y
def _get_train_schema(self) -> tuple[str, ...]:
"""
Return the canonical input schema that the fitted QSPreprocessor expects.
This method extracts `feature_names_in_` from the underlying
ColumnTransformer pipeline, which reflects the exact column names
and column order used during training.
Why this matters:
- ColumnTransformer is position‑sensitive (order must match training)
- Production normalization must align to this schema exactly
- Prevents feature misalignment and silent accuracy collapse
Returns
-------
tuple[str, ...]
Ordered column names that the preprocessor was trained on.
"""
pipeline = self.preprocessor.pipeline
assert hasattr(pipeline, "feature_names_in_"), (
"Preprocessor pipeline must be fitted before accessing feature_names_in_."
)
return tuple(pipeline.feature_names_in_)
def _validate_final_schema(self, final_cols: set) -> None:
"""
Ensure the final dataframe matches TRAIN_SCHEMA exactly.
Raises a hard error if misaligned.
"""
expected = set(self.TRAIN_SCHEMA)
missing = expected - final_cols
extra = final_cols - expected
if missing or extra:
raise ValueError(
f"Final schema mismatch.\n"
f"Missing columns: {sorted(list(missing))}\n"
f"Extra columns: {sorted(list(extra))}\n"
f"Expected: {sorted(list(expected))}\n"
f"Got: {sorted(list(final_cols))}"
)