"""Multi-Factor Scoring Engine v1.0 — Institutional-Grade Asset Scoring Fuses 7 independent signals into a unified conviction score. Based on: Gu et al. (2020) + Zuckerman (2021) Multi-Factor Equity Models """ import numpy as np import pandas as pd from typing import Dict, Optional # Factor weights — calibrated for institutional portfolio allocation # Weights sum to 1.0. Override for different strategies (value, momentum, etc.) DEFAULT_WEIGHTS = { 'trend': 0.20, # Price momentum, SMA alignment 'momentum': 0.15, # RSI, MACD, stochastics 'volatility': 0.15, # Vol regime, vol-of-vol, realized vs implied 'fundamentals': 0.15, # Valuation, growth, quality metrics 'news': 0.15, # Sentiment, event risk, news flow 'options': 0.10, # Put/call ratio, gamma, IV skew 'macro': 0.10, # Dollar, rates, VIX, sector beta } # Category thresholds CONVICTION_BANDS = { 'avoid': (0, 35), # Short / zero position 'neutral': (35, 55), # Watchlist / benchmark weight 'small': (55, 70), # 1/3 of target size 'moderate': (70, 85), # 2/3 of target size 'aggressive': (85, 100), # Full target size } class MultiFactorEngine: """Unified scoring from 7 orthogonal factors into a single conviction score.""" def __init__(self, weights: Optional[Dict[str, float]] = None): self.weights = weights or dict(DEFAULT_WEIGHTS) assert abs(sum(self.weights.values()) - 1.0) < 1e-6, "Weights must sum to 1.0" # ── Factor 1: Trend ─────────────────────────────────────── @staticmethod def trend_factor(price: float, sma20: float, sma50: float, sma200: float = None, adx: float = None, high_52w: float = None, low_52w: float = None) -> float: """Score 0-100 based on price position vs moving averages. Bullish when price > SMA20 > SMA50 with ADX > 25 confirming. """ score = 50.0 if price > sma20: score += 15 if price > sma50: score += 15 if sma20 > sma50: score += 10 if sma200 and price > sma200: score += 10 if high_52w and low_52w and high_52w > low_52w: score += 10 * ((price - low_52w) / (high_52w - low_52w + 1e-10)) if adx and adx > 25: score += 10 elif adx and adx > 40: score += 15 return max(0, min(100, score)) # ── Factor 2: Momentum ──────────────────────────────────── @staticmethod def momentum_factor(rsi: float, macd_hist: float, obv_slope: float = None, rsi_divergence: float = None) -> float: """Score 0-100. Momentum peaking at RSI 50-70 range. Bearish divergence (price up, RSI down) reduces score. """ score = 50.0 if rsi < 30: score += 15 # Oversold bounce elif 30 <= rsi <= 45: score += 5 elif 45 < rsi <= 65: score += 10 # Healthy momentum elif 65 < rsi <= 75: score += 3 # Still strong but caution elif rsi > 75: score -= 15 # Overbought if macd_hist > 0: score += 10 if macd_hist > 0.5: score += 10 if macd_hist < 0: score -= 10 if obv_slope and obv_slope > 0: score += 10 if obv_slope and obv_slope < 0: score -= 10 if rsi_divergence and rsi_divergence < 0: score -= 15 # Bearish div if rsi_divergence and rsi_divergence > 0: score += 10 # Bullish div return max(0, min(100, score)) # ── Factor 3: Volatility ───────────────────────────────── @staticmethod def volatility_factor(hv: float, iv: float = None, vol_regime: str = 'normal', skew: float = None, vix: float = None) -> float: """Score 0-100. Low vol regime = bullish. High vol with declining trend = bullish. IV < HV (cheap options) = bullish. Rising vol = bearish. """ score = 50.0 if hv < 0.15: score += 15 elif hv < 0.25: score += 5 elif hv > 0.40: score -= 15 elif hv > 0.35: score -= 5 if iv and hv and iv < hv: score += 10 # Cheap implied vol if iv and hv and iv > hv * 1.5: score -= 15 # Expensive options, fear premium if vol_regime == 'low': score += 15 if vol_regime == 'declining': score += 10 if vol_regime == 'spiking': score -= 20 if vol_regime == 'high': score -= 10 if skew and skew < 0: score += 5 # Put skew mild if skew and skew < -0.5: score -= 10 # Extreme fear if vix and vix < 15: score += 10 if vix and vix > 30: score -= 15 return max(0, min(100, score)) # ── Factor 4: Fundamentals ──────────────────────────────── @staticmethod def fundamentals_factor(pe: float = None, peg: float = None, ps: float = None, pb: float = None, roe: float = None, debt_equity: float = None, fcf_yield: float = None, growth_5y: float = None, sector_pe: float = None) -> float: """Score 0-100 based on valuation + quality. Lower PEG, higher ROE = better. FCF yield > 5% is strong signal. Relative to sector PE. """ score = 50.0 # Valuation if pe: if pe < 12: score += 15 elif pe < 18: score += 10 elif pe < 25: score += 3 elif pe > 35: score -= 10 elif pe > 50: score -= 15 if sector_pe and pe < sector_pe * 0.8: score += 10 if peg: if peg < 0.8: score += 15 elif peg < 1.0: score += 10 elif peg < 1.5: score += 3 elif peg > 2.0: score -= 15 if ps: if ps < 1: score += 10 elif ps > 10: score -= 10 if pb: if pb < 1.5: score += 10 elif pb > 5: score -= 10 # Quality if roe and roe > 0.15: score += 10 if roe and roe > 0.25: score += 5 if roe and roe < 0.05: score -= 10 if debt_equity: if debt_equity < 0.5: score += 10 elif debt_equity > 2.0: score -= 10 if fcf_yield: if fcf_yield > 0.06: score += 15 elif fcf_yield > 0.03: score += 10 elif fcf_yield < 0: score -= 15 if growth_5y: if growth_5y > 0.20: score += 10 elif growth_5y > 0.15: score += 5 elif growth_5y < 0: score -= 15 return max(0, min(100, score)) # ── Factor 5: News ──────────────────────────────────────── @staticmethod def news_factor(sentiment_score: float, news_volume: float = None, event_risk: str = 'none', event_date: str = None) -> float: """Score 0-100. Sentiment 0.0-1.0 (bearish-bullish). Event risk overrides. sentiment_score: 0.0=very bearish, 0.5=neutral, 1.0=very bullish news_volume: articles per day, higher = more signal confidence event_risk: 'none', 'earnings_today', 'earnings_week', 'fed_today', 'macro_today', 'lawsuit', 'merger', 'dividend' """ score = 50.0 # Base sentiment signal if sentiment_score < 0.2: score -= 25 elif sentiment_score < 0.35: score -= 15 elif sentiment_score < 0.45: score -= 5 elif 0.45 <= sentiment_score <= 0.55: score += 0 elif sentiment_score > 0.7: score += 25 elif sentiment_score > 0.55: score += 15 elif sentiment_score > 0.5: score += 5 # Volume confidence boost if news_volume: if news_volume > 50: score += 5 if news_volume > 200: score += 5 # High coverage = signal confidence # Event risk override event_override = { 'none': 0, 'earnings_week': -5, # Reduce size before earnings 'earnings_today': -20, # Major uncertainty 'fed_today': -15, 'macro_today': -10, 'lawsuit': -25, 'merger': 10, # Merger arb or acquisition premium 'dividend': 3, 'split': 5, 'buyback': 8, } if event_risk in event_override: score += event_override[event_risk] return max(0, min(100, score)) # ── Factor 6: Options Flow ─────────────────────────────── @staticmethod def options_factor(pcr: float = None, unusual_volume: float = None, gamma_exposure: float = None, iv_skew: float = None, open_interest_change: float = None, max_pain: float = None, current_price: float = None) -> float: """Score 0-100 based on options market microstructure signals. pcr: put/call ratio < 0.5 bullish, > 1.2 bearish unusual_volume: ratio vs 20d avg > 2 = significant gamma_exposure: positive gamma = sticky price, negative = magnetic pin iv_skew: steep put skew = fear, flat = complacent open_interest_change: rising OI with rising price = conviction max_pain: price tends toward max pain on expiry """ score = 50.0 if pcr: if pcr < 0.5: score += 20 # Extreme call buying elif pcr < 0.7: score += 10 elif pcr > 1.2: score -= 20 # Extreme put buying elif pcr > 1.0: score -= 10 elif pcr > 0.85: score -= 3 # Mild put bias if unusual_volume: if unusual_volume > 5: score += 15 # Massive flow elif unusual_volume > 2: score += 8 # Notable flow elif unusual_volume < 0.5: score -= 5 # Dead options if gamma_exposure: if gamma_exposure > 0: score += 5 # Gamma long = volatility support elif gamma_exposure < -5: score -= 15 # Gamma short = volatility risk if iv_skew: if iv_skew < -0.3: score -= 10 # Fear if iv_skew < -0.5: score -= 15 # Extreme fear if iv_skew > 0.1: score += 5 # Call skew (rare, bullish) if open_interest_change and open_interest_change > 0.3: score += 10 # Fresh conviction building if max_pain and current_price: if current_price < max_pain * 0.97: score += 5 # Room to max pain (up) if current_price > max_pain * 1.03: score -= 5 # Above max pain (mean revert) return max(0, min(100, score)) # ── Factor 7: Macro ─────────────────────────────────────── @staticmethod def macro_factor(vix: float = None, dxy_change: float = None, yield_10y: float = None, yield_2y: float = None, sector_beta: float = None, cpi_surprise: float = None, fed_meeting_days: int = None) -> float: """Score 0-100. Macro tailwinds or headwinds for risk assets. vix: volatility index dxy_change: 20d % change in dollar index yield_10y / yield_2y: treasury yields sector_beta: stock's beta to rates (tech > 1, utilities < 0.5) cpi_surprise: actual - expected, > 0 = hawkish fed_meeting_days: days until next FOMC """ score = 50.0 # VIX regime if vix: if vix < 15: score += 15 elif vix < 20: score += 5 elif vix > 30: score -= 20 elif vix > 25: score -= 10 elif vix > 20: score -= 5 # Dollar strength — bad for exporters, good for domestic if dxy_change: if dxy_change > 0.05: score -= 10 # Strong dollar if dxy_change < -0.03: score += 5 # Weak dollar # Yield curve if yield_10y and yield_2y: spread = yield_10y - yield_2y if spread < -0.5: score -= 15 # Deep inversion = recession elif spread < 0: score -= 10 # Inverted elif spread > 1.0: score += 10 # Steep = healthy # Rising rates hurt rate-sensitive stocks if yield_10y and sector_beta: if yield_10y > 0.05 and sector_beta > 1.0: score -= 15 # High rates + high beta = double hurt elif yield_10y < 0.03 and sector_beta > 1.0: score += 10 # Low rates + high beta = double boost # CPI surprise if cpi_surprise: if cpi_surprise > 0.5: score -= 15 # Hot inflation = hawkish Fed if cpi_surprise < -0.3: score += 10 # Cold inflation = dovish # Fed meeting proximity if fed_meeting_days is not None: if fed_meeting_days <= 3: score -= 10 # Imminent uncertainty if fed_meeting_days == 0: score -= 20 # Meeting day return max(0, min(100, score)) # ── Master Scoring ──────────────────────────────────────── def score(self, factors: Dict[str, float], verbose: bool = False) -> Dict: """Compute unified conviction score from individual factor scores. Args: factors: dict with keys matching self.weights e.g. {'trend': 75, 'momentum': 45, 'volatility': 60, 'fundamentals': 80, 'news': 65, 'options': 70, 'macro': 55} verbose: return full breakdown Returns: Dict with score, band, and optionally per-factor contributions """ total = 0.0 contributions = {} for key, weight in self.weights.items(): val = factors.get(key, 50.0) # Default neutral if missing contrib = val * weight contributions[key] = { 'score': val, 'weight': weight, 'contribution': contrib, } total += contrib total = max(0, min(100, total)) # Determine band band = 'neutral' for name, (lo, hi) in CONVICTION_BANDS.items(): if lo <= total < hi: band = name break if total >= 85: band = 'aggressive' # Position sizing based on band sizing = { 'avoid': 0.00, 'neutral': 0.00, 'small': 0.33, 'moderate': 0.67, 'aggressive': 1.00, } result = { 'conviction_score': round(total, 2), 'band': band, 'target_exposure': sizing.get(band, 0), 'direction': 'long' if total > 55 else 'short' if total < 35 else 'neutral', } if verbose: result['contributions'] = contributions result['weights'] = dict(self.weights) return result def score_from_dataframe(self, df: pd.DataFrame, fundamentals: Optional[Dict] = None, news: Optional[Dict] = None, options: Optional[Dict] = None, macro: Optional[Dict] = None, verbose: bool = False) -> Dict: """Compute multi-factor score from price DataFrame + optional overlays.""" l = df.iloc[-1] p = df.iloc[-2] if len(df) > 1 else l # Trend trend = self.trend_factor( price=l['Close'], sma20=l.get('SMA20', l['Close']), sma50=l.get('SMA50', l['Close']), high_52w=df['High'].max(), low_52w=df['Low'].min() ) # Momentum macd_hist = l.get('MACD', 0) - l.get('MACDS', 0) momentum = self.momentum_factor(rsi=l.get('RSI', 50), macd_hist=macd_hist) # Volatility hv = df['Ret'].dropna().std() * np.sqrt(252) volatility = self.volatility_factor(hv=hv) # Fundamentals fund = 50.0 if fundamentals: fund = self.fundamentals_factor(**fundamentals) # News news_score = 50.0 if news: news_score = self.news_factor(**news) # Options opt = 50.0 if options: opt = self.options_factor(**options) # Macro macro_score = 50.0 if macro: macro_score = self.macro_factor(**macro) factors = { 'trend': trend, 'momentum': momentum, 'volatility': volatility, 'fundamentals': fund, 'news': news_score, 'options': opt, 'macro': macro_score, } return self.score(factors, verbose=verbose) if __name__ == '__main__': # Example: Microsoft Corporation with user's actual metrics engine = MultiFactorEngine() factors = { 'trend': 85.0, # Price > SMA20 > SMA50 'momentum': 45.0, # RSI 48.9, MACD bearish crossover 'volatility': 30.0, # High vol (29.4%), negative Sharpe 'fundamentals': 80.0, # MSFT: great fundamentals 'news': 50.0, # Neutral sentiment 'options': 55.0, # Mild activity 'macro': 40.0, # Rising yields, neutral VIX } result = engine.score(factors, verbose=True) print(f"Conviction Score: {result['conviction_score']}/100") print(f"Band: {result['band'].upper()}") print(f"Direction: {result['direction'].upper()}") print(f"Target Exposure: {result['target_exposure']*100:.0f}%") print("\nPer-factor breakdown:") for k, v in result['contributions'].items(): print(f" {k:12s}: {v['score']:5.1f} × {v['weight']:.2f} = {v['contribution']:.1f}")