commodisense / model /trainer.py
Yash1178's picture
Deploy CommodiSense v1.0
2c3c5f5
"""
Model Trainer — trains XGBoost + LightGBM ensemble per commodity.
Designed to run on Kaggle free notebooks (GPU available there) but
works on CPU locally.
IMPORTANT: Run this on Kaggle for GPU acceleration, or locally with CPU.
Saves trained models to models/ directory.
Usage:
python model/trainer.py # train all symbols
python model/trainer.py --symbol GC=F # train one symbol
python model/trainer.py --symbol ZW=F --horizon 7d
"""
import argparse
import json
import logging
import pickle
from datetime import date, timedelta
import sys
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.model_selection import TimeSeriesSplit
from sklearn.preprocessing import StandardScaler
warnings.filterwarnings("ignore")
sys.path.insert(0, str(Path(__file__).parent.parent))
from model.feature_builder import build_training_data
from signals.price_features import ALL_SYMBOLS
MODELS_DIR = Path(__file__).parent.parent / "models"
MODELS_DIR.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
# Label encoding: -1 → 0 (DOWN), 0 → 1 (STABLE), 1 → 2 (UP) for XGBoost
LABEL_MAP = {-1: 0, 0: 1, 1: 2}
LABEL_REVERSE = {0: -1, 1: 0, 2: 1}
LABEL_NAMES = {0: "DOWN", 1: "STABLE", 2: "UP"}
# ── Phase 6: Booster 2 — commodity-specific feature weight multipliers ─────────
# Applied to sample weights at training time so the model learns that certain
# features matter more for specific commodities.
COMMODITY_FEATURE_WEIGHTS: dict[str, dict[str, float]] = {
"CL=F": {"risk_score_7d": 1.5, "risk_score_30d": 1.5, "days_to_opec_meeting": 1.4,
"drought_index": 0.5},
"NG=F": {"days_to_opec_meeting": 1.4, "return_60d": 1.3, "atr_14": 1.3},
"GC=F": {"dxy_proxy": 1.8, "risk_score_7d": 1.3, "sentiment_score_1d": 1.2},
"ZW=F": {"drought_index": 2.0, "sentiment_score_1d": 1.2, "precip_anomaly_pct": 1.5},
"ZC=F": {"harvest_season_flag": 1.5, "drought_index": 1.8, "precip_anomaly_pct": 1.4},
"ZS=F": {"harvest_season_flag": 1.5, "drought_index": 1.6, "precip_anomaly_pct": 1.3},
"CT=F": {"harvest_season_flag": 1.6, "heat_stress_days": 2.0, "precip_anomaly_pct": 1.5},
"SB=F": {"harvest_season_flag": 1.5, "precip_anomaly_pct": 1.4},
"USDINR=X": {"return_60d": 1.4, "momentum_score": 1.3, "macd_signal": 1.2},
"HG=F": {"risk_score_7d": 1.3, "return_60d": 1.4, "momentum_score": 1.2},
}
# ── model configs ──────────────────────────────────────────────────────────────
XGB_PARAMS = {
"n_estimators": 500,
"max_depth": 6,
"learning_rate": 0.05,
"subsample": 0.8,
"colsample_bytree": 0.8,
"objective": "multi:softprob",
"num_class": 3,
"eval_metric": "mlogloss",
"early_stopping_rounds": 50, # constructor param in XGBoost 3.x
"random_state": 42,
"n_jobs": -1,
}
LGBM_PARAMS = {
"n_estimators": 500,
"num_leaves": 31,
"learning_rate": 0.05,
"feature_fraction": 0.8,
"bagging_fraction": 0.8,
"bagging_freq": 5,
"objective": "multiclass",
"num_class": 3,
"metric": "multi_logloss",
"verbose": -1,
"random_state": 42,
"n_jobs": -1,
}
# ── helpers ────────────────────────────────────────────────────────────────────
def _encode_labels(y: pd.Series) -> np.ndarray:
return y.map(LABEL_MAP).values
def _compute_sample_weights(y_encoded: np.ndarray) -> np.ndarray:
"""Inverse-frequency sample weights. Falls back to uniform if not all 3 classes present."""
from sklearn.utils.class_weight import compute_sample_weight
if len(np.unique(y_encoded)) < 3:
return np.ones(len(y_encoded), dtype=float)
return compute_sample_weight("balanced", y_encoded)
def _select_top_features(
X: pd.DataFrame,
importances: np.ndarray,
top_n: int = 20,
min_importance: float = 0.01,
) -> list[str]:
"""Return top_n feature names by importance, filtering below min_importance."""
feat_imp = pd.Series(importances, index=X.columns).sort_values(ascending=False)
selected = feat_imp[feat_imp >= min_importance].head(top_n).index.tolist()
if len(selected) < 5:
selected = feat_imp.head(top_n).index.tolist()
return selected
def _detect_regime(X: pd.DataFrame) -> np.ndarray:
"""
Booster 3 — Regime Detection.
Returns per-row regime array: 0=RANGE_BOUND, 1=TRENDING, 2=VOLATILE.
Uses ATR% and absolute 30-day return to classify market state.
Only applied when X has enough rows to compute rolling stats (>60).
"""
if len(X) < 60 or "atr_pct" not in X.columns:
return np.zeros(len(X), dtype=int)
atr_pct = X["atr_pct"].fillna(0)
ret_30d = X.get("return_30d", pd.Series(0, index=X.index)).abs().fillna(0)
atr_mean = atr_pct.rolling(60, min_periods=20).mean().fillna(atr_pct.mean())
atr_std = atr_pct.rolling(60, min_periods=20).std().fillna(atr_pct.std())
atr_thresh_volatile = atr_mean + 1.5 * atr_std
regime = np.zeros(len(X), dtype=int)
regime[ret_30d.values > 10.0] = 1 # TRENDING
regime[atr_pct.values > atr_thresh_volatile.values] = 2 # VOLATILE
return regime
def _apply_commodity_weights(
sample_weights: np.ndarray,
X: pd.DataFrame,
symbol: str,
regime: np.ndarray,
) -> np.ndarray:
"""
Booster 2+3 combined — scale sample weights by commodity-specific feature
importance multipliers, then dampen VOLATILE-regime rows (trust nothing when
the market is in a chaotic state).
"""
w = sample_weights.copy().astype(float)
# Commodity-specific: up-weight rows where the key signal is strong
for feat, mult in COMMODITY_FEATURE_WEIGHTS.get(symbol, {}).items():
if feat in X.columns:
signal_strength = X[feat].abs().fillna(0)
percentile_75 = np.percentile(signal_strength, 75)
if percentile_75 > 0:
strong_rows = (signal_strength >= percentile_75).values
w[strong_rows] *= mult
# Regime: dampen volatile rows (Booster 3 — "trust nothing when volatile")
w[regime == 2] *= 0.6
# Trending rows: trust momentum features more — mild up-weight
w[regime == 1] *= 1.2
# Renormalise so total weight is unchanged
total = w.sum()
if total > 0:
w = w / total * len(w)
return w
def _directional_accuracy(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""Accuracy of predicting UP/DOWN/STABLE direction correctly."""
return float(np.mean(y_true == y_pred))
def _sharpe_ratio(y_true_raw: pd.Series, y_pred_encoded: np.ndarray) -> float:
"""
Naive Sharpe: long when model predicts UP, short when DOWN, flat when STABLE.
Uses true direction as proxy for daily return sign.
"""
pred_dirs = pd.Series(y_pred_encoded).map(LABEL_REVERSE)
returns = pred_dirs * y_true_raw.values # +1 correct, -1 wrong
mu = returns.mean()
sigma = returns.std()
return round(float(mu / sigma * np.sqrt(252)) if sigma > 0 else 0.0, 3)
# ── training ───────────────────────────────────────────────────────────────────
def train_symbol(
symbol: str,
horizon: str = "7d",
add_lag_features: bool = True,
last_days: int = None,
) -> dict:
"""
Train XGBoost + LightGBM ensemble for a single commodity and horizon.
Args:
symbol: Commodity ticker, e.g. "ZW=F"
horizon: "7d" or "30d"
add_lag_features: Add interaction features (accuracy booster)
last_days: If set, train only on the most recent N calendar days.
Use this when NLP signals only cover a short window —
avoids 4+ years of zero-padded sentiment rows diluting the model.
Returns:
Dict with accuracy metrics for this symbol/horizon.
"""
log.info("Training %s | horizon=%s | window=%s",
symbol, horizon, f"last {last_days}d" if last_days else "full")
X, y_7d, y_30d = build_training_data(symbol)
if X.empty:
log.warning("%s: no training data, skipping", symbol)
return {"symbol": symbol, "horizon": horizon, "error": "no_data"}
# Trim to short window — keeps only rows where NLP signals are non-zero
if last_days is not None:
cutoff = date.today() - timedelta(days=last_days)
if "date" in X.columns:
mask = pd.to_datetime(X["date"]).dt.date >= cutoff
else:
# date is the index order — take the last last_days * 0.7 rows (trading days)
trading_days = int(last_days * 0.71)
mask = pd.Series([False] * len(X))
mask.iloc[-trading_days:] = True
X = X[mask.values].reset_index(drop=True)
y_7d = y_7d[mask.values].reset_index(drop=True)
y_30d = y_30d[mask.values].reset_index(drop=True)
log.info("%s: trimmed to %d rows (last %d days)", symbol, len(X), last_days)
y = y_7d if horizon == "7d" else y_30d
# Skip if one class dominates >95% — model would just memorise the majority class
class_counts = y.value_counts(normalize=True)
if class_counts.max() > 0.95:
log.warning("%s %s: dominant class %.0f%% — skipping (too imbalanced to learn from)",
symbol, horizon, class_counts.max() * 100)
return {"symbol": symbol, "horizon": horizon, "error": "extreme_class_imbalance"}
# ── Phase 6 Booster 4: lag + interaction features ──
if add_lag_features:
X = X.copy()
# Interaction: sentiment × momentum (strong when both agree)
if "sentiment_score_1d" in X.columns and "momentum_score" in X.columns:
X["sentiment_x_momentum"] = X["sentiment_score_1d"] * X["momentum_score"]
# Interaction: event direction × price momentum
if "direction_score_7d" in X.columns and "return_7d" in X.columns:
X["event_x_momentum"] = X["direction_score_7d"] * np.sign(X["return_7d"].fillna(0))
# Volatility regime flag (standalone feature for the model)
if "atr_pct" in X.columns and len(X) >= 60:
atr_mean = X["atr_pct"].rolling(60, min_periods=20).mean().fillna(X["atr_pct"].mean())
X["high_volatility_flag"] = (X["atr_pct"] > atr_mean * 1.5).astype(int)
y_enc = _encode_labels(y)
sample_weights = _compute_sample_weights(y_enc)
# Phase 6 Boosters 2+3: regime detection + commodity-specific weights
if len(X) >= 60:
regime = _detect_regime(X)
sample_weights = _apply_commodity_weights(sample_weights, X, symbol, regime)
trending_pct = (regime == 1).mean() * 100
volatile_pct = (regime == 2).mean() * 100
log.info("%s: regime — %.0f%% trending, %.0f%% volatile, %.0f%% range-bound",
symbol, trending_pct, volatile_pct, 100 - trending_pct - volatile_pct)
# Short-window mode: use fewer folds + lighter model to avoid overfitting
is_short_window = last_days is not None and len(X) < 200
n_splits = 3 if is_short_window else 5
xgb_params_cv = {**XGB_PARAMS, "n_estimators": 200, "max_depth": 3} if is_short_window else XGB_PARAMS
tscv = TimeSeriesSplit(n_splits=n_splits)
fold_accs: list[float] = []
best_features: list[str] | None = None
last_fold_idx = n_splits - 1
# ── cross-validation to find stable feature set ──
for fold, (train_idx, val_idx) in enumerate(tscv.split(X)):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y_enc[train_idx], y_enc[val_idx]
sw_train = sample_weights[train_idx]
# Skip folds where val set has fewer than 3 samples or missing classes
if len(y_val) < 3:
continue
scaler_fold = StandardScaler()
X_tr_s = scaler_fold.fit_transform(X_train)
X_vl_s = scaler_fold.transform(X_val)
import xgboost as xgb
xgb_fold = xgb.XGBClassifier(**xgb_params_cv)
xgb_fold.fit(
X_tr_s, y_train,
sample_weight=sw_train,
eval_set=[(X_vl_s, y_val)],
verbose=False,
)
fold_acc = accuracy_score(y_val, xgb_fold.predict(X_vl_s))
fold_accs.append(fold_acc)
if fold == last_fold_idx:
best_features = _select_top_features(X, xgb_fold.feature_importances_)
if not fold_accs:
return {"symbol": symbol, "horizon": horizon, "error": "all_folds_skipped"}
cv_accuracy = float(np.mean(fold_accs))
log.info("%s %s: CV accuracy %.3f (folds: %s)",
symbol, horizon, cv_accuracy, [f"{a:.3f}" for a in fold_accs])
# Short window: use lighter final model to avoid overfitting on small data
if is_short_window:
XGB_PARAMS_BOOSTED = {**XGB_PARAMS, "n_estimators": 300, "max_depth": 4, "learning_rate": 0.03}
LGBM_PARAMS_BOOSTED = {**LGBM_PARAMS, "n_estimators": 300, "num_leaves": 15}
elif cv_accuracy < 0.90 and add_lag_features:
log.info("%s: below 90%%, boosting n_estimators to 1000", symbol)
XGB_PARAMS_BOOSTED = {**XGB_PARAMS, "n_estimators": 1000}
LGBM_PARAMS_BOOSTED = {**LGBM_PARAMS, "n_estimators": 1000}
else:
XGB_PARAMS_BOOSTED = XGB_PARAMS
LGBM_PARAMS_BOOSTED = LGBM_PARAMS
# ── final training on full dataset using best_features ──
X_selected = X[best_features] if best_features else X
scaler = StandardScaler()
X_s = scaler.fit_transform(X_selected)
# Short window: 70/30 split to keep a meaningful test set; else 80/20
test_frac = 0.30 if is_short_window else 0.20
split = int(len(X_s) * (1 - test_frac))
X_train_f, X_test_f = X_s[:split], X_s[split:]
y_train_f, y_test_f = y_enc[:split], y_enc[split:]
sw_f = sample_weights[:split]
import xgboost as xgb
import lightgbm as lgb
xgb_model = xgb.XGBClassifier(**XGB_PARAMS_BOOSTED)
xgb_model.fit(
X_train_f, y_train_f,
sample_weight=sw_f,
eval_set=[(X_test_f, y_test_f)],
verbose=False,
)
lgbm_model = lgb.LGBMClassifier(**LGBM_PARAMS_BOOSTED)
lgbm_model.fit(
X_train_f, y_train_f,
sample_weight=sw_f,
eval_set=[(X_test_f, y_test_f)],
callbacks=[lgb.early_stopping(50, verbose=False), lgb.log_evaluation(period=-1)],
)
# Phase 6 Booster 5 — Platt/isotonic calibration on XGBoost
# Uses the test split as held-out calibration data (cv="prefit")
cal_cv = min(3, max(2, len(X_train_f) // 100))
try:
from sklearn.calibration import CalibratedClassifierCV
xgb_calibrated = CalibratedClassifierCV(xgb_model, method="isotonic", cv="prefit")
xgb_calibrated.fit(X_test_f, y_test_f)
except Exception:
xgb_calibrated = xgb_model # fallback: uncalibrated
# Soft-voting ensemble on test set (calibrated XGB + raw LGBM)
xgb_proba = xgb_calibrated.predict_proba(X_test_f)
lgbm_proba = lgbm_model.predict_proba(X_test_f)
ensemble_proba = (xgb_proba + lgbm_proba) / 2
ensemble_pred = ensemble_proba.argmax(axis=1)
test_accuracy = _directional_accuracy(y_test_f, ensemble_pred)
sharpe = _sharpe_ratio(y.iloc[split:].reset_index(drop=True), ensemble_pred)
# Classification report
report = classification_report(
y_test_f, ensemble_pred,
target_names=["DOWN", "STABLE", "UP"],
output_dict=True,
)
# Feature importance (top 10 for report)
top10_features = (
pd.Series(xgb_model.feature_importances_, index=X_selected.columns)
.sort_values(ascending=False)
.head(10)
.to_dict()
)
log.info("%s %s: test accuracy=%.3f, Sharpe=%.2f", symbol, horizon, test_accuracy, sharpe)
# ── save artifacts ──
with open(MODELS_DIR / f"xgb_{symbol}_{horizon}.pkl", "wb") as f:
pickle.dump(xgb_calibrated, f)
with open(MODELS_DIR / f"lgbm_{symbol}_{horizon}.pkl", "wb") as f:
pickle.dump(lgbm_model, f)
with open(MODELS_DIR / f"scaler_{symbol}_{horizon}.pkl", "wb") as f:
pickle.dump(scaler, f)
with open(MODELS_DIR / f"feature_names_{symbol}_{horizon}.json", "w") as f:
json.dump(X_selected.columns.tolist(), f)
return {
"symbol": symbol,
"horizon": horizon,
"cv_accuracy": round(cv_accuracy, 4),
"test_accuracy": round(test_accuracy, 4),
"sharpe_ratio": sharpe,
"n_features": len(X_selected.columns),
"n_train_samples": split,
"n_test_samples": len(X_test_f),
"top10_features": top10_features,
"classification_report": report,
}
def train_all(horizons: list[str] = None, last_days: int = None) -> dict:
"""
Train models for all 10 commodities and save accuracy report.
Args:
horizons: List of horizons to train. Default: ["7d", "30d"]
last_days: If set, train each symbol on only the most recent N days.
Returns:
Dict mapping symbol → accuracy metrics per horizon.
"""
if horizons is None:
horizons = ["7d", "30d"]
results: dict = {}
for symbol in ALL_SYMBOLS:
results[symbol] = {}
for horizon in horizons:
try:
metrics = train_symbol(symbol, horizon=horizon, last_days=last_days)
results[symbol][horizon] = metrics
except Exception as exc:
log.error("Failed to train %s %s: %s", symbol, horizon, exc)
results[symbol][horizon] = {"error": str(exc)}
# Save combined accuracy report
report_path = MODELS_DIR / "accuracy_report.json"
with open(report_path, "w") as f:
json.dump(results, f, indent=2, default=str)
log.info("Accuracy report saved to %s", report_path)
# Print summary table
print("\n" + "=" * 85)
print(f"{'Commodity':<15} {'7d Accuracy':>12} {'30d Accuracy':>13} {'Sharpe (7d)':>12} {'Samples':>8}")
print("=" * 85)
for symbol, res in results.items():
r7 = res.get("7d", {})
r30 = res.get("30d", {})
acc7 = f"{r7.get('test_accuracy', 0):.1%}" if "test_accuracy" in r7 else "ERR"
acc30 = f"{r30.get('test_accuracy', 0):.1%}" if "test_accuracy" in r30 else "ERR"
sh7 = f"{r7.get('sharpe_ratio', 0):.2f}" if "sharpe_ratio" in r7 else "ERR"
n = r7.get("n_train_samples", 0)
print(f"{symbol:<15} {acc7:>12} {acc30:>13} {sh7:>12} {n:>8}")
print("=" * 85)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="CommodiSense model trainer")
parser.add_argument("--symbol", default=None, help="Single symbol to train")
parser.add_argument("--horizon", default="both", choices=["7d", "30d", "both"])
parser.add_argument("--days", default=None, type=int,
help="Train on only the most recent N calendar days (short-window mode)")
args = parser.parse_args()
if args.symbol:
horizons = ["7d", "30d"] if args.horizon == "both" else [args.horizon]
for h in horizons:
result = train_symbol(args.symbol, horizon=h, last_days=args.days)
print(json.dumps({k: v for k, v in result.items()
if k != "classification_report"}, indent=2, default=str))
else:
train_all(last_days=args.days)