Spaces:
Sleeping
Sleeping
| import warnings | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| import numpy as np | |
| import pandas as pd | |
| import optuna | |
| import shap | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.model_selection import StratifiedKFold | |
| from sklearn.metrics import ( | |
| classification_report, confusion_matrix, f1_score, recall_score, | |
| ) | |
| from sklearn.utils.class_weight import compute_sample_weight | |
| from xgboost import XGBClassifier | |
| from lightgbm import LGBMClassifier | |
| from catboost import CatBoostClassifier | |
| from src.config import ( | |
| RANDOM_STATE, OPTUNA_TRIALS, CV_FOLDS, IF_CONTAMINATION, | |
| CRITICAL_THRESHOLD, MEDIUM_THRESHOLD, RISK_LABELS, OUTPUT_PATH, | |
| ) | |
| optuna.logging.set_verbosity(optuna.logging.WARNING) | |
| def prepare_features(X_train, X_test): | |
| trade_cols = [c for c in X_train.columns if "Trade_" in c] | |
| if trade_cols: | |
| X_train = X_train.drop(columns=trade_cols) | |
| X_test = X_test.drop(columns=trade_cols) | |
| print(f"[Model] Dropped zero-variance columns: {trade_cols}") | |
| print(f"[Model] X_train {X_train.shape} X_test {X_test.shape}") | |
| return X_train, X_test | |
| def _inject_anomaly(Xtr, Xother): | |
| iso = IsolationForest( | |
| contamination=IF_CONTAMINATION, random_state=RANDOM_STATE, n_jobs=-1, | |
| ) | |
| iso.fit(Xtr) | |
| raw_tr = -iso.decision_function(Xtr) | |
| raw_other = -iso.decision_function(Xother) | |
| rmin, rmax = raw_tr.min(), raw_tr.max() | |
| denom = rmax - rmin if rmax != rmin else 1.0 | |
| Xtr = Xtr.copy() | |
| Xother = Xother.copy() | |
| Xtr["Anomaly_Score"] = np.clip((raw_tr - rmin) / denom * 100, 0, 100) | |
| Xother["Anomaly_Score"] = np.clip((raw_other - rmin) / denom * 100, 0, 100) | |
| return Xtr, Xother | |
| def compute_weights(y_train): | |
| weights = compute_sample_weight(class_weight="balanced", y=y_train) | |
| print(f"[Phase 1] Sample weights — " | |
| f"min {weights.min():.4f} max {weights.max():.4f} " | |
| f"mean {weights.mean():.4f}") | |
| return weights | |
| def optimise_hyperparams(X_train, y_train, sample_weights): | |
| def objective(trial): | |
| params = { | |
| "objective": "multi:softprob", | |
| "num_class": 3, | |
| "eval_metric": "mlogloss", | |
| "use_label_encoder": False, | |
| "tree_method": "hist", | |
| "random_state": RANDOM_STATE, | |
| "n_estimators": 500, | |
| "early_stopping_rounds": 30, | |
| "max_depth": trial.suggest_int("max_depth", 3, 9), | |
| "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True), | |
| "subsample": trial.suggest_float("subsample", 0.6, 1.0), | |
| "colsample_bytree": trial.suggest_float("colsample_bytree", 0.6, 1.0), | |
| } | |
| skf = StratifiedKFold( | |
| n_splits=CV_FOLDS, shuffle=True, random_state=RANDOM_STATE | |
| ) | |
| scores = [] | |
| for train_idx, val_idx in skf.split(X_train, y_train): | |
| Xtr_raw = X_train.iloc[train_idx] | |
| Xval_raw = X_train.iloc[val_idx] | |
| ytr, yval = y_train.iloc[train_idx], y_train.iloc[val_idx] | |
| wtr = sample_weights[train_idx] | |
| Xtr, Xval = _inject_anomaly(Xtr_raw, Xval_raw) | |
| model = XGBClassifier(**params) | |
| model.fit( | |
| Xtr, ytr, | |
| sample_weight=wtr, | |
| eval_set=[(Xval, yval)], | |
| verbose=False, | |
| ) | |
| preds = model.predict(Xval) | |
| scores.append(f1_score(yval, preds, average="macro")) | |
| return np.mean(scores) | |
| study = optuna.create_study( | |
| direction="maximize", | |
| sampler=optuna.samplers.TPESampler(seed=RANDOM_STATE), | |
| ) | |
| study.optimize(objective, n_trials=OPTUNA_TRIALS, show_progress_bar=True) | |
| best = study.best_params | |
| print(f"[Phase 2] Best macro-F1 = {study.best_value:.4f}") | |
| print(f" Params: {best}") | |
| return best | |
| def train_and_predict(X_train, y_train, X_test, sample_weights, best_params): | |
| iso = IsolationForest( | |
| contamination=IF_CONTAMINATION, random_state=RANDOM_STATE, n_jobs=-1, | |
| ) | |
| iso.fit(X_train) | |
| raw_tr = -iso.decision_function(X_train) | |
| raw_te = -iso.decision_function(X_test) | |
| iso_rmin, iso_rmax = float(raw_tr.min()), float(raw_tr.max()) | |
| _denom = iso_rmax - iso_rmin if iso_rmax != iso_rmin else 1.0 | |
| X_train = X_train.copy() | |
| X_test = X_test.copy() | |
| X_train["Anomaly_Score"] = np.clip((raw_tr - iso_rmin) / _denom * 100, 0, 100) | |
| X_test["Anomaly_Score"] = np.clip((raw_te - iso_rmin) / _denom * 100, 0, 100) | |
| print(f"[Phase 3] Anomaly_Score injected — " | |
| f"train mean {X_train['Anomaly_Score'].mean():.2f}, " | |
| f"test mean {X_test['Anomaly_Score'].mean():.2f}") | |
| xgb_model = XGBClassifier( | |
| objective="multi:softprob", | |
| num_class=3, | |
| eval_metric="mlogloss", | |
| use_label_encoder=False, | |
| tree_method="hist", | |
| random_state=RANDOM_STATE, | |
| n_estimators=800, | |
| **best_params, | |
| ) | |
| xgb_model.fit(X_train, y_train, sample_weight=sample_weights, verbose=False) | |
| print(f"[Phase 3] XGBoost trained on {X_train.shape}") | |
| lgb_model = LGBMClassifier( | |
| n_estimators=800, | |
| num_class=3, | |
| class_weight="balanced", | |
| random_state=RANDOM_STATE, | |
| verbose=-1, | |
| ) | |
| lgb_model.fit(X_train, y_train) | |
| print(f"[Phase 3] LightGBM trained") | |
| cat_model = CatBoostClassifier( | |
| iterations=800, | |
| auto_class_weights="Balanced", | |
| random_seed=RANDOM_STATE, | |
| verbose=False, | |
| ) | |
| cat_model.fit(X_train, y_train) | |
| print(f"[Phase 3] CatBoost trained") | |
| xgb_proba = xgb_model.predict_proba(X_test) | |
| lgb_proba = lgb_model.predict_proba(X_test) | |
| cat_proba = cat_model.predict_proba(X_test) | |
| proba = (0.5*xgb_proba) + (0.3*lgb_proba) + (0.2*cat_proba) | |
| print(f"[Phase 3] Ensemble probabilities blended (XGB + LGB + CAT)") | |
| predictions = np.zeros(len(X_test), dtype=int) | |
| critical_mask = proba[:, 2] > CRITICAL_THRESHOLD | |
| medium_mask = (~critical_mask) & (proba[:, 1] > MEDIUM_THRESHOLD) | |
| predictions[critical_mask] = 2 | |
| predictions[medium_mask] = 1 | |
| raw_score = (proba[:, 1] * 50) + (proba[:, 2] * 100) | |
| TIERS = {0: (0.0, 33.0), 1: (34.0, 66.0), 2: (67.0, 100.0)} | |
| risk_scores = np.empty(len(predictions), dtype=float) | |
| for cls, (lo, hi) in TIERS.items(): | |
| mask = predictions == cls | |
| if not mask.any(): | |
| continue | |
| vals = raw_score[mask] | |
| n = mask.sum() | |
| if n == 1: | |
| risk_scores[mask] = (lo + hi) / 2 | |
| else: | |
| ranks = vals.argsort().argsort() | |
| risk_scores[mask] = lo + ranks / (n - 1) * (hi - lo) | |
| print(f"[Phase 3] Predictions — " | |
| f"Low: {(predictions==0).sum()}, " | |
| f"Medium: {(predictions==1).sum()}, " | |
| f"Critical: {(predictions==2).sum()}") | |
| return ( | |
| xgb_model, lgb_model, cat_model, | |
| iso, iso_rmin, iso_rmax, | |
| X_train, X_test, proba, predictions, risk_scores, | |
| ) | |
| def inference_predict( | |
| xgb_model, lgb_model, cat_model, | |
| iso, iso_rmin, iso_rmax, | |
| X_test_pure, | |
| ): | |
| X_test = X_test_pure.copy().reset_index(drop=True) | |
| raw = -iso.decision_function(X_test) | |
| denom = iso_rmax - iso_rmin if iso_rmax != iso_rmin else 1.0 | |
| X_test["Anomaly_Score"] = np.clip((raw - iso_rmin) / denom * 100, 0, 100) | |
| xgb_proba = xgb_model.predict_proba(X_test) | |
| lgb_proba = lgb_model.predict_proba(X_test) | |
| cat_proba = cat_model.predict_proba(X_test) | |
| proba = (0.5 * xgb_proba) + (0.3 * lgb_proba) + (0.2 * cat_proba) | |
| predictions = np.zeros(len(X_test), dtype=int) | |
| critical_mask = proba[:, 2] > CRITICAL_THRESHOLD | |
| medium_mask = (~critical_mask) & (proba[:, 1] > MEDIUM_THRESHOLD) | |
| predictions[critical_mask] = 2 | |
| predictions[medium_mask] = 1 | |
| raw_score = (proba[:, 1] * 50) + (proba[:, 2] * 100) | |
| TIERS = {0: (0.0, 33.0), 1: (34.0, 66.0), 2: (67.0, 100.0)} | |
| risk_scores = np.empty(len(predictions), dtype=float) | |
| for cls, (lo, hi) in TIERS.items(): | |
| mask = predictions == cls | |
| if not mask.any(): | |
| continue | |
| vals = raw_score[mask] | |
| n = mask.sum() | |
| if n == 1: | |
| risk_scores[mask] = (lo + hi) / 2 | |
| else: | |
| ranks = vals.argsort().argsort() | |
| risk_scores[mask] = lo + ranks / (n - 1) * (hi - lo) | |
| return X_test, proba, predictions, risk_scores | |
| def explain_and_save(model, X_test, test_ids, predictions, risk_scores): | |
| explainer = shap.TreeExplainer(model) | |
| shap_values = explainer.shap_values(X_test) | |
| if isinstance(shap_values, list): | |
| def get_row_shap(sample_idx, class_idx): | |
| return shap_values[class_idx][sample_idx] | |
| else: | |
| def get_row_shap(sample_idx, class_idx): | |
| return shap_values[sample_idx, :, class_idx] | |
| feature_names = X_test.columns.tolist() | |
| n_features = len(feature_names) | |
| n = len(X_test) | |
| top_k = min(3, n_features) | |
| explanations = [] | |
| for i in range(n): | |
| pred_class = predictions[i] | |
| row_shap = get_row_shap(i, pred_class) | |
| top_indices = np.argsort(row_shap)[-top_k:][::-1] | |
| top_feats = [feature_names[idx] for idx in top_indices] | |
| feat_str = ", ".join( | |
| f"{rank}. {name}" for rank, name in enumerate(top_feats, 1) | |
| ) | |
| score = risk_scores[i] | |
| if pred_class == 2: | |
| explanations.append( | |
| f"Flagged as Critical (score {score:.1f}) driven by " | |
| f"unusual patterns in {feat_str}." | |
| ) | |
| elif pred_class == 1: | |
| explanations.append( | |
| f"Elevated to Medium risk (score {score:.1f}). " | |
| f"Top drivers: {feat_str}." | |
| ) | |
| else: | |
| explanations.append( | |
| f"Classified as Low risk (score {score:.1f}). " | |
| f"Primary factors: {feat_str}." | |
| ) | |
| output = pd.DataFrame({ | |
| "Container_ID": test_ids.values, | |
| "Risk_Score": np.round(risk_scores, 2), | |
| "Risk_Level": [RISK_LABELS[p] for p in predictions], | |
| "Explanation_Summary": explanations, | |
| }) | |
| output.to_csv(OUTPUT_PATH, index=False) | |
| print(f"\n[Phase 4] Saved final_predictions.csv ({output.shape[0]} rows)") | |
| print(f"\nRisk Level distribution:") | |
| print(output["Risk_Level"].value_counts()) | |
| print(f"\nSample output:\n{output.head(10).to_string()}") | |
| return output | |
| def evaluate_on_train_cv(X_train_pure, y_train, sample_weights, best_params): | |
| skf = StratifiedKFold( | |
| n_splits=CV_FOLDS, shuffle=True, random_state=RANDOM_STATE | |
| ) | |
| oof_preds = np.full(len(y_train), -1, dtype=int) | |
| for fold, (tr_idx, val_idx) in enumerate( | |
| skf.split(X_train_pure, y_train), 1 | |
| ): | |
| Xtr_raw = X_train_pure.iloc[tr_idx] | |
| Xval_raw = X_train_pure.iloc[val_idx] | |
| ytr = y_train.iloc[tr_idx] | |
| wtr = sample_weights[tr_idx] | |
| Xtr, Xval = _inject_anomaly(Xtr_raw, Xval_raw) | |
| fold_model = XGBClassifier( | |
| objective="multi:softprob", | |
| num_class=3, | |
| eval_metric="mlogloss", | |
| use_label_encoder=False, | |
| tree_method="hist", | |
| random_state=RANDOM_STATE, | |
| n_estimators=800, | |
| **best_params, | |
| ) | |
| fold_model.fit(Xtr, ytr, sample_weight=wtr, verbose=False) | |
| oof_preds[val_idx] = fold_model.predict(Xval) | |
| _print_metrics( | |
| "MODEL EVALUATION (3-Fold Stratified CV · Train Only)", | |
| y_train, oof_preds, | |
| ) | |
| def evaluate_on_test(predictions, y_test_true): | |
| _print_metrics( | |
| "TEST SET EVALUATION (Unseen Data · Ground Truth from CSV)", | |
| y_test_true, predictions, | |
| ) | |
| def _print_metrics(title, y_true, y_pred): | |
| target_names = ["Low (0)", "Medium (1)", "Critical (2)"] | |
| macro_f1 = f1_score(y_true, y_pred, average="macro") | |
| weighted_f1 = f1_score(y_true, y_pred, average="weighted") | |
| per_class = f1_score(y_true, y_pred, average=None) | |
| recall_crit = recall_score(y_true, y_pred, labels=[2], average=None)[0] | |
| cm = confusion_matrix(y_true, y_pred) | |
| cm_df = pd.DataFrame(cm, index=target_names, columns=target_names) | |
| print("\n" + "=" * 64) | |
| print(f" {title}") | |
| print("=" * 64) | |
| print(f"\n ▸ Macro F1 (PRIMARY) : {macro_f1:.4f}") | |
| print(f" ▸ Weighted F1 : {weighted_f1:.4f}") | |
| print(f" ▸ F1 Critical (class 2) : {per_class[2]:.4f}") | |
| print(f" ▸ Recall Critical : {recall_crit:.4f}") | |
| print(f" ▸ F1 Medium (class 1) : {per_class[1]:.4f}") | |
| print(f" ▸ F1 Low (class 0) : {per_class[0]:.4f}") | |
| print(f"\n── Confusion Matrix ──") | |
| print(cm_df.to_string()) | |
| print(f"\n── Full Classification Report ──") | |
| print(classification_report( | |
| y_true, y_pred, target_names=target_names, digits=4 | |
| )) | |
| print("=" * 64) | |