| import numpy as np |
| import pandas as pd |
| from pathlib import Path |
| from sklearn.cluster import KMeans |
| from catboost import CatBoostRegressor |
| |
| from sklearn.metrics import mean_squared_error, mean_absolute_error |
| from scipy import optimize |
| from scipy.special import gammaln |
|
|
| DATA_DIR = Path("data") |
| IMG_DIR = Path("images") |
| IMG_DIR.mkdir(exist_ok=True) |
|
|
| COLS = ["unit", "cycle"] + [f"op_{i}" for i in range(1, 4)] + [f"s_{i}" for i in range(1, 22)] |
| ALL_SENSORS = [f"s_{i}" for i in range(1, 22)] |
| WINDOW = 15 |
| RUL_CAP = 125 |
| RNG = np.random.default_rng(0) |
|
|
| def load(ds): |
| tr = pd.read_csv(DATA_DIR / f"train_{ds}.txt", sep=r"\s+", header=None, names=COLS, engine="python") |
| te = pd.read_csv(DATA_DIR / f"test_{ds}.txt", sep=r"\s+", header=None, names=COLS, engine="python") |
| rul = pd.read_csv(DATA_DIR / f"RUL_{ds}.txt", header=None, names=["RUL"]).squeeze("columns").values |
| return tr, te, rul |
| |
| |
| def nasa_score(y_true, y_pred): |
| d = y_pred - y_true |
| return np.where(d < 0, np.exp(-d / 13) - 1, np.exp(d / 10) - 1).sum() |
| |
| |
| def metrics(y_true, y_pred): |
| return { |
| "RMSE": float(np.sqrt(mean_squared_error(y_true, y_pred))), |
| "MAE": float(mean_absolute_error(y_true, y_pred)), |
| "NASA_score": float(nasa_score(y_true, y_pred)), |
| } |
| |
| |
| def assign_regime(df, kmeans): |
| if kmeans is None: |
| return np.zeros(len(df), dtype=int) |
| return kmeans.predict(df[["op_1", "op_2", "op_3"]].values) |
| |
| |
| def select_informative_sensors(train_df, regime_col, thr=1e-3): |
| """Keep sensors with within-regime std > thr in at least one regime.""" |
| kept = [] |
| for s in ALL_SENSORS: |
| within = train_df.groupby(regime_col)[s].std().fillna(0) |
| if within.max() > thr: |
| kept.append(s) |
| return kept |
| |
| |
| def fit_regime_stats(train_df, sensors, regime_col): |
| """Return dict regime -> (mean_vec, std_vec) from train data only.""" |
| stats = {} |
| for r, g in train_df.groupby(regime_col): |
| mu = g[sensors].mean().values |
| sd = g[sensors].std().replace(0, 1).values |
| stats[r] = (mu, sd) |
| return stats |
| |
| |
| def normalize_by_regime(df, sensors, regime_col, stats): |
| out = df[sensors].values.astype(float).copy() |
| r = df[regime_col].values |
| for reg, (mu, sd) in stats.items(): |
| mask = r == reg |
| if mask.any(): |
| out[mask] = (out[mask] - mu) / sd |
| return pd.DataFrame(out, columns=[f"{s}_n" for s in sensors], index=df.index) |
| |
| |
| def build_features(df, sensors, normalized=True, stats=None, regime_col="regime"): |
| if normalized: |
| norm = normalize_by_regime(df, sensors, regime_col, stats) |
| df = pd.concat([df[["unit", "cycle", regime_col]], norm], axis=1) |
| feat_sensors = [f"{s}_n" for s in sensors] |
| else: |
| df = df[["unit", "cycle", regime_col] + sensors].copy() |
| feat_sensors = sensors |
| |
| g = df.groupby("unit")[feat_sensors] |
| roll = g.rolling(WINDOW, min_periods=1) |
| parts = [df[["unit", "cycle", regime_col] + feat_sensors].reset_index(drop=True)] |
| parts.append(roll.mean().reset_index(level=0, drop=True).add_suffix("_m").reset_index(drop=True)) |
| parts.append(roll.std().fillna(0).reset_index(level=0, drop=True).add_suffix("_sd").reset_index(drop=True)) |
| |
| |
| |
| |
| shifted = df.groupby("unit")[feat_sensors].shift(WINDOW - 1) |
| sl = ((df[feat_sensors] - shifted) / (WINDOW - 1)).fillna(0).add_suffix("_sl").reset_index(drop=True) |
| parts.append(sl) |
| return pd.concat(parts, axis=1) |
| |
| |
| def run_experiment(ds, n_regimes, normalized=True, use_regime_feature=True, verbose=True): |
| train_raw, test_raw, rul_true = load(ds) |
| |
| |
| if n_regimes == 1: |
| km = None |
| train_raw = train_raw.assign(regime=0) |
| test_raw = test_raw.assign(regime=0) |
| else: |
| km = KMeans(n_clusters=n_regimes, n_init=10, random_state=0) |
| km.fit(train_raw[["op_1", "op_2", "op_3"]].values) |
| train_raw = train_raw.assign(regime=km.labels_) |
| test_raw = test_raw.assign(regime=assign_regime(test_raw, km)) |
| |
| sensors = select_informative_sensors(train_raw, "regime") |
| if verbose: |
| print(f" [{ds}] {n_regimes} regime(s), {len(sensors)} informative sensors") |
| |
| stats = fit_regime_stats(train_raw, sensors, "regime") if normalized else None |
| X_tr_df = build_features(train_raw, sensors, normalized=normalized, stats=stats) |
| X_tr_df["RUL"] = (train_raw.groupby("unit").cycle.transform("max") - train_raw.cycle).clip(upper=RUL_CAP).values |
| |
| exclude = ["unit", "cycle", "RUL"] |
| cat_features_arg = ["regime"] |
| if not use_regime_feature: |
| exclude.append("regime") |
| cat_features_arg = None |
| feature_cols = [c for c in X_tr_df.columns if c not in exclude] |
| |
| |
| all_units = X_tr_df["unit"].unique() |
| val_u = RNG.choice(all_units, size=max(20, len(all_units) // 5), replace=False) |
| is_val = X_tr_df["unit"].isin(val_u).values |
| |
| X_tr_df["regime"] = X_tr_df["regime"].astype(int) |
| X_tr = X_tr_df.loc[~is_val, feature_cols] |
| y_tr = X_tr_df.loc[~is_val, "RUL"].values |
| X_val = X_tr_df.loc[is_val, feature_cols] |
| y_val = X_tr_df.loc[is_val, "RUL"].values |
| |
| model = CatBoostRegressor( |
| iterations=1000, learning_rate=0.08, depth=5, |
| loss_function="RMSE", eval_metric="RMSE", |
| early_stopping_rounds=40, random_seed=42, verbose=False, |
| cat_features=cat_features_arg, |
| ) |
| model.fit(X_tr, y_tr, eval_set=(X_val, y_val)) |
| |
| |
| X_te_df = build_features(test_raw, sensors, normalized=normalized, stats=stats) |
| X_te_df["regime"] = X_te_df["regime"].astype(int) |
| last = X_te_df.groupby("unit").tail(1).sort_values("unit") |
| X_te = last[feature_cols] |
| preds = np.clip(model.predict(X_te), 0, None) |
| |
| return { |
| "model": model, |
| "metrics": metrics(rul_true, preds), |
| "preds": preds, |
| "y_true": rul_true, |
| "n_features": len(feature_cols), |
| "n_sensors": len(sensors), |
| "feature_cols": feature_cols, |
| "best_iter": model.get_best_iteration(), |
| } |
| |
|
|
| def weibull_pdf(t, beta, eta): |
| return (beta / eta) * (t / eta) ** (beta - 1) * np.exp(-(t / eta) ** beta) |
| |
| def em_weibull_mixture(x, K=2, n_iter=200, tol=1e-6, seed=0): |
| rng = np.random.default_rng(seed) |
| n = len(x) |
| |
| med = np.median(x) |
| init_mask = x < med |
| betas = np.array([3.0, 3.0]) |
| etas = np.array([np.mean(x[init_mask]) / np.exp(gammaln(1 + 1/3)), |
| np.mean(x[~init_mask]) / np.exp(gammaln(1 + 1/3))]) |
| pis = np.array([init_mask.mean(), 1 - init_mask.mean()]) |
| |
| prev_ll = -np.inf |
| for it in range(n_iter): |
| |
| comp = np.stack([pis[k] * weibull_pdf(x, betas[k], etas[k]) for k in range(K)], axis=1) |
| comp = np.clip(comp, 1e-300, None) |
| gamma = comp / comp.sum(axis=1, keepdims=True) |
| |
| |
| new_betas = np.zeros(K) |
| new_etas = np.zeros(K) |
| new_pis = gamma.mean(axis=0) |
| for k in range(K): |
| w = gamma[:, k] |
| ws = w.sum() |
| def score(b): |
| xb = x ** b |
| top = (w * xb * np.log(x)).sum() |
| bot = (w * xb).sum() |
| return top / bot - (w * np.log(x)).sum() / ws - 1 / b |
| try: |
| b_new = optimize.brentq(score, 0.3, 30.0) |
| except ValueError: |
| b_new = betas[k] |
| eta_new = ((w * x ** b_new).sum() / ws) ** (1 / b_new) |
| new_betas[k] = b_new |
| new_etas[k] = eta_new |
| |
| betas, etas, pis = new_betas, new_etas, new_pis |
| ll = np.log(np.stack([pis[k] * weibull_pdf(x, betas[k], etas[k]) |
| for k in range(K)], axis=1).sum(axis=1)).sum() |
| if abs(ll - prev_ll) < tol: |
| break |
| prev_ll = ll |
| |
| |
| order = np.argsort(etas) |
| return {"beta": betas[order], "eta": etas[order], "pi": pis[order], |
| "logL": ll, "n_iter": it + 1, "gamma": gamma[:, order]} |