Spaces:
Running
Running
| """ | |
| LightGBM Ensemble Predictor for Pattern-Based Trading Signals. | |
| Combines candlestick patterns + advanced math features + standard technicals | |
| into a single LightGBM model for multi-class prediction: | |
| - Class 0: Strong Down (< -1% expected return) | |
| - Class 1: Neutral (-1% to +1%) | |
| - Class 2: Strong Up (> +1%) | |
| Design: | |
| - Uses LightGBM (fastest boosting library, 10x lighter than XGBoost) | |
| - Walk-forward validation (no lookahead bias) | |
| - Feature importance via built-in gain importance + optional SHAP | |
| - In-memory model cache with 8-hour TTL | |
| - Supports all markets: equities, crypto, forex, commodities | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| # ββ Cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CACHE_TTL = 8 * 3600 # 8 hours | |
| class CachedPredictor: | |
| model: Any | |
| feature_names: List[str] | |
| metrics: Dict[str, float] | |
| trained_at: float = field(default_factory=time.time) | |
| def is_stale(self) -> bool: | |
| return (time.time() - self.trained_at) > CACHE_TTL | |
| _predictor_cache: Dict[str, CachedPredictor] = {} | |
| # ββ Feature Assembly βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _assemble_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Assemble the complete feature matrix from: | |
| 1. Standard technical indicators (from feature_engineering pipeline) | |
| 2. Pattern detection scores | |
| 3. Advanced mathematical features | |
| """ | |
| from app.services.feature_engineering.pipeline import feature_pipeline | |
| from app.services.ml.pattern_recognition.pattern_detector import pattern_detector | |
| from app.services.ml.pattern_recognition.advanced_features import advanced_feature_engine | |
| # 1. Standard technicals | |
| featured = feature_pipeline.compute_all_features(df) | |
| # 2. Pattern features (serialize as numeric) | |
| n = len(featured) | |
| bullish_count = np.zeros(n) | |
| bearish_count = np.zeros(n) | |
| max_reliability = np.zeros(n) | |
| pattern_signal = np.zeros(n) # +1 bullish, -1 bearish, weighted by reliability | |
| for i in range(max(0, n - 30), n): | |
| if i < 5: | |
| continue | |
| # Detect on a small slice | |
| slice_df = df.iloc[max(0, i-20):i+1].copy() | |
| if len(slice_df) < 5: | |
| continue | |
| patterns = pattern_detector.detect_all(slice_df, lookback=3) | |
| for p in patterns: | |
| rel = p.get("reliability", 0.5) | |
| direction = p.get("direction", "neutral") | |
| if direction == "bullish": | |
| bullish_count[i] += 1 | |
| pattern_signal[i] += rel | |
| elif direction == "bearish": | |
| bearish_count[i] += 1 | |
| pattern_signal[i] -= rel | |
| max_reliability[i] = max(max_reliability[i], rel) | |
| featured["pattern_bullish_count"] = bullish_count | |
| featured["pattern_bearish_count"] = bearish_count | |
| featured["pattern_max_reliability"] = max_reliability | |
| featured["pattern_signal"] = pattern_signal | |
| # 3. Advanced math features (rolling) | |
| adv = advanced_feature_engine.compute_feature_series(df, window=20) | |
| for col in ["hurst_exponent", "fractal_dimension", "entropy", | |
| "price_efficiency", "trend_strength", | |
| "return_skew_20", "return_kurtosis_20"]: | |
| if col in adv.columns: | |
| featured[col] = adv[col] | |
| # 4. Additional return-based features | |
| close = featured["Close"] | |
| for lag in [1, 2, 3, 5, 10]: | |
| featured[f"return_{lag}d"] = close.pct_change(lag) | |
| log_ret = np.log(close / close.shift(1)) | |
| for w in [5, 10, 20]: | |
| featured[f"vol_{w}d"] = log_ret.rolling(w).std() * np.sqrt(252) | |
| for ma_col in ["sma_20", "sma_50", "sma_200"]: | |
| if ma_col in featured.columns: | |
| featured[f"price_vs_{ma_col}"] = (close - featured[ma_col]) / (featured[ma_col] + 1e-10) | |
| if "Volume" in featured.columns: | |
| featured["volume_change"] = featured["Volume"].pct_change() | |
| featured["volume_zscore"] = ( | |
| featured["Volume"] - featured["Volume"].rolling(20).mean() | |
| ) / (featured["Volume"].rolling(20).std() + 1e-10) | |
| # Day of week | |
| if hasattr(featured.index, "dayofweek"): | |
| featured["day_of_week"] = featured.index.dayofweek | |
| return featured | |
| def _create_target(df: pd.DataFrame, horizon: int = 5, threshold: float = 0.01) -> pd.Series: | |
| """ | |
| Multi-class target: | |
| 0 = strong down (return < -threshold) | |
| 1 = neutral | |
| 2 = strong up (return > threshold) | |
| """ | |
| future_return = df["Close"].shift(-horizon) / df["Close"] - 1 | |
| target = pd.Series(1, index=df.index) # neutral by default | |
| target[future_return > threshold] = 2 # strong up | |
| target[future_return < -threshold] = 0 # strong down | |
| return target | |
| # ββ Training βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _train_model( | |
| df: pd.DataFrame, | |
| horizon: int = 5, | |
| threshold: float = 0.01, | |
| ) -> Tuple[Any, List[str], Dict[str, float]]: | |
| """ | |
| Train LightGBM classifier with walk-forward validation. | |
| """ | |
| from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score | |
| import lightgbm as lgb | |
| featured = _assemble_features(df) | |
| featured["target"] = _create_target(featured, horizon=horizon, threshold=threshold) | |
| featured = featured.dropna() | |
| if len(featured) < 100: | |
| raise ValueError(f"Insufficient data: {len(featured)} rows (need 100+)") | |
| # Feature columns | |
| exclude = {"Open", "High", "Low", "Close", "Volume", "Adj Close", "target"} | |
| feature_cols = [ | |
| c for c in featured.columns | |
| if c not in exclude and featured[c].dtype in [np.float64, np.int64, np.float32, np.int32] | |
| ] | |
| X = featured[feature_cols].values | |
| y = featured["target"].values | |
| # Walk-forward split | |
| split = int(len(X) * 0.8) | |
| X_train, X_val = X[:split], X[split:] | |
| y_train, y_val = y[:split], y[split:] | |
| X_train = np.nan_to_num(X_train, nan=0.0, posinf=0.0, neginf=0.0) | |
| X_val = np.nan_to_num(X_val, nan=0.0, posinf=0.0, neginf=0.0) | |
| # Class weights for imbalanced data | |
| classes, counts = np.unique(y_train, return_counts=True) | |
| total = len(y_train) | |
| class_weights = {int(c): total / (len(classes) * cnt) for c, cnt in zip(classes, counts)} | |
| train_data = lgb.Dataset(X_train, label=y_train) | |
| val_data = lgb.Dataset(X_val, label=y_val, reference=train_data) | |
| params = { | |
| "objective": "multiclass", | |
| "num_class": 3, | |
| "metric": "multi_logloss", | |
| "boosting_type": "gbdt", | |
| "num_leaves": 63, | |
| "learning_rate": 0.05, | |
| "feature_fraction": 0.8, | |
| "bagging_fraction": 0.8, | |
| "bagging_freq": 5, | |
| "lambda_l1": 0.1, | |
| "lambda_l2": 1.0, | |
| "min_child_samples": 20, | |
| "verbose": -1, | |
| "n_jobs": -1, | |
| "seed": 42, | |
| } | |
| callbacks = [ | |
| lgb.early_stopping(stopping_rounds=20), | |
| lgb.log_evaluation(period=0), | |
| ] | |
| model = lgb.train( | |
| params, | |
| train_data, | |
| num_boost_round=300, | |
| valid_sets=[val_data], | |
| callbacks=callbacks, | |
| ) | |
| # Validation metrics | |
| y_pred_proba = model.predict(X_val) | |
| y_pred = np.argmax(y_pred_proba, axis=1) | |
| metrics = { | |
| "accuracy": round(float(accuracy_score(y_val, y_pred)), 4), | |
| "precision": round(float(precision_score(y_val, y_pred, average="weighted", zero_division=0)), 4), | |
| "recall": round(float(recall_score(y_val, y_pred, average="weighted", zero_division=0)), 4), | |
| "f1": round(float(f1_score(y_val, y_pred, average="weighted", zero_division=0)), 4), | |
| "train_samples": len(X_train), | |
| "val_samples": len(X_val), | |
| } | |
| logger.info( | |
| "LightGBM trained: %d samples, accuracy=%.1f%%, f1=%.4f", | |
| len(X_train) + len(X_val), metrics["accuracy"] * 100, metrics["f1"], | |
| ) | |
| return model, feature_cols, metrics | |
| # ββ Prediction βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DIRECTION_MAP = {0: "strong_down", 1: "neutral", 2: "strong_up"} | |
| DIRECTION_LABELS = { | |
| "strong_down": "Strong Bearish", | |
| "neutral": "Neutral", | |
| "strong_up": "Strong Bullish", | |
| } | |
| async def predict_with_patterns( | |
| ticker: str, | |
| period: str = "2y", | |
| horizon: int = 5, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Full prediction pipeline: patterns + advanced features + LightGBM. | |
| Returns prediction direction, confidence, feature importances, | |
| detected patterns, and model metrics. | |
| """ | |
| import asyncio | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| from app.services.ml.pattern_recognition.pattern_detector import pattern_detector | |
| from app.services.ml.pattern_recognition.advanced_features import advanced_feature_engine | |
| cache_key = f"pattern_{ticker}_{period}_{horizon}" | |
| # Fetch data | |
| df = pd.DataFrame() | |
| for attempt in range(3): | |
| try: | |
| df = await yahoo_adapter.get_price_dataframe(ticker, period=period) | |
| if not df.empty: | |
| break | |
| except Exception as e: | |
| logger.warning("Fetch attempt %d for %s: %s", attempt + 1, ticker, e) | |
| if attempt < 2: | |
| await asyncio.sleep(1) | |
| if df.empty or len(df) < 100: | |
| raise ValueError(f"Insufficient price data for {ticker}") | |
| # Check cache | |
| cached = _predictor_cache.get(cache_key) | |
| from_cache = False | |
| if cached and not cached.is_stale: | |
| model = cached.model | |
| feature_names = cached.feature_names | |
| metrics = cached.metrics | |
| from_cache = True | |
| else: | |
| model, feature_names, metrics = _train_model(df, horizon=horizon) | |
| _predictor_cache[cache_key] = CachedPredictor( | |
| model=model, feature_names=feature_names, metrics=metrics, | |
| ) | |
| # Build features for prediction | |
| featured = _assemble_features(df) | |
| featured = featured.dropna(subset=[c for c in feature_names if c in featured.columns]) | |
| if featured.empty: | |
| raise ValueError(f"No valid features for {ticker}") | |
| # Latest prediction | |
| X_latest = featured[feature_names].iloc[[-1]].values | |
| X_latest = np.nan_to_num(X_latest, nan=0.0, posinf=0.0, neginf=0.0) | |
| proba = model.predict(X_latest)[0] | |
| pred_class = int(np.argmax(proba)) | |
| confidence = float(proba[pred_class]) | |
| direction = DIRECTION_MAP.get(pred_class, "neutral") | |
| # Feature importance (top 15) | |
| importances = model.feature_importance(importance_type="gain") | |
| importance_pairs = sorted( | |
| zip(feature_names, importances.tolist()), | |
| key=lambda x: x[1], reverse=True, | |
| )[:15] | |
| # Detect current patterns | |
| patterns = pattern_detector.detect_all(df, lookback=5) | |
| # Advanced features snapshot | |
| adv_features = advanced_feature_engine.compute_all(df) | |
| # Expected return estimate | |
| recent = df["Close"].pct_change(horizon).dropna() | |
| bins = {"strong_up": recent[recent > 0.01], "neutral": recent[abs(recent) <= 0.01], "strong_down": recent[recent < -0.01]} | |
| expected_return = float(bins.get(direction, recent).mean()) if len(bins.get(direction, recent)) > 0 else 0 | |
| # Hedge recommendation (aggregate from ML + patterns) | |
| hedge_signals = [p.get("hedge_signal", "hold_hedge") for p in patterns[:10]] | |
| bearish_signals = sum(1 for s in hedge_signals if s in ("hedge_now", "increase_hedge")) | |
| bullish_signals = sum(1 for s in hedge_signals if s == "reduce_hedge") | |
| pattern_consensus = "bearish" if bearish_signals > bullish_signals else "bullish" if bullish_signals > bearish_signals else "neutral" | |
| if direction == "strong_down" and confidence > 0.55: | |
| hedge_action = "hedge_now" | |
| hedge_pct = min(50, int(confidence * 65)) | |
| hedge_urgency = "high" | |
| elif direction == "strong_down" or pattern_consensus == "bearish": | |
| hedge_action = "increase_hedge" | |
| hedge_pct = min(35, int(confidence * 45)) | |
| hedge_urgency = "medium" | |
| elif direction == "strong_up" and confidence > 0.55: | |
| hedge_action = "reduce_hedge" | |
| hedge_pct = max(5, int((1 - confidence) * 20)) | |
| hedge_urgency = "low" | |
| else: | |
| hedge_action = "hold_hedge" | |
| hedge_pct = 15 | |
| hedge_urgency = "none" | |
| hedge_recommendation = { | |
| "action": hedge_action, | |
| "suggested_hedge_pct": hedge_pct, | |
| "urgency": hedge_urgency, | |
| "pattern_consensus": pattern_consensus, | |
| "bearish_pattern_count": bearish_signals, | |
| "bullish_pattern_count": bullish_signals, | |
| "rationale": { | |
| "hedge_now": f"ML predicts {DIRECTION_LABELS.get(direction, direction)} ({confidence:.0%} confidence) with {bearish_signals} bearish pattern(s). Recommend hedging {hedge_pct}% of portfolio via inverse ETFs or protective puts.", | |
| "increase_hedge": f"Moderate downside risk detected. ML confidence: {confidence:.0%}. Consider increasing hedge exposure to {hedge_pct}%.", | |
| "reduce_hedge": f"Bullish outlook ({confidence:.0%} confidence). Consider reducing hedge to {hedge_pct}% maintenance level to capture upside.", | |
| "hold_hedge": f"Mixed or neutral signals. Maintain current hedge allocation (~{hedge_pct}%).", | |
| }.get(hedge_action, ""), | |
| "instruments": { | |
| "hedge_now": ["SH (ProShares Short S&P500)", "SQQQ (ProShares Ultra Short QQQ)", "VIX Calls"], | |
| "increase_hedge": ["SH (Short S&P500)", "GLD (Gold ETF)", "TLT (Long-Term Treasury)"], | |
| "reduce_hedge": [], | |
| "hold_hedge": ["GLD (Gold ETF)"], | |
| }.get(hedge_action, []), | |
| } | |
| return { | |
| "ticker": ticker, | |
| "prediction": direction, | |
| "prediction_label": DIRECTION_LABELS.get(direction, direction), | |
| "confidence": round(confidence, 4), | |
| "probabilities": { | |
| "strong_down": round(float(proba[0]), 4), | |
| "neutral": round(float(proba[1]), 4), | |
| "strong_up": round(float(proba[2]), 4), | |
| }, | |
| "expected_return_pct": round(expected_return * 100, 2), | |
| "horizon_days": horizon, | |
| "confidence_level": ( | |
| "high" if confidence > 0.65 | |
| else "medium" if confidence > 0.45 | |
| else "low" | |
| ), | |
| "hedge_recommendation": hedge_recommendation, | |
| "detected_patterns": patterns[:10], | |
| "top_features": [ | |
| {"name": name, "importance": round(imp, 4)} | |
| for name, imp in importance_pairs | |
| ], | |
| "advanced_features": { | |
| k: v for k, v in adv_features.items() | |
| if k != "error" and not isinstance(v, (list, dict)) | |
| }, | |
| "model_metrics": metrics, | |
| "from_cache": from_cache, | |
| "training_samples": len(df), | |
| "current_price": round(float(df["Close"].iloc[-1]), 2), | |
| } | |
| async def analyze_multiple( | |
| tickers: List[str], | |
| period: str = "2y", | |
| horizon: int = 5, | |
| ) -> Dict[str, Any]: | |
| """Analyze multiple tickers and return comparative results.""" | |
| results = {} | |
| for ticker in tickers[:10]: # cap at 10 | |
| try: | |
| results[ticker] = await predict_with_patterns(ticker, period, horizon) | |
| except Exception as e: | |
| logger.warning("Analysis failed for %s: %s", ticker, e) | |
| results[ticker] = {"error": str(e)} | |
| return results | |
| async def backtest_pattern_accuracy( | |
| ticker: str, | |
| period: str = "5y", | |
| horizon: int = 5, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Backtest pattern detection accuracy on historical data. | |
| For each pattern type, compute win rate and average return. | |
| """ | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| from app.services.ml.pattern_recognition.pattern_detector import pattern_detector | |
| df = await yahoo_adapter.get_price_dataframe(ticker, period=period) | |
| if df.empty or len(df) < 100: | |
| raise ValueError(f"Insufficient data for {ticker}") | |
| close = df["Close"].values | |
| n = len(df) | |
| pattern_stats: Dict[str, Dict[str, Any]] = {} | |
| for i in range(30, n - horizon): | |
| slice_df = df.iloc[max(0, i-20):i+1].copy() | |
| patterns = pattern_detector.detect_all(slice_df, lookback=3) | |
| future_return = (close[i + horizon] - close[i]) / close[i] if close[i] > 0 else 0 | |
| for p in patterns: | |
| name = p["name"] | |
| direction = p["direction"] | |
| if name not in pattern_stats: | |
| pattern_stats[name] = { | |
| "occurrences": 0, | |
| "correct": 0, | |
| "returns": [], | |
| "direction": direction, | |
| "reliability": p["reliability"], | |
| } | |
| stats = pattern_stats[name] | |
| stats["occurrences"] += 1 | |
| stats["returns"].append(future_return) | |
| # Correct if: bullish + positive return, or bearish + negative return | |
| if (direction == "bullish" and future_return > 0) or \ | |
| (direction == "bearish" and future_return < 0): | |
| stats["correct"] += 1 | |
| # Compile results | |
| accuracy_report = [] | |
| for name, stats in pattern_stats.items(): | |
| occ = stats["occurrences"] | |
| if occ < 3: | |
| continue | |
| avg_return = float(np.mean(stats["returns"])) * 100 | |
| win_rate = stats["correct"] / occ | |
| accuracy_report.append({ | |
| "pattern": name, | |
| "direction": stats["direction"], | |
| "occurrences": occ, | |
| "win_rate": round(win_rate, 4), | |
| "avg_return_pct": round(avg_return, 4), | |
| "theoretical_reliability": stats["reliability"], | |
| "actual_vs_theoretical": round(win_rate - stats["reliability"], 4), | |
| }) | |
| accuracy_report.sort(key=lambda x: x["win_rate"], reverse=True) | |
| return { | |
| "ticker": ticker, | |
| "period": period, | |
| "horizon_days": horizon, | |
| "total_bars_analyzed": n - 30 - horizon, | |
| "patterns_found": len(accuracy_report), | |
| "accuracy_report": accuracy_report, | |
| } | |
| def clear_cache(ticker: Optional[str] = None) -> int: | |
| """Clear predictor cache.""" | |
| if ticker: | |
| keys = [k for k in _predictor_cache if ticker in k] | |
| for k in keys: | |
| del _predictor_cache[k] | |
| return len(keys) | |
| count = len(_predictor_cache) | |
| _predictor_cache.clear() | |
| return count | |