import re import torch import sklearn import numpy as np import pandas as pd from typing import Optional from core.config import settings from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer from sklearn.compose import ColumnTransformer from sentence_transformers import SentenceTransformer from sklearn.base import BaseEstimator, TransformerMixin from sklearn.preprocessing import OneHotEncoder, StandardScaler def to_snake_case(df: pd.DataFrame) -> pd.DataFrame: df = df.copy(deep=True) df.columns = ( df.columns.str.replace( r"(?<=[a-z0-9])(?=[A-Z])", "_", regex=True ) # Split CamelCase/PascalCase .str.lower() # Convert to lowercase .str.replace(r"[\s/()-]+", "_", regex=True) # Replace separators with '_' .str.replace(r"[^0-9a-z_]", "", regex=True) # Remove invalid characters .str.strip("_") # Trim leading/trailing '_' ) return df # ========================================================= # 1. TEXT CLEANING # ========================================================= def clean_description(text): if pd.isna(text): return "" text = text.lower() boilerplate = [ "supply", "install", "installation", "complete", "including", "accordance", "as per", "per drawing", "testing", "commissioning", "works", "work", "etc", "shown", "detailed", ] for b in boilerplate: text = text.replace(b, " ") text = re.sub(r"[^a-zA-Z\s]", " ", text) text = re.sub(r"\s+", " ", text).strip() return text # ========================================================= # 2. ITEM SPEC EXTRACTION # ========================================================= def extract_specs(item): if pd.isna(item): return {} text = item.lower() specs = {} m = re.search(r"(\d+)\s*mm", text) specs["diameter_mm"] = float(m.group(1)) if m else np.nan m = re.search(r"(\d+)\s*(mm)?\s*thick", text) specs["thickness_mm"] = float(m.group(1)) if m else np.nan m = re.search(r"(\d+)\s*x\s*(\d+)", text) specs["size_1_mm"] = float(m.group(1)) if m else np.nan specs["size_2_mm"] = float(m.group(2)) if m else np.nan m = re.search(r"(\d+)\s*kw", text) specs["capacity_kw"] = float(m.group(1)) if m else np.nan m = re.search(r"(\d+)\s*mm2", text) specs["cable_size_mm2"] = float(m.group(1)) if m else np.nan return specs # ========================================================= # 3. MATERIAL FLAGS # ========================================================= def extract_material_flags(text): if pd.isna(text): text = "" text = text.lower() return { "is_steel": int("steel" in text), "is_concrete": int("concrete" in text), "is_copper": int("copper" in text), "is_pvc": int("pvc" in text), "is_hdpe": int("hdpe" in text), "is_ppr": int("ppr" in text), } # ========================================================= # 4. UOM NORMALIZATION # ========================================================= def normalize_uom(u): if pd.isna(u): return "unknown" u = str(u).strip().lower() if u in ["m", "lm"]: return "linear" if u in ["m2", "m²", "ha"]: return "area" if u in ["m3", "m³", "m3/h", "m3/"]: return "volume" if u in ["kg", "t", "ton"]: return "weight" if u in ["nr", "no", "item", "unit", "points", "sys", "set", "sets", "lot", "pc"]: return "count" if u in ["h", "hr", "hour", "hours", "week", "months"]: return "time" if u in ["lump sum", "sum"]: return "lump_sum" return "other" # ========================================================= # 6. EMBEDDING CACHE # ========================================================= _EMBEDDER_CACHE: dict[str, SentenceTransformer] = {} def get_cached_embedder(model_name: str) -> SentenceTransformer: if model_name not in _EMBEDDER_CACHE: # _EMBEDDER_CACHE[model_name] = SentenceTransformer(model_name) _EMBEDDER_CACHE[model_name] = SentenceTransformer( str(settings.MODEL_DIR / "embedding_model/SentenceTransformer/all-MiniLM-L6-v2") ) # Loaded from local return _EMBEDDER_CACHE[model_name] # ========================================================= # 5. CUSTOM TRANSFORMERS # ========================================================= class TextEmbeddingTransformer(BaseEstimator, TransformerMixin): def __init__(self, model): self.model_name = "all-MiniLM-L6-v2" self.model = model # Fast loading since model is already in memory def fit(self, X, y=None): return self def transform(self, X): cleaned = X.apply(clean_description).tolist() return self.model.encode(cleaned, show_progress_bar=False) def get_feature_names_out(self, input_features=None): return np.array([f"desc_embed_{i}" for i in range(384)]) class ItemSpecExtractor(BaseEstimator, TransformerMixin): def fit(self, X, y=None): example = extract_specs(X.iloc[0]) self.feature_names_ = list(example.keys()) return self def transform(self, X): rows = [extract_specs(item) for item in X] return pd.DataFrame(rows).values def get_feature_names_out(self, input_features=None): return np.array(self.feature_names_) class MaterialFlagExtractor(BaseEstimator, TransformerMixin): def fit(self, X, y=None): example = extract_material_flags(X.iloc[0]) self.feature_names_ = list(example.keys()) return self def transform(self, X): rows = [extract_material_flags(text) for text in X] return pd.DataFrame(rows).values def get_feature_names_out(self, input_features=None): return np.array(self.feature_names_) class UoMFamilyTransformer(BaseEstimator, TransformerMixin): def __init__(self): self.encoder = OneHotEncoder(handle_unknown="ignore") def fit(self, X, y=None): normalized = X.apply(normalize_uom).to_frame() self.encoder.fit(normalized) return self def transform(self, X): normalized = X.apply(normalize_uom).to_frame() return self.encoder.transform(normalized) def get_feature_names_out(self, input_features=None): return self.encoder.get_feature_names_out(["uom"]) class BooleanFlagTransformer(BaseEstimator, TransformerMixin): def fit(self, X, y=None): return self def transform(self, X): return X.notna().astype(int).values.reshape(-1, 1) def get_feature_names_out(self, input_features=None): return np.array(["basket_flag"]) class NotesPresenceTransformer(BaseEstimator, TransformerMixin): def fit(self, X, y=None): return self def transform(self, X): return X.notna().astype(int).values.reshape(-1, 1) def get_feature_names_out(self, input_features=None): return np.array(["notes_flag"]) class QtyMissingFlag(BaseEstimator, TransformerMixin): def fit(self, X, y=None): return self def transform(self, X): return X.isna().astype(int).values.reshape(-1, 1) def get_feature_names_out(self, input_features=None): return np.array(["qty_missing"]) class CachedTextEmbeddingTransformer(BaseEstimator, TransformerMixin): """ Clone-safe, cached SentenceTransformer embedding block. Loads MiniLM once and reuses it across all clones and transforms. Uses the global get_cached_embedder() so MiniLM loads once per process. """ def __init__(self, model_name="all-MiniLM-L6-v2", device="cpu"): self.model_name = model_name self.device = ( device # kept for API symmetry; device is handled by the cached model ) self._model = None # cache survives cloning def _load(self): # Use global cache shared with HybridNRMSelectiveClassifier return get_cached_embedder(self.model_name) def fit(self, X, y=None): # Ensure model is loaded once self._load() return self def transform(self, X): model = self._load() cleaned = X.apply(clean_description).tolist() return model.encode(cleaned, show_progress_bar=False) # Return feature names like desc_emb_0, desc_emb_1, ..., desc_emb_383 for 384-dimensional embeddings def get_feature_names_out(self, input_features=None): return np.array([f"desc_embed_{i}" for i in range(384)]) # ========================================================= # 6. FINAL PREPROCESSOR # ========================================================= def select_st_device(prefer_gpu: bool = True) -> str: if prefer_gpu and torch.cuda.is_available(): return "cuda" return "cpu" class QSPreprocessor(BaseEstimator, TransformerMixin): def __init__(self, prefer_gpu: bool = True): # Sklearn initialization- parameters should be set here, but heavy resources (like models) should be loaded in fit() to avoid issues with cloning and memory bloat. self.sklearn_version = sklearn.__version__ self.pipeline: Optional[ColumnTransformer] = None self.prefer_gpu = prefer_gpu self.cols_to_drop = [ "unnamed_25", "ref", "project_code", "client", "contractor", "source", ] # Datatype standardization rules self.categorical_cols = [ "ge", "group_element", "e", "element", "uom", "project", "typology", "location", "base_date", "package", "rate_scope", "client_location", "basket_of_goods", ] self.text_cols = ["description", "item", "notes"] self.numerical_cols = ["qty", "rate_base_date", "rate_q1_2025"] # --------------------------------------------------------- # STEP 0 — DROP COLUMNS # --------------------------------------------------------- def _drop_columns(self, df: pd.DataFrame) -> pd.DataFrame: return df.drop(columns=[c for c in self.cols_to_drop if c in df.columns]) # --------------------------------------------------------- # STEP 1 — DATATYPE STANDARDIZATION # --------------------------------------------------------- def _standardize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame: df = df.copy() # categorical for col in self.categorical_cols: if col in df.columns: df[col] = df[col].astype("category") # text for col in self.text_cols: if col in df.columns: df[col] = df[col].astype("string") # numeric for col in self.numerical_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").astype("float32") return df # --------------------------------------------------------- # FIT # --------------------------------------------------------- def fit(self, df, y=None): # GPU/CPU fallback self.device = select_st_device(prefer_gpu=self.prefer_gpu) # ----------------------------- # TEXT EMBEDDING # ----------------------------- # ⭐ Clone-safe cached embedding transformer # Load SentenceTransformer model once during initialization to avoid repeated loading during fit/transform self.desc_embed = CachedTextEmbeddingTransformer( model_name="all-MiniLM-L6-v2", device=self.device ) # Step 0: drop garbage columns df = self._drop_columns(df) # Step 1: enforce datatype standardization df = self._standardize_dtypes(df) # ----------------------------- # NUMERICAL PIPELINE # ----------------------------- numerical_cols = ["qty", "rate_base_date", "rate_q1_2025"] numerical_block = ColumnTransformer( transformers=[ ( "qty_impute", SimpleImputer(strategy="median"), ["qty"], ), # Impute missing qty with median before scaling. Only 0.2% missing, so median is a robust choice that won't skew the data. ( "other_nums", "passthrough", ["rate_base_date", "rate_q1_2025"], ), # No missing values in rate columns, so just passthrough before scaling. ], remainder="drop", verbose_feature_names_out=False, ) numerical_pipeline = Pipeline( [ ("numerical_block", numerical_block), ("scaler", StandardScaler()), ] ) # ----------------------------- # CATEGORICAL PIPELINE # ----------------------------- # Define column groups AFTER cleaning categorical_cols = [ "ge", "group_element", "e", "element", "project", "typology", "location", "base_date", "package", "rate_scope", "client_location", ] categorical_pipeline = Pipeline( [ # Impute missing categorical values with 'missing' placeholder ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("ohe", OneHotEncoder(handle_unknown="ignore")), ] ) # -------------------------- # ITEM SPECS PIPELINE # -------------------------- # Handle missing item_specs_pipeline = Pipeline( [ ("extractor", ItemSpecExtractor()), ("imputer", SimpleImputer(strategy="constant", fill_value=0.0)), ] ) # ----------------------------- # MAIN COLUMN TRANSFORMER # ----------------------------- self.pipeline = ColumnTransformer( # Parallel transformations not serial, so order doesn't matter. Just need to ensure all steps are included and correctly reference the columns. transformers=[ ("desc_embed", self.desc_embed, "description"), ("item_specs", item_specs_pipeline, "item"), ("materials", MaterialFlagExtractor(), "description"), ("uom_family", UoMFamilyTransformer(), "uom"), ("categorical", categorical_pipeline, categorical_cols), ("numerical", numerical_pipeline, numerical_cols), ("basket_flag", BooleanFlagTransformer(), "basket_of_goods"), ("notes_flag", NotesPresenceTransformer(), "notes"), ("qty_missing", QtyMissingFlag(), "qty"), ], remainder="drop", verbose_feature_names_out=False, ) self.pipeline.fit(df) return self # --------------------------------------------------------- # TRANSFORM # --------------------------------------------------------- def transform(self, df): df = self._drop_columns(df) df = self._standardize_dtypes(df) if self.pipeline is None: raise ValueError("The preprocessor has not been fitted yet.") return self.pipeline.transform(df) # --------------------------------------------------------- # GET FEATURE NAMES OUT # --------------------------------------------------------- def get_feature_names_out(self, input_features=None): if self.pipeline is None: raise ValueError("The preprocessor has not been fitted yet.") return self.pipeline.get_feature_names_out(input_features=input_features) # ========================================================= # HIERARCHY MAPPING OF NRM CATEGORIES # ========================================================= def build_nrm_hierarchy_map(df): """ Build a full hierarchy mapping from the dataset using clear, non-NRM names. Dataset columns: - ge → Level 1 - group_element → Level 2 - e → Level 3 - element → Level 4 - category → Leaf class (target) """ required_cols = ["ge", "group_element", "e", "element", "category"] for col in required_cols: if col not in df.columns: raise ValueError(f"Missing required column: {col}") hierarchy_map = {} # Drop duplicates to avoid redundant mappings unique_rows = df[ ["category", "ge", "group_element", "e", "element"] ].drop_duplicates() for _, row in unique_rows.iterrows(): leaf = str(row["category"]).strip() level1 = str(row["ge"]).strip() level2 = str(row["group_element"]).strip() level3 = str(row["e"]).strip() level4 = str(row["element"]).strip() # Build mapping: category → (Level1, Level2, Level3, Level4) hierarchy_map[leaf] = (level1, level2, level3, level4) # Ensure Others exists hierarchy_map["Others"] = ("Others", "Others", "Others", "Others") return hierarchy_map