| """Multi-Factor Scoring Engine v1.0 β Institutional-Grade Asset Scoring |
| Fuses 7 independent signals into a unified conviction score. |
| Based on: Gu et al. (2020) + Zuckerman (2021) Multi-Factor Equity Models |
| """ |
| import numpy as np |
| import pandas as pd |
| from typing import Dict, Optional |
|
|
| |
| |
| DEFAULT_WEIGHTS = { |
| 'trend': 0.20, |
| 'momentum': 0.15, |
| 'volatility': 0.15, |
| 'fundamentals': 0.15, |
| 'news': 0.15, |
| 'options': 0.10, |
| 'macro': 0.10, |
| } |
|
|
| |
| CONVICTION_BANDS = { |
| 'avoid': (0, 35), |
| 'neutral': (35, 55), |
| 'small': (55, 70), |
| 'moderate': (70, 85), |
| 'aggressive': (85, 100), |
| } |
|
|
|
|
| class MultiFactorEngine: |
| """Unified scoring from 7 orthogonal factors into a single conviction score.""" |
|
|
| def __init__(self, weights: Optional[Dict[str, float]] = None): |
| self.weights = weights or dict(DEFAULT_WEIGHTS) |
| assert abs(sum(self.weights.values()) - 1.0) < 1e-6, "Weights must sum to 1.0" |
|
|
| |
| @staticmethod |
| def trend_factor(price: float, sma20: float, sma50: float, sma200: float = None, |
| adx: float = None, high_52w: float = None, low_52w: float = None) -> float: |
| """Score 0-100 based on price position vs moving averages. |
| Bullish when price > SMA20 > SMA50 with ADX > 25 confirming. |
| """ |
| score = 50.0 |
| if price > sma20: score += 15 |
| if price > sma50: score += 15 |
| if sma20 > sma50: score += 10 |
| if sma200 and price > sma200: score += 10 |
| if high_52w and low_52w and high_52w > low_52w: |
| score += 10 * ((price - low_52w) / (high_52w - low_52w + 1e-10)) |
| if adx and adx > 25: |
| score += 10 |
| elif adx and adx > 40: |
| score += 15 |
| return max(0, min(100, score)) |
|
|
| |
| @staticmethod |
| def momentum_factor(rsi: float, macd_hist: float, obv_slope: float = None, |
| rsi_divergence: float = None) -> float: |
| """Score 0-100. Momentum peaking at RSI 50-70 range. |
| Bearish divergence (price up, RSI down) reduces score. |
| """ |
| score = 50.0 |
| if rsi < 30: score += 15 |
| elif 30 <= rsi <= 45: score += 5 |
| elif 45 < rsi <= 65: score += 10 |
| elif 65 < rsi <= 75: score += 3 |
| elif rsi > 75: score -= 15 |
|
|
| if macd_hist > 0: score += 10 |
| if macd_hist > 0.5: score += 10 |
| if macd_hist < 0: score -= 10 |
|
|
| if obv_slope and obv_slope > 0: score += 10 |
| if obv_slope and obv_slope < 0: score -= 10 |
|
|
| if rsi_divergence and rsi_divergence < 0: score -= 15 |
| if rsi_divergence and rsi_divergence > 0: score += 10 |
|
|
| return max(0, min(100, score)) |
|
|
| |
| @staticmethod |
| def volatility_factor(hv: float, iv: float = None, vol_regime: str = 'normal', |
| skew: float = None, vix: float = None) -> float: |
| """Score 0-100. Low vol regime = bullish. High vol with declining trend = bullish. |
| IV < HV (cheap options) = bullish. Rising vol = bearish. |
| """ |
| score = 50.0 |
| if hv < 0.15: score += 15 |
| elif hv < 0.25: score += 5 |
| elif hv > 0.40: score -= 15 |
| elif hv > 0.35: score -= 5 |
|
|
| if iv and hv and iv < hv: |
| score += 10 |
| if iv and hv and iv > hv * 1.5: |
| score -= 15 |
|
|
| if vol_regime == 'low': score += 15 |
| if vol_regime == 'declining': score += 10 |
| if vol_regime == 'spiking': score -= 20 |
| if vol_regime == 'high': score -= 10 |
|
|
| if skew and skew < 0: score += 5 |
| if skew and skew < -0.5: score -= 10 |
|
|
| if vix and vix < 15: score += 10 |
| if vix and vix > 30: score -= 15 |
|
|
| return max(0, min(100, score)) |
|
|
| |
| @staticmethod |
| def fundamentals_factor(pe: float = None, peg: float = None, ps: float = None, |
| pb: float = None, roe: float = None, debt_equity: float = None, |
| fcf_yield: float = None, growth_5y: float = None, |
| sector_pe: float = None) -> float: |
| """Score 0-100 based on valuation + quality. Lower PEG, higher ROE = better. |
| FCF yield > 5% is strong signal. Relative to sector PE. |
| """ |
| score = 50.0 |
| |
| if pe: |
| if pe < 12: score += 15 |
| elif pe < 18: score += 10 |
| elif pe < 25: score += 3 |
| elif pe > 35: score -= 10 |
| elif pe > 50: score -= 15 |
| if sector_pe and pe < sector_pe * 0.8: score += 10 |
|
|
| if peg: |
| if peg < 0.8: score += 15 |
| elif peg < 1.0: score += 10 |
| elif peg < 1.5: score += 3 |
| elif peg > 2.0: score -= 15 |
|
|
| if ps: |
| if ps < 1: score += 10 |
| elif ps > 10: score -= 10 |
|
|
| if pb: |
| if pb < 1.5: score += 10 |
| elif pb > 5: score -= 10 |
|
|
| |
| if roe and roe > 0.15: score += 10 |
| if roe and roe > 0.25: score += 5 |
| if roe and roe < 0.05: score -= 10 |
|
|
| if debt_equity: |
| if debt_equity < 0.5: score += 10 |
| elif debt_equity > 2.0: score -= 10 |
|
|
| if fcf_yield: |
| if fcf_yield > 0.06: score += 15 |
| elif fcf_yield > 0.03: score += 10 |
| elif fcf_yield < 0: score -= 15 |
|
|
| if growth_5y: |
| if growth_5y > 0.20: score += 10 |
| elif growth_5y > 0.15: score += 5 |
| elif growth_5y < 0: score -= 15 |
|
|
| return max(0, min(100, score)) |
|
|
| |
| @staticmethod |
| def news_factor(sentiment_score: float, news_volume: float = None, |
| event_risk: str = 'none', event_date: str = None) -> float: |
| """Score 0-100. Sentiment 0.0-1.0 (bearish-bullish). Event risk overrides. |
| |
| sentiment_score: 0.0=very bearish, 0.5=neutral, 1.0=very bullish |
| news_volume: articles per day, higher = more signal confidence |
| event_risk: 'none', 'earnings_today', 'earnings_week', 'fed_today', |
| 'macro_today', 'lawsuit', 'merger', 'dividend' |
| """ |
| score = 50.0 |
| |
| if sentiment_score < 0.2: score -= 25 |
| elif sentiment_score < 0.35: score -= 15 |
| elif sentiment_score < 0.45: score -= 5 |
| elif 0.45 <= sentiment_score <= 0.55: score += 0 |
| elif sentiment_score > 0.7: score += 25 |
| elif sentiment_score > 0.55: score += 15 |
| elif sentiment_score > 0.5: score += 5 |
|
|
| |
| if news_volume: |
| if news_volume > 50: score += 5 |
| if news_volume > 200: score += 5 |
|
|
| |
| event_override = { |
| 'none': 0, |
| 'earnings_week': -5, |
| 'earnings_today': -20, |
| 'fed_today': -15, |
| 'macro_today': -10, |
| 'lawsuit': -25, |
| 'merger': 10, |
| 'dividend': 3, |
| 'split': 5, |
| 'buyback': 8, |
| } |
| if event_risk in event_override: |
| score += event_override[event_risk] |
|
|
| return max(0, min(100, score)) |
|
|
| |
| @staticmethod |
| def options_factor(pcr: float = None, unusual_volume: float = None, |
| gamma_exposure: float = None, iv_skew: float = None, |
| open_interest_change: float = None, max_pain: float = None, |
| current_price: float = None) -> float: |
| """Score 0-100 based on options market microstructure signals. |
| |
| pcr: put/call ratio < 0.5 bullish, > 1.2 bearish |
| unusual_volume: ratio vs 20d avg > 2 = significant |
| gamma_exposure: positive gamma = sticky price, negative = magnetic pin |
| iv_skew: steep put skew = fear, flat = complacent |
| open_interest_change: rising OI with rising price = conviction |
| max_pain: price tends toward max pain on expiry |
| """ |
| score = 50.0 |
| if pcr: |
| if pcr < 0.5: score += 20 |
| elif pcr < 0.7: score += 10 |
| elif pcr > 1.2: score -= 20 |
| elif pcr > 1.0: score -= 10 |
| elif pcr > 0.85: score -= 3 |
|
|
| if unusual_volume: |
| if unusual_volume > 5: score += 15 |
| elif unusual_volume > 2: score += 8 |
| elif unusual_volume < 0.5: score -= 5 |
|
|
| if gamma_exposure: |
| if gamma_exposure > 0: score += 5 |
| elif gamma_exposure < -5: score -= 15 |
|
|
| if iv_skew: |
| if iv_skew < -0.3: score -= 10 |
| if iv_skew < -0.5: score -= 15 |
| if iv_skew > 0.1: score += 5 |
|
|
| if open_interest_change and open_interest_change > 0.3: |
| score += 10 |
|
|
| if max_pain and current_price: |
| if current_price < max_pain * 0.97: score += 5 |
| if current_price > max_pain * 1.03: score -= 5 |
|
|
| return max(0, min(100, score)) |
|
|
| |
| @staticmethod |
| def macro_factor(vix: float = None, dxy_change: float = None, |
| yield_10y: float = None, yield_2y: float = None, |
| sector_beta: float = None, cpi_surprise: float = None, |
| fed_meeting_days: int = None) -> float: |
| """Score 0-100. Macro tailwinds or headwinds for risk assets. |
| |
| vix: volatility index |
| dxy_change: 20d % change in dollar index |
| yield_10y / yield_2y: treasury yields |
| sector_beta: stock's beta to rates (tech > 1, utilities < 0.5) |
| cpi_surprise: actual - expected, > 0 = hawkish |
| fed_meeting_days: days until next FOMC |
| """ |
| score = 50.0 |
| |
| if vix: |
| if vix < 15: score += 15 |
| elif vix < 20: score += 5 |
| elif vix > 30: score -= 20 |
| elif vix > 25: score -= 10 |
| elif vix > 20: score -= 5 |
|
|
| |
| if dxy_change: |
| if dxy_change > 0.05: score -= 10 |
| if dxy_change < -0.03: score += 5 |
|
|
| |
| if yield_10y and yield_2y: |
| spread = yield_10y - yield_2y |
| if spread < -0.5: score -= 15 |
| elif spread < 0: score -= 10 |
| elif spread > 1.0: score += 10 |
|
|
| |
| if yield_10y and sector_beta: |
| if yield_10y > 0.05 and sector_beta > 1.0: |
| score -= 15 |
| elif yield_10y < 0.03 and sector_beta > 1.0: |
| score += 10 |
|
|
| |
| if cpi_surprise: |
| if cpi_surprise > 0.5: score -= 15 |
| if cpi_surprise < -0.3: score += 10 |
|
|
| |
| if fed_meeting_days is not None: |
| if fed_meeting_days <= 3: score -= 10 |
| if fed_meeting_days == 0: score -= 20 |
|
|
| return max(0, min(100, score)) |
|
|
| |
| def score(self, factors: Dict[str, float], verbose: bool = False) -> Dict: |
| """Compute unified conviction score from individual factor scores. |
| |
| Args: |
| factors: dict with keys matching self.weights |
| e.g. {'trend': 75, 'momentum': 45, 'volatility': 60, |
| 'fundamentals': 80, 'news': 65, 'options': 70, 'macro': 55} |
| verbose: return full breakdown |
| |
| Returns: |
| Dict with score, band, and optionally per-factor contributions |
| """ |
| total = 0.0 |
| contributions = {} |
| for key, weight in self.weights.items(): |
| val = factors.get(key, 50.0) |
| contrib = val * weight |
| contributions[key] = { |
| 'score': val, |
| 'weight': weight, |
| 'contribution': contrib, |
| } |
| total += contrib |
|
|
| total = max(0, min(100, total)) |
|
|
| |
| band = 'neutral' |
| for name, (lo, hi) in CONVICTION_BANDS.items(): |
| if lo <= total < hi: |
| band = name |
| break |
| if total >= 85: |
| band = 'aggressive' |
|
|
| |
| sizing = { |
| 'avoid': 0.00, |
| 'neutral': 0.00, |
| 'small': 0.33, |
| 'moderate': 0.67, |
| 'aggressive': 1.00, |
| } |
|
|
| result = { |
| 'conviction_score': round(total, 2), |
| 'band': band, |
| 'target_exposure': sizing.get(band, 0), |
| 'direction': 'long' if total > 55 else 'short' if total < 35 else 'neutral', |
| } |
| if verbose: |
| result['contributions'] = contributions |
| result['weights'] = dict(self.weights) |
| return result |
|
|
| def score_from_dataframe(self, df: pd.DataFrame, |
| fundamentals: Optional[Dict] = None, |
| news: Optional[Dict] = None, |
| options: Optional[Dict] = None, |
| macro: Optional[Dict] = None, |
| verbose: bool = False) -> Dict: |
| """Compute multi-factor score from price DataFrame + optional overlays.""" |
| l = df.iloc[-1] |
| p = df.iloc[-2] if len(df) > 1 else l |
|
|
| |
| trend = self.trend_factor( |
| price=l['Close'], sma20=l.get('SMA20', l['Close']), |
| sma50=l.get('SMA50', l['Close']), |
| high_52w=df['High'].max(), low_52w=df['Low'].min() |
| ) |
|
|
| |
| macd_hist = l.get('MACD', 0) - l.get('MACDS', 0) |
| momentum = self.momentum_factor(rsi=l.get('RSI', 50), macd_hist=macd_hist) |
|
|
| |
| hv = df['Ret'].dropna().std() * np.sqrt(252) |
| volatility = self.volatility_factor(hv=hv) |
|
|
| |
| fund = 50.0 |
| if fundamentals: |
| fund = self.fundamentals_factor(**fundamentals) |
|
|
| |
| news_score = 50.0 |
| if news: |
| news_score = self.news_factor(**news) |
|
|
| |
| opt = 50.0 |
| if options: |
| opt = self.options_factor(**options) |
|
|
| |
| macro_score = 50.0 |
| if macro: |
| macro_score = self.macro_factor(**macro) |
|
|
| factors = { |
| 'trend': trend, |
| 'momentum': momentum, |
| 'volatility': volatility, |
| 'fundamentals': fund, |
| 'news': news_score, |
| 'options': opt, |
| 'macro': macro_score, |
| } |
|
|
| return self.score(factors, verbose=verbose) |
|
|
|
|
| if __name__ == '__main__': |
| |
| engine = MultiFactorEngine() |
|
|
| factors = { |
| 'trend': 85.0, |
| 'momentum': 45.0, |
| 'volatility': 30.0, |
| 'fundamentals': 80.0, |
| 'news': 50.0, |
| 'options': 55.0, |
| 'macro': 40.0, |
| } |
|
|
| result = engine.score(factors, verbose=True) |
| print(f"Conviction Score: {result['conviction_score']}/100") |
| print(f"Band: {result['band'].upper()}") |
| print(f"Direction: {result['direction'].upper()}") |
| print(f"Target Exposure: {result['target_exposure']*100:.0f}%") |
| print("\nPer-factor breakdown:") |
| for k, v in result['contributions'].items(): |
| print(f" {k:12s}: {v['score']:5.1f} Γ {v['weight']:.2f} = {v['contribution']:.1f}") |
|
|