| import json
|
| import logging
|
| import numpy as np
|
| import pandas as pd
|
| from pathlib import Path
|
| from sklearn.base import BaseEstimator
|
| from dataclasses import dataclass, field
|
| from .preprocessing import QSPreprocessor
|
| from sklearn.metrics import accuracy_score, f1_score
|
| from sklearn.calibration import CalibratedClassifierCV, LabelEncoder
|
| from typing import Any, Dict, Iterable, Mapping, Sequence, Tuple, Union
|
|
|
|
|
|
|
| FrameLike = Union[pd.DataFrame, Dict[str, Iterable[Any]]]
|
| LabelArrayLike = Union[pd.Series, Sequence[str], np.ndarray]
|
|
|
|
|
| @dataclass
|
| class QSSelectiveCalibratedModel:
|
| """
|
| Production-ready selective QS classifier.
|
|
|
| This object is the single source of truth for:
|
| - Schema normalization and enforcement
|
| - Schema drift logging
|
| - Label encoding and class management
|
| - Calibrated, thresholded selective prediction
|
| - NRM hierarchy inference
|
| - Lineage and metadata exposure
|
|
|
| It is designed to be:
|
| - Mostly Hermetic (zero external internal module dependencies)
|
| - Immutable (no .fit methods)
|
| - Deterministic (same input → same output)
|
| - Routes low-confidence predictions to 'Others'
|
| - Audit-ready (drift logs + lineage)
|
| - Deployment-safe (API- and batch-friendly)
|
| """
|
|
|
| calibrator: CalibratedClassifierCV
|
| preprocessor: QSPreprocessor
|
| label_encoder: LabelEncoder
|
| threshold: float
|
| others_label: str = "Others"
|
|
|
|
|
| nrm_hierarchy_map: Mapping[str, Tuple[str, str, str, str]] = field(
|
| default_factory=dict
|
| )
|
|
|
|
|
| model_version: str = "v1.0.0"
|
| schema_version: str = "v1"
|
| hierarchy_version: str = "v1"
|
|
|
| model_name: str = field(init=False)
|
|
|
|
|
| drift_log_path: Path = Path("schema_drift.log")
|
|
|
|
|
|
|
|
|
| TRAIN_SCHEMA: Tuple[str, ...] = field(default_factory=tuple)
|
|
|
|
|
| RENAME_MAP: Dict[str, str] = field(
|
| default_factory=lambda: {
|
| "desc": "description",
|
| "items": "item",
|
| "units": "uom",
|
| "quantity": "qty",
|
| }
|
| )
|
|
|
|
|
|
|
|
|
| def __post_init__(self):
|
|
|
| self.classes_ = self.label_encoder.classes_
|
| self.n_classes_ = len(self.classes_)
|
|
|
| self.threshold = float(self.threshold)
|
| self.drift_log_path = Path(self.drift_log_path)
|
| self.TRAIN_SCHEMA = self._get_train_schema()
|
|
|
| base_algo = self._get_base_algorithm_name(self.calibrator)
|
|
|
|
|
| self.model_name = (
|
| f"QS_{base_algo}_SelectiveCalibratedModel_{self.model_version}"
|
| )
|
|
|
| self._logger = logging.getLogger(self.model_name)
|
| if not self._logger.handlers:
|
| handler = logging.StreamHandler()
|
| handler.setFormatter(
|
| logging.Formatter(
|
| "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
| )
|
| )
|
| self._logger.addHandler(handler)
|
| self._logger.setLevel(logging.INFO)
|
|
|
|
|
|
|
|
|
| def collapse_labels(self, y: pd.Series) -> pd.Series:
|
| """
|
| EXACT collapse logic used in calibration:
|
|
|
| - normalize to string
|
| - collapse unseen/rare labels → Others
|
| """
|
| known = set(self.classes_)
|
|
|
|
|
| y_str = y.astype(str)
|
|
|
|
|
|
|
|
|
| y_collapsed = y_str.where(y_str.isin(known), self.others_label)
|
|
|
| return y_collapsed
|
|
|
|
|
|
|
|
|
| def decode_predictions(self, max_idx: np.ndarray) -> np.ndarray:
|
| """
|
| Decode integer predictions back to canonical string labels.
|
| argmax index → calibrator.classes_ (integers) → label_encoder.inverse_transform → strings
|
| """
|
| pred_integers = self.calibrator.classes_[max_idx]
|
| pred_strings = self.label_encoder.inverse_transform(pred_integers)
|
| return pred_strings
|
|
|
|
|
|
|
|
|
| def predict_full(
|
| self,
|
| X_raw: FrameLike,
|
| ) -> pd.DataFrame:
|
| """
|
| Run the full QS inference pipeline:
|
|
|
| - Normalize BoQ schema
|
| - Log schema drift (if any)
|
| - Preprocess and predict categories
|
| - Compute calibrated confidence
|
| - Infer NRM hierarchy levels
|
| - Smart dynamic hierarchy placement (Overwrite missing, interleave existing)
|
| - Attach scalar lineage metadata as columns for downstream analysis
|
|
|
| Parameters
|
| ----------
|
| X_raw : FrameLike
|
| Raw input data (DataFrame or dict-like) in BoQ-style schema.
|
|
|
| Returns
|
| -------
|
| pd.DataFrame
|
| DataFrame containing:
|
| - normalized input columns
|
| - Predicted_Category
|
| - Prediction_Confidence
|
| - ge, group_element, e, element (NRM hierarchy)
|
| - lineage metadata columns
|
| """
|
| df = self._to_dataframe(X_raw)
|
| df_norm = self._normalize_boq_schema(df)
|
|
|
| y_pred, conf = self.selective_predict(df_norm)
|
|
|
| df_out = df_norm.copy()
|
| df_out["Predicted_Category"] = y_pred
|
| df_out["Prediction_Confidence"] = conf.round(4)
|
|
|
| hierarchy_df = df_out["Predicted_Category"].apply(
|
| lambda cat: self.infer_nrm_hierarchy(cat)
|
| )
|
| hierarchy_df = pd.DataFrame(list(hierarchy_df))
|
|
|
|
|
|
|
|
|
|
|
|
|
| hierarchy_cols = ["ge", "group_element", "e", "element"]
|
| dynamic_pairs = {}
|
|
|
| for col in hierarchy_cols:
|
|
|
| if df_out[col].isna().all():
|
|
|
| df_out[col] = hierarchy_df[col]
|
| dynamic_pairs[col] = None
|
| else:
|
|
|
| pred_col = f"Predicted_{col}"
|
| df_out[pred_col] = hierarchy_df[col]
|
| dynamic_pairs[col] = pred_col
|
|
|
|
|
| meta = self.metadata()
|
| meta_cols = []
|
| for k, v in meta.items():
|
| if isinstance(v, (str, int, float, bool)):
|
| col_name = f"meta_{k}"
|
| df_out[col_name] = v
|
| meta_cols.append(col_name)
|
|
|
|
|
|
|
|
|
| ordered_cols = []
|
| for col in self.TRAIN_SCHEMA:
|
| ordered_cols.append(col)
|
|
|
| if col in dynamic_pairs and dynamic_pairs[col] is not None:
|
| ordered_cols.append(dynamic_pairs[col])
|
|
|
|
|
| ordered_cols.extend(["Predicted_Category", "Prediction_Confidence"])
|
|
|
|
|
| ordered_cols.extend(meta_cols)
|
|
|
|
|
| remaining = [c for c in df_out.columns if c not in ordered_cols]
|
| ordered_cols.extend(remaining)
|
|
|
| return df_out[ordered_cols]
|
|
|
|
|
|
|
|
|
| def selective_predict(
|
| self,
|
| X_raw: FrameLike,
|
| ) -> Tuple[np.ndarray, np.ndarray]:
|
| """
|
| Predict with selective routing using calibrated probabilities and
|
| a frozen confidence threshold.
|
|
|
| Parameters
|
| ----------
|
| X_raw : FrameLike
|
| Raw input data (DataFrame or dict-like).
|
|
|
| Returns
|
| -------
|
| Tuple[np.ndarray, np.ndarray]
|
| - y_pred: predicted labels or Others
|
| - max_conf: max calibrated confidence per sample
|
| """
|
| df = self._to_dataframe(X_raw)
|
| df_norm = self._normalize_boq_schema(df)
|
|
|
| X_t = self.preprocessor.transform(df_norm)
|
| proba = self.calibrator.predict_proba(X_t)
|
|
|
| max_conf = proba.max(axis=1)
|
| max_idx = proba.argmax(axis=1)
|
|
|
| pred_strings = self.decode_predictions(max_idx)
|
|
|
| y_pred = np.where(
|
| max_conf >= self.threshold,
|
| pred_strings,
|
| self.others_label,
|
| )
|
|
|
| return y_pred, max_conf
|
|
|
| def predict(self, X_raw: FrameLike) -> np.ndarray:
|
| """
|
| Standard prediction API: returns only labels (with Others routing).
|
| """
|
| y_pred, _ = self.selective_predict(X_raw)
|
| return y_pred
|
|
|
| def predict_with_confidence(
|
| self,
|
| X_raw: FrameLike,
|
| ) -> Tuple[np.ndarray, np.ndarray]:
|
| """
|
| Explicit alias for selective_predict for clarity in external APIs.
|
| """
|
| return self.selective_predict(X_raw)
|
|
|
| def predict_proba(self, X_raw: FrameLike) -> np.ndarray:
|
| """
|
| Standard probability prediction API using calibrated probabilities.
|
| """
|
| df = self._to_dataframe(X_raw)
|
| df_norm = self._normalize_boq_schema(df)
|
| X_t = self.preprocessor.transform(df_norm)
|
| return self.calibrator.predict_proba(X_t)
|
|
|
|
|
|
|
|
|
| def evaluate(
|
| self,
|
| X_raw: FrameLike,
|
| y_true_raw: LabelArrayLike,
|
| ) -> Dict[str, Any]:
|
| """
|
| Governance-grade evaluation for the selective QS classifier.
|
|
|
| This implementation mirrors the calibration pipeline to ensure:
|
| - Schema normalization is applied exactly once
|
| - y_true is validated, collapsed, and encoded consistently
|
| - Predictions are decoded back to canonical string labels
|
| - Selective thresholding is applied post-decoding
|
| - Row alignment is guaranteed (no index drift)
|
| - Metrics reflect true model performance
|
| """
|
|
|
|
|
|
|
| df_raw = self._to_dataframe(X_raw)
|
| df_norm = self._normalize_boq_schema(df_raw).reset_index(drop=True)
|
|
|
|
|
|
|
|
|
| y_true = self.validate_y_true(y_true_raw)
|
|
|
|
|
|
|
|
|
| y_true_collapsed = self.collapse_labels(y_true)
|
|
|
|
|
|
|
|
|
|
|
| _ = self.label_encoder.transform(y_true_collapsed)
|
|
|
|
|
|
|
|
|
| X_t = self.preprocessor.transform(df_norm)
|
| proba = self.calibrator.predict_proba(X_t)
|
|
|
| max_conf = proba.max(axis=1)
|
| max_idx = proba.argmax(axis=1)
|
|
|
|
|
|
|
|
|
| pred_strings = self.decode_predictions(max_idx)
|
|
|
|
|
|
|
|
|
| y_pred = np.where(
|
| max_conf >= self.threshold,
|
| pred_strings,
|
| self.others_label,
|
| )
|
|
|
|
|
|
|
|
|
| mask_acc = y_pred != self.others_label
|
| accepted = max_conf[mask_acc]
|
| rejected = max_conf[~mask_acc]
|
|
|
|
|
|
|
|
|
| return {
|
| "macro_f1": float(f1_score(y_true_collapsed, y_pred, average="macro")),
|
| "weighted_f1": float(
|
| f1_score(y_true_collapsed, y_pred, average="weighted")
|
| ),
|
| "accuracy": float(accuracy_score(y_true_collapsed, y_pred)),
|
| "others_rate": float(np.mean(y_pred == self.others_label)),
|
| "accepted_conf_mean": float(accepted.mean()) if accepted.size else None,
|
| "accepted_conf_std": float(accepted.std()) if accepted.size else None,
|
| "rejected_conf_mean": float(rejected.mean()) if rejected.size else None,
|
| "rejected_conf_std": float(rejected.std()) if rejected.size else None,
|
| "threshold": self.threshold,
|
| }
|
|
|
|
|
|
|
|
|
| def infer_nrm_hierarchy(self, category: str) -> Dict[str, str]:
|
| """
|
| Infer NRM hierarchy levels from a predicted category.
|
|
|
| QS logic only — the mapping should be maintained as a single source
|
| of truth and versioned alongside the model.
|
|
|
| Parameters
|
| ----------
|
| category : str
|
| Predicted NRM category.
|
|
|
| Returns
|
| -------
|
| Dict[str, str]
|
| Dictionary with keys: ge, group_element, e, element.
|
| """
|
| lvl1, lvl2, lvl3, lvl4 = self.nrm_hierarchy_map.get(
|
| category,
|
| ("Unknown", "Unknown", "Unknown", "Unknown"),
|
| )
|
| return {
|
| "ge": lvl1,
|
| "group_element": lvl2,
|
| "e": lvl3,
|
| "element": lvl4,
|
| }
|
|
|
|
|
|
|
|
|
| def _normalize_boq_schema(self, df: pd.DataFrame) -> pd.DataFrame:
|
| """
|
| Normalize any incoming BoQ-like dataframe into the canonical schema
|
| expected by the preprocessor and classifier.
|
|
|
| Beware of Train Serving skew or pipeline skew if your training data was not normalized with the same logic!
|
|
|
| Responsibilities:
|
| - Hermetic Snake-Casing: Inline vectorization mirroring `utils.preprocessing.to_snake_case`
|
| as the single source of truth without relying on external file dependencies.
|
| - Rename alternative names → canonical names
|
| - Add missing columns with safe defaults (NaN)
|
| - Reorder columns
|
| - Log schema drift events
|
| """
|
| original_cols = list(df.columns)
|
|
|
|
|
| self._log_schema_drift(
|
| stage="raw",
|
| incoming_cols=set(original_cols),
|
| normalized_cols=set(),
|
| final_cols=set(),
|
| )
|
|
|
| df = df.copy()
|
|
|
|
|
|
|
|
|
| df.columns = (
|
| df.columns.str.replace(
|
| r"(?<=[a-z0-9])(?=[A-Z])", "_", regex=True
|
| )
|
| .str.lower()
|
| .str.replace(r"[\s/()-]+", "_", regex=True)
|
| .str.replace(r"[^0-9a-z_]", "", regex=True)
|
| .str.strip("_")
|
| )
|
|
|
|
|
| rename_map = {k: v for k, v in self.RENAME_MAP.items() if k in df.columns}
|
| df = df.rename(columns=rename_map)
|
|
|
|
|
| for col in self.TRAIN_SCHEMA:
|
| if col not in df.columns:
|
| df[col] = np.nan
|
|
|
|
|
| self._log_schema_drift(
|
| stage="post_normalization",
|
| incoming_cols=set(original_cols),
|
| normalized_cols=set(df.columns),
|
| final_cols=set(),
|
| )
|
|
|
|
|
| df = df[list(self.TRAIN_SCHEMA)]
|
|
|
|
|
| self._log_schema_drift(
|
| stage="final",
|
| incoming_cols=set(original_cols),
|
| normalized_cols=set(df.columns),
|
| final_cols=set(df.columns),
|
| )
|
|
|
|
|
| self._validate_final_schema(set(df.columns))
|
| return df
|
|
|
| def _log_schema_drift(
|
| self,
|
| stage: str,
|
| incoming_cols: set[str],
|
| normalized_cols: set[str],
|
| final_cols: set[str],
|
| ) -> None:
|
| """
|
| Log schema drift at different stages of normalization.
|
|
|
| Parameters
|
| ----------
|
| stage : str
|
| One of {"raw", "post_normalization", "final"} indicating when drift is logged.
|
| incoming_cols : set
|
| Columns present in the raw incoming dataframe.
|
| normalized_cols : set
|
| Columns after normalization (lowercasing, renaming, dtype enforcement).
|
| final_cols : set
|
| Columns after final filtering to TRAIN_SCHEMA.
|
|
|
| Notes
|
| -----
|
| - RAW drift detects unexpected columns in the incoming dataset.
|
| - POST-NORMALIZATION drift detects mismatches after canonicalization.
|
| - FINAL drift should always be empty; otherwise, the model will misbehave.
|
| """
|
| expected = set(self.TRAIN_SCHEMA)
|
| event: Dict[str, Any] = {
|
| "stage": stage,
|
| "model_version": self.model_version,
|
| "schema_version": self.schema_version,
|
| "hierarchy_version": self.hierarchy_version,
|
| "model_name": self.model_name,
|
| }
|
|
|
| has_drift = False
|
|
|
|
|
| if stage == "raw":
|
| extra_in_raw = incoming_cols - expected
|
| if extra_in_raw:
|
| event["extra_in_raw"] = sorted(list(extra_in_raw))
|
| has_drift = True
|
|
|
| elif stage == "post_normalization":
|
| extra_after_norm = normalized_cols - expected
|
| missing_after_norm = expected - normalized_cols
|
| if extra_after_norm or missing_after_norm:
|
| event["extra_after_normalization"] = sorted(list(extra_after_norm))
|
| event["missing_after_normalization"] = sorted(list(missing_after_norm))
|
| has_drift = True
|
|
|
| elif stage == "final":
|
| missing_in_final = expected - final_cols
|
| extra_in_final = final_cols - expected
|
| if missing_in_final or extra_in_final:
|
| event["missing_in_final"] = sorted(list(missing_in_final))
|
| event["extra_in_final"] = sorted(list(extra_in_final))
|
| has_drift = True
|
|
|
|
|
| if has_drift:
|
|
|
| event["incoming_columns"] = sorted(list(incoming_cols))
|
| event["normalized_columns"] = sorted(list(normalized_cols))
|
| event["final_columns"] = sorted(list(final_cols))
|
|
|
| self._logger.warning(
|
| "Schema drift detected at stage [%s]: %s", stage, event
|
| )
|
|
|
| self.drift_log_path.parent.mkdir(parents=True, exist_ok=True)
|
| with self.drift_log_path.open("a", encoding="utf-8") as f:
|
| f.write(json.dumps(event) + "\n")
|
|
|
| def read_schema_drift_log(self, max_lines: int = 200) -> str:
|
| """
|
| Return the most recent schema drift events as a single string.
|
|
|
| Parameters
|
| ----------
|
| max_lines : int
|
| Maximum number of lines to return from the drift log.
|
|
|
| Returns
|
| -------
|
| str
|
| Tail of the drift log. Empty string if no log exists.
|
| """
|
| if not self.drift_log_path.exists():
|
| return ""
|
| with self.drift_log_path.open("r", encoding="utf-8") as f:
|
| lines = f.readlines()
|
| return "".join(lines[-max_lines:])
|
|
|
|
|
|
|
|
|
| def metadata(self) -> Dict[str, Any]:
|
| """
|
| Return lineage and metadata for this production object.
|
|
|
| This is the single source of truth for:
|
| - model version
|
| - model name
|
| - schema version
|
| - hierarchy version
|
| - threshold
|
| - classes
|
| """
|
| return {
|
| "model_version": self.model_version,
|
| "model_name": self.model_name,
|
| "schema_version": self.schema_version,
|
| "hierarchy_version": self.hierarchy_version,
|
| "threshold": self.threshold,
|
| "n_classes": self.n_classes_,
|
| "classes": self.classes_.tolist(),
|
| }
|
|
|
|
|
|
|
|
|
| @staticmethod
|
| def _to_dataframe(X_raw: FrameLike) -> pd.DataFrame:
|
| """
|
| Convert raw input into a pandas DataFrame in a safe, predictable way.
|
| """
|
| if isinstance(X_raw, pd.DataFrame):
|
| return X_raw.copy()
|
| return pd.DataFrame(X_raw)
|
|
|
| @staticmethod
|
| def _get_base_algorithm_name(estimator: BaseEstimator) -> str:
|
| """Recursively unwrap meta-estimators to find the core algorithm name."""
|
| inner_estimator = getattr(estimator, "estimator", None)
|
| if inner_estimator is not None:
|
| return QSSelectiveCalibratedModel._get_base_algorithm_name(inner_estimator)
|
|
|
| base_estimator = getattr(estimator, "base_estimator", None)
|
| if base_estimator is not None:
|
| return QSSelectiveCalibratedModel._get_base_algorithm_name(base_estimator)
|
|
|
| final_estimator = getattr(estimator, "_final_estimator", None)
|
| if final_estimator is not None:
|
| return QSSelectiveCalibratedModel._get_base_algorithm_name(final_estimator)
|
|
|
| return type(estimator).__name__
|
|
|
| def validate_y_true(self, y_true_raw: LabelArrayLike) -> pd.Series:
|
| """
|
| Validate and standardize y_true for evaluation.
|
|
|
| Responsibilities:
|
| - Ensure y_true is 1D
|
| - Ensure index alignment (reset_index)
|
| - Ensure string dtype
|
| - Prevent common evaluation mistakes (e.g., passing column name)
|
| - Governance guardrail: detect if y_true contains only 1 unique value
|
| """
|
| y = pd.Series(y_true_raw)
|
|
|
| if (
|
| len(y) == 1
|
| and isinstance(y.iloc[0], str)
|
| and y.iloc[0] in self.TRAIN_SCHEMA
|
| ):
|
| raise ValueError(
|
| f"y_true appears to be the COLUMN NAME '{y.iloc[0]}' "
|
| f"instead of the actual label values."
|
| )
|
|
|
| y = y.reset_index(drop=True).astype(str)
|
|
|
| if y.nunique() <= 1:
|
| self._logger.warning(
|
| f"y_true has only {y.nunique()} unique value(s). "
|
| f"This usually indicates a data extraction or alignment issue."
|
| )
|
|
|
| return y
|
|
|
| def _get_train_schema(self) -> tuple[str, ...]:
|
| """
|
| Return the canonical input schema that the fitted QSPreprocessor expects.
|
|
|
| This method extracts `feature_names_in_` from the underlying
|
| ColumnTransformer pipeline, which reflects the exact column names
|
| and column order used during training.
|
|
|
| Why this matters:
|
| - ColumnTransformer is position‑sensitive (order must match training)
|
| - Production normalization must align to this schema exactly
|
| - Prevents feature misalignment and silent accuracy collapse
|
|
|
| Returns
|
| -------
|
| tuple[str, ...]
|
| Ordered column names that the preprocessor was trained on.
|
| """
|
| pipeline = self.preprocessor.pipeline
|
| assert hasattr(pipeline, "feature_names_in_"), (
|
| "Preprocessor pipeline must be fitted before accessing feature_names_in_."
|
| )
|
| return tuple(pipeline.feature_names_in_)
|
|
|
| def _validate_final_schema(self, final_cols: set) -> None:
|
| """
|
| Ensure the final dataframe matches TRAIN_SCHEMA exactly.
|
| Raises a hard error if misaligned.
|
| """
|
| expected = set(self.TRAIN_SCHEMA)
|
| missing = expected - final_cols
|
| extra = final_cols - expected
|
|
|
| if missing or extra:
|
| raise ValueError(
|
| f"Final schema mismatch.\n"
|
| f"Missing columns: {sorted(list(missing))}\n"
|
| f"Extra columns: {sorted(list(extra))}\n"
|
| f"Expected: {sorted(list(expected))}\n"
|
| f"Got: {sorted(list(final_cols))}"
|
| )
|
|
|