boq-api / utils /preprocessing.py
gabcares's picture
Upload 80 files
72fdabd verified
Raw
History Blame Contribute Delete
18 kB
import re
import torch
import sklearn
import numpy as np
import pandas as pd
from typing import Optional
from core.config import settings
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sentence_transformers import SentenceTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import OneHotEncoder, StandardScaler
def to_snake_case(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy(deep=True)
df.columns = (
df.columns.str.replace(
r"(?<=[a-z0-9])(?=[A-Z])", "_", regex=True
) # Split CamelCase/PascalCase
.str.lower() # Convert to lowercase
.str.replace(r"[\s/()-]+", "_", regex=True) # Replace separators with '_'
.str.replace(r"[^0-9a-z_]", "", regex=True) # Remove invalid characters
.str.strip("_") # Trim leading/trailing '_'
)
return df
# =========================================================
# 1. TEXT CLEANING
# =========================================================
def clean_description(text):
if pd.isna(text):
return ""
text = text.lower()
boilerplate = [
"supply",
"install",
"installation",
"complete",
"including",
"accordance",
"as per",
"per drawing",
"testing",
"commissioning",
"works",
"work",
"etc",
"shown",
"detailed",
]
for b in boilerplate:
text = text.replace(b, " ")
text = re.sub(r"[^a-zA-Z\s]", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text
# =========================================================
# 2. ITEM SPEC EXTRACTION
# =========================================================
def extract_specs(item):
if pd.isna(item):
return {}
text = item.lower()
specs = {}
m = re.search(r"(\d+)\s*mm", text)
specs["diameter_mm"] = float(m.group(1)) if m else np.nan
m = re.search(r"(\d+)\s*(mm)?\s*thick", text)
specs["thickness_mm"] = float(m.group(1)) if m else np.nan
m = re.search(r"(\d+)\s*x\s*(\d+)", text)
specs["size_1_mm"] = float(m.group(1)) if m else np.nan
specs["size_2_mm"] = float(m.group(2)) if m else np.nan
m = re.search(r"(\d+)\s*kw", text)
specs["capacity_kw"] = float(m.group(1)) if m else np.nan
m = re.search(r"(\d+)\s*mm2", text)
specs["cable_size_mm2"] = float(m.group(1)) if m else np.nan
return specs
# =========================================================
# 3. MATERIAL FLAGS
# =========================================================
def extract_material_flags(text):
if pd.isna(text):
text = ""
text = text.lower()
return {
"is_steel": int("steel" in text),
"is_concrete": int("concrete" in text),
"is_copper": int("copper" in text),
"is_pvc": int("pvc" in text),
"is_hdpe": int("hdpe" in text),
"is_ppr": int("ppr" in text),
}
# =========================================================
# 4. UOM NORMALIZATION
# =========================================================
def normalize_uom(u):
if pd.isna(u):
return "unknown"
u = str(u).strip().lower()
if u in ["m", "lm"]:
return "linear"
if u in ["m2", "m²", "ha"]:
return "area"
if u in ["m3", "m³", "m3/h", "m3/"]:
return "volume"
if u in ["kg", "t", "ton"]:
return "weight"
if u in ["nr", "no", "item", "unit", "points", "sys", "set", "sets", "lot", "pc"]:
return "count"
if u in ["h", "hr", "hour", "hours", "week", "months"]:
return "time"
if u in ["lump sum", "sum"]:
return "lump_sum"
return "other"
# =========================================================
# 6. EMBEDDING CACHE
# =========================================================
_EMBEDDER_CACHE: dict[str, SentenceTransformer] = {}
def get_cached_embedder(model_name: str) -> SentenceTransformer:
if model_name not in _EMBEDDER_CACHE:
# _EMBEDDER_CACHE[model_name] = SentenceTransformer(model_name)
_EMBEDDER_CACHE[model_name] = SentenceTransformer(
str(settings.MODEL_DIR / "embedding_model/SentenceTransformer/all-MiniLM-L6-v2")
) # Loaded from local
return _EMBEDDER_CACHE[model_name]
# =========================================================
# 5. CUSTOM TRANSFORMERS
# =========================================================
class TextEmbeddingTransformer(BaseEstimator, TransformerMixin):
def __init__(self, model):
self.model_name = "all-MiniLM-L6-v2"
self.model = model # Fast loading since model is already in memory
def fit(self, X, y=None):
return self
def transform(self, X):
cleaned = X.apply(clean_description).tolist()
return self.model.encode(cleaned, show_progress_bar=False)
def get_feature_names_out(self, input_features=None):
return np.array([f"desc_embed_{i}" for i in range(384)])
class ItemSpecExtractor(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
example = extract_specs(X.iloc[0])
self.feature_names_ = list(example.keys())
return self
def transform(self, X):
rows = [extract_specs(item) for item in X]
return pd.DataFrame(rows).values
def get_feature_names_out(self, input_features=None):
return np.array(self.feature_names_)
class MaterialFlagExtractor(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
example = extract_material_flags(X.iloc[0])
self.feature_names_ = list(example.keys())
return self
def transform(self, X):
rows = [extract_material_flags(text) for text in X]
return pd.DataFrame(rows).values
def get_feature_names_out(self, input_features=None):
return np.array(self.feature_names_)
class UoMFamilyTransformer(BaseEstimator, TransformerMixin):
def __init__(self):
self.encoder = OneHotEncoder(handle_unknown="ignore")
def fit(self, X, y=None):
normalized = X.apply(normalize_uom).to_frame()
self.encoder.fit(normalized)
return self
def transform(self, X):
normalized = X.apply(normalize_uom).to_frame()
return self.encoder.transform(normalized)
def get_feature_names_out(self, input_features=None):
return self.encoder.get_feature_names_out(["uom"])
class BooleanFlagTransformer(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X):
return X.notna().astype(int).values.reshape(-1, 1)
def get_feature_names_out(self, input_features=None):
return np.array(["basket_flag"])
class NotesPresenceTransformer(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X):
return X.notna().astype(int).values.reshape(-1, 1)
def get_feature_names_out(self, input_features=None):
return np.array(["notes_flag"])
class QtyMissingFlag(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X):
return X.isna().astype(int).values.reshape(-1, 1)
def get_feature_names_out(self, input_features=None):
return np.array(["qty_missing"])
class CachedTextEmbeddingTransformer(BaseEstimator, TransformerMixin):
"""
Clone-safe, cached SentenceTransformer embedding block.
Loads MiniLM once and reuses it across all clones and transforms.
Uses the global get_cached_embedder() so MiniLM loads once per process.
"""
def __init__(self, model_name="all-MiniLM-L6-v2", device="cpu"):
self.model_name = model_name
self.device = (
device # kept for API symmetry; device is handled by the cached model
)
self._model = None # cache survives cloning
def _load(self):
# Use global cache shared with HybridNRMSelectiveClassifier
return get_cached_embedder(self.model_name)
def fit(self, X, y=None):
# Ensure model is loaded once
self._load()
return self
def transform(self, X):
model = self._load()
cleaned = X.apply(clean_description).tolist()
return model.encode(cleaned, show_progress_bar=False)
# Return feature names like desc_emb_0, desc_emb_1, ..., desc_emb_383 for 384-dimensional embeddings
def get_feature_names_out(self, input_features=None):
return np.array([f"desc_embed_{i}" for i in range(384)])
# =========================================================
# 6. FINAL PREPROCESSOR
# =========================================================
def select_st_device(prefer_gpu: bool = True) -> str:
if prefer_gpu and torch.cuda.is_available():
return "cuda"
return "cpu"
class QSPreprocessor(BaseEstimator, TransformerMixin):
def __init__(self, prefer_gpu: bool = True):
# Sklearn initialization- parameters should be set here, but heavy resources (like models) should be loaded in fit() to avoid issues with cloning and memory bloat.
self.sklearn_version = sklearn.__version__
self.pipeline: Optional[ColumnTransformer] = None
self.prefer_gpu = prefer_gpu
self.cols_to_drop = [
"unnamed_25",
"ref",
"project_code",
"client",
"contractor",
"source",
]
# Datatype standardization rules
self.categorical_cols = [
"ge",
"group_element",
"e",
"element",
"uom",
"project",
"typology",
"location",
"base_date",
"package",
"rate_scope",
"client_location",
"basket_of_goods",
]
self.text_cols = ["description", "item", "notes"]
self.numerical_cols = ["qty", "rate_base_date", "rate_q1_2025"]
# ---------------------------------------------------------
# STEP 0 — DROP COLUMNS
# ---------------------------------------------------------
def _drop_columns(self, df: pd.DataFrame) -> pd.DataFrame:
return df.drop(columns=[c for c in self.cols_to_drop if c in df.columns])
# ---------------------------------------------------------
# STEP 1 — DATATYPE STANDARDIZATION
# ---------------------------------------------------------
def _standardize_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
# categorical
for col in self.categorical_cols:
if col in df.columns:
df[col] = df[col].astype("category")
# text
for col in self.text_cols:
if col in df.columns:
df[col] = df[col].astype("string")
# numeric
for col in self.numerical_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").astype("float32")
return df
# ---------------------------------------------------------
# FIT
# ---------------------------------------------------------
def fit(self, df, y=None):
# GPU/CPU fallback
self.device = select_st_device(prefer_gpu=self.prefer_gpu)
# -----------------------------
# TEXT EMBEDDING
# -----------------------------
# ⭐ Clone-safe cached embedding transformer
# Load SentenceTransformer model once during initialization to avoid repeated loading during fit/transform
self.desc_embed = CachedTextEmbeddingTransformer(
model_name="all-MiniLM-L6-v2", device=self.device
)
# Step 0: drop garbage columns
df = self._drop_columns(df)
# Step 1: enforce datatype standardization
df = self._standardize_dtypes(df)
# -----------------------------
# NUMERICAL PIPELINE
# -----------------------------
numerical_cols = ["qty", "rate_base_date", "rate_q1_2025"]
numerical_block = ColumnTransformer(
transformers=[
(
"qty_impute",
SimpleImputer(strategy="median"),
["qty"],
), # Impute missing qty with median before scaling. Only 0.2% missing, so median is a robust choice that won't skew the data.
(
"other_nums",
"passthrough",
["rate_base_date", "rate_q1_2025"],
), # No missing values in rate columns, so just passthrough before scaling.
],
remainder="drop",
verbose_feature_names_out=False,
)
numerical_pipeline = Pipeline(
[
("numerical_block", numerical_block),
("scaler", StandardScaler()),
]
)
# -----------------------------
# CATEGORICAL PIPELINE
# -----------------------------
# Define column groups AFTER cleaning
categorical_cols = [
"ge",
"group_element",
"e",
"element",
"project",
"typology",
"location",
"base_date",
"package",
"rate_scope",
"client_location",
]
categorical_pipeline = Pipeline(
[
# Impute missing categorical values with 'missing' placeholder
("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
("ohe", OneHotEncoder(handle_unknown="ignore")),
]
)
# --------------------------
# ITEM SPECS PIPELINE
# --------------------------
# Handle missing
item_specs_pipeline = Pipeline(
[
("extractor", ItemSpecExtractor()),
("imputer", SimpleImputer(strategy="constant", fill_value=0.0)),
]
)
# -----------------------------
# MAIN COLUMN TRANSFORMER
# -----------------------------
self.pipeline = ColumnTransformer(
# Parallel transformations not serial, so order doesn't matter. Just need to ensure all steps are included and correctly reference the columns.
transformers=[
("desc_embed", self.desc_embed, "description"),
("item_specs", item_specs_pipeline, "item"),
("materials", MaterialFlagExtractor(), "description"),
("uom_family", UoMFamilyTransformer(), "uom"),
("categorical", categorical_pipeline, categorical_cols),
("numerical", numerical_pipeline, numerical_cols),
("basket_flag", BooleanFlagTransformer(), "basket_of_goods"),
("notes_flag", NotesPresenceTransformer(), "notes"),
("qty_missing", QtyMissingFlag(), "qty"),
],
remainder="drop",
verbose_feature_names_out=False,
)
self.pipeline.fit(df)
return self
# ---------------------------------------------------------
# TRANSFORM
# ---------------------------------------------------------
def transform(self, df):
df = self._drop_columns(df)
df = self._standardize_dtypes(df)
if self.pipeline is None:
raise ValueError("The preprocessor has not been fitted yet.")
return self.pipeline.transform(df)
# ---------------------------------------------------------
# GET FEATURE NAMES OUT
# ---------------------------------------------------------
def get_feature_names_out(self, input_features=None):
if self.pipeline is None:
raise ValueError("The preprocessor has not been fitted yet.")
return self.pipeline.get_feature_names_out(input_features=input_features)
# =========================================================
# HIERARCHY MAPPING OF NRM CATEGORIES
# =========================================================
def build_nrm_hierarchy_map(df):
"""
Build a full hierarchy mapping from the dataset using clear, non-NRM names.
Dataset columns:
- ge → Level 1
- group_element → Level 2
- e → Level 3
- element → Level 4
- category → Leaf class (target)
"""
required_cols = ["ge", "group_element", "e", "element", "category"]
for col in required_cols:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
hierarchy_map = {}
# Drop duplicates to avoid redundant mappings
unique_rows = df[
["category", "ge", "group_element", "e", "element"]
].drop_duplicates()
for _, row in unique_rows.iterrows():
leaf = str(row["category"]).strip()
level1 = str(row["ge"]).strip()
level2 = str(row["group_element"]).strip()
level3 = str(row["e"]).strip()
level4 = str(row["element"]).strip()
# Build mapping: category → (Level1, Level2, Level3, Level4)
hierarchy_map[leaf] = (level1, level2, level3, level4)
# Ensure Others exists
hierarchy_map["Others"] = ("Others", "Others", "Others", "Others")
return hierarchy_map