Dakoro's picture
fix: typo data path
d95dfb7
Raw
History Blame Contribute Delete
8.44 kB
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.cluster import KMeans
from catboost import CatBoostRegressor
# from scipy.stats import weibull_min
from sklearn.metrics import mean_squared_error, mean_absolute_error
from scipy import optimize
from scipy.special import gammaln
DATA_DIR = Path("data")
IMG_DIR = Path("images")
IMG_DIR.mkdir(exist_ok=True)
COLS = ["unit", "cycle"] + [f"op_{i}" for i in range(1, 4)] + [f"s_{i}" for i in range(1, 22)]
ALL_SENSORS = [f"s_{i}" for i in range(1, 22)]
WINDOW = 15
RUL_CAP = 125
RNG = np.random.default_rng(0)
def load(ds):
tr = pd.read_csv(DATA_DIR / f"train_{ds}.txt", sep=r"\s+", header=None, names=COLS, engine="python")
te = pd.read_csv(DATA_DIR / f"test_{ds}.txt", sep=r"\s+", header=None, names=COLS, engine="python")
rul = pd.read_csv(DATA_DIR / f"RUL_{ds}.txt", header=None, names=["RUL"]).squeeze("columns").values
return tr, te, rul
def nasa_score(y_true, y_pred):
d = y_pred - y_true
return np.where(d < 0, np.exp(-d / 13) - 1, np.exp(d / 10) - 1).sum()
def metrics(y_true, y_pred):
return {
"RMSE": float(np.sqrt(mean_squared_error(y_true, y_pred))),
"MAE": float(mean_absolute_error(y_true, y_pred)),
"NASA_score": float(nasa_score(y_true, y_pred)),
}
def assign_regime(df, kmeans):
if kmeans is None:
return np.zeros(len(df), dtype=int)
return kmeans.predict(df[["op_1", "op_2", "op_3"]].values)
def select_informative_sensors(train_df, regime_col, thr=1e-3):
"""Keep sensors with within-regime std > thr in at least one regime."""
kept = []
for s in ALL_SENSORS:
within = train_df.groupby(regime_col)[s].std().fillna(0)
if within.max() > thr:
kept.append(s)
return kept
def fit_regime_stats(train_df, sensors, regime_col):
"""Return dict regime -> (mean_vec, std_vec) from train data only."""
stats = {}
for r, g in train_df.groupby(regime_col):
mu = g[sensors].mean().values
sd = g[sensors].std().replace(0, 1).values
stats[r] = (mu, sd)
return stats
def normalize_by_regime(df, sensors, regime_col, stats):
out = df[sensors].values.astype(float).copy()
r = df[regime_col].values
for reg, (mu, sd) in stats.items():
mask = r == reg
if mask.any():
out[mask] = (out[mask] - mu) / sd
return pd.DataFrame(out, columns=[f"{s}_n" for s in sensors], index=df.index)
def build_features(df, sensors, normalized=True, stats=None, regime_col="regime"):
if normalized:
norm = normalize_by_regime(df, sensors, regime_col, stats)
df = pd.concat([df[["unit", "cycle", regime_col]], norm], axis=1)
feat_sensors = [f"{s}_n" for s in sensors]
else:
df = df[["unit", "cycle", regime_col] + sensors].copy()
feat_sensors = sensors
g = df.groupby("unit")[feat_sensors]
roll = g.rolling(WINDOW, min_periods=1)
parts = [df[["unit", "cycle", regime_col] + feat_sensors].reset_index(drop=True)]
parts.append(roll.mean().reset_index(level=0, drop=True).add_suffix("_m").reset_index(drop=True))
parts.append(roll.std().fillna(0).reset_index(level=0, drop=True).add_suffix("_sd").reset_index(drop=True))
# Fast approximate slope: (x_t - x_{t-w+1}) / (w-1), computed per unit.
# Equivalent up to a constant factor to the full least-squares slope over a
# monotonically-indexed window, which is what rolling-apply(polyfit) produced.
shifted = df.groupby("unit")[feat_sensors].shift(WINDOW - 1)
sl = ((df[feat_sensors] - shifted) / (WINDOW - 1)).fillna(0).add_suffix("_sl").reset_index(drop=True)
parts.append(sl)
return pd.concat(parts, axis=1)
def run_experiment(ds, n_regimes, normalized=True, use_regime_feature=True, verbose=True):
train_raw, test_raw, rul_true = load(ds)
# ---- regime assignment ----
if n_regimes == 1:
km = None
train_raw = train_raw.assign(regime=0)
test_raw = test_raw.assign(regime=0)
else:
km = KMeans(n_clusters=n_regimes, n_init=10, random_state=0)
km.fit(train_raw[["op_1", "op_2", "op_3"]].values)
train_raw = train_raw.assign(regime=km.labels_)
test_raw = test_raw.assign(regime=assign_regime(test_raw, km))
sensors = select_informative_sensors(train_raw, "regime")
if verbose:
print(f" [{ds}] {n_regimes} regime(s), {len(sensors)} informative sensors")
stats = fit_regime_stats(train_raw, sensors, "regime") if normalized else None
X_tr_df = build_features(train_raw, sensors, normalized=normalized, stats=stats)
X_tr_df["RUL"] = (train_raw.groupby("unit").cycle.transform("max") - train_raw.cycle).clip(upper=RUL_CAP).values
exclude = ["unit", "cycle", "RUL"]
cat_features_arg = ["regime"]
if not use_regime_feature:
exclude.append("regime")
cat_features_arg = None
feature_cols = [c for c in X_tr_df.columns if c not in exclude]
# 20%-of-units held-out val
all_units = X_tr_df["unit"].unique()
val_u = RNG.choice(all_units, size=max(20, len(all_units) // 5), replace=False)
is_val = X_tr_df["unit"].isin(val_u).values
X_tr_df["regime"] = X_tr_df["regime"].astype(int)
X_tr = X_tr_df.loc[~is_val, feature_cols]
y_tr = X_tr_df.loc[~is_val, "RUL"].values
X_val = X_tr_df.loc[is_val, feature_cols]
y_val = X_tr_df.loc[is_val, "RUL"].values
model = CatBoostRegressor(
iterations=1000, learning_rate=0.08, depth=5,
loss_function="RMSE", eval_metric="RMSE",
early_stopping_rounds=40, random_seed=42, verbose=False,
cat_features=cat_features_arg,
)
model.fit(X_tr, y_tr, eval_set=(X_val, y_val))
# Official test set: last observation per unit
X_te_df = build_features(test_raw, sensors, normalized=normalized, stats=stats)
X_te_df["regime"] = X_te_df["regime"].astype(int)
last = X_te_df.groupby("unit").tail(1).sort_values("unit")
X_te = last[feature_cols]
preds = np.clip(model.predict(X_te), 0, None)
return {
"model": model,
"metrics": metrics(rul_true, preds),
"preds": preds,
"y_true": rul_true,
"n_features": len(feature_cols),
"n_sensors": len(sensors),
"feature_cols": feature_cols,
"best_iter": model.get_best_iteration(),
}
def weibull_pdf(t, beta, eta):
return (beta / eta) * (t / eta) ** (beta - 1) * np.exp(-(t / eta) ** beta)
def em_weibull_mixture(x, K=2, n_iter=200, tol=1e-6, seed=0):
rng = np.random.default_rng(seed)
n = len(x)
# Init by splitting at median
med = np.median(x)
init_mask = x < med
betas = np.array([3.0, 3.0])
etas = np.array([np.mean(x[init_mask]) / np.exp(gammaln(1 + 1/3)),
np.mean(x[~init_mask]) / np.exp(gammaln(1 + 1/3))])
pis = np.array([init_mask.mean(), 1 - init_mask.mean()])
prev_ll = -np.inf
for it in range(n_iter):
# E step
comp = np.stack([pis[k] * weibull_pdf(x, betas[k], etas[k]) for k in range(K)], axis=1)
comp = np.clip(comp, 1e-300, None)
gamma = comp / comp.sum(axis=1, keepdims=True)
# M step: per component, weighted Weibull MLE via numerical root
new_betas = np.zeros(K)
new_etas = np.zeros(K)
new_pis = gamma.mean(axis=0)
for k in range(K):
w = gamma[:, k]
ws = w.sum()
def score(b):
xb = x ** b
top = (w * xb * np.log(x)).sum()
bot = (w * xb).sum()
return top / bot - (w * np.log(x)).sum() / ws - 1 / b
try:
b_new = optimize.brentq(score, 0.3, 30.0)
except ValueError:
b_new = betas[k]
eta_new = ((w * x ** b_new).sum() / ws) ** (1 / b_new)
new_betas[k] = b_new
new_etas[k] = eta_new
betas, etas, pis = new_betas, new_etas, new_pis
ll = np.log(np.stack([pis[k] * weibull_pdf(x, betas[k], etas[k])
for k in range(K)], axis=1).sum(axis=1)).sum()
if abs(ll - prev_ll) < tol:
break
prev_ll = ll
# Sort components by eta for reproducibility
order = np.argsort(etas)
return {"beta": betas[order], "eta": etas[order], "pi": pis[order],
"logL": ll, "n_iter": it + 1, "gamma": gamma[:, order]}