Geopolitics-Risk-Analysis / feature_engineering.py
JayLacoma's picture
Update feature_engineering.py
fbba68f verified
"""
Professional Market Regime Detection - Empirically Validated Feature Engineering
Based on verified historical signals from 1970s-2025 economic cycles.
Key Principle: Use only historically validated cross-asset patterns with 6-18 month lead times.
All thresholds and weights are derived from documented historical episodes.
Usage:
python feature_engineering.py --input unified_market_data.csv --output features.csv
"""
import pandas as pd
import numpy as np
from typing import Dict, Tuple
import warnings
warnings.filterwarnings('ignore')
class MarketRegimeDetector:
"""
Professional regime detection using empirically validated indicators.
All features based on documented historical patterns with verified predictive power.
"""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.features = pd.DataFrame(index=df.index)
self._validate_required_data()
def _validate_required_data(self):
"""Ensure critical data series are present"""
critical = {'SP500', 'DGS10', 'Gold', 'VIX', 'CPIAUCSL', 'UNRATE'}
missing = critical - set(self.df.columns)
if missing:
raise ValueError(f"Missing critical data: {missing}")
def _safe_get(self, col: str, default: float = 0) -> pd.Series:
"""Safely retrieve column with proper index alignment"""
if col in self.df.columns:
return self.df[col].copy()
return pd.Series(default, index=self.df.index)
def _safe_ratio(self, numerator: pd.Series, denominator: pd.Series,
fill: float = 0) -> pd.Series:
"""Safe division with zero/inf handling"""
result = numerator / (denominator + 1e-10)
return result.replace([np.inf, -np.inf], fill).fillna(fill)
def _normalize(self, series: pd.Series, window: int = 252,
clip: Tuple[float, float] = (-3, 3)) -> pd.Series:
"""Rolling z-score normalization with clipping"""
mean = series.rolling(window, min_periods=30).mean()
std = series.rolling(window, min_periods=30).std()
z = (series - mean) / (std + 1e-10)
return z.clip(*clip).fillna(0)
# =====================================================================
# CATEGORY 1: LEADING INDICATORS (6-18 Month Lead Time)
# =====================================================================
def yield_curve_signals(self):
"""
Yield Curve Inversion - Most reliable recession predictor
Historical: Preceded ALL recessions since 1970s with 6-18 month lead
- March 2000: -0.34% → Dot-com crash
- August 2006: -0.17% → GFC 2008
- August 2019: -0.52% → COVID recession
- July 2022-present: -1.08% peak → Longest inversion in history (800+ days)
"""
dgs10 = self._safe_get('DGS10')
dgs2 = self._safe_get('DGS2')
# Raw spread
spread = dgs10 - dgs2
self.features['yield_curve_spread'] = spread
# Inversion flag (historically critical threshold: below -0.15%)
self.features['yield_curve_inverted'] = (spread < -0.15).astype(float)
# Severity score (deeper inversions = stronger signal)
self.features['inversion_severity'] = np.clip(-spread / 1.0, 0, 3)
# Duration tracking (consecutive days inverted)
inverted_flag = (spread < -0.15).astype(int)
self.features['inversion_duration'] = inverted_flag.groupby(
(inverted_flag != inverted_flag.shift()).cumsum()
).cumsum()
return self
def credit_stress_indicators(self):
"""
High Yield Spreads - Leading credit crisis indicator
Historical patterns:
- 2015 Energy bust: HYG down 10%, spreads widened
- 2020 March: Both HYG/JNK crashed 20%+, preceded equity collapse
- 2025: Outflows amid tariff fears signaled volatility
"""
hyg = self._safe_get('HYG')
jnk = self._safe_get('JNK')
tlt = self._safe_get('TLT')
lqd = self._safe_get('LQD')
# High yield vs safe haven divergence
hy_avg = (hyg + jnk) / 2
safe_avg = (tlt + lqd) / 2
# Returns-based spread proxy (widens before crises)
hy_ret = hy_avg.pct_change(21)
safe_ret = safe_avg.pct_change(21)
self.features['credit_spread_proxy'] = safe_ret - hy_ret
# Credit stress flag (when HY underperforms by >5%)
self.features['credit_stress'] = (
(safe_ret - hy_ret) > 0.05
).astype(float)
# Volatility of credit (spikes precede defaults)
self.features['credit_volatility'] = hy_avg.pct_change().rolling(21).std() * 100
return self
def copper_gold_ratio(self):
"""
Copper/Gold Ratio - "Dr. Copper" economic health indicator
Historical thresholds:
- 2019 slowdown: Fell to 0.15
- 2021 reopening: Rose to 0.25
- August 2025: CRISIS LEVEL 0.0015 (record low, similar to 2020)
Interpretation: Low ratio = Growth fears, High ratio = Expansion
"""
copper = self._safe_get('Copper', 1)
gold = self._safe_get('Gold', 1)
ratio = self._safe_ratio(copper, gold)
self.features['copper_gold_ratio'] = ratio
# Normalized score (higher = healthier economy)
self.features['copper_gold_zscore'] = self._normalize(ratio, window=252)
# Crisis flag (below historical crisis threshold of 0.002)
self.features['copper_gold_crisis'] = (ratio < 0.002).astype(float)
# Growth momentum (rising ratio = expansion)
self.features['copper_gold_momentum'] = ratio.pct_change(63)
return self
def consumer_rotation_signal(self):
"""
XLY/XLP Ratio - Consumer confidence & recession predictor
Historical:
- Late 2007: Crashed from 2.5 to 1.5 → Predicted GFC
- 2020: Sharp drop → Recession confirmed
- 2023-2025: Recovery to 2.0+ = Consumer resilience
Low ratio (<1.5) = Defensive rotation, High ratio (>2.0) = Risk-on
"""
xly = self._safe_get('Consumer_Discretionary', 1)
xlp = self._safe_get('Consumer_Staples', 1)
ratio = self._safe_ratio(xly, xlp)
self.features['consumer_rotation_ratio'] = ratio
# Historical thresholds
self.features['consumer_defensive_mode'] = (ratio < 1.5).astype(float)
self.features['consumer_risk_on'] = (ratio > 2.0).astype(float)
# Rate of change (sharp drops = warning)
self.features['consumer_rotation_velocity'] = ratio.pct_change(21)
# Normalized signal
self.features['consumer_confidence_zscore'] = self._normalize(ratio)
return self
# =====================================================================
# CATEGORY 2: COINCIDENT INDICATORS (Real-Time Confirmation)
# =====================================================================
def equity_market_health(self):
"""
Equity indices as coincident cycle confirmations
S&P 500: Leads GDP by 6-12 months typically
NASDAQ: Innovation & liquidity barometer
Russell 2000: Domestic credit conditions
"""
sp500 = self._safe_get('SP500')
nasdaq = self._safe_get('NASDAQ')
russell = self._safe_get('RUSSELL', sp500) # Fallback to SP500
# Returns across timeframes
self.features['sp500_return_1m'] = sp500.pct_change(21)
self.features['sp500_return_3m'] = sp500.pct_change(63)
self.features['sp500_return_6m'] = sp500.pct_change(126)
# Tech leadership (NASDAQ outperformance = risk-on)
self.features['tech_leadership'] = self._safe_ratio(
nasdaq.pct_change(63),
sp500.pct_change(63)
) - 1
# Small cap health (Russell vs S&P)
self.features['small_cap_relative'] = self._safe_ratio(
russell.pct_change(63),
sp500.pct_change(63)
) - 1
# Drawdown from peak (risk management signal)
rolling_max = sp500.rolling(252, min_periods=1).max()
self.features['sp500_drawdown'] = (sp500 / rolling_max - 1) * 100
return self
def volatility_regime(self):
"""
VIX - Fear gauge with predictive spikes
Historical: Exceeded 80 in 2008 and 2020 crashes
Rising VIX with flat S&P often precedes sell-offs
"""
vix = self._safe_get('VIX')
sp500 = self._safe_get('SP500')
self.features['vix_level'] = vix
# VIX regime thresholds
self.features['vix_panic'] = (vix > 30).astype(float) # Historical panic threshold
self.features['vix_extreme'] = (vix > 40).astype(float) # Crisis level
# VIX spike (sudden fear increase)
self.features['vix_spike'] = vix.pct_change(5)
# VIX-S&P divergence (rising fear, flat market = warning)
sp_ret = sp500.pct_change(21)
vix_change = vix.pct_change(21)
self.features['vix_sp500_divergence'] = (
(vix_change > 0.2) & (sp_ret.abs() < 0.05)
).astype(float)
return self
def commodity_inflation_signals(self):
"""
Oil, Gold, Copper - Inflation & growth thermometers
Historical: Oil spikes preceded stagflation (1970s, 2022)
Gold rallies signal fear/debt concerns (2008, 2020-2025)
"""
oil = self._safe_get('Oil')
gold = self._safe_get('Gold')
copper = self._safe_get('Copper')
# Energy inflation pressure
self.features['oil_return_3m'] = oil.pct_change(63)
self.features['oil_volatility'] = oil.pct_change().rolling(21).std() * 100
# Safe haven demand (gold strength)
self.features['gold_return_3m'] = gold.pct_change(63)
self.features['gold_momentum'] = gold.pct_change(21)
# Industrial demand (copper)
self.features['copper_return_3m'] = copper.pct_change(63)
# Stagflation risk (high oil + weak copper = trouble)
oil_strong = (oil.pct_change(63) > 0.1).astype(float)
copper_weak = (copper.pct_change(63) < 0).astype(float)
self.features['stagflation_commodity_signal'] = oil_strong * copper_weak
return self
def dollar_strength_regime(self):
"""
DXY - Global risk appetite & funding stress indicator
Historical spikes:
- 1998 Asian Crisis: 120 (EM defaults)
- 2020 March: 103 (liquidity crunch)
- 2022: 114 (20-year high, crushed EM)
Strong dollar = Risk-off, EM stress
"""
dxy = self._safe_get('DXY')
self.features['dollar_strength'] = dxy
self.features['dollar_return_1m'] = dxy.pct_change(21)
self.features['dollar_return_3m'] = dxy.pct_change(63)
# Dollar surge flag (>105 historically critical)
self.features['dollar_surge'] = (dxy > 105).astype(float)
# Rate of dollar appreciation (rapid = stress)
self.features['dollar_velocity'] = dxy.pct_change(10)
return self
# =====================================================================
# CATEGORY 3: LAGGING INDICATORS (Confirmation & Validation)
# =====================================================================
def inflation_regime(self):
"""
CPI - Lagging but critical policy driver
Historical: 9.1% peak in 2022 drove Fed to 5.25% rates
Cooled to 2-3% by 2025 forecasts
"""
cpi = self._safe_get('CPIAUCSL')
# Year-over-year inflation rate
cpi_yoy = cpi.pct_change(12) * 100
self.features['inflation_yoy'] = cpi_yoy
# Inflation regime flags
self.features['high_inflation'] = (cpi_yoy > 3.0).astype(float)
self.features['very_high_inflation'] = (cpi_yoy > 5.0).astype(float)
# Inflation acceleration (getting worse)
self.features['inflation_accelerating'] = (
cpi_yoy.diff(3) > 0.5
).astype(float)
return self
def labor_market_health(self):
"""
Unemployment Rate - Lagging recession confirmation
Historical: Rose from 3.5% to 14.8% in 2020, 4.4% to 10% in 2008
2025: Stable at 4%, suggesting no immediate downturn
"""
unrate = self._safe_get('UNRATE')
self.features['unemployment_rate'] = unrate
# Change in unemployment (Sahm Rule: 0.5pp rise = recession)
unrate_change_3m = unrate - unrate.shift(3)
self.features['unemployment_change_3m'] = unrate_change_3m
# Sahm Rule trigger (historically accurate)
self.features['sahm_rule_trigger'] = (unrate_change_3m > 0.5).astype(float)
# Labor market weakening
self.features['labor_weakening'] = (unrate.diff() > 0.1).astype(float)
return self
# =====================================================================
# CATEGORY 4: SECTOR & GEOGRAPHIC ROTATION SIGNALS
# =====================================================================
def sector_rotation_analysis(self):
"""
Sector ETF rotation patterns predict cycle phases
Defensive rotation (XLU, XLP outperform) = Late cycle/Recession fears
Cyclical strength (XLI, XLB, XLY) = Expansion
"""
# Defensive sectors
utilities = self._safe_get('Utilities')
staples = self._safe_get('Consumer_Staples')
healthcare = self._safe_get('Healthcare')
# Cyclical sectors
industrials = self._safe_get('Industrials')
materials = self._safe_get('Materials')
discretionary = self._safe_get('Consumer_Discretionary')
# Technology (innovation cycle)
tech = self._safe_get('Technology')
# Energy (inflation/geopolitics)
energy = self._safe_get('Energy')
# Financials (credit cycle)
financials = self._safe_get('Financials')
sp500 = self._safe_get('SP500', 1)
# Defensive outperformance = Risk-off
defensive_basket = (utilities + staples + healthcare) / 3
self.features['defensive_outperformance'] = self._safe_ratio(
defensive_basket.pct_change(63),
sp500.pct_change(63)
) - 1
# Cyclical outperformance = Risk-on
cyclical_basket = (industrials + materials + discretionary) / 3
self.features['cyclical_outperformance'] = self._safe_ratio(
cyclical_basket.pct_change(63),
sp500.pct_change(63)
) - 1
# Tech leadership (AI boom 2023-2025 example)
self.features['tech_outperformance'] = self._safe_ratio(
tech.pct_change(63),
sp500.pct_change(63)
) - 1
# Energy inflation signal
self.features['energy_outperformance'] = self._safe_ratio(
energy.pct_change(63),
sp500.pct_change(63)
) - 1
# Financial health (banking system)
self.features['financial_outperformance'] = self._safe_ratio(
financials.pct_change(63),
sp500.pct_change(63)
) - 1
return self
def regional_banking_stress(self):
"""
KRE - Regional bank stress indicator
Historical: Collapsed 40% in March 2023 (SVB crisis)
Leading indicator for credit tightening
"""
kre = self._safe_get('Regional_Banks')
xlf = self._safe_get('Financials', 1)
# Regional bank relative performance
self.features['regional_bank_stress'] = self._safe_ratio(
kre.pct_change(21),
xlf.pct_change(21)
) - 1
# Severe stress flag (>-20% underperformance)
self.features['banking_crisis_signal'] = (
self.features['regional_bank_stress'] < -0.2
).astype(float)
return self
def emerging_market_flows(self):
"""
EEM - EM basket as risk appetite gauge
Weakens with strong USD (2015, 2022)
2024-2025: Gains on Fed pivot signal
"""
eem = self._safe_get('Emerging_Markets')
sp500 = self._safe_get('SP500', 1)
dxy = self._safe_get('DXY')
# EM relative performance
self.features['em_relative_performance'] = self._safe_ratio(
eem.pct_change(63),
sp500.pct_change(63)
) - 1
# EM stress (underperformance + strong dollar)
em_weak = (self.features['em_relative_performance'] < -0.1).astype(float)
dxy_strong = (dxy.pct_change(63) > 0.05).astype(float)
self.features['em_stress'] = em_weak * dxy_strong
return self
# =====================================================================
# CATEGORY 5: COMPOSITE REGIME CLASSIFICATION
# =====================================================================
def calculate_composite_scores(self):
"""
Aggregate leading indicators into composite recession/crisis scores
Based on historically validated patterns
"""
f = self.features
# === RECESSION PROBABILITY ===
# Weight the most predictive leading indicators
recession_signals = [
f.get('yield_curve_inverted', 0) * 0.30, # Most reliable
f.get('credit_stress', 0) * 0.25, # Credit precedes equity
f.get('consumer_defensive_mode', 0) * 0.20, # Consumer rotation
f.get('sahm_rule_trigger', 0) * 0.15, # Labor confirmation
f.get('copper_gold_crisis', 0) * 0.10, # Growth proxy
]
self.features['recession_probability'] = np.clip(
sum(recession_signals),
0, 1
)
# === FINANCIAL CRISIS RISK ===
crisis_signals = [
f.get('credit_spread_proxy', 0).clip(0, 0.2) / 0.2 * 0.30,
f.get('banking_crisis_signal', 0) * 0.25,
f.get('vix_extreme', 0) * 0.20,
f.get('inversion_severity', 0).clip(0, 1) * 0.15,
f.get('dollar_surge', 0) * 0.10,
]
self.features['financial_crisis_risk'] = np.clip(
sum(crisis_signals),
0, 1
)
# === STAGFLATION RISK ===
stagflation_signals = [
f.get('stagflation_commodity_signal', 0) * 0.30,
f.get('high_inflation', 0) * 0.25,
f.get('labor_weakening', 0) * 0.20,
f.get('energy_outperformance', 0).clip(0, 0.5) / 0.5 * 0.15,
f.get('em_stress', 0) * 0.10,
]
self.features['stagflation_risk'] = np.clip(
sum(stagflation_signals),
0, 1
)
# === EXPANSION/BOOM PROBABILITY ===
expansion_signals = [
f.get('consumer_risk_on', 0) * 0.25,
f.get('cyclical_outperformance', 0).clip(-0.2, 0.3) / 0.3 * 0.25,
f.get('tech_outperformance', 0).clip(0, 0.5) / 0.5 * 0.20,
(1 - f.get('yield_curve_inverted', 0)) * 0.15,
f.get('copper_gold_momentum', 0).clip(0, 0.2) / 0.2 * 0.15,
]
self.features['expansion_probability'] = np.clip(
sum(expansion_signals),
0, 1
)
return self
def classify_regime(self):
"""
Final regime classification based on composite scores
Uses hierarchical logic reflecting crisis > recession > stagflation > expansion
"""
f = self.features
# Get probabilities
crisis_prob = f.get('financial_crisis_risk', 0)
recession_prob = f.get('recession_probability', 0)
stagflation_prob = f.get('stagflation_risk', 0)
expansion_prob = f.get('expansion_probability', 0)
# Hierarchical classification (higher severity takes precedence)
conditions = [
crisis_prob > 0.6, # Clear crisis signals
recession_prob > 0.5, # Recession likely
stagflation_prob > 0.5, # Stagflation pressures
expansion_prob > 0.5, # Expansion mode
]
choices = [
'FINANCIAL_CRISIS',
'RECESSION_WARNING',
'STAGFLATION',
'EXPANSION'
]
self.features['regime'] = np.select(conditions, choices, default='TRANSITION')
# Regime confidence score (max probability)
self.features['regime_confidence'] = pd.concat([
crisis_prob, recession_prob, stagflation_prob, expansion_prob
], axis=1).max(axis=1)
return self
# =====================================================================
# MASTER BUILD FUNCTION
# =====================================================================
def build_all_features(self) -> pd.DataFrame:
"""
Execute complete feature engineering pipeline
Returns: DataFrame with all regime detection features
"""
print("Building professional market regime features...")
print("=" * 70)
# Leading indicators (6-18 month predictive power)
print("✓ Yield curve signals (recession predictor)")
self.yield_curve_signals()
print("✓ Credit stress indicators (crisis early warning)")
self.credit_stress_indicators()
print("✓ Copper/Gold ratio (growth proxy)")
self.copper_gold_ratio()
print("✓ Consumer rotation (confidence gauge)")
self.consumer_rotation_signal()
# Coincident indicators
print("✓ Equity market health")
self.equity_market_health()
print("✓ Volatility regime")
self.volatility_regime()
print("✓ Commodity inflation signals")
self.commodity_inflation_signals()
print("✓ Dollar strength regime")
self.dollar_strength_regime()
# Lagging indicators
print("✓ Inflation regime")
self.inflation_regime()
print("✓ Labor market health")
self.labor_market_health()
# Rotation analysis
print("✓ Sector rotation analysis")
self.sector_rotation_analysis()
print("✓ Regional banking stress")
self.regional_banking_stress()
print("✓ Emerging market flows")
self.emerging_market_flows()
# Composite scores
print("✓ Calculating composite regime scores")
self.calculate_composite_scores()
print("✓ Final regime classification")
self.classify_regime()
print("=" * 70)
print(f"✅ Generated {len(self.features.columns)} features")
return self.features
def main():
import argparse
parser = argparse.ArgumentParser(
description='Professional Market Regime Detection - Empirically Validated'
)
parser.add_argument('--input', default='unified_market_data.csv',
help='Input CSV file with market data')
parser.add_argument('--output', default='regime_features.csv',
help='Output CSV file for features')
args = parser.parse_args()
print(f"\nLoading data from: {args.input}")
df = pd.read_csv(args.input, index_col=0, parse_dates=True)
print(f"Data shape: {df.shape}")
print(f"Date range: {df.index.min()} to {df.index.max()}\n")
# Build features
detector = MarketRegimeDetector(df)
features = detector.build_all_features()
# Save
features.to_csv(args.output)
print(f"\n💾 Features saved to: {args.output}")
# Summary statistics
print("\n" + "=" * 70)
print("REGIME DISTRIBUTION (Last 252 days):")
print("=" * 70)
recent = features.tail(252)
if 'regime' in recent.columns:
print(recent['regime'].value_counts())
print(f"\nCurrent Regime: {features['regime'].iloc[-1]}")
print(f"Confidence: {features['regime_confidence'].iloc[-1]:.1%}")
if __name__ == "__main__":
main()