import numpy as np import pandas as pd from pathlib import Path from sklearn.cluster import KMeans from catboost import CatBoostRegressor # from scipy.stats import weibull_min from sklearn.metrics import mean_squared_error, mean_absolute_error from scipy import optimize from scipy.special import gammaln DATA_DIR = Path("data") IMG_DIR = Path("images") IMG_DIR.mkdir(exist_ok=True) COLS = ["unit", "cycle"] + [f"op_{i}" for i in range(1, 4)] + [f"s_{i}" for i in range(1, 22)] ALL_SENSORS = [f"s_{i}" for i in range(1, 22)] WINDOW = 15 RUL_CAP = 125 RNG = np.random.default_rng(0) def load(ds): tr = pd.read_csv(DATA_DIR / f"train_{ds}.txt", sep=r"\s+", header=None, names=COLS, engine="python") te = pd.read_csv(DATA_DIR / f"test_{ds}.txt", sep=r"\s+", header=None, names=COLS, engine="python") rul = pd.read_csv(DATA_DIR / f"RUL_{ds}.txt", header=None, names=["RUL"]).squeeze("columns").values return tr, te, rul def nasa_score(y_true, y_pred): d = y_pred - y_true return np.where(d < 0, np.exp(-d / 13) - 1, np.exp(d / 10) - 1).sum() def metrics(y_true, y_pred): return { "RMSE": float(np.sqrt(mean_squared_error(y_true, y_pred))), "MAE": float(mean_absolute_error(y_true, y_pred)), "NASA_score": float(nasa_score(y_true, y_pred)), } def assign_regime(df, kmeans): if kmeans is None: return np.zeros(len(df), dtype=int) return kmeans.predict(df[["op_1", "op_2", "op_3"]].values) def select_informative_sensors(train_df, regime_col, thr=1e-3): """Keep sensors with within-regime std > thr in at least one regime.""" kept = [] for s in ALL_SENSORS: within = train_df.groupby(regime_col)[s].std().fillna(0) if within.max() > thr: kept.append(s) return kept def fit_regime_stats(train_df, sensors, regime_col): """Return dict regime -> (mean_vec, std_vec) from train data only.""" stats = {} for r, g in train_df.groupby(regime_col): mu = g[sensors].mean().values sd = g[sensors].std().replace(0, 1).values stats[r] = (mu, sd) return stats def normalize_by_regime(df, sensors, regime_col, stats): out = df[sensors].values.astype(float).copy() r = df[regime_col].values for reg, (mu, sd) in stats.items(): mask = r == reg if mask.any(): out[mask] = (out[mask] - mu) / sd return pd.DataFrame(out, columns=[f"{s}_n" for s in sensors], index=df.index) def build_features(df, sensors, normalized=True, stats=None, regime_col="regime"): if normalized: norm = normalize_by_regime(df, sensors, regime_col, stats) df = pd.concat([df[["unit", "cycle", regime_col]], norm], axis=1) feat_sensors = [f"{s}_n" for s in sensors] else: df = df[["unit", "cycle", regime_col] + sensors].copy() feat_sensors = sensors g = df.groupby("unit")[feat_sensors] roll = g.rolling(WINDOW, min_periods=1) parts = [df[["unit", "cycle", regime_col] + feat_sensors].reset_index(drop=True)] parts.append(roll.mean().reset_index(level=0, drop=True).add_suffix("_m").reset_index(drop=True)) parts.append(roll.std().fillna(0).reset_index(level=0, drop=True).add_suffix("_sd").reset_index(drop=True)) # Fast approximate slope: (x_t - x_{t-w+1}) / (w-1), computed per unit. # Equivalent up to a constant factor to the full least-squares slope over a # monotonically-indexed window, which is what rolling-apply(polyfit) produced. shifted = df.groupby("unit")[feat_sensors].shift(WINDOW - 1) sl = ((df[feat_sensors] - shifted) / (WINDOW - 1)).fillna(0).add_suffix("_sl").reset_index(drop=True) parts.append(sl) return pd.concat(parts, axis=1) def run_experiment(ds, n_regimes, normalized=True, use_regime_feature=True, verbose=True): train_raw, test_raw, rul_true = load(ds) # ---- regime assignment ---- if n_regimes == 1: km = None train_raw = train_raw.assign(regime=0) test_raw = test_raw.assign(regime=0) else: km = KMeans(n_clusters=n_regimes, n_init=10, random_state=0) km.fit(train_raw[["op_1", "op_2", "op_3"]].values) train_raw = train_raw.assign(regime=km.labels_) test_raw = test_raw.assign(regime=assign_regime(test_raw, km)) sensors = select_informative_sensors(train_raw, "regime") if verbose: print(f" [{ds}] {n_regimes} regime(s), {len(sensors)} informative sensors") stats = fit_regime_stats(train_raw, sensors, "regime") if normalized else None X_tr_df = build_features(train_raw, sensors, normalized=normalized, stats=stats) X_tr_df["RUL"] = (train_raw.groupby("unit").cycle.transform("max") - train_raw.cycle).clip(upper=RUL_CAP).values exclude = ["unit", "cycle", "RUL"] cat_features_arg = ["regime"] if not use_regime_feature: exclude.append("regime") cat_features_arg = None feature_cols = [c for c in X_tr_df.columns if c not in exclude] # 20%-of-units held-out val all_units = X_tr_df["unit"].unique() val_u = RNG.choice(all_units, size=max(20, len(all_units) // 5), replace=False) is_val = X_tr_df["unit"].isin(val_u).values X_tr_df["regime"] = X_tr_df["regime"].astype(int) X_tr = X_tr_df.loc[~is_val, feature_cols] y_tr = X_tr_df.loc[~is_val, "RUL"].values X_val = X_tr_df.loc[is_val, feature_cols] y_val = X_tr_df.loc[is_val, "RUL"].values model = CatBoostRegressor( iterations=1000, learning_rate=0.08, depth=5, loss_function="RMSE", eval_metric="RMSE", early_stopping_rounds=40, random_seed=42, verbose=False, cat_features=cat_features_arg, ) model.fit(X_tr, y_tr, eval_set=(X_val, y_val)) # Official test set: last observation per unit X_te_df = build_features(test_raw, sensors, normalized=normalized, stats=stats) X_te_df["regime"] = X_te_df["regime"].astype(int) last = X_te_df.groupby("unit").tail(1).sort_values("unit") X_te = last[feature_cols] preds = np.clip(model.predict(X_te), 0, None) return { "model": model, "metrics": metrics(rul_true, preds), "preds": preds, "y_true": rul_true, "n_features": len(feature_cols), "n_sensors": len(sensors), "feature_cols": feature_cols, "best_iter": model.get_best_iteration(), } def weibull_pdf(t, beta, eta): return (beta / eta) * (t / eta) ** (beta - 1) * np.exp(-(t / eta) ** beta) def em_weibull_mixture(x, K=2, n_iter=200, tol=1e-6, seed=0): rng = np.random.default_rng(seed) n = len(x) # Init by splitting at median med = np.median(x) init_mask = x < med betas = np.array([3.0, 3.0]) etas = np.array([np.mean(x[init_mask]) / np.exp(gammaln(1 + 1/3)), np.mean(x[~init_mask]) / np.exp(gammaln(1 + 1/3))]) pis = np.array([init_mask.mean(), 1 - init_mask.mean()]) prev_ll = -np.inf for it in range(n_iter): # E step comp = np.stack([pis[k] * weibull_pdf(x, betas[k], etas[k]) for k in range(K)], axis=1) comp = np.clip(comp, 1e-300, None) gamma = comp / comp.sum(axis=1, keepdims=True) # M step: per component, weighted Weibull MLE via numerical root new_betas = np.zeros(K) new_etas = np.zeros(K) new_pis = gamma.mean(axis=0) for k in range(K): w = gamma[:, k] ws = w.sum() def score(b): xb = x ** b top = (w * xb * np.log(x)).sum() bot = (w * xb).sum() return top / bot - (w * np.log(x)).sum() / ws - 1 / b try: b_new = optimize.brentq(score, 0.3, 30.0) except ValueError: b_new = betas[k] eta_new = ((w * x ** b_new).sum() / ws) ** (1 / b_new) new_betas[k] = b_new new_etas[k] = eta_new betas, etas, pis = new_betas, new_etas, new_pis ll = np.log(np.stack([pis[k] * weibull_pdf(x, betas[k], etas[k]) for k in range(K)], axis=1).sum(axis=1)).sum() if abs(ll - prev_ll) < tol: break prev_ll = ll # Sort components by eta for reproducibility order = np.argsort(etas) return {"beta": betas[order], "eta": etas[order], "pi": pis[order], "logL": ll, "n_iter": it + 1, "gamma": gamma[:, order]}