Spaces:
Running
Running
| # predict_blend.py | |
| import os, json, numpy as np, pandas as pd, torch, lightgbm as lgb | |
| import torch.nn as nn | |
| # ========================= | |
| # Config | |
| # ========================= | |
| from pathlib import Path | |
| BASE_DIR = Path(__file__).resolve().parent | |
| ART_DIR = str((BASE_DIR / "artifacts_blend").resolve()) | |
| CAT_COL = "material" | |
| NUM_COLS = ["thickness","diameter","degree","upper_radius","lower_radius","LB","RB"] | |
| # ========================= | |
| # FT-Transformer | |
| # ========================= | |
| class FTTransformer(nn.Module): | |
| def __init__(self, n_materials:int, n_num:int, d_model:int=192, nhead:int=8, | |
| num_layers:int=4, dim_ff:int=768, dropout:float=0.15): | |
| super().__init__() | |
| self.mat_emb = nn.Embedding(n_materials, d_model) | |
| self.num_linears = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num)]) | |
| self.cls = nn.Parameter(torch.zeros(1, 1, d_model)) | |
| nn.init.trunc_normal_(self.cls, std=0.02) | |
| enc_layer = nn.TransformerEncoderLayer( | |
| d_model=d_model, nhead=nhead, dim_feedforward=dim_ff, | |
| dropout=dropout, batch_first=True, activation='gelu', norm_first=True | |
| ) | |
| self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers) | |
| self.head = nn.Sequential( | |
| nn.LayerNorm(d_model), | |
| nn.Linear(d_model, d_model), | |
| nn.GELU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(d_model, 1) | |
| ) | |
| def forward(self, mat_ids, x_num): | |
| B = x_num.size(0) | |
| mat_tok = self.mat_emb(mat_ids).unsqueeze(1) | |
| num_tok = torch.cat( | |
| [lin(x_num[:, i:i+1]).unsqueeze(1) for i, lin in enumerate(self.num_linears)], | |
| dim=1 | |
| ) | |
| tokens = torch.cat([self.cls.expand(B, -1, -1), mat_tok, num_tok], dim=1) | |
| h = self.encoder(tokens) | |
| return self.head(h[:, 0, :]) | |
| def _scale_like_fold(X_num: np.ndarray, mean: np.ndarray, scale: np.ndarray) -> np.ndarray: | |
| return ((X_num - mean) / scale).astype(np.float32) | |
| # ========================= | |
| # Material label helpers | |
| # ========================= | |
| def _canonize_list(materials): | |
| return [str(m).strip() for m in materials] | |
| def _build_alias2canon(canon_list): | |
| alias2canon = {} | |
| for c in canon_list: | |
| alias2canon[c] = c | |
| s = c.strip() | |
| alias2canon[s] = c | |
| if "." in s: | |
| alias2canon[s.rstrip("0").rstrip(".")] = c | |
| try: | |
| v = float(s) | |
| alias2canon[str(v)] = c | |
| if v.is_integer(): | |
| alias2canon[str(int(v))] = c | |
| except: | |
| pass | |
| return alias2canon | |
| # ========================= | |
| # Loader helpers | |
| # ========================= | |
| def _first_existing(*paths): | |
| for p in paths: | |
| if os.path.exists(p): | |
| return p | |
| return None | |
| def _load_ft_folds(art_dir: str): | |
| folds = [] | |
| for fold in range(1, 11): | |
| p = os.path.join(art_dir, f"ftt_fold{fold}.pt") | |
| if not os.path.exists(p): | |
| if folds: break | |
| continue | |
| ckpt = torch.load(p, map_location="cpu", weights_only=False) | |
| materials = ckpt["materials"] | |
| num_cols = ckpt["num_cols"] | |
| model = FTTransformer(len(materials), len(num_cols)) | |
| model.load_state_dict(ckpt["state_dict"]) | |
| model.eval() | |
| folds.append({ | |
| "model": model, | |
| "materials": materials, | |
| "num_cols": num_cols, | |
| "scaler_mean": np.array(ckpt["scaler_mean"], dtype=np.float32), | |
| "scaler_scale": np.array(ckpt["scaler_scale"], dtype=np.float32), | |
| }) | |
| if not folds: | |
| raise FileNotFoundError("No FT checkpoints found in artifacts folder.") | |
| return folds | |
| def _load_lgbm_folds(art_dir: str): | |
| boosters = [] | |
| for fold in range(1, 11): | |
| p1 = os.path.join(art_dir, f"lgbm_fold{fold}.txt") | |
| p2 = os.path.join(art_dir, f"lgbm_fold{fold}") | |
| p = _first_existing(p1, p2) | |
| if p is None: | |
| if boosters: break | |
| continue | |
| boosters.append(lgb.Booster(model_file=p)) | |
| if not boosters: | |
| raise FileNotFoundError("No LightGBM model files found in artifacts folder.") | |
| return boosters | |
| def _load_json_like(art_dir: str, basename: str) -> dict: | |
| p1 = os.path.join(art_dir, f"{basename}.json") | |
| p2 = os.path.join(art_dir, basename) | |
| p = _first_existing(p1, p2) | |
| if p is None: | |
| raise FileNotFoundError(f"Missing {basename}(.json) in {art_dir}") | |
| with open(p, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def _load_materials(art_dir: str, folds_ft): | |
| try: | |
| return _load_json_like(art_dir, "materials")["materials"] | |
| except FileNotFoundError: | |
| return folds_ft[0]["materials"] | |
| def _load_best_alpha(art_dir: str) -> float: | |
| return float(_load_json_like(art_dir, "blend_alpha")["best_alpha"]) | |
| # ========================= | |
| # Predictor | |
| # ========================= | |
| class BlendPredictor: | |
| def __init__(self, art_dir: str = ART_DIR, unknown_policy: str = "error"): | |
| self.art_dir = art_dir | |
| self.folds_ft = _load_ft_folds(art_dir) | |
| self.boosters = _load_lgbm_folds(art_dir) | |
| self.materials = _load_materials(art_dir, self.folds_ft) | |
| self.best_alpha = _load_best_alpha(art_dir) | |
| self.materials_canon = _canonize_list(self.materials) | |
| self.alias2canon = _build_alias2canon(self.materials_canon) | |
| self.mat2id = {m: i for i, m in enumerate(self.materials_canon)} | |
| self.unknown_policy = unknown_policy | |
| def _prep_df(self, df_new: pd.DataFrame) -> pd.DataFrame: | |
| df = df_new.copy() | |
| need = [CAT_COL] + NUM_COLS | |
| missing = [c for c in need if c not in df.columns] | |
| if missing: | |
| raise ValueError(f"Missing columns in input: {missing}") | |
| df[CAT_COL] = df[CAT_COL].astype(str).str.strip() | |
| df["_mat_canon"] = df[CAT_COL].map(self.alias2canon) | |
| if self.unknown_policy == "error": | |
| unknown = df.loc[df["_mat_canon"].isna(), CAT_COL].unique().tolist() | |
| if unknown: | |
| raise ValueError( | |
| f"Unknown materials in input {unknown}. " | |
| f"Known materials: {self.materials_canon[:10]}{' ...' if len(self.materials_canon)>10 else ''}" | |
| ) | |
| df["_mat_id"] = df["_mat_canon"].map(self.mat2id).astype(int) | |
| else: | |
| df["_mat_canon"] = df["_mat_canon"].fillna(self.materials_canon[0]) | |
| df["_mat_id"] = df["_mat_canon"].map(self.mat2id).astype(int) | |
| df[NUM_COLS] = df[NUM_COLS].apply(pd.to_numeric, errors="coerce") | |
| if df[NUM_COLS].isnull().any().any(): | |
| bad = df[NUM_COLS].columns[df[NUM_COLS].isnull().any()].tolist() | |
| raise ValueError(f"Non-numeric values detected in columns: {bad}") | |
| return df | |
| def predict_ft(self, df_new: pd.DataFrame) -> np.ndarray: | |
| df = self._prep_df(df_new) | |
| Xn = df[NUM_COLS].values.astype(np.float32) | |
| mids = torch.tensor(df["_mat_id"].values, dtype=torch.long) | |
| preds = [] | |
| for f in self.folds_ft: | |
| x_scaled = _scale_like_fold(Xn, f["scaler_mean"], f["scaler_scale"]) | |
| x_t = torch.tensor(x_scaled, dtype=torch.float32) | |
| with torch.no_grad(): | |
| p = f["model"](mids, x_t).cpu().numpy().ravel() | |
| preds.append(p) | |
| return np.mean(preds, axis=0) | |
| def predict_lgbm(self, df_new: pd.DataFrame) -> np.ndarray: | |
| df = self._prep_df(df_new) | |
| X = df[[CAT_COL] + NUM_COLS].copy() | |
| X[CAT_COL] = pd.Categorical(df["_mat_canon"], categories=self.materials_canon) | |
| preds = [bst.predict(X, num_iteration=getattr(bst, "best_iteration", None)) | |
| for bst in self.boosters] | |
| return np.mean(preds, axis=0) | |
| def predict_blend(self, df_new: pd.DataFrame, alpha: float = None) -> np.ndarray: | |
| if alpha is None: | |
| alpha = self.best_alpha | |
| p_dl = self.predict_ft(df_new) | |
| p_lgb = self.predict_lgbm(df_new) | |
| return alpha * p_dl + (1 - alpha) * p_lgb | |
| # ========================= | |
| # Example run | |
| # ========================= | |
| if __name__ == "__main__": | |
| base = { | |
| "thickness": 1, "diameter": 20, "degree": 73, | |
| "upper_radius": 3, "lower_radius": 2, | |
| "LB": 0, "RB": 1, | |
| } | |
| df_new = pd.DataFrame([ | |
| {**base, "material": "590"}, | |
| {**base, "material": "440"}, | |
| ]) | |
| predictor = BlendPredictor(ART_DIR, unknown_policy="error") | |
| print("materials (trained):", predictor.materials_canon[:10]) | |
| print("best_alpha:", predictor.best_alpha) | |
| print("\nDL only :", predictor.predict_blend(df_new, alpha=1.0)) | |
| print("LGBM only:", predictor.predict_blend(df_new, alpha=0.0)) | |
| print("Blend :", predictor.predict_blend(df_new)) | |