File size: 8,794 Bytes
6224b16
 
 
29e2de9
a1f9c4e
 
 
 
 
 
6224b16
 
 
 
a1f9c4e
6224b16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a1f9c4e
52ce084
 
6224b16
 
a1f9c4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6224b16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a1f9c4e
 
 
 
 
 
 
 
 
6224b16
 
 
 
 
 
 
 
a1f9c4e
 
6224b16
a1f9c4e
 
 
6224b16
 
 
 
a1f9c4e
 
 
 
 
6224b16
a1f9c4e
 
6224b16
a1f9c4e
6224b16
a1f9c4e
 
 
 
 
 
 
 
 
6224b16
a1f9c4e
 
 
6224b16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a1f9c4e
6224b16
 
 
 
 
 
 
 
 
 
 
a1f9c4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# predict_blend.py
import os, json, numpy as np, pandas as pd, torch, lightgbm as lgb
import torch.nn as nn

# =========================
# Config
# =========================
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent
ART_DIR = str((BASE_DIR / "artifacts_blend").resolve())
CAT_COL = "material"
NUM_COLS = ["thickness","diameter","degree","upper_radius","lower_radius","LB","RB"]

# =========================
# FT-Transformer
# =========================
class FTTransformer(nn.Module):
    def __init__(self, n_materials:int, n_num:int, d_model:int=192, nhead:int=8,
                 num_layers:int=4, dim_ff:int=768, dropout:float=0.15):
        super().__init__()
        self.mat_emb = nn.Embedding(n_materials, d_model)
        self.num_linears = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num)])
        self.cls = nn.Parameter(torch.zeros(1, 1, d_model))
        nn.init.trunc_normal_(self.cls, std=0.02)
        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead, dim_feedforward=dim_ff,
            dropout=dropout, batch_first=True, activation='gelu', norm_first=True
        )
        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.head = nn.Sequential(
            nn.LayerNorm(d_model),
            nn.Linear(d_model, d_model),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_model, 1)
        )

    def forward(self, mat_ids, x_num):
        B = x_num.size(0)
        mat_tok = self.mat_emb(mat_ids).unsqueeze(1)
        num_tok = torch.cat(
            [lin(x_num[:, i:i+1]).unsqueeze(1) for i, lin in enumerate(self.num_linears)],
            dim=1
        )
        tokens = torch.cat([self.cls.expand(B, -1, -1), mat_tok, num_tok], dim=1)
        h = self.encoder(tokens)
        return self.head(h[:, 0, :])


def _scale_like_fold(X_num: np.ndarray, mean: np.ndarray, scale: np.ndarray) -> np.ndarray:
    return ((X_num - mean) / scale).astype(np.float32)

# =========================
# Material label helpers
# =========================
def _canonize_list(materials):
    return [str(m).strip() for m in materials]

def _build_alias2canon(canon_list):
    alias2canon = {}
    for c in canon_list:
        alias2canon[c] = c
        s = c.strip()
        alias2canon[s] = c
        if "." in s:
            alias2canon[s.rstrip("0").rstrip(".")] = c
        try:
            v = float(s)
            alias2canon[str(v)] = c
            if v.is_integer():
                alias2canon[str(int(v))] = c
        except:
            pass
    return alias2canon

# =========================
# Loader helpers
# =========================
def _first_existing(*paths):
    for p in paths:
        if os.path.exists(p):
            return p
    return None

def _load_ft_folds(art_dir: str):
    folds = []
    for fold in range(1, 11):
        p = os.path.join(art_dir, f"ftt_fold{fold}.pt")
        if not os.path.exists(p):
            if folds: break
            continue
        ckpt = torch.load(p, map_location="cpu", weights_only=False)
        materials = ckpt["materials"]
        num_cols = ckpt["num_cols"]
        model = FTTransformer(len(materials), len(num_cols))
        model.load_state_dict(ckpt["state_dict"])
        model.eval()
        folds.append({
            "model": model,
            "materials": materials,
            "num_cols": num_cols,
            "scaler_mean": np.array(ckpt["scaler_mean"], dtype=np.float32),
            "scaler_scale": np.array(ckpt["scaler_scale"], dtype=np.float32),
        })
    if not folds:
        raise FileNotFoundError("No FT checkpoints found in artifacts folder.")
    return folds

def _load_lgbm_folds(art_dir: str):
    boosters = []
    for fold in range(1, 11):
        p1 = os.path.join(art_dir, f"lgbm_fold{fold}.txt")
        p2 = os.path.join(art_dir, f"lgbm_fold{fold}")
        p = _first_existing(p1, p2)
        if p is None:
            if boosters: break
            continue
        boosters.append(lgb.Booster(model_file=p))
    if not boosters:
        raise FileNotFoundError("No LightGBM model files found in artifacts folder.")
    return boosters

def _load_json_like(art_dir: str, basename: str) -> dict:
    p1 = os.path.join(art_dir, f"{basename}.json")
    p2 = os.path.join(art_dir, basename)
    p = _first_existing(p1, p2)
    if p is None:
        raise FileNotFoundError(f"Missing {basename}(.json) in {art_dir}")
    with open(p, "r", encoding="utf-8") as f:
        return json.load(f)

def _load_materials(art_dir: str, folds_ft):
    try:
        return _load_json_like(art_dir, "materials")["materials"]
    except FileNotFoundError:
        return folds_ft[0]["materials"]

def _load_best_alpha(art_dir: str) -> float:
    return float(_load_json_like(art_dir, "blend_alpha")["best_alpha"])

# =========================
# Predictor
# =========================
class BlendPredictor:
    def __init__(self, art_dir: str = ART_DIR, unknown_policy: str = "error"):
        self.art_dir = art_dir
        self.folds_ft = _load_ft_folds(art_dir)
        self.boosters = _load_lgbm_folds(art_dir)
        self.materials = _load_materials(art_dir, self.folds_ft)
        self.best_alpha = _load_best_alpha(art_dir)

        self.materials_canon = _canonize_list(self.materials)
        self.alias2canon = _build_alias2canon(self.materials_canon)
        self.mat2id = {m: i for i, m in enumerate(self.materials_canon)}
        self.unknown_policy = unknown_policy

    def _prep_df(self, df_new: pd.DataFrame) -> pd.DataFrame:
        df = df_new.copy()
        need = [CAT_COL] + NUM_COLS
        missing = [c for c in need if c not in df.columns]
        if missing:
            raise ValueError(f"Missing columns in input: {missing}")

        df[CAT_COL] = df[CAT_COL].astype(str).str.strip()
        df["_mat_canon"] = df[CAT_COL].map(self.alias2canon)

        if self.unknown_policy == "error":
            unknown = df.loc[df["_mat_canon"].isna(), CAT_COL].unique().tolist()
            if unknown:
                raise ValueError(
                    f"Unknown materials in input {unknown}. "
                    f"Known materials: {self.materials_canon[:10]}{' ...' if len(self.materials_canon)>10 else ''}"
                )
            df["_mat_id"] = df["_mat_canon"].map(self.mat2id).astype(int)
        else:
            df["_mat_canon"] = df["_mat_canon"].fillna(self.materials_canon[0])
            df["_mat_id"] = df["_mat_canon"].map(self.mat2id).astype(int)

        df[NUM_COLS] = df[NUM_COLS].apply(pd.to_numeric, errors="coerce")
        if df[NUM_COLS].isnull().any().any():
            bad = df[NUM_COLS].columns[df[NUM_COLS].isnull().any()].tolist()
            raise ValueError(f"Non-numeric values detected in columns: {bad}")
        return df

    def predict_ft(self, df_new: pd.DataFrame) -> np.ndarray:
        df = self._prep_df(df_new)
        Xn = df[NUM_COLS].values.astype(np.float32)
        mids = torch.tensor(df["_mat_id"].values, dtype=torch.long)
        preds = []
        for f in self.folds_ft:
            x_scaled = _scale_like_fold(Xn, f["scaler_mean"], f["scaler_scale"])
            x_t = torch.tensor(x_scaled, dtype=torch.float32)
            with torch.no_grad():
                p = f["model"](mids, x_t).cpu().numpy().ravel()
            preds.append(p)
        return np.mean(preds, axis=0)

    def predict_lgbm(self, df_new: pd.DataFrame) -> np.ndarray:
        df = self._prep_df(df_new)
        X = df[[CAT_COL] + NUM_COLS].copy()
        X[CAT_COL] = pd.Categorical(df["_mat_canon"], categories=self.materials_canon)
        preds = [bst.predict(X, num_iteration=getattr(bst, "best_iteration", None))
                 for bst in self.boosters]
        return np.mean(preds, axis=0)

    def predict_blend(self, df_new: pd.DataFrame, alpha: float = None) -> np.ndarray:
        if alpha is None:
            alpha = self.best_alpha
        p_dl = self.predict_ft(df_new)
        p_lgb = self.predict_lgbm(df_new)
        return alpha * p_dl + (1 - alpha) * p_lgb

# =========================
# Example run
# =========================
if __name__ == "__main__":
    base = {
        "thickness": 1, "diameter": 20, "degree": 73,
        "upper_radius": 3, "lower_radius": 2,
        "LB": 0, "RB": 1,
    }
    df_new = pd.DataFrame([
        {**base, "material": "590"},
        {**base, "material": "440"},
    ])

    predictor = BlendPredictor(ART_DIR, unknown_policy="error")
    print("materials (trained):", predictor.materials_canon[:10])
    print("best_alpha:", predictor.best_alpha)

    print("\nDL only :", predictor.predict_blend(df_new, alpha=1.0))
    print("LGBM only:", predictor.predict_blend(df_new, alpha=0.0))
    print("Blend    :", predictor.predict_blend(df_new))