""" LightGBM Ensemble Predictor for Pattern-Based Trading Signals. Combines candlestick patterns + advanced math features + standard technicals into a single LightGBM model for multi-class prediction: - Class 0: Strong Down (< -1% expected return) - Class 1: Neutral (-1% to +1%) - Class 2: Strong Up (> +1%) Design: - Uses LightGBM (fastest boosting library, 10x lighter than XGBoost) - Walk-forward validation (no lookahead bias) - Feature importance via built-in gain importance + optional SHAP - In-memory model cache with 8-hour TTL - Supports all markets: equities, crypto, forex, commodities """ from __future__ import annotations import logging import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd logger = logging.getLogger(__name__) # ── Cache ──────────────────────────────────────────────────────────────── CACHE_TTL = 8 * 3600 # 8 hours @dataclass class CachedPredictor: model: Any feature_names: List[str] metrics: Dict[str, float] trained_at: float = field(default_factory=time.time) @property def is_stale(self) -> bool: return (time.time() - self.trained_at) > CACHE_TTL _predictor_cache: Dict[str, CachedPredictor] = {} # ── Feature Assembly ───────────────────────────────────────────────────── def _assemble_features(df: pd.DataFrame) -> pd.DataFrame: """ Assemble the complete feature matrix from: 1. Standard technical indicators (from feature_engineering pipeline) 2. Pattern detection scores 3. Advanced mathematical features """ from app.services.feature_engineering.pipeline import feature_pipeline from app.services.ml.pattern_recognition.pattern_detector import pattern_detector from app.services.ml.pattern_recognition.advanced_features import advanced_feature_engine # 1. Standard technicals featured = feature_pipeline.compute_all_features(df) # 2. Pattern features (serialize as numeric) n = len(featured) bullish_count = np.zeros(n) bearish_count = np.zeros(n) max_reliability = np.zeros(n) pattern_signal = np.zeros(n) # +1 bullish, -1 bearish, weighted by reliability for i in range(max(0, n - 30), n): if i < 5: continue # Detect on a small slice slice_df = df.iloc[max(0, i-20):i+1].copy() if len(slice_df) < 5: continue patterns = pattern_detector.detect_all(slice_df, lookback=3) for p in patterns: rel = p.get("reliability", 0.5) direction = p.get("direction", "neutral") if direction == "bullish": bullish_count[i] += 1 pattern_signal[i] += rel elif direction == "bearish": bearish_count[i] += 1 pattern_signal[i] -= rel max_reliability[i] = max(max_reliability[i], rel) featured["pattern_bullish_count"] = bullish_count featured["pattern_bearish_count"] = bearish_count featured["pattern_max_reliability"] = max_reliability featured["pattern_signal"] = pattern_signal # 3. Advanced math features (rolling) adv = advanced_feature_engine.compute_feature_series(df, window=20) for col in ["hurst_exponent", "fractal_dimension", "entropy", "price_efficiency", "trend_strength", "return_skew_20", "return_kurtosis_20"]: if col in adv.columns: featured[col] = adv[col] # 4. Additional return-based features close = featured["Close"] for lag in [1, 2, 3, 5, 10]: featured[f"return_{lag}d"] = close.pct_change(lag) log_ret = np.log(close / close.shift(1)) for w in [5, 10, 20]: featured[f"vol_{w}d"] = log_ret.rolling(w).std() * np.sqrt(252) for ma_col in ["sma_20", "sma_50", "sma_200"]: if ma_col in featured.columns: featured[f"price_vs_{ma_col}"] = (close - featured[ma_col]) / (featured[ma_col] + 1e-10) if "Volume" in featured.columns: featured["volume_change"] = featured["Volume"].pct_change() featured["volume_zscore"] = ( featured["Volume"] - featured["Volume"].rolling(20).mean() ) / (featured["Volume"].rolling(20).std() + 1e-10) # Day of week if hasattr(featured.index, "dayofweek"): featured["day_of_week"] = featured.index.dayofweek return featured def _create_target(df: pd.DataFrame, horizon: int = 5, threshold: float = 0.01) -> pd.Series: """ Multi-class target: 0 = strong down (return < -threshold) 1 = neutral 2 = strong up (return > threshold) """ future_return = df["Close"].shift(-horizon) / df["Close"] - 1 target = pd.Series(1, index=df.index) # neutral by default target[future_return > threshold] = 2 # strong up target[future_return < -threshold] = 0 # strong down return target # ── Training ───────────────────────────────────────────────────────────── def _train_model( df: pd.DataFrame, horizon: int = 5, threshold: float = 0.01, ) -> Tuple[Any, List[str], Dict[str, float]]: """ Train LightGBM classifier with walk-forward validation. """ from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score import lightgbm as lgb featured = _assemble_features(df) featured["target"] = _create_target(featured, horizon=horizon, threshold=threshold) featured = featured.dropna() if len(featured) < 100: raise ValueError(f"Insufficient data: {len(featured)} rows (need 100+)") # Feature columns exclude = {"Open", "High", "Low", "Close", "Volume", "Adj Close", "target"} feature_cols = [ c for c in featured.columns if c not in exclude and featured[c].dtype in [np.float64, np.int64, np.float32, np.int32] ] X = featured[feature_cols].values y = featured["target"].values # Walk-forward split split = int(len(X) * 0.8) X_train, X_val = X[:split], X[split:] y_train, y_val = y[:split], y[split:] X_train = np.nan_to_num(X_train, nan=0.0, posinf=0.0, neginf=0.0) X_val = np.nan_to_num(X_val, nan=0.0, posinf=0.0, neginf=0.0) # Class weights for imbalanced data classes, counts = np.unique(y_train, return_counts=True) total = len(y_train) class_weights = {int(c): total / (len(classes) * cnt) for c, cnt in zip(classes, counts)} train_data = lgb.Dataset(X_train, label=y_train) val_data = lgb.Dataset(X_val, label=y_val, reference=train_data) params = { "objective": "multiclass", "num_class": 3, "metric": "multi_logloss", "boosting_type": "gbdt", "num_leaves": 63, "learning_rate": 0.05, "feature_fraction": 0.8, "bagging_fraction": 0.8, "bagging_freq": 5, "lambda_l1": 0.1, "lambda_l2": 1.0, "min_child_samples": 20, "verbose": -1, "n_jobs": -1, "seed": 42, } callbacks = [ lgb.early_stopping(stopping_rounds=20), lgb.log_evaluation(period=0), ] model = lgb.train( params, train_data, num_boost_round=300, valid_sets=[val_data], callbacks=callbacks, ) # Validation metrics y_pred_proba = model.predict(X_val) y_pred = np.argmax(y_pred_proba, axis=1) metrics = { "accuracy": round(float(accuracy_score(y_val, y_pred)), 4), "precision": round(float(precision_score(y_val, y_pred, average="weighted", zero_division=0)), 4), "recall": round(float(recall_score(y_val, y_pred, average="weighted", zero_division=0)), 4), "f1": round(float(f1_score(y_val, y_pred, average="weighted", zero_division=0)), 4), "train_samples": len(X_train), "val_samples": len(X_val), } logger.info( "LightGBM trained: %d samples, accuracy=%.1f%%, f1=%.4f", len(X_train) + len(X_val), metrics["accuracy"] * 100, metrics["f1"], ) return model, feature_cols, metrics # ── Prediction ─────────────────────────────────────────────────────────── DIRECTION_MAP = {0: "strong_down", 1: "neutral", 2: "strong_up"} DIRECTION_LABELS = { "strong_down": "Strong Bearish", "neutral": "Neutral", "strong_up": "Strong Bullish", } async def predict_with_patterns( ticker: str, period: str = "2y", horizon: int = 5, ) -> Dict[str, Any]: """ Full prediction pipeline: patterns + advanced features + LightGBM. Returns prediction direction, confidence, feature importances, detected patterns, and model metrics. """ import asyncio from app.services.data_ingestion.yahoo import yahoo_adapter from app.services.ml.pattern_recognition.pattern_detector import pattern_detector from app.services.ml.pattern_recognition.advanced_features import advanced_feature_engine cache_key = f"pattern_{ticker}_{period}_{horizon}" # Fetch data df = pd.DataFrame() for attempt in range(3): try: df = await yahoo_adapter.get_price_dataframe(ticker, period=period) if not df.empty: break except Exception as e: logger.warning("Fetch attempt %d for %s: %s", attempt + 1, ticker, e) if attempt < 2: await asyncio.sleep(1) if df.empty or len(df) < 100: raise ValueError(f"Insufficient price data for {ticker}") # Check cache cached = _predictor_cache.get(cache_key) from_cache = False if cached and not cached.is_stale: model = cached.model feature_names = cached.feature_names metrics = cached.metrics from_cache = True else: model, feature_names, metrics = _train_model(df, horizon=horizon) _predictor_cache[cache_key] = CachedPredictor( model=model, feature_names=feature_names, metrics=metrics, ) # Build features for prediction featured = _assemble_features(df) featured = featured.dropna(subset=[c for c in feature_names if c in featured.columns]) if featured.empty: raise ValueError(f"No valid features for {ticker}") # Latest prediction X_latest = featured[feature_names].iloc[[-1]].values X_latest = np.nan_to_num(X_latest, nan=0.0, posinf=0.0, neginf=0.0) proba = model.predict(X_latest)[0] pred_class = int(np.argmax(proba)) confidence = float(proba[pred_class]) direction = DIRECTION_MAP.get(pred_class, "neutral") # Feature importance (top 15) importances = model.feature_importance(importance_type="gain") importance_pairs = sorted( zip(feature_names, importances.tolist()), key=lambda x: x[1], reverse=True, )[:15] # Detect current patterns patterns = pattern_detector.detect_all(df, lookback=5) # Advanced features snapshot adv_features = advanced_feature_engine.compute_all(df) # Expected return estimate recent = df["Close"].pct_change(horizon).dropna() bins = {"strong_up": recent[recent > 0.01], "neutral": recent[abs(recent) <= 0.01], "strong_down": recent[recent < -0.01]} expected_return = float(bins.get(direction, recent).mean()) if len(bins.get(direction, recent)) > 0 else 0 # Hedge recommendation (aggregate from ML + patterns) hedge_signals = [p.get("hedge_signal", "hold_hedge") for p in patterns[:10]] bearish_signals = sum(1 for s in hedge_signals if s in ("hedge_now", "increase_hedge")) bullish_signals = sum(1 for s in hedge_signals if s == "reduce_hedge") pattern_consensus = "bearish" if bearish_signals > bullish_signals else "bullish" if bullish_signals > bearish_signals else "neutral" if direction == "strong_down" and confidence > 0.55: hedge_action = "hedge_now" hedge_pct = min(50, int(confidence * 65)) hedge_urgency = "high" elif direction == "strong_down" or pattern_consensus == "bearish": hedge_action = "increase_hedge" hedge_pct = min(35, int(confidence * 45)) hedge_urgency = "medium" elif direction == "strong_up" and confidence > 0.55: hedge_action = "reduce_hedge" hedge_pct = max(5, int((1 - confidence) * 20)) hedge_urgency = "low" else: hedge_action = "hold_hedge" hedge_pct = 15 hedge_urgency = "none" hedge_recommendation = { "action": hedge_action, "suggested_hedge_pct": hedge_pct, "urgency": hedge_urgency, "pattern_consensus": pattern_consensus, "bearish_pattern_count": bearish_signals, "bullish_pattern_count": bullish_signals, "rationale": { "hedge_now": f"ML predicts {DIRECTION_LABELS.get(direction, direction)} ({confidence:.0%} confidence) with {bearish_signals} bearish pattern(s). Recommend hedging {hedge_pct}% of portfolio via inverse ETFs or protective puts.", "increase_hedge": f"Moderate downside risk detected. ML confidence: {confidence:.0%}. Consider increasing hedge exposure to {hedge_pct}%.", "reduce_hedge": f"Bullish outlook ({confidence:.0%} confidence). Consider reducing hedge to {hedge_pct}% maintenance level to capture upside.", "hold_hedge": f"Mixed or neutral signals. Maintain current hedge allocation (~{hedge_pct}%).", }.get(hedge_action, ""), "instruments": { "hedge_now": ["SH (ProShares Short S&P500)", "SQQQ (ProShares Ultra Short QQQ)", "VIX Calls"], "increase_hedge": ["SH (Short S&P500)", "GLD (Gold ETF)", "TLT (Long-Term Treasury)"], "reduce_hedge": [], "hold_hedge": ["GLD (Gold ETF)"], }.get(hedge_action, []), } return { "ticker": ticker, "prediction": direction, "prediction_label": DIRECTION_LABELS.get(direction, direction), "confidence": round(confidence, 4), "probabilities": { "strong_down": round(float(proba[0]), 4), "neutral": round(float(proba[1]), 4), "strong_up": round(float(proba[2]), 4), }, "expected_return_pct": round(expected_return * 100, 2), "horizon_days": horizon, "confidence_level": ( "high" if confidence > 0.65 else "medium" if confidence > 0.45 else "low" ), "hedge_recommendation": hedge_recommendation, "detected_patterns": patterns[:10], "top_features": [ {"name": name, "importance": round(imp, 4)} for name, imp in importance_pairs ], "advanced_features": { k: v for k, v in adv_features.items() if k != "error" and not isinstance(v, (list, dict)) }, "model_metrics": metrics, "from_cache": from_cache, "training_samples": len(df), "current_price": round(float(df["Close"].iloc[-1]), 2), } async def analyze_multiple( tickers: List[str], period: str = "2y", horizon: int = 5, ) -> Dict[str, Any]: """Analyze multiple tickers and return comparative results.""" results = {} for ticker in tickers[:10]: # cap at 10 try: results[ticker] = await predict_with_patterns(ticker, period, horizon) except Exception as e: logger.warning("Analysis failed for %s: %s", ticker, e) results[ticker] = {"error": str(e)} return results async def backtest_pattern_accuracy( ticker: str, period: str = "5y", horizon: int = 5, ) -> Dict[str, Any]: """ Backtest pattern detection accuracy on historical data. For each pattern type, compute win rate and average return. """ from app.services.data_ingestion.yahoo import yahoo_adapter from app.services.ml.pattern_recognition.pattern_detector import pattern_detector df = await yahoo_adapter.get_price_dataframe(ticker, period=period) if df.empty or len(df) < 100: raise ValueError(f"Insufficient data for {ticker}") close = df["Close"].values n = len(df) pattern_stats: Dict[str, Dict[str, Any]] = {} for i in range(30, n - horizon): slice_df = df.iloc[max(0, i-20):i+1].copy() patterns = pattern_detector.detect_all(slice_df, lookback=3) future_return = (close[i + horizon] - close[i]) / close[i] if close[i] > 0 else 0 for p in patterns: name = p["name"] direction = p["direction"] if name not in pattern_stats: pattern_stats[name] = { "occurrences": 0, "correct": 0, "returns": [], "direction": direction, "reliability": p["reliability"], } stats = pattern_stats[name] stats["occurrences"] += 1 stats["returns"].append(future_return) # Correct if: bullish + positive return, or bearish + negative return if (direction == "bullish" and future_return > 0) or \ (direction == "bearish" and future_return < 0): stats["correct"] += 1 # Compile results accuracy_report = [] for name, stats in pattern_stats.items(): occ = stats["occurrences"] if occ < 3: continue avg_return = float(np.mean(stats["returns"])) * 100 win_rate = stats["correct"] / occ accuracy_report.append({ "pattern": name, "direction": stats["direction"], "occurrences": occ, "win_rate": round(win_rate, 4), "avg_return_pct": round(avg_return, 4), "theoretical_reliability": stats["reliability"], "actual_vs_theoretical": round(win_rate - stats["reliability"], 4), }) accuracy_report.sort(key=lambda x: x["win_rate"], reverse=True) return { "ticker": ticker, "period": period, "horizon_days": horizon, "total_bars_analyzed": n - 30 - horizon, "patterns_found": len(accuracy_report), "accuracy_report": accuracy_report, } def clear_cache(ticker: Optional[str] = None) -> int: """Clear predictor cache.""" if ticker: keys = [k for k in _predictor_cache if ticker in k] for k in keys: del _predictor_cache[k] return len(keys) count = len(_predictor_cache) _predictor_cache.clear() return count