# train_blend_ftt_lgbm.py # FT-Transformer (weighted MAE + 5-Fold OOF) + LightGBM (5-Fold OOF) + OOF blending # pip install pandas numpy scikit-learn torch lightgbm openpyxl import os, math, json, random, pathlib import numpy as np import pandas as pd from typing import List, Tuple from sklearn.model_selection import KFold from sklearn.preprocessing import StandardScaler from sklearn.metrics import mean_absolute_error import lightgbm as lgb import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader # ========================= # Config # ========================= SEED = 42 DATA_PATH = r"C:\Users\KDT10\OneDrive\바탕 화면\AutoForm\데이터통합.xlsx" # .xlsx 또는 .csv TARGET = "max_failure" CAT_COL = "material" # 범주형 NUM_COLS = ["thickness","diameter","degree","upper_radius","lower_radius","LB","RB"] # 필요 시 물성/파생변수 추가 N_SPLITS = 5 # FT-Transformer 하이퍼파라미터 (튜닝안) D_MODEL = 256 NHEAD = 8 LAYERS = 6 DIM_FF = 1024 DROPOUT = 0.25 EPOCHS = 500 PATIENCE = 50 LR = 5e-4 WEIGHT_DECAY = 2e-4 BATCH_TRAIN = 256 BATCH_VAL = 512 # LightGBM 하이퍼파라미터 LGB_PARAMS = { "objective": "mae", "metric": "mae", "learning_rate": 0.05, "num_leaves": 31, "feature_fraction": 0.9, "bagging_fraction": 0.9, "bagging_freq": 1, "min_data_in_leaf": 20, "verbosity": -1, "seed": SEED, } NUM_BOOST_ROUND = 8000 EARLY_STOP = 400 ART_DIR = "artifacts_blend" os.makedirs(ART_DIR, exist_ok=True) # ========================= # Utils # ========================= def get_safe_device(): """CUDA가 실제 사용 가능한지 미리 검증하고, 실패 시 CPU로 폴백.""" if torch.cuda.is_available(): try: _ = torch.zeros(1, device="cuda") torch.cuda.synchronize() print("[INFO] Using CUDA") return torch.device("cuda") except Exception as e: print(f"[WARN] CUDA available but failed to initialize: {e}") print("[INFO] Using CPU") return torch.device("cpu") def set_seed(seed: int, device: torch.device): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if device.type == "cuda": try: torch.cuda.manual_seed_all(seed) except Exception as e: print(f"[WARN] torch.cuda.manual_seed_all failed: {e}") def read_table(path: str) -> pd.DataFrame: p = pathlib.Path(path) if p.suffix.lower() in (".xlsx", ".xls"): return pd.read_excel(p) # openpyxl 필요 return pd.read_csv(p) def ensure_categorical(df: pd.DataFrame, col: str) -> pd.DataFrame: df = df.copy() df[col] = df[col].astype(str).astype("category") return df def tukey_biweight_weights_by_group(df, target=TARGET, group=CAT_COL, c=4.685, eps=1e-9): """재질별 median/IQR 기준 Tukey biweight 가중치(0~1)""" df = df.copy() w = np.ones(len(df), dtype=np.float32) for g, idx in df.groupby(group).groups.items(): y = df.loc[idx, target].astype(float) med = np.median(y) q1, q3 = np.percentile(y, 25), np.percentile(y, 75) iqr = max(q3 - q1, eps) u = (y - med) / (c * iqr) w_g = np.where(np.abs(u) < 1, (1 - u**2)**2, 0.0) w[idx] = w_g.astype(np.float32) return np.clip(w, 0.05, 1.0).astype(np.float32) def search_best_alpha(oof_a: np.ndarray, oof_b: np.ndarray, y_true: np.ndarray): alphas = np.linspace(0.0, 1.0, 1001) # 0.0001 간격 정밀 탐색 best_a, best_mae = None, 1e9 for a in alphas: blend = a*oof_a + (1-a)*oof_b mae = mean_absolute_error(y_true, blend) if mae < best_mae: best_a, best_mae = a, mae return best_a, best_mae # ========================= # Dataset / Model # ========================= class TabDataset(Dataset): def __init__(self, mat_ids, num_feats, target=None, weights=None): self.mat_ids = torch.tensor(mat_ids, dtype=torch.long) self.num_feats = torch.tensor(num_feats, dtype=torch.float32) self.target = None if target is None else torch.tensor(target, dtype=torch.float32).view(-1,1) self.weights = None if weights is None else torch.tensor(weights, dtype=torch.float32).view(-1,1) def __len__(self): return len(self.mat_ids) def __getitem__(self, i): if self.target is None: return self.mat_ids[i], self.num_feats[i] if self.weights is None: return self.mat_ids[i], self.num_feats[i], self.target[i] return self.mat_ids[i], self.num_feats[i], self.target[i], self.weights[i] class FTTransformer(nn.Module): def __init__(self, n_materials:int, n_num:int, d_model:int=128, nhead:int=8, num_layers:int=4, dim_ff:int=256, dropout:float=0.2): super().__init__() self.mat_emb = nn.Embedding(n_materials, d_model) self.num_linears = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num)]) self.cls = nn.Parameter(torch.zeros(1, 1, d_model)) nn.init.trunc_normal_(self.cls, std=0.02) enc_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, dim_feedforward=dim_ff, dropout=dropout, batch_first=True, activation='gelu', norm_first=True ) self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers) self.head = nn.Sequential( nn.LayerNorm(d_model), nn.Linear(d_model, d_model), nn.GELU(), nn.Dropout(dropout), nn.Linear(d_model, 1) ) def forward(self, mat_ids: torch.LongTensor, x_num: torch.FloatTensor): B = x_num.size(0) mat_tok = self.mat_emb(mat_ids).unsqueeze(1) # (B,1,d) num_tok = torch.cat([lin(x_num[:, i:i+1]).unsqueeze(1) for i,lin in enumerate(self.num_linears)], dim=1) tokens = torch.cat([self.cls.expand(B, -1, -1), mat_tok, num_tok], dim=1) h = self.encoder(tokens) return self.head(h[:, 0, :]) # (B,1) def weighted_l1_loss(pred, y, w): return (w * (pred - y).abs()).sum() / (w.sum() + 1e-9) def val_mae(model, loader, device): model.eval() mae, n = 0.0, 0 with torch.no_grad(): for batch in loader: if len(batch) == 4: m,x,y,_ = batch else: m,x,y = batch m,x,y = m.to(device), x.to(device), y.to(device) p = model(m,x) mae += (p - y).abs().sum().item() n += y.size(0) return mae / n # ========================= # Main # ========================= def main(): # 안전 디바이스 결정 → 그 디바이스 기준으로 시드 설정 device = get_safe_device() set_seed(SEED, device) # ----- Load ----- df = read_table(DATA_PATH).copy() need = [CAT_COL] + NUM_COLS + [TARGET] missing = [c for c in need if c not in df.columns] if missing: raise RuntimeError(f"입력 데이터에 없는 컬럼: {missing}") df = df.dropna(subset=[TARGET]).reset_index(drop=True) df = ensure_categorical(df, CAT_COL) # 샘플 가중치(없으면 로버스트 가중치 생성) if "sample_weight" in df.columns: df["sample_weight"] = df["sample_weight"].astype(np.float32) else: df["sample_weight"] = tukey_biweight_weights_by_group(df, target=TARGET, group=CAT_COL, c=4.685) # material → id materials = sorted(df[CAT_COL].astype(str).unique()) mat2id = {m:i for i,m in enumerate(materials)} df["_mat_id"] = df[CAT_COL].astype(str).map(mat2id).astype(int) # 공통 어레이 X_num_full = df[NUM_COLS].values.astype(np.float32) y_full = df[TARGET].values.astype(np.float32) m_full = df["_mat_id"].values w_full = df["sample_weight"].values.astype(np.float32) # ========================= # 1) FT-Transformer 5-Fold OOF # ========================= kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED) oof_dl = np.zeros(len(df), dtype=np.float32) dl_models, dl_scalers = [], [] fold_summ_dl = [] for fold, (tr_idx, va_idx) in enumerate(kf.split(X_num_full), 1): print(f"\n========== [DL] FOLD {fold}/{N_SPLITS} ==========") # 스케일러 누수 방지 scaler = StandardScaler() X_tr = scaler.fit_transform(X_num_full[tr_idx]).astype(np.float32) X_va = scaler.transform(X_num_full[va_idx]).astype(np.float32) y_tr, y_va = y_full[tr_idx], y_full[va_idx] m_tr, m_va = m_full[tr_idx], m_full[va_idx] w_tr, w_va = w_full[tr_idx], w_full[va_idx] train_ds = TabDataset(m_tr, X_tr, y_tr, w_tr) val_ds = TabDataset(m_va, X_va, y_va, w_va) train_dl = DataLoader(train_ds, batch_size=BATCH_TRAIN, shuffle=True, num_workers=0) val_dl = DataLoader(val_ds, batch_size=BATCH_VAL, shuffle=False, num_workers=0) model = FTTransformer( n_materials=len(materials), n_num=len(NUM_COLS), d_model=D_MODEL, nhead=NHEAD, num_layers=LAYERS, dim_ff=DIM_FF, dropout=DROPOUT ) # 디바이스 이동에 실패하면 CPU 폴백 try: model = model.to(device) except Exception as e: print(f"[WARN] model.to({device}) failed: {e}. Falling back to CPU.") device = torch.device("cpu") model = model.to(device) optim = torch.optim.AdamW(model.parameters(), lr=LR, weight_decay=WEIGHT_DECAY) sched = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optim, T_0=10) best_mae, best_state, wait = 1e9, None, 0 for epoch in range(1, EPOCHS+1): model.train() for m,x,y,w in train_dl: m,x,y,w = m.to(device), x.to(device), y.to(device), w.to(device) optim.zero_grad(set_to_none=True) pred = model(m,x) loss = weighted_l1_loss(pred, y, w) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 2.0) optim.step() sched.step(epoch) mae = val_mae(model, val_dl, device) print(f"[DL {epoch:03d}] VAL MAE={mae:.4f}") if mae < best_mae - 1e-4: best_mae, wait = mae, 0 best_state = {k:v.cpu().clone() for k,v in model.state_dict().items()} else: wait += 1 if wait >= PATIENCE: print("Early stopping.") break # 복원 + fold 저장 if best_state is not None: model.load_state_dict(best_state) torch.save({ "state_dict": model.state_dict(), "materials": materials, "num_cols": NUM_COLS, "scaler_mean": scaler.mean_, "scaler_scale": scaler.scale_, }, os.path.join(ART_DIR, f"ftt_fold{fold}.pt")) fold_summ_dl.append(best_mae) print(f"[DL FOLD {fold}] best VAL MAE={best_mae:.4f}") # ── OOF 채우기 (모델과 텐서를 같은 device에서) try: model = model.to(device) except Exception as e: print(f"[WARN] model.to({device}) failed during OOF: {e}. Falling back to CPU.") device = torch.device("cpu") model = model.to(device) model.eval() preds = [] with torch.no_grad(): val_loader = DataLoader(val_ds, batch_size=BATCH_VAL, shuffle=False, num_workers=0) for batch in val_loader: if len(batch)==4: m,x,y,_ = batch else: m,x,y = batch m,x = m.to(device), x.to(device) p = model(m,x).cpu().numpy().ravel() preds.append(p) oof_dl[va_idx] = np.concatenate(preds).astype(np.float32) # ── OOF 완료 후 CPU로 내려서 보관 dl_models.append(model.cpu()) dl_scalers.append(scaler) oof_mae_dl = mean_absolute_error(y_full, oof_dl) print("\n[DL] Fold best MAEs:", [f"{m:.4f}" for m in fold_summ_dl]) print(f"[DL] OOF MAE : {oof_mae_dl:.4f}") pd.DataFrame({"y_true": y_full, "y_oof_dl": oof_dl}).to_csv(os.path.join(ART_DIR, "oof_dl.csv"), index=False) # ========================= # 2) LightGBM 5-Fold OOF (callbacks로 조기 종료/로그) # ========================= df = ensure_categorical(df, CAT_COL) FEATS_GBDT = [CAT_COL] + NUM_COLS X_gbdt = df[FEATS_GBDT].copy() y = y_full w = w_full kf2 = KFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED) oof_lgbm = np.zeros(len(df), dtype=np.float32) lgbm_models = [] fold_summ_lgb = [] for fold, (tr_idx, va_idx) in enumerate(kf2.split(X_gbdt), 1): print(f"\n========== [LGBM] FOLD {fold}/{N_SPLITS} ==========") X_tr, X_va = X_gbdt.iloc[tr_idx], X_gbdt.iloc[va_idx] y_tr, y_va = y[tr_idx], y[va_idx] w_tr, w_va = w[tr_idx], w[va_idx] dtr = lgb.Dataset(X_tr, label=y_tr, weight=w_tr, categorical_feature=[CAT_COL], free_raw_data=False) dva = lgb.Dataset(X_va, label=y_va, weight=w_va, categorical_feature=[CAT_COL], reference=dtr, free_raw_data=False) callbacks = [ lgb.early_stopping(EARLY_STOP, verbose=False), lgb.log_evaluation(100), ] model = lgb.train( LGB_PARAMS, dtr, num_boost_round=NUM_BOOST_ROUND, valid_sets=[dtr, dva], valid_names=["train","valid"], callbacks=callbacks, ) pred_va = model.predict(X_va, num_iteration=model.best_iteration) oof_lgbm[va_idx] = pred_va.astype(np.float32) mae = mean_absolute_error(y_va, pred_va) fold_summ_lgb.append(mae) print(f"[LGBM FOLD {fold}] VAL MAE={mae:.4f}") model.save_model(os.path.join(ART_DIR, f"lgbm_fold{fold}.txt"), num_iteration=model.best_iteration) lgbm_models.append(model) oof_mae_lgb = mean_absolute_error(y, oof_lgbm) print("\n[LGBM] Fold MAEs:", [f"{m:.4f}" for m in fold_summ_lgb]) print(f"[LGBM] OOF MAE : {oof_mae_lgb:.4f}") pd.DataFrame({"y_true": y, "y_oof_lgbm": oof_lgbm}).to_csv(os.path.join(ART_DIR, "oof_lgbm.csv"), index=False) # ========================= # 3) OOF Blending (DL + LGBM) # ========================= best_alpha, best_mae = search_best_alpha(oof_dl, oof_lgbm, y_full) print(f"\n[BLEND] best α={best_alpha:.3f}, blended OOF MAE={best_mae:.4f}") with open(os.path.join(ART_DIR, "blend_alpha.json"), "w") as f: json.dump({"best_alpha": float(best_alpha), "oof_mae_blend": float(best_mae), "oof_mae_dl": float(oof_mae_dl), "oof_mae_lgbm": float(oof_mae_lgb)}, f, indent=2) # ========================= # 4) Inference helper (예시) # ========================= def predict_dl_ensemble(df_new: pd.DataFrame) -> np.ndarray: df_new = df_new.copy() df_new["_mat_id"] = df_new[CAT_COL].astype(str).map(mat2id).fillna(0).astype(int) Xn = df_new[NUM_COLS].values.astype(np.float32) preds = [] for mdl, sc in zip(dl_models, dl_scalers): x = sc.transform(Xn).astype(np.float32) mdl.eval() with torch.no_grad(): m_ids = torch.tensor(df_new["_mat_id"].values, dtype=torch.long) x_t = torch.tensor(x, dtype=torch.float32) p = mdl(m_ids, x_t).cpu().numpy().ravel() preds.append(p) return np.mean(preds, axis=0) def predict_lgbm_ensemble(df_new: pd.DataFrame) -> np.ndarray: Xn = df_new[[CAT_COL] + NUM_COLS].copy() Xn[CAT_COL] = Xn[CAT_COL].astype(str).astype("category") preds = [mdl.predict(Xn, num_iteration=mdl.best_iteration) for mdl in lgbm_models] return np.mean(preds, axis=0) with open(os.path.join(ART_DIR, "materials.json"), "w", encoding="utf-8") as f: json.dump({"materials": materials}, f, ensure_ascii=False, indent=2) with open(os.path.join(ART_DIR, "columns.json"), "w", encoding="utf-8") as f: json.dump({"num_cols": NUM_COLS, "cat_col": CAT_COL, "target": TARGET}, f, ensure_ascii=False, indent=2) print(f"\nArtifacts saved in: {ART_DIR}") print("Use predict_dl_ensemble / predict_lgbm_ensemble, and blend with best_alpha for new data.") if __name__ == "__main__": device = get_safe_device() main()