import json import logging import numpy as np import pandas as pd from pathlib import Path from sklearn.base import BaseEstimator from dataclasses import dataclass, field from .preprocessing import QSPreprocessor from sklearn.metrics import accuracy_score, f1_score from sklearn.calibration import CalibratedClassifierCV, LabelEncoder from typing import Any, Dict, Iterable, Mapping, Sequence, Tuple, Union # Type Aliases FrameLike = Union[pd.DataFrame, Dict[str, Iterable[Any]]] LabelArrayLike = Union[pd.Series, Sequence[str], np.ndarray] @dataclass class QSSelectiveCalibratedModel: """ Production-ready selective QS classifier. This object is the single source of truth for: - Schema normalization and enforcement - Schema drift logging - Label encoding and class management - Calibrated, thresholded selective prediction - NRM hierarchy inference - Lineage and metadata exposure It is designed to be: - Mostly Hermetic (zero external internal module dependencies) - Immutable (no .fit methods) - Deterministic (same input → same output) - Routes low-confidence predictions to 'Others' - Audit-ready (drift logs + lineage) - Deployment-safe (API- and batch-friendly) """ calibrator: CalibratedClassifierCV preprocessor: QSPreprocessor label_encoder: LabelEncoder threshold: float others_label: str = "Others" # NRM hierarchy map: category → (lvl1, lvl2, lvl3, lvl4) nrm_hierarchy_map: Mapping[str, Tuple[str, str, str, str]] = field( default_factory=dict ) # Versioning / lineage model_version: str = "v1.0.0" schema_version: str = "v1" hierarchy_version: str = "v1" model_name: str = field(init=False) # Drift logging drift_log_path: Path = Path("schema_drift.log") # ------------------------------------------------------------------------- # Class-level schema definitions # ------------------------------------------------------------------------- TRAIN_SCHEMA: Tuple[str, ...] = field(default_factory=tuple) # Semantic aliases (Inputs are already snake_cased by the time this runs) RENAME_MAP: Dict[str, str] = field( default_factory=lambda: { "desc": "description", "items": "item", "units": "uom", "quantity": "qty", } ) # ------------------------------------------------------------------------- # INIT # ------------------------------------------------------------------------- def __post_init__(self): # Canonical class space = EXACT allowlist from the fitted label encoder self.classes_ = self.label_encoder.classes_ self.n_classes_ = len(self.classes_) self.threshold = float(self.threshold) self.drift_log_path = Path(self.drift_log_path) self.TRAIN_SCHEMA = self._get_train_schema() base_algo = self._get_base_algorithm_name(self.calibrator) # Dynamically generate model name for logging and lineage self.model_name = ( f"QS_{base_algo}_SelectiveCalibratedModel_{self.model_version}" ) self._logger = logging.getLogger(self.model_name) if not self._logger.handlers: handler = logging.StreamHandler() handler.setFormatter( logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) ) self._logger.addHandler(handler) self._logger.setLevel(logging.INFO) # ------------------------------------------------------------------------- # COLLAPSE LOGIC — EXACTLY MATCHES CALIBRATION PIPELINE # ------------------------------------------------------------------------- def collapse_labels(self, y: pd.Series) -> pd.Series: """ EXACT collapse logic used in calibration: - normalize to string - collapse unseen/rare labels → Others """ known = set(self.classes_) # EXACT allowlist # 1. Cast the series to string safely y_str = y.astype(str) # 2. Vectorized condition: # Keep the value if it is in the known vocabulary, otherwise replace with others_label. # This completely avoids the type-unsafe LabelEncoder loop and is 100x faster. y_collapsed = y_str.where(y_str.isin(known), self.others_label) return y_collapsed # ------------------------------------------------------------------------- # DECODE LOGIC — EXACTLY MATCHES CALIBRATION # ------------------------------------------------------------------------- def decode_predictions(self, max_idx: np.ndarray) -> np.ndarray: """ Decode integer predictions back to canonical string labels. argmax index → calibrator.classes_ (integers) → label_encoder.inverse_transform → strings """ pred_integers = self.calibrator.classes_[max_idx] pred_strings = self.label_encoder.inverse_transform(pred_integers) return pred_strings # ------------------------------------------------------------------------- # Public API: high-level prediction with hierarchy + lineage # ------------------------------------------------------------------------- def predict_full( self, X_raw: FrameLike, ) -> pd.DataFrame: """ Run the full QS inference pipeline: - Normalize BoQ schema - Log schema drift (if any) - Preprocess and predict categories - Compute calibrated confidence - Infer NRM hierarchy levels - Smart dynamic hierarchy placement (Overwrite missing, interleave existing) - Attach scalar lineage metadata as columns for downstream analysis Parameters ---------- X_raw : FrameLike Raw input data (DataFrame or dict-like) in BoQ-style schema. Returns ------- pd.DataFrame DataFrame containing: - normalized input columns - Predicted_Category - Prediction_Confidence - ge, group_element, e, element (NRM hierarchy) - lineage metadata columns """ df = self._to_dataframe(X_raw) df_norm = self._normalize_boq_schema(df) y_pred, conf = self.selective_predict(df_norm) df_out = df_norm.copy() df_out["Predicted_Category"] = y_pred df_out["Prediction_Confidence"] = conf.round(4) hierarchy_df = df_out["Predicted_Category"].apply( lambda cat: self.infer_nrm_hierarchy(cat) ) hierarchy_df = pd.DataFrame(list(hierarchy_df)) # --------------------------------------------------------- # SMART HIERARCHY PLACEMENT # If original is missing (all NaNs), overwrite it directly. # If original exists, keep it and create 'Predicted_...' next to it. # --------------------------------------------------------- hierarchy_cols = ["ge", "group_element", "e", "element"] dynamic_pairs = {} for col in hierarchy_cols: # Check if the original column is completely empty/missing if df_out[col].isna().all(): # Overwrite the empty original column directly df_out[col] = hierarchy_df[col] dynamic_pairs[col] = None # Flag to NOT create a duplicate else: # User provided data; preserve it and create the Predicted column pred_col = f"Predicted_{col}" df_out[pred_col] = hierarchy_df[col] dynamic_pairs[col] = pred_col # Attach lineage metadata as columns (ONLY scalars to prevent broadcasting errors) meta = self.metadata() meta_cols = [] for k, v in meta.items(): if isinstance(v, (str, int, float, bool)): col_name = f"meta_{k}" df_out[col_name] = v meta_cols.append(col_name) # --------------------------------------------------------- # SMART COLUMN ORDERING # --------------------------------------------------------- ordered_cols = [] for col in self.TRAIN_SCHEMA: ordered_cols.append(col) # Insert the predicted column immediately after the original, if we created one if col in dynamic_pairs and dynamic_pairs[col] is not None: ordered_cols.append(dynamic_pairs[col]) # Append final ML outputs ordered_cols.extend(["Predicted_Category", "Prediction_Confidence"]) # Append Lineage ordered_cols.extend(meta_cols) # Safely catch any extra payload columns to prevent dropping data remaining = [c for c in df_out.columns if c not in ordered_cols] ordered_cols.extend(remaining) return df_out[ordered_cols] # ------------------------------------------------------------------------- # Core selective prediction # ------------------------------------------------------------------------- def selective_predict( self, X_raw: FrameLike, ) -> Tuple[np.ndarray, np.ndarray]: """ Predict with selective routing using calibrated probabilities and a frozen confidence threshold. Parameters ---------- X_raw : FrameLike Raw input data (DataFrame or dict-like). Returns ------- Tuple[np.ndarray, np.ndarray] - y_pred: predicted labels or Others - max_conf: max calibrated confidence per sample """ df = self._to_dataframe(X_raw) df_norm = self._normalize_boq_schema(df) X_t = self.preprocessor.transform(df_norm) proba = self.calibrator.predict_proba(X_t) max_conf = proba.max(axis=1) max_idx = proba.argmax(axis=1) pred_strings = self.decode_predictions(max_idx) y_pred = np.where( max_conf >= self.threshold, pred_strings, self.others_label, ) return y_pred, max_conf def predict(self, X_raw: FrameLike) -> np.ndarray: """ Standard prediction API: returns only labels (with Others routing). """ y_pred, _ = self.selective_predict(X_raw) return y_pred def predict_with_confidence( self, X_raw: FrameLike, ) -> Tuple[np.ndarray, np.ndarray]: """ Explicit alias for selective_predict for clarity in external APIs. """ return self.selective_predict(X_raw) def predict_proba(self, X_raw: FrameLike) -> np.ndarray: """ Standard probability prediction API using calibrated probabilities. """ df = self._to_dataframe(X_raw) df_norm = self._normalize_boq_schema(df) X_t = self.preprocessor.transform(df_norm) return self.calibrator.predict_proba(X_t) # ------------------------------------------------------------------------- # Evaluation helper # ------------------------------------------------------------------------- def evaluate( self, X_raw: FrameLike, y_true_raw: LabelArrayLike, ) -> Dict[str, Any]: """ Governance-grade evaluation for the selective QS classifier. This implementation mirrors the calibration pipeline to ensure: - Schema normalization is applied exactly once - y_true is validated, collapsed, and encoded consistently - Predictions are decoded back to canonical string labels - Selective thresholding is applied post-decoding - Row alignment is guaranteed (no index drift) - Metrics reflect true model performance """ # --------------------------------------------------------- # 1. Normalize X once (same path as production inference) # --------------------------------------------------------- df_raw = self._to_dataframe(X_raw) df_norm = self._normalize_boq_schema(df_raw).reset_index(drop=True) # --------------------------------------------------------- # 2. Validate and standardize y_true # --------------------------------------------------------- y_true = self.validate_y_true(y_true_raw) # --------------------------------------------------------- # 3. Collapse y_true using training logic # --------------------------------------------------------- y_true_collapsed = self.collapse_labels(y_true) # --------------------------------------------------------- # 4. Encode y_true using the same label encoder used in training # --------------------------------------------------------- # Governance guardrail: ensure label encoder can transform y_true _ = self.label_encoder.transform(y_true_collapsed) # --------------------------------------------------------- # 5. Predict calibrated probabilities # --------------------------------------------------------- X_t = self.preprocessor.transform(df_norm) proba = self.calibrator.predict_proba(X_t) max_conf = proba.max(axis=1) max_idx = proba.argmax(axis=1) # --------------------------------------------------------- # 6. Decode predictions back to canonical string labels # --------------------------------------------------------- pred_strings = self.decode_predictions(max_idx) # --------------------------------------------------------- # 7. Apply selective threshold AFTER decoding # --------------------------------------------------------- y_pred = np.where( max_conf >= self.threshold, pred_strings, self.others_label, ) # --------------------------------------------------------- # 8. Confidence diagnostics # --------------------------------------------------------- mask_acc = y_pred != self.others_label accepted = max_conf[mask_acc] rejected = max_conf[~mask_acc] # --------------------------------------------------------- # 9. Return governance metrics # --------------------------------------------------------- return { "macro_f1": float(f1_score(y_true_collapsed, y_pred, average="macro")), "weighted_f1": float( f1_score(y_true_collapsed, y_pred, average="weighted") ), "accuracy": float(accuracy_score(y_true_collapsed, y_pred)), "others_rate": float(np.mean(y_pred == self.others_label)), "accepted_conf_mean": float(accepted.mean()) if accepted.size else None, "accepted_conf_std": float(accepted.std()) if accepted.size else None, "rejected_conf_mean": float(rejected.mean()) if rejected.size else None, "rejected_conf_std": float(rejected.std()) if rejected.size else None, "threshold": self.threshold, } # ------------------------------------------------------------------------- # NRM hierarchy inference # ------------------------------------------------------------------------- def infer_nrm_hierarchy(self, category: str) -> Dict[str, str]: """ Infer NRM hierarchy levels from a predicted category. QS logic only — the mapping should be maintained as a single source of truth and versioned alongside the model. Parameters ---------- category : str Predicted NRM category. Returns ------- Dict[str, str] Dictionary with keys: ge, group_element, e, element. """ lvl1, lvl2, lvl3, lvl4 = self.nrm_hierarchy_map.get( category, ("Unknown", "Unknown", "Unknown", "Unknown"), ) return { "ge": lvl1, "group_element": lvl2, "e": lvl3, "element": lvl4, } # ------------------------------------------------------------------------- # Schema normalization + drift logging # ------------------------------------------------------------------------- def _normalize_boq_schema(self, df: pd.DataFrame) -> pd.DataFrame: """ Normalize any incoming BoQ-like dataframe into the canonical schema expected by the preprocessor and classifier. Beware of Train Serving skew or pipeline skew if your training data was not normalized with the same logic! Responsibilities: - Hermetic Snake-Casing: Inline vectorization mirroring `utils.preprocessing.to_snake_case` as the single source of truth without relying on external file dependencies. - Rename alternative names → canonical names - Add missing columns with safe defaults (NaN) - Reorder columns - Log schema drift events """ original_cols = list(df.columns) # RAW DRIFT self._log_schema_drift( stage="raw", incoming_cols=set(original_cols), normalized_cols=set(), final_cols=set(), ) df = df.copy() # --------------------------------------------------------- # HERMETIC SNAKE_CASE VECTORIZATION # --------------------------------------------------------- df.columns = ( df.columns.str.replace( r"(?<=[a-z0-9])(?=[A-Z])", "_", regex=True ) # Split CamelCase/PascalCase .str.lower() # Convert to lowercase .str.replace(r"[\s/()-]+", "_", regex=True) # Replace separators with '_' .str.replace(r"[^0-9a-z_]", "", regex=True) # Remove invalid characters .str.strip("_") # Trim leading/trailing '_' ) # Apply rename map (only for columns that exist) rename_map = {k: v for k, v in self.RENAME_MAP.items() if k in df.columns} df = df.rename(columns=rename_map) # Ensure all TRAIN_SCHEMA columns exist (even if not canonical) for col in self.TRAIN_SCHEMA: if col not in df.columns: df[col] = np.nan # POST-NORMALIZATION DRIFT self._log_schema_drift( stage="post_normalization", incoming_cols=set(original_cols), normalized_cols=set(df.columns), final_cols=set(), ) # Reorder to training schema df = df[list(self.TRAIN_SCHEMA)] # FINAL DRIFT self._log_schema_drift( stage="final", incoming_cols=set(original_cols), normalized_cols=set(df.columns), final_cols=set(df.columns), ) # VALIDATE self._validate_final_schema(set(df.columns)) return df def _log_schema_drift( self, stage: str, incoming_cols: set[str], normalized_cols: set[str], final_cols: set[str], ) -> None: """ Log schema drift at different stages of normalization. Parameters ---------- stage : str One of {"raw", "post_normalization", "final"} indicating when drift is logged. incoming_cols : set Columns present in the raw incoming dataframe. normalized_cols : set Columns after normalization (lowercasing, renaming, dtype enforcement). final_cols : set Columns after final filtering to TRAIN_SCHEMA. Notes ----- - RAW drift detects unexpected columns in the incoming dataset. - POST-NORMALIZATION drift detects mismatches after canonicalization. - FINAL drift should always be empty; otherwise, the model will misbehave. """ expected = set(self.TRAIN_SCHEMA) event: Dict[str, Any] = { "stage": stage, "model_version": self.model_version, "schema_version": self.schema_version, "hierarchy_version": self.hierarchy_version, "model_name": self.model_name, } has_drift = False # Evaluate metrics selectively based on the active stage if stage == "raw": extra_in_raw = incoming_cols - expected if extra_in_raw: event["extra_in_raw"] = sorted(list(extra_in_raw)) has_drift = True elif stage == "post_normalization": extra_after_norm = normalized_cols - expected missing_after_norm = expected - normalized_cols if extra_after_norm or missing_after_norm: event["extra_after_normalization"] = sorted(list(extra_after_norm)) event["missing_after_normalization"] = sorted(list(missing_after_norm)) has_drift = True elif stage == "final": missing_in_final = expected - final_cols extra_in_final = final_cols - expected if missing_in_final or extra_in_final: event["missing_in_final"] = sorted(list(missing_in_final)) event["extra_in_final"] = sorted(list(extra_in_final)) has_drift = True # Only append to file and log warning if actual drift occurred if has_drift: # Include exact column layouts for debugging context event["incoming_columns"] = sorted(list(incoming_cols)) event["normalized_columns"] = sorted(list(normalized_cols)) event["final_columns"] = sorted(list(final_cols)) self._logger.warning( "Schema drift detected at stage [%s]: %s", stage, event ) self.drift_log_path.parent.mkdir(parents=True, exist_ok=True) with self.drift_log_path.open("a", encoding="utf-8") as f: f.write(json.dumps(event) + "\n") def read_schema_drift_log(self, max_lines: int = 200) -> str: """ Return the most recent schema drift events as a single string. Parameters ---------- max_lines : int Maximum number of lines to return from the drift log. Returns ------- str Tail of the drift log. Empty string if no log exists. """ if not self.drift_log_path.exists(): return "" with self.drift_log_path.open("r", encoding="utf-8") as f: lines = f.readlines() return "".join(lines[-max_lines:]) # ------------------------------------------------------------------------- # Lineage / metadata # ------------------------------------------------------------------------- def metadata(self) -> Dict[str, Any]: """ Return lineage and metadata for this production object. This is the single source of truth for: - model version - model name - schema version - hierarchy version - threshold - classes """ return { "model_version": self.model_version, "model_name": self.model_name, "schema_version": self.schema_version, "hierarchy_version": self.hierarchy_version, "threshold": self.threshold, "n_classes": self.n_classes_, "classes": self.classes_.tolist(), } # ------------------------------------------------------------------------- # Utilities # ------------------------------------------------------------------------- @staticmethod def _to_dataframe(X_raw: FrameLike) -> pd.DataFrame: """ Convert raw input into a pandas DataFrame in a safe, predictable way. """ if isinstance(X_raw, pd.DataFrame): return X_raw.copy() return pd.DataFrame(X_raw) @staticmethod def _get_base_algorithm_name(estimator: BaseEstimator) -> str: """Recursively unwrap meta-estimators to find the core algorithm name.""" inner_estimator = getattr(estimator, "estimator", None) if inner_estimator is not None: return QSSelectiveCalibratedModel._get_base_algorithm_name(inner_estimator) base_estimator = getattr(estimator, "base_estimator", None) if base_estimator is not None: return QSSelectiveCalibratedModel._get_base_algorithm_name(base_estimator) final_estimator = getattr(estimator, "_final_estimator", None) if final_estimator is not None: return QSSelectiveCalibratedModel._get_base_algorithm_name(final_estimator) return type(estimator).__name__ def validate_y_true(self, y_true_raw: LabelArrayLike) -> pd.Series: """ Validate and standardize y_true for evaluation. Responsibilities: - Ensure y_true is 1D - Ensure index alignment (reset_index) - Ensure string dtype - Prevent common evaluation mistakes (e.g., passing column name) - Governance guardrail: detect if y_true contains only 1 unique value """ y = pd.Series(y_true_raw) if ( len(y) == 1 and isinstance(y.iloc[0], str) and y.iloc[0] in self.TRAIN_SCHEMA ): raise ValueError( f"y_true appears to be the COLUMN NAME '{y.iloc[0]}' " f"instead of the actual label values." ) y = y.reset_index(drop=True).astype(str) if y.nunique() <= 1: self._logger.warning( f"y_true has only {y.nunique()} unique value(s). " f"This usually indicates a data extraction or alignment issue." ) return y def _get_train_schema(self) -> tuple[str, ...]: """ Return the canonical input schema that the fitted QSPreprocessor expects. This method extracts `feature_names_in_` from the underlying ColumnTransformer pipeline, which reflects the exact column names and column order used during training. Why this matters: - ColumnTransformer is position‑sensitive (order must match training) - Production normalization must align to this schema exactly - Prevents feature misalignment and silent accuracy collapse Returns ------- tuple[str, ...] Ordered column names that the preprocessor was trained on. """ pipeline = self.preprocessor.pipeline assert hasattr(pipeline, "feature_names_in_"), ( "Preprocessor pipeline must be fitted before accessing feature_names_in_." ) return tuple(pipeline.feature_names_in_) def _validate_final_schema(self, final_cols: set) -> None: """ Ensure the final dataframe matches TRAIN_SCHEMA exactly. Raises a hard error if misaligned. """ expected = set(self.TRAIN_SCHEMA) missing = expected - final_cols extra = final_cols - expected if missing or extra: raise ValueError( f"Final schema mismatch.\n" f"Missing columns: {sorted(list(missing))}\n" f"Extra columns: {sorted(list(extra))}\n" f"Expected: {sorted(list(expected))}\n" f"Got: {sorted(list(final_cols))}" )