alphaforge-quant-system / multi_factor_engine.py
Premchan369's picture
Add multi-factor scoring engine with 7 weighted factors
f282470 verified
"""Multi-Factor Scoring Engine v1.0 β€” Institutional-Grade Asset Scoring
Fuses 7 independent signals into a unified conviction score.
Based on: Gu et al. (2020) + Zuckerman (2021) Multi-Factor Equity Models
"""
import numpy as np
import pandas as pd
from typing import Dict, Optional
# Factor weights β€” calibrated for institutional portfolio allocation
# Weights sum to 1.0. Override for different strategies (value, momentum, etc.)
DEFAULT_WEIGHTS = {
'trend': 0.20, # Price momentum, SMA alignment
'momentum': 0.15, # RSI, MACD, stochastics
'volatility': 0.15, # Vol regime, vol-of-vol, realized vs implied
'fundamentals': 0.15, # Valuation, growth, quality metrics
'news': 0.15, # Sentiment, event risk, news flow
'options': 0.10, # Put/call ratio, gamma, IV skew
'macro': 0.10, # Dollar, rates, VIX, sector beta
}
# Category thresholds
CONVICTION_BANDS = {
'avoid': (0, 35), # Short / zero position
'neutral': (35, 55), # Watchlist / benchmark weight
'small': (55, 70), # 1/3 of target size
'moderate': (70, 85), # 2/3 of target size
'aggressive': (85, 100), # Full target size
}
class MultiFactorEngine:
"""Unified scoring from 7 orthogonal factors into a single conviction score."""
def __init__(self, weights: Optional[Dict[str, float]] = None):
self.weights = weights or dict(DEFAULT_WEIGHTS)
assert abs(sum(self.weights.values()) - 1.0) < 1e-6, "Weights must sum to 1.0"
# ── Factor 1: Trend ───────────────────────────────────────
@staticmethod
def trend_factor(price: float, sma20: float, sma50: float, sma200: float = None,
adx: float = None, high_52w: float = None, low_52w: float = None) -> float:
"""Score 0-100 based on price position vs moving averages.
Bullish when price > SMA20 > SMA50 with ADX > 25 confirming.
"""
score = 50.0
if price > sma20: score += 15
if price > sma50: score += 15
if sma20 > sma50: score += 10
if sma200 and price > sma200: score += 10
if high_52w and low_52w and high_52w > low_52w:
score += 10 * ((price - low_52w) / (high_52w - low_52w + 1e-10))
if adx and adx > 25:
score += 10
elif adx and adx > 40:
score += 15
return max(0, min(100, score))
# ── Factor 2: Momentum ────────────────────────────────────
@staticmethod
def momentum_factor(rsi: float, macd_hist: float, obv_slope: float = None,
rsi_divergence: float = None) -> float:
"""Score 0-100. Momentum peaking at RSI 50-70 range.
Bearish divergence (price up, RSI down) reduces score.
"""
score = 50.0
if rsi < 30: score += 15 # Oversold bounce
elif 30 <= rsi <= 45: score += 5
elif 45 < rsi <= 65: score += 10 # Healthy momentum
elif 65 < rsi <= 75: score += 3 # Still strong but caution
elif rsi > 75: score -= 15 # Overbought
if macd_hist > 0: score += 10
if macd_hist > 0.5: score += 10
if macd_hist < 0: score -= 10
if obv_slope and obv_slope > 0: score += 10
if obv_slope and obv_slope < 0: score -= 10
if rsi_divergence and rsi_divergence < 0: score -= 15 # Bearish div
if rsi_divergence and rsi_divergence > 0: score += 10 # Bullish div
return max(0, min(100, score))
# ── Factor 3: Volatility ─────────────────────────────────
@staticmethod
def volatility_factor(hv: float, iv: float = None, vol_regime: str = 'normal',
skew: float = None, vix: float = None) -> float:
"""Score 0-100. Low vol regime = bullish. High vol with declining trend = bullish.
IV < HV (cheap options) = bullish. Rising vol = bearish.
"""
score = 50.0
if hv < 0.15: score += 15
elif hv < 0.25: score += 5
elif hv > 0.40: score -= 15
elif hv > 0.35: score -= 5
if iv and hv and iv < hv:
score += 10 # Cheap implied vol
if iv and hv and iv > hv * 1.5:
score -= 15 # Expensive options, fear premium
if vol_regime == 'low': score += 15
if vol_regime == 'declining': score += 10
if vol_regime == 'spiking': score -= 20
if vol_regime == 'high': score -= 10
if skew and skew < 0: score += 5 # Put skew mild
if skew and skew < -0.5: score -= 10 # Extreme fear
if vix and vix < 15: score += 10
if vix and vix > 30: score -= 15
return max(0, min(100, score))
# ── Factor 4: Fundamentals ────────────────────────────────
@staticmethod
def fundamentals_factor(pe: float = None, peg: float = None, ps: float = None,
pb: float = None, roe: float = None, debt_equity: float = None,
fcf_yield: float = None, growth_5y: float = None,
sector_pe: float = None) -> float:
"""Score 0-100 based on valuation + quality. Lower PEG, higher ROE = better.
FCF yield > 5% is strong signal. Relative to sector PE.
"""
score = 50.0
# Valuation
if pe:
if pe < 12: score += 15
elif pe < 18: score += 10
elif pe < 25: score += 3
elif pe > 35: score -= 10
elif pe > 50: score -= 15
if sector_pe and pe < sector_pe * 0.8: score += 10
if peg:
if peg < 0.8: score += 15
elif peg < 1.0: score += 10
elif peg < 1.5: score += 3
elif peg > 2.0: score -= 15
if ps:
if ps < 1: score += 10
elif ps > 10: score -= 10
if pb:
if pb < 1.5: score += 10
elif pb > 5: score -= 10
# Quality
if roe and roe > 0.15: score += 10
if roe and roe > 0.25: score += 5
if roe and roe < 0.05: score -= 10
if debt_equity:
if debt_equity < 0.5: score += 10
elif debt_equity > 2.0: score -= 10
if fcf_yield:
if fcf_yield > 0.06: score += 15
elif fcf_yield > 0.03: score += 10
elif fcf_yield < 0: score -= 15
if growth_5y:
if growth_5y > 0.20: score += 10
elif growth_5y > 0.15: score += 5
elif growth_5y < 0: score -= 15
return max(0, min(100, score))
# ── Factor 5: News ────────────────────────────────────────
@staticmethod
def news_factor(sentiment_score: float, news_volume: float = None,
event_risk: str = 'none', event_date: str = None) -> float:
"""Score 0-100. Sentiment 0.0-1.0 (bearish-bullish). Event risk overrides.
sentiment_score: 0.0=very bearish, 0.5=neutral, 1.0=very bullish
news_volume: articles per day, higher = more signal confidence
event_risk: 'none', 'earnings_today', 'earnings_week', 'fed_today',
'macro_today', 'lawsuit', 'merger', 'dividend'
"""
score = 50.0
# Base sentiment signal
if sentiment_score < 0.2: score -= 25
elif sentiment_score < 0.35: score -= 15
elif sentiment_score < 0.45: score -= 5
elif 0.45 <= sentiment_score <= 0.55: score += 0
elif sentiment_score > 0.7: score += 25
elif sentiment_score > 0.55: score += 15
elif sentiment_score > 0.5: score += 5
# Volume confidence boost
if news_volume:
if news_volume > 50: score += 5
if news_volume > 200: score += 5 # High coverage = signal confidence
# Event risk override
event_override = {
'none': 0,
'earnings_week': -5, # Reduce size before earnings
'earnings_today': -20, # Major uncertainty
'fed_today': -15,
'macro_today': -10,
'lawsuit': -25,
'merger': 10, # Merger arb or acquisition premium
'dividend': 3,
'split': 5,
'buyback': 8,
}
if event_risk in event_override:
score += event_override[event_risk]
return max(0, min(100, score))
# ── Factor 6: Options Flow ───────────────────────────────
@staticmethod
def options_factor(pcr: float = None, unusual_volume: float = None,
gamma_exposure: float = None, iv_skew: float = None,
open_interest_change: float = None, max_pain: float = None,
current_price: float = None) -> float:
"""Score 0-100 based on options market microstructure signals.
pcr: put/call ratio < 0.5 bullish, > 1.2 bearish
unusual_volume: ratio vs 20d avg > 2 = significant
gamma_exposure: positive gamma = sticky price, negative = magnetic pin
iv_skew: steep put skew = fear, flat = complacent
open_interest_change: rising OI with rising price = conviction
max_pain: price tends toward max pain on expiry
"""
score = 50.0
if pcr:
if pcr < 0.5: score += 20 # Extreme call buying
elif pcr < 0.7: score += 10
elif pcr > 1.2: score -= 20 # Extreme put buying
elif pcr > 1.0: score -= 10
elif pcr > 0.85: score -= 3 # Mild put bias
if unusual_volume:
if unusual_volume > 5: score += 15 # Massive flow
elif unusual_volume > 2: score += 8 # Notable flow
elif unusual_volume < 0.5: score -= 5 # Dead options
if gamma_exposure:
if gamma_exposure > 0: score += 5 # Gamma long = volatility support
elif gamma_exposure < -5: score -= 15 # Gamma short = volatility risk
if iv_skew:
if iv_skew < -0.3: score -= 10 # Fear
if iv_skew < -0.5: score -= 15 # Extreme fear
if iv_skew > 0.1: score += 5 # Call skew (rare, bullish)
if open_interest_change and open_interest_change > 0.3:
score += 10 # Fresh conviction building
if max_pain and current_price:
if current_price < max_pain * 0.97: score += 5 # Room to max pain (up)
if current_price > max_pain * 1.03: score -= 5 # Above max pain (mean revert)
return max(0, min(100, score))
# ── Factor 7: Macro ───────────────────────────────────────
@staticmethod
def macro_factor(vix: float = None, dxy_change: float = None,
yield_10y: float = None, yield_2y: float = None,
sector_beta: float = None, cpi_surprise: float = None,
fed_meeting_days: int = None) -> float:
"""Score 0-100. Macro tailwinds or headwinds for risk assets.
vix: volatility index
dxy_change: 20d % change in dollar index
yield_10y / yield_2y: treasury yields
sector_beta: stock's beta to rates (tech > 1, utilities < 0.5)
cpi_surprise: actual - expected, > 0 = hawkish
fed_meeting_days: days until next FOMC
"""
score = 50.0
# VIX regime
if vix:
if vix < 15: score += 15
elif vix < 20: score += 5
elif vix > 30: score -= 20
elif vix > 25: score -= 10
elif vix > 20: score -= 5
# Dollar strength β€” bad for exporters, good for domestic
if dxy_change:
if dxy_change > 0.05: score -= 10 # Strong dollar
if dxy_change < -0.03: score += 5 # Weak dollar
# Yield curve
if yield_10y and yield_2y:
spread = yield_10y - yield_2y
if spread < -0.5: score -= 15 # Deep inversion = recession
elif spread < 0: score -= 10 # Inverted
elif spread > 1.0: score += 10 # Steep = healthy
# Rising rates hurt rate-sensitive stocks
if yield_10y and sector_beta:
if yield_10y > 0.05 and sector_beta > 1.0:
score -= 15 # High rates + high beta = double hurt
elif yield_10y < 0.03 and sector_beta > 1.0:
score += 10 # Low rates + high beta = double boost
# CPI surprise
if cpi_surprise:
if cpi_surprise > 0.5: score -= 15 # Hot inflation = hawkish Fed
if cpi_surprise < -0.3: score += 10 # Cold inflation = dovish
# Fed meeting proximity
if fed_meeting_days is not None:
if fed_meeting_days <= 3: score -= 10 # Imminent uncertainty
if fed_meeting_days == 0: score -= 20 # Meeting day
return max(0, min(100, score))
# ── Master Scoring ────────────────────────────────────────
def score(self, factors: Dict[str, float], verbose: bool = False) -> Dict:
"""Compute unified conviction score from individual factor scores.
Args:
factors: dict with keys matching self.weights
e.g. {'trend': 75, 'momentum': 45, 'volatility': 60,
'fundamentals': 80, 'news': 65, 'options': 70, 'macro': 55}
verbose: return full breakdown
Returns:
Dict with score, band, and optionally per-factor contributions
"""
total = 0.0
contributions = {}
for key, weight in self.weights.items():
val = factors.get(key, 50.0) # Default neutral if missing
contrib = val * weight
contributions[key] = {
'score': val,
'weight': weight,
'contribution': contrib,
}
total += contrib
total = max(0, min(100, total))
# Determine band
band = 'neutral'
for name, (lo, hi) in CONVICTION_BANDS.items():
if lo <= total < hi:
band = name
break
if total >= 85:
band = 'aggressive'
# Position sizing based on band
sizing = {
'avoid': 0.00,
'neutral': 0.00,
'small': 0.33,
'moderate': 0.67,
'aggressive': 1.00,
}
result = {
'conviction_score': round(total, 2),
'band': band,
'target_exposure': sizing.get(band, 0),
'direction': 'long' if total > 55 else 'short' if total < 35 else 'neutral',
}
if verbose:
result['contributions'] = contributions
result['weights'] = dict(self.weights)
return result
def score_from_dataframe(self, df: pd.DataFrame,
fundamentals: Optional[Dict] = None,
news: Optional[Dict] = None,
options: Optional[Dict] = None,
macro: Optional[Dict] = None,
verbose: bool = False) -> Dict:
"""Compute multi-factor score from price DataFrame + optional overlays."""
l = df.iloc[-1]
p = df.iloc[-2] if len(df) > 1 else l
# Trend
trend = self.trend_factor(
price=l['Close'], sma20=l.get('SMA20', l['Close']),
sma50=l.get('SMA50', l['Close']),
high_52w=df['High'].max(), low_52w=df['Low'].min()
)
# Momentum
macd_hist = l.get('MACD', 0) - l.get('MACDS', 0)
momentum = self.momentum_factor(rsi=l.get('RSI', 50), macd_hist=macd_hist)
# Volatility
hv = df['Ret'].dropna().std() * np.sqrt(252)
volatility = self.volatility_factor(hv=hv)
# Fundamentals
fund = 50.0
if fundamentals:
fund = self.fundamentals_factor(**fundamentals)
# News
news_score = 50.0
if news:
news_score = self.news_factor(**news)
# Options
opt = 50.0
if options:
opt = self.options_factor(**options)
# Macro
macro_score = 50.0
if macro:
macro_score = self.macro_factor(**macro)
factors = {
'trend': trend,
'momentum': momentum,
'volatility': volatility,
'fundamentals': fund,
'news': news_score,
'options': opt,
'macro': macro_score,
}
return self.score(factors, verbose=verbose)
if __name__ == '__main__':
# Example: Microsoft Corporation with user's actual metrics
engine = MultiFactorEngine()
factors = {
'trend': 85.0, # Price > SMA20 > SMA50
'momentum': 45.0, # RSI 48.9, MACD bearish crossover
'volatility': 30.0, # High vol (29.4%), negative Sharpe
'fundamentals': 80.0, # MSFT: great fundamentals
'news': 50.0, # Neutral sentiment
'options': 55.0, # Mild activity
'macro': 40.0, # Rising yields, neutral VIX
}
result = engine.score(factors, verbose=True)
print(f"Conviction Score: {result['conviction_score']}/100")
print(f"Band: {result['band'].upper()}")
print(f"Direction: {result['direction'].upper()}")
print(f"Target Exposure: {result['target_exposure']*100:.0f}%")
print("\nPer-factor breakdown:")
for k, v in result['contributions'].items():
print(f" {k:12s}: {v['score']:5.1f} Γ— {v['weight']:.2f} = {v['contribution']:.1f}")