| import re
|
| import torch
|
| import sklearn
|
| import numpy as np
|
| import pandas as pd
|
| from typing import Optional
|
| from core.config import settings
|
| from sklearn.pipeline import Pipeline
|
| from sklearn.impute import SimpleImputer
|
| from sklearn.compose import ColumnTransformer
|
| from sentence_transformers import SentenceTransformer
|
| from sklearn.base import BaseEstimator, TransformerMixin
|
| from sklearn.preprocessing import OneHotEncoder, StandardScaler
|
|
|
|
|
| def to_snake_case(df: pd.DataFrame) -> pd.DataFrame:
|
| df = df.copy(deep=True)
|
|
|
| df.columns = (
|
| df.columns.str.replace(
|
| r"(?<=[a-z0-9])(?=[A-Z])", "_", regex=True
|
| )
|
| .str.lower()
|
| .str.replace(r"[\s/()-]+", "_", regex=True)
|
| .str.replace(r"[^0-9a-z_]", "", regex=True)
|
| .str.strip("_")
|
| )
|
| return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def clean_description(text):
|
| if pd.isna(text):
|
| return ""
|
| text = text.lower()
|
|
|
| boilerplate = [
|
| "supply",
|
| "install",
|
| "installation",
|
| "complete",
|
| "including",
|
| "accordance",
|
| "as per",
|
| "per drawing",
|
| "testing",
|
| "commissioning",
|
| "works",
|
| "work",
|
| "etc",
|
| "shown",
|
| "detailed",
|
| ]
|
| for b in boilerplate:
|
| text = text.replace(b, " ")
|
|
|
| text = re.sub(r"[^a-zA-Z\s]", " ", text)
|
| text = re.sub(r"\s+", " ", text).strip()
|
| return text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def extract_specs(item):
|
| if pd.isna(item):
|
| return {}
|
|
|
| text = item.lower()
|
| specs = {}
|
|
|
| m = re.search(r"(\d+)\s*mm", text)
|
| specs["diameter_mm"] = float(m.group(1)) if m else np.nan
|
|
|
| m = re.search(r"(\d+)\s*(mm)?\s*thick", text)
|
| specs["thickness_mm"] = float(m.group(1)) if m else np.nan
|
|
|
| m = re.search(r"(\d+)\s*x\s*(\d+)", text)
|
| specs["size_1_mm"] = float(m.group(1)) if m else np.nan
|
| specs["size_2_mm"] = float(m.group(2)) if m else np.nan
|
|
|
| m = re.search(r"(\d+)\s*kw", text)
|
| specs["capacity_kw"] = float(m.group(1)) if m else np.nan
|
|
|
| m = re.search(r"(\d+)\s*mm2", text)
|
| specs["cable_size_mm2"] = float(m.group(1)) if m else np.nan
|
|
|
| return specs
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def extract_material_flags(text):
|
| if pd.isna(text):
|
| text = ""
|
| text = text.lower()
|
|
|
| return {
|
| "is_steel": int("steel" in text),
|
| "is_concrete": int("concrete" in text),
|
| "is_copper": int("copper" in text),
|
| "is_pvc": int("pvc" in text),
|
| "is_hdpe": int("hdpe" in text),
|
| "is_ppr": int("ppr" in text),
|
| }
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def normalize_uom(u):
|
| if pd.isna(u):
|
| return "unknown"
|
| u = str(u).strip().lower()
|
|
|
| if u in ["m", "lm"]:
|
| return "linear"
|
| if u in ["m2", "m²", "ha"]:
|
| return "area"
|
| if u in ["m3", "m³", "m3/h", "m3/"]:
|
| return "volume"
|
| if u in ["kg", "t", "ton"]:
|
| return "weight"
|
| if u in ["nr", "no", "item", "unit", "points", "sys", "set", "sets", "lot", "pc"]:
|
| return "count"
|
| if u in ["h", "hr", "hour", "hours", "week", "months"]:
|
| return "time"
|
| if u in ["lump sum", "sum"]:
|
| return "lump_sum"
|
|
|
| return "other"
|
|
|
|
|
|
|
|
|
|
|
|
|
| _EMBEDDER_CACHE: dict[str, SentenceTransformer] = {}
|
|
|
|
|
| def get_cached_embedder(model_name: str) -> SentenceTransformer:
|
| if model_name not in _EMBEDDER_CACHE:
|
|
|
| _EMBEDDER_CACHE[model_name] = SentenceTransformer(
|
| str(settings.MODEL_DIR / "embedding_model/SentenceTransformer/all-MiniLM-L6-v2")
|
| )
|
| return _EMBEDDER_CACHE[model_name]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class TextEmbeddingTransformer(BaseEstimator, TransformerMixin):
|
| def __init__(self, model):
|
| self.model_name = "all-MiniLM-L6-v2"
|
| self.model = model
|
|
|
| def fit(self, X, y=None):
|
| return self
|
|
|
| def transform(self, X):
|
| cleaned = X.apply(clean_description).tolist()
|
| return self.model.encode(cleaned, show_progress_bar=False)
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return np.array([f"desc_embed_{i}" for i in range(384)])
|
|
|
|
|
| class ItemSpecExtractor(BaseEstimator, TransformerMixin):
|
| def fit(self, X, y=None):
|
| example = extract_specs(X.iloc[0])
|
| self.feature_names_ = list(example.keys())
|
| return self
|
|
|
| def transform(self, X):
|
| rows = [extract_specs(item) for item in X]
|
| return pd.DataFrame(rows).values
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return np.array(self.feature_names_)
|
|
|
|
|
| class MaterialFlagExtractor(BaseEstimator, TransformerMixin):
|
| def fit(self, X, y=None):
|
| example = extract_material_flags(X.iloc[0])
|
| self.feature_names_ = list(example.keys())
|
| return self
|
|
|
| def transform(self, X):
|
| rows = [extract_material_flags(text) for text in X]
|
| return pd.DataFrame(rows).values
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return np.array(self.feature_names_)
|
|
|
|
|
| class UoMFamilyTransformer(BaseEstimator, TransformerMixin):
|
| def __init__(self):
|
| self.encoder = OneHotEncoder(handle_unknown="ignore")
|
|
|
| def fit(self, X, y=None):
|
| normalized = X.apply(normalize_uom).to_frame()
|
| self.encoder.fit(normalized)
|
| return self
|
|
|
| def transform(self, X):
|
| normalized = X.apply(normalize_uom).to_frame()
|
| return self.encoder.transform(normalized)
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return self.encoder.get_feature_names_out(["uom"])
|
|
|
|
|
| class BooleanFlagTransformer(BaseEstimator, TransformerMixin):
|
| def fit(self, X, y=None):
|
| return self
|
|
|
| def transform(self, X):
|
| return X.notna().astype(int).values.reshape(-1, 1)
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return np.array(["basket_flag"])
|
|
|
|
|
| class NotesPresenceTransformer(BaseEstimator, TransformerMixin):
|
| def fit(self, X, y=None):
|
| return self
|
|
|
| def transform(self, X):
|
| return X.notna().astype(int).values.reshape(-1, 1)
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return np.array(["notes_flag"])
|
|
|
|
|
| class QtyMissingFlag(BaseEstimator, TransformerMixin):
|
| def fit(self, X, y=None):
|
| return self
|
|
|
| def transform(self, X):
|
| return X.isna().astype(int).values.reshape(-1, 1)
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return np.array(["qty_missing"])
|
|
|
|
|
| class CachedTextEmbeddingTransformer(BaseEstimator, TransformerMixin):
|
| """
|
| Clone-safe, cached SentenceTransformer embedding block.
|
| Loads MiniLM once and reuses it across all clones and transforms.
|
| Uses the global get_cached_embedder() so MiniLM loads once per process.
|
| """
|
|
|
| def __init__(self, model_name="all-MiniLM-L6-v2", device="cpu"):
|
| self.model_name = model_name
|
| self.device = (
|
| device
|
| )
|
| self._model = None
|
|
|
| def _load(self):
|
|
|
| return get_cached_embedder(self.model_name)
|
|
|
| def fit(self, X, y=None):
|
|
|
| self._load()
|
| return self
|
|
|
| def transform(self, X):
|
| model = self._load()
|
| cleaned = X.apply(clean_description).tolist()
|
| return model.encode(cleaned, show_progress_bar=False)
|
|
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| return np.array([f"desc_embed_{i}" for i in range(384)])
|
|
|
|
|
|
|
|
|
|
|
| def select_st_device(prefer_gpu: bool = True) -> str:
|
| if prefer_gpu and torch.cuda.is_available():
|
| return "cuda"
|
| return "cpu"
|
|
|
|
|
| class QSPreprocessor(BaseEstimator, TransformerMixin):
|
| def __init__(self, prefer_gpu: bool = True):
|
|
|
| self.sklearn_version = sklearn.__version__
|
| self.pipeline: Optional[ColumnTransformer] = None
|
| self.prefer_gpu = prefer_gpu
|
|
|
| self.cols_to_drop = [
|
| "unnamed_25",
|
| "ref",
|
| "project_code",
|
| "client",
|
| "contractor",
|
| "source",
|
| ]
|
|
|
|
|
| self.categorical_cols = [
|
| "ge",
|
| "group_element",
|
| "e",
|
| "element",
|
| "uom",
|
| "project",
|
| "typology",
|
| "location",
|
| "base_date",
|
| "package",
|
| "rate_scope",
|
| "client_location",
|
| "basket_of_goods",
|
| ]
|
|
|
| self.text_cols = ["description", "item", "notes"]
|
| self.numerical_cols = ["qty", "rate_base_date", "rate_q1_2025"]
|
|
|
|
|
|
|
|
|
| def _drop_columns(self, df: pd.DataFrame) -> pd.DataFrame:
|
| return df.drop(columns=[c for c in self.cols_to_drop if c in df.columns])
|
|
|
|
|
|
|
|
|
| def _standardize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
|
| df = df.copy()
|
|
|
|
|
| for col in self.categorical_cols:
|
| if col in df.columns:
|
| df[col] = df[col].astype("category")
|
|
|
|
|
| for col in self.text_cols:
|
| if col in df.columns:
|
| df[col] = df[col].astype("string")
|
|
|
|
|
| for col in self.numerical_cols:
|
| if col in df.columns:
|
| df[col] = pd.to_numeric(df[col], errors="coerce").astype("float32")
|
|
|
| return df
|
|
|
|
|
|
|
|
|
| def fit(self, df, y=None):
|
|
|
| self.device = select_st_device(prefer_gpu=self.prefer_gpu)
|
|
|
|
|
|
|
|
|
|
|
|
|
| self.desc_embed = CachedTextEmbeddingTransformer(
|
| model_name="all-MiniLM-L6-v2", device=self.device
|
| )
|
|
|
|
|
| df = self._drop_columns(df)
|
|
|
|
|
| df = self._standardize_dtypes(df)
|
|
|
|
|
|
|
|
|
| numerical_cols = ["qty", "rate_base_date", "rate_q1_2025"]
|
|
|
| numerical_block = ColumnTransformer(
|
| transformers=[
|
| (
|
| "qty_impute",
|
| SimpleImputer(strategy="median"),
|
| ["qty"],
|
| ),
|
| (
|
| "other_nums",
|
| "passthrough",
|
| ["rate_base_date", "rate_q1_2025"],
|
| ),
|
| ],
|
| remainder="drop",
|
| verbose_feature_names_out=False,
|
| )
|
|
|
| numerical_pipeline = Pipeline(
|
| [
|
| ("numerical_block", numerical_block),
|
| ("scaler", StandardScaler()),
|
| ]
|
| )
|
|
|
|
|
|
|
|
|
|
|
|
|
| categorical_cols = [
|
| "ge",
|
| "group_element",
|
| "e",
|
| "element",
|
| "project",
|
| "typology",
|
| "location",
|
| "base_date",
|
| "package",
|
| "rate_scope",
|
| "client_location",
|
| ]
|
| categorical_pipeline = Pipeline(
|
| [
|
|
|
| ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
|
| ("ohe", OneHotEncoder(handle_unknown="ignore")),
|
| ]
|
| )
|
|
|
|
|
|
|
|
|
|
|
| item_specs_pipeline = Pipeline(
|
| [
|
| ("extractor", ItemSpecExtractor()),
|
| ("imputer", SimpleImputer(strategy="constant", fill_value=0.0)),
|
| ]
|
| )
|
|
|
|
|
|
|
|
|
| self.pipeline = ColumnTransformer(
|
|
|
| transformers=[
|
| ("desc_embed", self.desc_embed, "description"),
|
| ("item_specs", item_specs_pipeline, "item"),
|
| ("materials", MaterialFlagExtractor(), "description"),
|
| ("uom_family", UoMFamilyTransformer(), "uom"),
|
| ("categorical", categorical_pipeline, categorical_cols),
|
| ("numerical", numerical_pipeline, numerical_cols),
|
| ("basket_flag", BooleanFlagTransformer(), "basket_of_goods"),
|
| ("notes_flag", NotesPresenceTransformer(), "notes"),
|
| ("qty_missing", QtyMissingFlag(), "qty"),
|
| ],
|
| remainder="drop",
|
| verbose_feature_names_out=False,
|
| )
|
|
|
| self.pipeline.fit(df)
|
| return self
|
|
|
|
|
|
|
|
|
| def transform(self, df):
|
| df = self._drop_columns(df)
|
| df = self._standardize_dtypes(df)
|
|
|
| if self.pipeline is None:
|
| raise ValueError("The preprocessor has not been fitted yet.")
|
| return self.pipeline.transform(df)
|
|
|
|
|
|
|
|
|
| def get_feature_names_out(self, input_features=None):
|
| if self.pipeline is None:
|
| raise ValueError("The preprocessor has not been fitted yet.")
|
| return self.pipeline.get_feature_names_out(input_features=input_features)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def build_nrm_hierarchy_map(df):
|
| """
|
| Build a full hierarchy mapping from the dataset using clear, non-NRM names.
|
|
|
| Dataset columns:
|
| - ge → Level 1
|
| - group_element → Level 2
|
| - e → Level 3
|
| - element → Level 4
|
| - category → Leaf class (target)
|
| """
|
|
|
| required_cols = ["ge", "group_element", "e", "element", "category"]
|
| for col in required_cols:
|
| if col not in df.columns:
|
| raise ValueError(f"Missing required column: {col}")
|
|
|
| hierarchy_map = {}
|
|
|
|
|
| unique_rows = df[
|
| ["category", "ge", "group_element", "e", "element"]
|
| ].drop_duplicates()
|
|
|
| for _, row in unique_rows.iterrows():
|
| leaf = str(row["category"]).strip()
|
|
|
| level1 = str(row["ge"]).strip()
|
| level2 = str(row["group_element"]).strip()
|
| level3 = str(row["e"]).strip()
|
| level4 = str(row["element"]).strip()
|
|
|
|
|
| hierarchy_map[leaf] = (level1, level2, level3, level4)
|
|
|
|
|
| hierarchy_map["Others"] = ("Others", "Others", "Others", "Others")
|
|
|
| return hierarchy_map
|
|
|