Spaces:
Sleeping
Sleeping
| """ | |
| Model Trainer — trains XGBoost + LightGBM ensemble per commodity. | |
| Designed to run on Kaggle free notebooks (GPU available there) but | |
| works on CPU locally. | |
| IMPORTANT: Run this on Kaggle for GPU acceleration, or locally with CPU. | |
| Saves trained models to models/ directory. | |
| Usage: | |
| python model/trainer.py # train all symbols | |
| python model/trainer.py --symbol GC=F # train one symbol | |
| python model/trainer.py --symbol ZW=F --horizon 7d | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import pickle | |
| from datetime import date, timedelta | |
| import sys | |
| import warnings | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.calibration import CalibratedClassifierCV | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix | |
| from sklearn.model_selection import TimeSeriesSplit | |
| from sklearn.preprocessing import StandardScaler | |
| warnings.filterwarnings("ignore") | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from model.feature_builder import build_training_data | |
| from signals.price_features import ALL_SYMBOLS | |
| MODELS_DIR = Path(__file__).parent.parent / "models" | |
| MODELS_DIR.mkdir(exist_ok=True) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| ) | |
| log = logging.getLogger(__name__) | |
| # Label encoding: -1 → 0 (DOWN), 0 → 1 (STABLE), 1 → 2 (UP) for XGBoost | |
| LABEL_MAP = {-1: 0, 0: 1, 1: 2} | |
| LABEL_REVERSE = {0: -1, 1: 0, 2: 1} | |
| LABEL_NAMES = {0: "DOWN", 1: "STABLE", 2: "UP"} | |
| # ── Phase 6: Booster 2 — commodity-specific feature weight multipliers ───────── | |
| # Applied to sample weights at training time so the model learns that certain | |
| # features matter more for specific commodities. | |
| COMMODITY_FEATURE_WEIGHTS: dict[str, dict[str, float]] = { | |
| "CL=F": {"risk_score_7d": 1.5, "risk_score_30d": 1.5, "days_to_opec_meeting": 1.4, | |
| "drought_index": 0.5}, | |
| "NG=F": {"days_to_opec_meeting": 1.4, "return_60d": 1.3, "atr_14": 1.3}, | |
| "GC=F": {"dxy_proxy": 1.8, "risk_score_7d": 1.3, "sentiment_score_1d": 1.2}, | |
| "ZW=F": {"drought_index": 2.0, "sentiment_score_1d": 1.2, "precip_anomaly_pct": 1.5}, | |
| "ZC=F": {"harvest_season_flag": 1.5, "drought_index": 1.8, "precip_anomaly_pct": 1.4}, | |
| "ZS=F": {"harvest_season_flag": 1.5, "drought_index": 1.6, "precip_anomaly_pct": 1.3}, | |
| "CT=F": {"harvest_season_flag": 1.6, "heat_stress_days": 2.0, "precip_anomaly_pct": 1.5}, | |
| "SB=F": {"harvest_season_flag": 1.5, "precip_anomaly_pct": 1.4}, | |
| "USDINR=X": {"return_60d": 1.4, "momentum_score": 1.3, "macd_signal": 1.2}, | |
| "HG=F": {"risk_score_7d": 1.3, "return_60d": 1.4, "momentum_score": 1.2}, | |
| } | |
| # ── model configs ────────────────────────────────────────────────────────────── | |
| XGB_PARAMS = { | |
| "n_estimators": 500, | |
| "max_depth": 6, | |
| "learning_rate": 0.05, | |
| "subsample": 0.8, | |
| "colsample_bytree": 0.8, | |
| "objective": "multi:softprob", | |
| "num_class": 3, | |
| "eval_metric": "mlogloss", | |
| "early_stopping_rounds": 50, # constructor param in XGBoost 3.x | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| } | |
| LGBM_PARAMS = { | |
| "n_estimators": 500, | |
| "num_leaves": 31, | |
| "learning_rate": 0.05, | |
| "feature_fraction": 0.8, | |
| "bagging_fraction": 0.8, | |
| "bagging_freq": 5, | |
| "objective": "multiclass", | |
| "num_class": 3, | |
| "metric": "multi_logloss", | |
| "verbose": -1, | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| } | |
| # ── helpers ──────────────────────────────────────────────────────────────────── | |
| def _encode_labels(y: pd.Series) -> np.ndarray: | |
| return y.map(LABEL_MAP).values | |
| def _compute_sample_weights(y_encoded: np.ndarray) -> np.ndarray: | |
| """Inverse-frequency sample weights. Falls back to uniform if not all 3 classes present.""" | |
| from sklearn.utils.class_weight import compute_sample_weight | |
| if len(np.unique(y_encoded)) < 3: | |
| return np.ones(len(y_encoded), dtype=float) | |
| return compute_sample_weight("balanced", y_encoded) | |
| def _select_top_features( | |
| X: pd.DataFrame, | |
| importances: np.ndarray, | |
| top_n: int = 20, | |
| min_importance: float = 0.01, | |
| ) -> list[str]: | |
| """Return top_n feature names by importance, filtering below min_importance.""" | |
| feat_imp = pd.Series(importances, index=X.columns).sort_values(ascending=False) | |
| selected = feat_imp[feat_imp >= min_importance].head(top_n).index.tolist() | |
| if len(selected) < 5: | |
| selected = feat_imp.head(top_n).index.tolist() | |
| return selected | |
| def _detect_regime(X: pd.DataFrame) -> np.ndarray: | |
| """ | |
| Booster 3 — Regime Detection. | |
| Returns per-row regime array: 0=RANGE_BOUND, 1=TRENDING, 2=VOLATILE. | |
| Uses ATR% and absolute 30-day return to classify market state. | |
| Only applied when X has enough rows to compute rolling stats (>60). | |
| """ | |
| if len(X) < 60 or "atr_pct" not in X.columns: | |
| return np.zeros(len(X), dtype=int) | |
| atr_pct = X["atr_pct"].fillna(0) | |
| ret_30d = X.get("return_30d", pd.Series(0, index=X.index)).abs().fillna(0) | |
| atr_mean = atr_pct.rolling(60, min_periods=20).mean().fillna(atr_pct.mean()) | |
| atr_std = atr_pct.rolling(60, min_periods=20).std().fillna(atr_pct.std()) | |
| atr_thresh_volatile = atr_mean + 1.5 * atr_std | |
| regime = np.zeros(len(X), dtype=int) | |
| regime[ret_30d.values > 10.0] = 1 # TRENDING | |
| regime[atr_pct.values > atr_thresh_volatile.values] = 2 # VOLATILE | |
| return regime | |
| def _apply_commodity_weights( | |
| sample_weights: np.ndarray, | |
| X: pd.DataFrame, | |
| symbol: str, | |
| regime: np.ndarray, | |
| ) -> np.ndarray: | |
| """ | |
| Booster 2+3 combined — scale sample weights by commodity-specific feature | |
| importance multipliers, then dampen VOLATILE-regime rows (trust nothing when | |
| the market is in a chaotic state). | |
| """ | |
| w = sample_weights.copy().astype(float) | |
| # Commodity-specific: up-weight rows where the key signal is strong | |
| for feat, mult in COMMODITY_FEATURE_WEIGHTS.get(symbol, {}).items(): | |
| if feat in X.columns: | |
| signal_strength = X[feat].abs().fillna(0) | |
| percentile_75 = np.percentile(signal_strength, 75) | |
| if percentile_75 > 0: | |
| strong_rows = (signal_strength >= percentile_75).values | |
| w[strong_rows] *= mult | |
| # Regime: dampen volatile rows (Booster 3 — "trust nothing when volatile") | |
| w[regime == 2] *= 0.6 | |
| # Trending rows: trust momentum features more — mild up-weight | |
| w[regime == 1] *= 1.2 | |
| # Renormalise so total weight is unchanged | |
| total = w.sum() | |
| if total > 0: | |
| w = w / total * len(w) | |
| return w | |
| def _directional_accuracy(y_true: np.ndarray, y_pred: np.ndarray) -> float: | |
| """Accuracy of predicting UP/DOWN/STABLE direction correctly.""" | |
| return float(np.mean(y_true == y_pred)) | |
| def _sharpe_ratio(y_true_raw: pd.Series, y_pred_encoded: np.ndarray) -> float: | |
| """ | |
| Naive Sharpe: long when model predicts UP, short when DOWN, flat when STABLE. | |
| Uses true direction as proxy for daily return sign. | |
| """ | |
| pred_dirs = pd.Series(y_pred_encoded).map(LABEL_REVERSE) | |
| returns = pred_dirs * y_true_raw.values # +1 correct, -1 wrong | |
| mu = returns.mean() | |
| sigma = returns.std() | |
| return round(float(mu / sigma * np.sqrt(252)) if sigma > 0 else 0.0, 3) | |
| # ── training ─────────────────────────────────────────────────────────────────── | |
| def train_symbol( | |
| symbol: str, | |
| horizon: str = "7d", | |
| add_lag_features: bool = True, | |
| last_days: int = None, | |
| ) -> dict: | |
| """ | |
| Train XGBoost + LightGBM ensemble for a single commodity and horizon. | |
| Args: | |
| symbol: Commodity ticker, e.g. "ZW=F" | |
| horizon: "7d" or "30d" | |
| add_lag_features: Add interaction features (accuracy booster) | |
| last_days: If set, train only on the most recent N calendar days. | |
| Use this when NLP signals only cover a short window — | |
| avoids 4+ years of zero-padded sentiment rows diluting the model. | |
| Returns: | |
| Dict with accuracy metrics for this symbol/horizon. | |
| """ | |
| log.info("Training %s | horizon=%s | window=%s", | |
| symbol, horizon, f"last {last_days}d" if last_days else "full") | |
| X, y_7d, y_30d = build_training_data(symbol) | |
| if X.empty: | |
| log.warning("%s: no training data, skipping", symbol) | |
| return {"symbol": symbol, "horizon": horizon, "error": "no_data"} | |
| # Trim to short window — keeps only rows where NLP signals are non-zero | |
| if last_days is not None: | |
| cutoff = date.today() - timedelta(days=last_days) | |
| if "date" in X.columns: | |
| mask = pd.to_datetime(X["date"]).dt.date >= cutoff | |
| else: | |
| # date is the index order — take the last last_days * 0.7 rows (trading days) | |
| trading_days = int(last_days * 0.71) | |
| mask = pd.Series([False] * len(X)) | |
| mask.iloc[-trading_days:] = True | |
| X = X[mask.values].reset_index(drop=True) | |
| y_7d = y_7d[mask.values].reset_index(drop=True) | |
| y_30d = y_30d[mask.values].reset_index(drop=True) | |
| log.info("%s: trimmed to %d rows (last %d days)", symbol, len(X), last_days) | |
| y = y_7d if horizon == "7d" else y_30d | |
| # Skip if one class dominates >95% — model would just memorise the majority class | |
| class_counts = y.value_counts(normalize=True) | |
| if class_counts.max() > 0.95: | |
| log.warning("%s %s: dominant class %.0f%% — skipping (too imbalanced to learn from)", | |
| symbol, horizon, class_counts.max() * 100) | |
| return {"symbol": symbol, "horizon": horizon, "error": "extreme_class_imbalance"} | |
| # ── Phase 6 Booster 4: lag + interaction features ── | |
| if add_lag_features: | |
| X = X.copy() | |
| # Interaction: sentiment × momentum (strong when both agree) | |
| if "sentiment_score_1d" in X.columns and "momentum_score" in X.columns: | |
| X["sentiment_x_momentum"] = X["sentiment_score_1d"] * X["momentum_score"] | |
| # Interaction: event direction × price momentum | |
| if "direction_score_7d" in X.columns and "return_7d" in X.columns: | |
| X["event_x_momentum"] = X["direction_score_7d"] * np.sign(X["return_7d"].fillna(0)) | |
| # Volatility regime flag (standalone feature for the model) | |
| if "atr_pct" in X.columns and len(X) >= 60: | |
| atr_mean = X["atr_pct"].rolling(60, min_periods=20).mean().fillna(X["atr_pct"].mean()) | |
| X["high_volatility_flag"] = (X["atr_pct"] > atr_mean * 1.5).astype(int) | |
| y_enc = _encode_labels(y) | |
| sample_weights = _compute_sample_weights(y_enc) | |
| # Phase 6 Boosters 2+3: regime detection + commodity-specific weights | |
| if len(X) >= 60: | |
| regime = _detect_regime(X) | |
| sample_weights = _apply_commodity_weights(sample_weights, X, symbol, regime) | |
| trending_pct = (regime == 1).mean() * 100 | |
| volatile_pct = (regime == 2).mean() * 100 | |
| log.info("%s: regime — %.0f%% trending, %.0f%% volatile, %.0f%% range-bound", | |
| symbol, trending_pct, volatile_pct, 100 - trending_pct - volatile_pct) | |
| # Short-window mode: use fewer folds + lighter model to avoid overfitting | |
| is_short_window = last_days is not None and len(X) < 200 | |
| n_splits = 3 if is_short_window else 5 | |
| xgb_params_cv = {**XGB_PARAMS, "n_estimators": 200, "max_depth": 3} if is_short_window else XGB_PARAMS | |
| tscv = TimeSeriesSplit(n_splits=n_splits) | |
| fold_accs: list[float] = [] | |
| best_features: list[str] | None = None | |
| last_fold_idx = n_splits - 1 | |
| # ── cross-validation to find stable feature set ── | |
| for fold, (train_idx, val_idx) in enumerate(tscv.split(X)): | |
| X_train, X_val = X.iloc[train_idx], X.iloc[val_idx] | |
| y_train, y_val = y_enc[train_idx], y_enc[val_idx] | |
| sw_train = sample_weights[train_idx] | |
| # Skip folds where val set has fewer than 3 samples or missing classes | |
| if len(y_val) < 3: | |
| continue | |
| scaler_fold = StandardScaler() | |
| X_tr_s = scaler_fold.fit_transform(X_train) | |
| X_vl_s = scaler_fold.transform(X_val) | |
| import xgboost as xgb | |
| xgb_fold = xgb.XGBClassifier(**xgb_params_cv) | |
| xgb_fold.fit( | |
| X_tr_s, y_train, | |
| sample_weight=sw_train, | |
| eval_set=[(X_vl_s, y_val)], | |
| verbose=False, | |
| ) | |
| fold_acc = accuracy_score(y_val, xgb_fold.predict(X_vl_s)) | |
| fold_accs.append(fold_acc) | |
| if fold == last_fold_idx: | |
| best_features = _select_top_features(X, xgb_fold.feature_importances_) | |
| if not fold_accs: | |
| return {"symbol": symbol, "horizon": horizon, "error": "all_folds_skipped"} | |
| cv_accuracy = float(np.mean(fold_accs)) | |
| log.info("%s %s: CV accuracy %.3f (folds: %s)", | |
| symbol, horizon, cv_accuracy, [f"{a:.3f}" for a in fold_accs]) | |
| # Short window: use lighter final model to avoid overfitting on small data | |
| if is_short_window: | |
| XGB_PARAMS_BOOSTED = {**XGB_PARAMS, "n_estimators": 300, "max_depth": 4, "learning_rate": 0.03} | |
| LGBM_PARAMS_BOOSTED = {**LGBM_PARAMS, "n_estimators": 300, "num_leaves": 15} | |
| elif cv_accuracy < 0.90 and add_lag_features: | |
| log.info("%s: below 90%%, boosting n_estimators to 1000", symbol) | |
| XGB_PARAMS_BOOSTED = {**XGB_PARAMS, "n_estimators": 1000} | |
| LGBM_PARAMS_BOOSTED = {**LGBM_PARAMS, "n_estimators": 1000} | |
| else: | |
| XGB_PARAMS_BOOSTED = XGB_PARAMS | |
| LGBM_PARAMS_BOOSTED = LGBM_PARAMS | |
| # ── final training on full dataset using best_features ── | |
| X_selected = X[best_features] if best_features else X | |
| scaler = StandardScaler() | |
| X_s = scaler.fit_transform(X_selected) | |
| # Short window: 70/30 split to keep a meaningful test set; else 80/20 | |
| test_frac = 0.30 if is_short_window else 0.20 | |
| split = int(len(X_s) * (1 - test_frac)) | |
| X_train_f, X_test_f = X_s[:split], X_s[split:] | |
| y_train_f, y_test_f = y_enc[:split], y_enc[split:] | |
| sw_f = sample_weights[:split] | |
| import xgboost as xgb | |
| import lightgbm as lgb | |
| xgb_model = xgb.XGBClassifier(**XGB_PARAMS_BOOSTED) | |
| xgb_model.fit( | |
| X_train_f, y_train_f, | |
| sample_weight=sw_f, | |
| eval_set=[(X_test_f, y_test_f)], | |
| verbose=False, | |
| ) | |
| lgbm_model = lgb.LGBMClassifier(**LGBM_PARAMS_BOOSTED) | |
| lgbm_model.fit( | |
| X_train_f, y_train_f, | |
| sample_weight=sw_f, | |
| eval_set=[(X_test_f, y_test_f)], | |
| callbacks=[lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=-1)], | |
| ) | |
| # Phase 6 Booster 5 — Platt/isotonic calibration on XGBoost | |
| # Uses the test split as held-out calibration data (cv="prefit") | |
| cal_cv = min(3, max(2, len(X_train_f) // 100)) | |
| try: | |
| from sklearn.calibration import CalibratedClassifierCV | |
| xgb_calibrated = CalibratedClassifierCV(xgb_model, method="isotonic", cv="prefit") | |
| xgb_calibrated.fit(X_test_f, y_test_f) | |
| except Exception: | |
| xgb_calibrated = xgb_model # fallback: uncalibrated | |
| # Soft-voting ensemble on test set (calibrated XGB + raw LGBM) | |
| xgb_proba = xgb_calibrated.predict_proba(X_test_f) | |
| lgbm_proba = lgbm_model.predict_proba(X_test_f) | |
| ensemble_proba = (xgb_proba + lgbm_proba) / 2 | |
| ensemble_pred = ensemble_proba.argmax(axis=1) | |
| test_accuracy = _directional_accuracy(y_test_f, ensemble_pred) | |
| sharpe = _sharpe_ratio(y.iloc[split:].reset_index(drop=True), ensemble_pred) | |
| # Classification report | |
| report = classification_report( | |
| y_test_f, ensemble_pred, | |
| target_names=["DOWN", "STABLE", "UP"], | |
| output_dict=True, | |
| ) | |
| # Feature importance (top 10 for report) | |
| top10_features = ( | |
| pd.Series(xgb_model.feature_importances_, index=X_selected.columns) | |
| .sort_values(ascending=False) | |
| .head(10) | |
| .to_dict() | |
| ) | |
| log.info("%s %s: test accuracy=%.3f, Sharpe=%.2f", symbol, horizon, test_accuracy, sharpe) | |
| # ── save artifacts ── | |
| with open(MODELS_DIR / f"xgb_{symbol}_{horizon}.pkl", "wb") as f: | |
| pickle.dump(xgb_calibrated, f) | |
| with open(MODELS_DIR / f"lgbm_{symbol}_{horizon}.pkl", "wb") as f: | |
| pickle.dump(lgbm_model, f) | |
| with open(MODELS_DIR / f"scaler_{symbol}_{horizon}.pkl", "wb") as f: | |
| pickle.dump(scaler, f) | |
| with open(MODELS_DIR / f"feature_names_{symbol}_{horizon}.json", "w") as f: | |
| json.dump(X_selected.columns.tolist(), f) | |
| return { | |
| "symbol": symbol, | |
| "horizon": horizon, | |
| "cv_accuracy": round(cv_accuracy, 4), | |
| "test_accuracy": round(test_accuracy, 4), | |
| "sharpe_ratio": sharpe, | |
| "n_features": len(X_selected.columns), | |
| "n_train_samples": split, | |
| "n_test_samples": len(X_test_f), | |
| "top10_features": top10_features, | |
| "classification_report": report, | |
| } | |
| def train_all(horizons: list[str] = None, last_days: int = None) -> dict: | |
| """ | |
| Train models for all 10 commodities and save accuracy report. | |
| Args: | |
| horizons: List of horizons to train. Default: ["7d", "30d"] | |
| last_days: If set, train each symbol on only the most recent N days. | |
| Returns: | |
| Dict mapping symbol → accuracy metrics per horizon. | |
| """ | |
| if horizons is None: | |
| horizons = ["7d", "30d"] | |
| results: dict = {} | |
| for symbol in ALL_SYMBOLS: | |
| results[symbol] = {} | |
| for horizon in horizons: | |
| try: | |
| metrics = train_symbol(symbol, horizon=horizon, last_days=last_days) | |
| results[symbol][horizon] = metrics | |
| except Exception as exc: | |
| log.error("Failed to train %s %s: %s", symbol, horizon, exc) | |
| results[symbol][horizon] = {"error": str(exc)} | |
| # Save combined accuracy report | |
| report_path = MODELS_DIR / "accuracy_report.json" | |
| with open(report_path, "w") as f: | |
| json.dump(results, f, indent=2, default=str) | |
| log.info("Accuracy report saved to %s", report_path) | |
| # Print summary table | |
| print("\n" + "=" * 85) | |
| print(f"{'Commodity':<15} {'7d Accuracy':>12} {'30d Accuracy':>13} {'Sharpe (7d)':>12} {'Samples':>8}") | |
| print("=" * 85) | |
| for symbol, res in results.items(): | |
| r7 = res.get("7d", {}) | |
| r30 = res.get("30d", {}) | |
| acc7 = f"{r7.get('test_accuracy', 0):.1%}" if "test_accuracy" in r7 else "ERR" | |
| acc30 = f"{r30.get('test_accuracy', 0):.1%}" if "test_accuracy" in r30 else "ERR" | |
| sh7 = f"{r7.get('sharpe_ratio', 0):.2f}" if "sharpe_ratio" in r7 else "ERR" | |
| n = r7.get("n_train_samples", 0) | |
| print(f"{symbol:<15} {acc7:>12} {acc30:>13} {sh7:>12} {n:>8}") | |
| print("=" * 85) | |
| return results | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="CommodiSense model trainer") | |
| parser.add_argument("--symbol", default=None, help="Single symbol to train") | |
| parser.add_argument("--horizon", default="both", choices=["7d", "30d", "both"]) | |
| parser.add_argument("--days", default=None, type=int, | |
| help="Train on only the most recent N calendar days (short-window mode)") | |
| args = parser.parse_args() | |
| if args.symbol: | |
| horizons = ["7d", "30d"] if args.horizon == "both" else [args.horizon] | |
| for h in horizons: | |
| result = train_symbol(args.symbol, horizon=h, last_days=args.days) | |
| print(json.dumps({k: v for k, v in result.items() | |
| if k != "classification_report"}, indent=2, default=str)) | |
| else: | |
| train_all(last_days=args.days) | |