Spaces:
Running
Running
| # predict_blend_thinning.py | |
| import os, json, numpy as np, pandas as pd, torch, lightgbm as lgb | |
| import torch.nn as nn | |
| # ========================= | |
| # Config (๊ธฐ๋ณธ๊ฐ โ columns_thinning.json์ด ์์ผ๋ฉด ์๋ ๋์ฒด) | |
| # ========================= | |
| ART_DIR = r"C:\_vscode\CATIA_Project\artifacts_blend_thinning" | |
| CAT_COL = "material" | |
| NUM_COLS = ["thickness","diameter","degree","upper_radius","lower_radius","LB","RB"] | |
| # ========================= | |
| # FT-Transformer | |
| # ========================= | |
| class FTTransformer(nn.Module): | |
| def __init__(self, n_materials:int, n_num:int, d_model:int=192, nhead:int=8, | |
| num_layers:int=4, dim_ff:int=768, dropout:float=0.15): | |
| super().__init__() | |
| self.mat_emb = nn.Embedding(n_materials, d_model) | |
| self.num_linears = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num)]) | |
| self.cls = nn.Parameter(torch.zeros(1, 1, d_model)) | |
| nn.init.trunc_normal_(self.cls, std=0.02) | |
| enc_layer = nn.TransformerEncoderLayer( | |
| d_model=d_model, nhead=nhead, dim_feedforward=dim_ff, | |
| dropout=dropout, batch_first=True, activation='gelu', norm_first=True | |
| ) | |
| self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers) | |
| self.head = nn.Sequential( | |
| nn.LayerNorm(d_model), | |
| nn.Linear(d_model, d_model), | |
| nn.GELU(), | |
| nn.Dropout(dropout), | |
| nn.Linear(d_model, 1) | |
| ) | |
| def forward(self, mat_ids, x_num): | |
| B = x_num.size(0) | |
| mat_tok = self.mat_emb(mat_ids).unsqueeze(1) | |
| num_tok = torch.cat( | |
| [lin(x_num[:, i:i+1]).unsqueeze(1) for i, lin in enumerate(self.num_linears)], | |
| dim=1 | |
| ) | |
| tokens = torch.cat([self.cls.expand(B, -1, -1), mat_tok, num_tok], dim=1) | |
| h = self.encoder(tokens) | |
| return self.head(h[:, 0, :]) | |
| def _scale_like_fold(X_num: np.ndarray, mean: np.ndarray, scale: np.ndarray) -> np.ndarray: | |
| return ((X_num - mean) / scale).astype(np.float32) | |
| # ========================= | |
| # Material label helpers | |
| # ========================= | |
| def _canonize_list(materials): | |
| return [str(m).strip() for m in materials] | |
| def _build_alias2canon(canon_list): | |
| alias2canon = {} | |
| for c in canon_list: | |
| alias2canon[c] = c | |
| s = c.strip() | |
| alias2canon[s] = c | |
| if "." in s: | |
| alias2canon[s.rstrip("0").rstrip(".")] = c | |
| try: | |
| v = float(s) | |
| alias2canon[str(v)] = c | |
| if v.is_integer(): | |
| alias2canon[str(int(v))] = c | |
| except: | |
| pass | |
| return alias2canon | |
| # ========================= | |
| # Loader helpers | |
| # ========================= | |
| def _first_existing(*paths): | |
| for p in paths: | |
| if os.path.exists(p): | |
| return p | |
| return None | |
| def _load_columns_meta(art_dir: str): | |
| """columns_thinning.json ๋๋ columns.json์ด ์์ผ๋ฉด ๊ฑฐ๊ธฐ ์ ์๋ฅผ ์ฌ์ฉ.""" | |
| meta = None | |
| p = _first_existing(os.path.join(art_dir, "columns_thinning.json"), | |
| os.path.join(art_dir, "columns.json")) | |
| if p: | |
| with open(p, "r", encoding="utf-8") as f: | |
| meta = json.load(f) | |
| return meta | |
| def _load_ft_folds(art_dir: str): | |
| folds = [] | |
| for fold in range(1, 11): | |
| p = os.path.join(art_dir, f"ftt_thinning_fold{fold}.pt") | |
| if not os.path.exists(p): | |
| if folds: break | |
| continue | |
| ckpt = torch.load(p, map_location="cpu", weights_only=False) | |
| materials = ckpt["materials"] | |
| num_cols = ckpt["num_cols"] | |
| model = FTTransformer(len(materials), len(num_cols)) | |
| model.load_state_dict(ckpt["state_dict"]) | |
| model.eval() | |
| folds.append({ | |
| "model": model, | |
| "materials": materials, | |
| "num_cols": num_cols, | |
| "scaler_mean": np.array(ckpt["scaler_mean"], dtype=np.float32), | |
| "scaler_scale": np.array(ckpt["scaler_scale"], dtype=np.float32), | |
| }) | |
| if not folds: | |
| raise FileNotFoundError("No FT thinning checkpoints found in artifacts folder.") | |
| return folds | |
| def _load_lgbm_folds(art_dir: str): | |
| boosters = [] | |
| for fold in range(1, 11): | |
| p1 = os.path.join(art_dir, f"lgbm_thinning_fold{fold}.txt") | |
| p2 = os.path.join(art_dir, f"lgbm_thinning_fold{fold}") | |
| p = _first_existing(p1, p2) | |
| if p is None: | |
| if boosters: break | |
| continue | |
| boosters.append(lgb.Booster(model_file=p)) | |
| if not boosters: | |
| raise FileNotFoundError("No LightGBM thinning model files found in artifacts folder.") | |
| return boosters | |
| def _load_json_like(art_dir: str, basename: str) -> dict: | |
| p1 = os.path.join(art_dir, f"{basename}.json") | |
| p2 = os.path.join(art_dir, basename) | |
| p = _first_existing(p1, p2) | |
| if p is None: | |
| raise FileNotFoundError(f"Missing {basename}(.json) in {art_dir}") | |
| with open(p, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def _load_materials(art_dir: str, folds_ft): | |
| try: | |
| return _load_json_like(art_dir, "materials")["materials"] | |
| except FileNotFoundError: | |
| return folds_ft[0]["materials"] | |
| def _load_best_alpha(art_dir: str) -> float: | |
| return float(_load_json_like(art_dir, "blend_alpha_thinning")["best_alpha"]) | |
| # ========================= | |
| # Predictor | |
| # ========================= | |
| class BlendPredictor: | |
| def __init__(self, art_dir: str = ART_DIR, unknown_policy: str = "error"): | |
| self.art_dir = art_dir | |
| self.folds_ft = _load_ft_folds(art_dir) | |
| self.boosters = _load_lgbm_folds(art_dir) | |
| self.materials = _load_materials(art_dir, self.folds_ft) | |
| self.best_alpha = _load_best_alpha(art_dir) | |
| # ์ปฌ๋ผ ๋ฉํ (์์ผ๋ฉด ์ฌ์ฉ) | |
| meta = _load_columns_meta(art_dir) | |
| if meta: | |
| self.cat_col = meta.get("cat_col", CAT_COL) | |
| self.num_cols = meta.get("num_cols", NUM_COLS) | |
| self.target = meta.get("target", "thinning") | |
| else: | |
| self.cat_col = CAT_COL | |
| self.num_cols = NUM_COLS | |
| self.target = "thinning" | |
| self.materials_canon = _canonize_list(self.materials) | |
| self.alias2canon = _build_alias2canon(self.materials_canon) | |
| self.mat2id = {m: i for i, m in enumerate(self.materials_canon)} | |
| self.unknown_policy = unknown_policy | |
| def _prep_df(self, df_new: pd.DataFrame) -> pd.DataFrame: | |
| df = df_new.copy() | |
| need = [self.cat_col] + self.num_cols | |
| missing = [c for c in need if c not in df.columns] | |
| if missing: | |
| raise ValueError(f"Missing columns in input: {missing}") | |
| df[self.cat_col] = df[self.cat_col].astype(str).str.strip() | |
| df["_mat_canon"] = df[self.cat_col].map(self.alias2canon) | |
| if self.unknown_policy == "error": | |
| unknown = df.loc[df["_mat_canon"].isna(), self.cat_col].unique().tolist() | |
| if unknown: | |
| raise ValueError( | |
| f"Unknown materials in input {unknown}. " | |
| f"Known materials: {self.materials_canon[:10]}{' ...' if len(self.materials_canon)>10 else ''}" | |
| ) | |
| df["_mat_id"] = df["_mat_canon"].map(self.mat2id).astype(int) | |
| else: | |
| df["_mat_canon"] = df["_mat_canon"].fillna(self.materials_canon[0]) | |
| df["_mat_id"] = df["_mat_canon"].map(self.mat2id).astype(int) | |
| df[self.num_cols] = df[self.num_cols].apply(pd.to_numeric, errors="coerce") | |
| if df[self.num_cols].isnull().any().any(): | |
| bad = df[self.num_cols].columns[df[self.num_cols].isnull().any()].tolist() | |
| raise ValueError(f"Non-numeric values detected in columns: {bad}") | |
| return df | |
| def predict_ft(self, df_new: pd.DataFrame) -> np.ndarray: | |
| df = self._prep_df(df_new) | |
| mids = torch.tensor(df["_mat_id"].values, dtype=torch.long) | |
| preds = [] | |
| for f in self.folds_ft: | |
| # ๊ฐ fold๊ฐ ์ ์ฅํ num_cols ์์๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉ | |
| fold_num_cols = f["num_cols"] | |
| Xn = df[fold_num_cols].values.astype(np.float32) | |
| x_scaled = _scale_like_fold(Xn, f["scaler_mean"], f["scaler_scale"]) | |
| x_t = torch.tensor(x_scaled, dtype=torch.float32) | |
| with torch.no_grad(): | |
| p = f["model"](mids, x_t).cpu().numpy().ravel() | |
| preds.append(p) | |
| return np.mean(preds, axis=0) | |
| def predict_lgbm(self, df_new: pd.DataFrame) -> np.ndarray: | |
| df = self._prep_df(df_new) | |
| X = df[[self.cat_col] + self.num_cols].copy() | |
| X[self.cat_col] = pd.Categorical(df["_mat_canon"], categories=self.materials_canon) | |
| preds = [bst.predict(X, num_iteration=getattr(bst, "best_iteration", None)) | |
| for bst in self.boosters] | |
| return np.mean(preds, axis=0) | |
| def predict_blend(self, df_new: pd.DataFrame, alpha: float = None) -> np.ndarray: | |
| if alpha is None: | |
| alpha = self.best_alpha | |
| p_dl = self.predict_ft(df_new) | |
| p_lgb = self.predict_lgbm(df_new) | |
| return alpha * p_dl + (1 - alpha) * p_lgb | |
| def debug_validate(df: pd.DataFrame, predictor: BlendPredictor): | |
| # 1) ํ์ ์ปฌ๋ผ ์ฒดํฌ | |
| need = [predictor.cat_col] + predictor.num_cols | |
| missing = [c for c in need if c not in df.columns] | |
| if missing: | |
| raise ValueError(f"์ ๋ ฅ DataFrame์ ๋๋ฝ ์ปฌ๋ผ: {missing}") | |
| # 2) ์ฌ์ง ๋งคํ ํ์ธ | |
| mats = df[predictor.cat_col].astype(str).str.strip() | |
| unknown = sorted(set(mats) - set(predictor.alias2canon.keys())) | |
| if unknown: | |
| print(f"[WARN] ํ์ต ๋ผ๋ฒจ์ ์๋ ์ฌ์ง ๋ณ์นญ ๋ฐ๊ฒฌ โ fallback0๊ฐ ์๋๋ฉด ์๋ฌ: {unknown}") | |
| # 3) ์ซ์ํ ํ์ธ | |
| bad_cols = [] | |
| for c in predictor.num_cols: | |
| if not np.issubdtype(df[c].dtype, np.number): | |
| bad_cols.append(c) | |
| if bad_cols: | |
| print(f"[WARN] ์ซ์ํ์ด ์๋ ์ปฌ๋ผ ๋ฐ๊ฒฌ โ ์๋ ๋ณํ ์ํ ์์ : {bad_cols}") | |
| if __name__ == "__main__": | |
| # ์ฌ์ฉ ์: | |
| # filtered = pd.read_excel("your_inputs.xlsx") | |
| predictor = BlendPredictor(ART_DIR, unknown_policy="fallback0") # ํ์์ "error"๋ก ๋ณ๊ฒฝ | |
| print("materials (trained):", predictor.materials_canon[:10]) | |
| print("best_alpha (thinning):", predictor.best_alpha) | |
| # ์ ๋ ฅ ๊ฒ์ฆ | |
| # debug_validate(filtered, predictor) | |
| # ์์ธก ์ํ | |
| # Blend_y_pred = predictor.predict_blend(filtered) # ์ต์ ฮฑ ๋ธ๋ ๋ | |
| # LGBM_pred = predictor.predict_blend(filtered, 0.0) # LGBM only | |
| # DL_pred = predictor.predict_blend(filtered, 1.0) # DL only | |
| # filtered = filtered.copy() | |
| # filtered["Blend_thinning_pred"] = Blend_y_pred | |
| # filtered["LGBM_thinning_pred"] = LGBM_pred | |
| # filtered["DL_thinning_pred"] = DL_pred | |
| # ์ ์ฅ ์: | |
| # filtered.to_excel("predicted_thinning.xlsx", index=False) | |
| # print("saved: predicted_thinning.xlsx") | |