jashdoshi77's picture
whole lotta changes
e6021a3
"""
LightGBM Ensemble Predictor for Pattern-Based Trading Signals.
Combines candlestick patterns + advanced math features + standard technicals
into a single LightGBM model for multi-class prediction:
- Class 0: Strong Down (< -1% expected return)
- Class 1: Neutral (-1% to +1%)
- Class 2: Strong Up (> +1%)
Design:
- Uses LightGBM (fastest boosting library, 10x lighter than XGBoost)
- Walk-forward validation (no lookahead bias)
- Feature importance via built-in gain importance + optional SHAP
- In-memory model cache with 8-hour TTL
- Supports all markets: equities, crypto, forex, commodities
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
# ── Cache ────────────────────────────────────────────────────────────────
CACHE_TTL = 8 * 3600 # 8 hours
@dataclass
class CachedPredictor:
model: Any
feature_names: List[str]
metrics: Dict[str, float]
trained_at: float = field(default_factory=time.time)
@property
def is_stale(self) -> bool:
return (time.time() - self.trained_at) > CACHE_TTL
_predictor_cache: Dict[str, CachedPredictor] = {}
# ── Feature Assembly ─────────────────────────────────────────────────────
def _assemble_features(df: pd.DataFrame) -> pd.DataFrame:
"""
Assemble the complete feature matrix from:
1. Standard technical indicators (from feature_engineering pipeline)
2. Pattern detection scores
3. Advanced mathematical features
"""
from app.services.feature_engineering.pipeline import feature_pipeline
from app.services.ml.pattern_recognition.pattern_detector import pattern_detector
from app.services.ml.pattern_recognition.advanced_features import advanced_feature_engine
# 1. Standard technicals
featured = feature_pipeline.compute_all_features(df)
# 2. Pattern features (serialize as numeric)
n = len(featured)
bullish_count = np.zeros(n)
bearish_count = np.zeros(n)
max_reliability = np.zeros(n)
pattern_signal = np.zeros(n) # +1 bullish, -1 bearish, weighted by reliability
for i in range(max(0, n - 30), n):
if i < 5:
continue
# Detect on a small slice
slice_df = df.iloc[max(0, i-20):i+1].copy()
if len(slice_df) < 5:
continue
patterns = pattern_detector.detect_all(slice_df, lookback=3)
for p in patterns:
rel = p.get("reliability", 0.5)
direction = p.get("direction", "neutral")
if direction == "bullish":
bullish_count[i] += 1
pattern_signal[i] += rel
elif direction == "bearish":
bearish_count[i] += 1
pattern_signal[i] -= rel
max_reliability[i] = max(max_reliability[i], rel)
featured["pattern_bullish_count"] = bullish_count
featured["pattern_bearish_count"] = bearish_count
featured["pattern_max_reliability"] = max_reliability
featured["pattern_signal"] = pattern_signal
# 3. Advanced math features (rolling)
adv = advanced_feature_engine.compute_feature_series(df, window=20)
for col in ["hurst_exponent", "fractal_dimension", "entropy",
"price_efficiency", "trend_strength",
"return_skew_20", "return_kurtosis_20"]:
if col in adv.columns:
featured[col] = adv[col]
# 4. Additional return-based features
close = featured["Close"]
for lag in [1, 2, 3, 5, 10]:
featured[f"return_{lag}d"] = close.pct_change(lag)
log_ret = np.log(close / close.shift(1))
for w in [5, 10, 20]:
featured[f"vol_{w}d"] = log_ret.rolling(w).std() * np.sqrt(252)
for ma_col in ["sma_20", "sma_50", "sma_200"]:
if ma_col in featured.columns:
featured[f"price_vs_{ma_col}"] = (close - featured[ma_col]) / (featured[ma_col] + 1e-10)
if "Volume" in featured.columns:
featured["volume_change"] = featured["Volume"].pct_change()
featured["volume_zscore"] = (
featured["Volume"] - featured["Volume"].rolling(20).mean()
) / (featured["Volume"].rolling(20).std() + 1e-10)
# Day of week
if hasattr(featured.index, "dayofweek"):
featured["day_of_week"] = featured.index.dayofweek
return featured
def _create_target(df: pd.DataFrame, horizon: int = 5, threshold: float = 0.01) -> pd.Series:
"""
Multi-class target:
0 = strong down (return < -threshold)
1 = neutral
2 = strong up (return > threshold)
"""
future_return = df["Close"].shift(-horizon) / df["Close"] - 1
target = pd.Series(1, index=df.index) # neutral by default
target[future_return > threshold] = 2 # strong up
target[future_return < -threshold] = 0 # strong down
return target
# ── Training ─────────────────────────────────────────────────────────────
def _train_model(
df: pd.DataFrame,
horizon: int = 5,
threshold: float = 0.01,
) -> Tuple[Any, List[str], Dict[str, float]]:
"""
Train LightGBM classifier with walk-forward validation.
"""
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import lightgbm as lgb
featured = _assemble_features(df)
featured["target"] = _create_target(featured, horizon=horizon, threshold=threshold)
featured = featured.dropna()
if len(featured) < 100:
raise ValueError(f"Insufficient data: {len(featured)} rows (need 100+)")
# Feature columns
exclude = {"Open", "High", "Low", "Close", "Volume", "Adj Close", "target"}
feature_cols = [
c for c in featured.columns
if c not in exclude and featured[c].dtype in [np.float64, np.int64, np.float32, np.int32]
]
X = featured[feature_cols].values
y = featured["target"].values
# Walk-forward split
split = int(len(X) * 0.8)
X_train, X_val = X[:split], X[split:]
y_train, y_val = y[:split], y[split:]
X_train = np.nan_to_num(X_train, nan=0.0, posinf=0.0, neginf=0.0)
X_val = np.nan_to_num(X_val, nan=0.0, posinf=0.0, neginf=0.0)
# Class weights for imbalanced data
classes, counts = np.unique(y_train, return_counts=True)
total = len(y_train)
class_weights = {int(c): total / (len(classes) * cnt) for c, cnt in zip(classes, counts)}
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
params = {
"objective": "multiclass",
"num_class": 3,
"metric": "multi_logloss",
"boosting_type": "gbdt",
"num_leaves": 63,
"learning_rate": 0.05,
"feature_fraction": 0.8,
"bagging_fraction": 0.8,
"bagging_freq": 5,
"lambda_l1": 0.1,
"lambda_l2": 1.0,
"min_child_samples": 20,
"verbose": -1,
"n_jobs": -1,
"seed": 42,
}
callbacks = [
lgb.early_stopping(stopping_rounds=20),
lgb.log_evaluation(period=0),
]
model = lgb.train(
params,
train_data,
num_boost_round=300,
valid_sets=[val_data],
callbacks=callbacks,
)
# Validation metrics
y_pred_proba = model.predict(X_val)
y_pred = np.argmax(y_pred_proba, axis=1)
metrics = {
"accuracy": round(float(accuracy_score(y_val, y_pred)), 4),
"precision": round(float(precision_score(y_val, y_pred, average="weighted", zero_division=0)), 4),
"recall": round(float(recall_score(y_val, y_pred, average="weighted", zero_division=0)), 4),
"f1": round(float(f1_score(y_val, y_pred, average="weighted", zero_division=0)), 4),
"train_samples": len(X_train),
"val_samples": len(X_val),
}
logger.info(
"LightGBM trained: %d samples, accuracy=%.1f%%, f1=%.4f",
len(X_train) + len(X_val), metrics["accuracy"] * 100, metrics["f1"],
)
return model, feature_cols, metrics
# ── Prediction ───────────────────────────────────────────────────────────
DIRECTION_MAP = {0: "strong_down", 1: "neutral", 2: "strong_up"}
DIRECTION_LABELS = {
"strong_down": "Strong Bearish",
"neutral": "Neutral",
"strong_up": "Strong Bullish",
}
async def predict_with_patterns(
ticker: str,
period: str = "2y",
horizon: int = 5,
) -> Dict[str, Any]:
"""
Full prediction pipeline: patterns + advanced features + LightGBM.
Returns prediction direction, confidence, feature importances,
detected patterns, and model metrics.
"""
import asyncio
from app.services.data_ingestion.yahoo import yahoo_adapter
from app.services.ml.pattern_recognition.pattern_detector import pattern_detector
from app.services.ml.pattern_recognition.advanced_features import advanced_feature_engine
cache_key = f"pattern_{ticker}_{period}_{horizon}"
# Fetch data
df = pd.DataFrame()
for attempt in range(3):
try:
df = await yahoo_adapter.get_price_dataframe(ticker, period=period)
if not df.empty:
break
except Exception as e:
logger.warning("Fetch attempt %d for %s: %s", attempt + 1, ticker, e)
if attempt < 2:
await asyncio.sleep(1)
if df.empty or len(df) < 100:
raise ValueError(f"Insufficient price data for {ticker}")
# Check cache
cached = _predictor_cache.get(cache_key)
from_cache = False
if cached and not cached.is_stale:
model = cached.model
feature_names = cached.feature_names
metrics = cached.metrics
from_cache = True
else:
model, feature_names, metrics = _train_model(df, horizon=horizon)
_predictor_cache[cache_key] = CachedPredictor(
model=model, feature_names=feature_names, metrics=metrics,
)
# Build features for prediction
featured = _assemble_features(df)
featured = featured.dropna(subset=[c for c in feature_names if c in featured.columns])
if featured.empty:
raise ValueError(f"No valid features for {ticker}")
# Latest prediction
X_latest = featured[feature_names].iloc[[-1]].values
X_latest = np.nan_to_num(X_latest, nan=0.0, posinf=0.0, neginf=0.0)
proba = model.predict(X_latest)[0]
pred_class = int(np.argmax(proba))
confidence = float(proba[pred_class])
direction = DIRECTION_MAP.get(pred_class, "neutral")
# Feature importance (top 15)
importances = model.feature_importance(importance_type="gain")
importance_pairs = sorted(
zip(feature_names, importances.tolist()),
key=lambda x: x[1], reverse=True,
)[:15]
# Detect current patterns
patterns = pattern_detector.detect_all(df, lookback=5)
# Advanced features snapshot
adv_features = advanced_feature_engine.compute_all(df)
# Expected return estimate
recent = df["Close"].pct_change(horizon).dropna()
bins = {"strong_up": recent[recent > 0.01], "neutral": recent[abs(recent) <= 0.01], "strong_down": recent[recent < -0.01]}
expected_return = float(bins.get(direction, recent).mean()) if len(bins.get(direction, recent)) > 0 else 0
# Hedge recommendation (aggregate from ML + patterns)
hedge_signals = [p.get("hedge_signal", "hold_hedge") for p in patterns[:10]]
bearish_signals = sum(1 for s in hedge_signals if s in ("hedge_now", "increase_hedge"))
bullish_signals = sum(1 for s in hedge_signals if s == "reduce_hedge")
pattern_consensus = "bearish" if bearish_signals > bullish_signals else "bullish" if bullish_signals > bearish_signals else "neutral"
if direction == "strong_down" and confidence > 0.55:
hedge_action = "hedge_now"
hedge_pct = min(50, int(confidence * 65))
hedge_urgency = "high"
elif direction == "strong_down" or pattern_consensus == "bearish":
hedge_action = "increase_hedge"
hedge_pct = min(35, int(confidence * 45))
hedge_urgency = "medium"
elif direction == "strong_up" and confidence > 0.55:
hedge_action = "reduce_hedge"
hedge_pct = max(5, int((1 - confidence) * 20))
hedge_urgency = "low"
else:
hedge_action = "hold_hedge"
hedge_pct = 15
hedge_urgency = "none"
hedge_recommendation = {
"action": hedge_action,
"suggested_hedge_pct": hedge_pct,
"urgency": hedge_urgency,
"pattern_consensus": pattern_consensus,
"bearish_pattern_count": bearish_signals,
"bullish_pattern_count": bullish_signals,
"rationale": {
"hedge_now": f"ML predicts {DIRECTION_LABELS.get(direction, direction)} ({confidence:.0%} confidence) with {bearish_signals} bearish pattern(s). Recommend hedging {hedge_pct}% of portfolio via inverse ETFs or protective puts.",
"increase_hedge": f"Moderate downside risk detected. ML confidence: {confidence:.0%}. Consider increasing hedge exposure to {hedge_pct}%.",
"reduce_hedge": f"Bullish outlook ({confidence:.0%} confidence). Consider reducing hedge to {hedge_pct}% maintenance level to capture upside.",
"hold_hedge": f"Mixed or neutral signals. Maintain current hedge allocation (~{hedge_pct}%).",
}.get(hedge_action, ""),
"instruments": {
"hedge_now": ["SH (ProShares Short S&P500)", "SQQQ (ProShares Ultra Short QQQ)", "VIX Calls"],
"increase_hedge": ["SH (Short S&P500)", "GLD (Gold ETF)", "TLT (Long-Term Treasury)"],
"reduce_hedge": [],
"hold_hedge": ["GLD (Gold ETF)"],
}.get(hedge_action, []),
}
return {
"ticker": ticker,
"prediction": direction,
"prediction_label": DIRECTION_LABELS.get(direction, direction),
"confidence": round(confidence, 4),
"probabilities": {
"strong_down": round(float(proba[0]), 4),
"neutral": round(float(proba[1]), 4),
"strong_up": round(float(proba[2]), 4),
},
"expected_return_pct": round(expected_return * 100, 2),
"horizon_days": horizon,
"confidence_level": (
"high" if confidence > 0.65
else "medium" if confidence > 0.45
else "low"
),
"hedge_recommendation": hedge_recommendation,
"detected_patterns": patterns[:10],
"top_features": [
{"name": name, "importance": round(imp, 4)}
for name, imp in importance_pairs
],
"advanced_features": {
k: v for k, v in adv_features.items()
if k != "error" and not isinstance(v, (list, dict))
},
"model_metrics": metrics,
"from_cache": from_cache,
"training_samples": len(df),
"current_price": round(float(df["Close"].iloc[-1]), 2),
}
async def analyze_multiple(
tickers: List[str],
period: str = "2y",
horizon: int = 5,
) -> Dict[str, Any]:
"""Analyze multiple tickers and return comparative results."""
results = {}
for ticker in tickers[:10]: # cap at 10
try:
results[ticker] = await predict_with_patterns(ticker, period, horizon)
except Exception as e:
logger.warning("Analysis failed for %s: %s", ticker, e)
results[ticker] = {"error": str(e)}
return results
async def backtest_pattern_accuracy(
ticker: str,
period: str = "5y",
horizon: int = 5,
) -> Dict[str, Any]:
"""
Backtest pattern detection accuracy on historical data.
For each pattern type, compute win rate and average return.
"""
from app.services.data_ingestion.yahoo import yahoo_adapter
from app.services.ml.pattern_recognition.pattern_detector import pattern_detector
df = await yahoo_adapter.get_price_dataframe(ticker, period=period)
if df.empty or len(df) < 100:
raise ValueError(f"Insufficient data for {ticker}")
close = df["Close"].values
n = len(df)
pattern_stats: Dict[str, Dict[str, Any]] = {}
for i in range(30, n - horizon):
slice_df = df.iloc[max(0, i-20):i+1].copy()
patterns = pattern_detector.detect_all(slice_df, lookback=3)
future_return = (close[i + horizon] - close[i]) / close[i] if close[i] > 0 else 0
for p in patterns:
name = p["name"]
direction = p["direction"]
if name not in pattern_stats:
pattern_stats[name] = {
"occurrences": 0,
"correct": 0,
"returns": [],
"direction": direction,
"reliability": p["reliability"],
}
stats = pattern_stats[name]
stats["occurrences"] += 1
stats["returns"].append(future_return)
# Correct if: bullish + positive return, or bearish + negative return
if (direction == "bullish" and future_return > 0) or \
(direction == "bearish" and future_return < 0):
stats["correct"] += 1
# Compile results
accuracy_report = []
for name, stats in pattern_stats.items():
occ = stats["occurrences"]
if occ < 3:
continue
avg_return = float(np.mean(stats["returns"])) * 100
win_rate = stats["correct"] / occ
accuracy_report.append({
"pattern": name,
"direction": stats["direction"],
"occurrences": occ,
"win_rate": round(win_rate, 4),
"avg_return_pct": round(avg_return, 4),
"theoretical_reliability": stats["reliability"],
"actual_vs_theoretical": round(win_rate - stats["reliability"], 4),
})
accuracy_report.sort(key=lambda x: x["win_rate"], reverse=True)
return {
"ticker": ticker,
"period": period,
"horizon_days": horizon,
"total_bars_analyzed": n - 30 - horizon,
"patterns_found": len(accuracy_report),
"accuracy_report": accuracy_report,
}
def clear_cache(ticker: Optional[str] = None) -> int:
"""Clear predictor cache."""
if ticker:
keys = [k for k in _predictor_cache if ticker in k]
for k in keys:
del _predictor_cache[k]
return len(keys)
count = len(_predictor_cache)
_predictor_cache.clear()
return count