Geopolitics-Risk-Analysis / feature_engineering.py
JayLacoma's picture
Update feature_engineering.py
b28248f verified
raw
history blame
22.2 kB
"""
Integrated Market Theory - Feature Engineering Pipeline
Combines all tickers from geo_macro.py into unified theory indicators
Usage:
python feature_engineering.py --input unified_market_data.csv --output enhanced_features.csv
"""
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings('ignore')
def safe_zscore(series, window=252, min_obs=30):
"""Rolling z-score with fallback to 0 for unstable windows"""
mean = series.rolling(window, min_periods=min_obs).mean()
std = series.rolling(window, min_periods=min_obs).std()
z = (series - mean) / std
return z.fillna(0).clip(-3, 3)
class IntegratedTheoryFeatures:
"""
Transforms raw market data into theory-driven features combining:
- Dalio's 5 Forces
- Stevenson's Inequality Metrics
- Thiel's Monopoly Indicators
- Gundlach's Reckoning Signals
"""
def __init__(self, df):
# Validate critical columns
required = {'SP500', 'DGS10', 'Gold', 'VIX', 'UNRATE', 'CPIAUCSL'}
missing = required - set(df.columns)
if missing:
raise ValueError(f"Critical data missing: {missing}")
self.df = df.copy()
self.features = pd.DataFrame(index=df.index)
def calculate_returns_volatility(self, windows=[21, 63, 252]):
"""Calculate multi-timeframe returns and volatility for all tickers"""
print("Calculating returns and volatility...")
for col in self.df.columns:
for window in windows:
# Returns
self.df[f'{col}_ret{window}'] = self.df[col].pct_change(window)
# Volatility
self.df[f'{col}_vol{window}'] = self.df[col].pct_change().rolling(window).std()
# Momentum
self.df[f'{col}_mom{window}'] = (
self.df[col].pct_change(window) -
self.df[col].pct_change(window).shift(window)
)
return self
def dalio_forces(self):
"""Ray Dalio's 5 Forces Composite Indicators"""
print("Building Dalio's 5 Forces...")
# Force 1: Debt/Economic Cycle
yield_curve = self.df.get('DGS10', 0) - self.df.get('DGS2', 0)
inflation_mom = self.df.get('CPIAUCSL', pd.Series(0)).pct_change(12) * 100
hy_spread = self.df.get('BAMLH0A0HYM2', pd.Series(0)) / 100
self.features['dalio_debt_cycle'] = (
yield_curve * 0.3 +
inflation_mom * 0.4 +
hy_spread * 0.3
)
# Force 2: Internal Conflict
consumer_weakness = (self.df.get('Consumer_Discretionary', 0) /
self.df.get('Consumer_Staples', 1)).pct_change(63) * -1
unemployment_stress = self.df.get('UNRATE', pd.Series(0)).diff() * 2
small_large_gap = (self.df.get('Small_Cap_Value', 0) /
self.df.get('SP500', 1)).pct_change(63) * -1
self.features['dalio_internal_conflict'] = (
consumer_weakness * 0.4 +
unemployment_stress * 0.3 +
small_large_gap * 0.3
)
# Force 3: External Conflict
defense_momentum = self.df.get('Defense_Stocks', pd.Series(0)).pct_change(21)
dollar_anomaly = self._calculate_dollar_anomaly()
china_taiwan_tension = self._calculate_asia_tension()
self.features['dalio_external_conflict'] = (
defense_momentum * 0.4 +
dollar_anomaly * 0.3 +
china_taiwan_tension * 0.3
)
# Force 4: Acts of Nature
water_stress = self.df.get('Water', pd.Series(0)).pct_change(63)
ag_volatility = self.df.get('Agricultural', pd.Series(0)).pct_change().rolling(63).std() * 100
self.features['dalio_nature_force'] = (
water_stress * 0.6 +
ag_volatility * 0.4
)
# Force 5: Technology/Inventiveness
tech_outperform = (self.df.get('Technology', 0) /
self.df.get('SP500', 1)).pct_change(21)
cloud_momentum = self.df.get('Cloud_Computing', pd.Series(0)).pct_change(63)
ai_momentum = self.df.get('Robotics_AI', pd.Series(0)).pct_change(63)
self.features['dalio_tech_force'] = (
tech_outperform * 0.4 +
cloud_momentum * 0.3 +
ai_momentum * 0.3
)
# Master Composite
dalio_components = [
self.features['dalio_debt_cycle'] * 0.35,
self.features['dalio_internal_conflict'] * 0.25,
self.features['dalio_external_conflict'] * 0.20,
self.features['dalio_tech_force'] * 0.15,
self.features['dalio_nature_force'] * 0.05
]
self.features['dalio_composite'] = pd.concat(dalio_components, axis=1).sum(axis=1)
self.features['dalio_composite_norm'] = self._normalize(self.features['dalio_composite'])
return self
def stevenson_inequality(self):
"""Gary Stevenson's Inequality Amplification Metrics"""
print("Building Stevenson's inequality indicators...")
asset_rich = (self.df.get('Gold', 0) +
self.df.get('Real_Estate', 0) +
self.df.get('Growth_Stocks', 0)) / 3
middle_class = (self.df.get('Consumer_Staples', 0) +
self.df.get('Regional_Banks', 0) +
self.df.get('Small_Cap_Value', 0)) / 3
self.features['inequality_wealth_flow'] = (
asset_rich.pct_change(63) - middle_class.pct_change(63)
)
luxury = self.df.get('Retail_Luxury', pd.Series(0)).pct_change(21)
mass = (self.df.get('Restaurants', 0) + self.df.get('Retail', 0)) / 2
mass = mass.pct_change(21)
self.features['inequality_consumption_gap'] = luxury - mass
quality_credit = (self.df.get('Investment_Grade_Spread', 0) +
self.df.get('Preferred_Stock', 0)) / 2
junk_credit = (self.df.get('HYG', 0) +
self.df.get('JNK', 0) +
self.df.get('Emerging_Market_Debt', 0)) / 3
self.features['inequality_credit_access'] = (
quality_credit.pct_change(63) - junk_credit.pct_change(63)
)
self.features['stevenson_inequality'] = (
self.features['inequality_wealth_flow'] * 0.4 +
self.features['inequality_consumption_gap'] * 0.3 +
self.features['inequality_credit_access'] * 0.3
)
self.features['stevenson_inequality_norm'] = self._normalize(self.features['stevenson_inequality'])
asset_inflation = (self.df.get('Gold', 0) + self.df.get('Real_Estate', 0)).pct_change(21)
wage_proxy = self.df.get('Staffing', pd.Series(0)).pct_change(21)
self.features['inequality_transmission'] = asset_inflation - wage_proxy
return self
def thiel_monopoly(self):
"""Peter Thiel's Monopoly vs Competition Indicators"""
print("Building Thiel's monopoly indicators...")
tech_strength = self.df.get('Technology', 0)
finance_strength = self.df.get('Financials', 1)
self.features['monopoly_cash_moat'] = (
tech_strength.pct_change(63) - finance_strength.pct_change(63)
)
network_sectors = (self.df.get('Cloud_Computing', 0) * 0.4 +
self.df.get('Communication_Services', 0) * 0.3 +
self.df.get('Fintech', 0) * 0.3)
self.features['monopoly_network_effects'] = network_sectors.pct_change(63)
tech_volatility = self.df.get('Technology', pd.Series(1)).pct_change().rolling(63).std()
chip_strength = self.df.get('Semiconductors', pd.Series(0)).pct_change(63)
self.features['monopoly_defensibility'] = (
(1 / (tech_volatility + 0.001)) * 0.01 +
chip_strength * 0.5
)
self.features['thiel_monopoly'] = (
self.features['monopoly_cash_moat'] * 0.35 +
self.features['monopoly_network_effects'] * 0.35 +
self.features['monopoly_defensibility'] * 0.30
)
self.features['thiel_monopoly_norm'] = self._normalize(self.features['thiel_monopoly'])
tech_return = self.df.get('Technology', pd.Series(0)).pct_change(21)
rate_change = self.df.get('DGS10', pd.Series(0)).diff() * -1
self.features['monopoly_immunity'] = tech_return / (rate_change.abs() + 0.001)
specialized = (self.df.get('Semiconductors', 0) +
self.df.get('Cloud_Computing', 0) +
self.df.get('Robotics_AI', 0)) / 3
broad_tech = self.df.get('Technology', 1)
self.features['tech_concentration'] = specialized / broad_tech
return self
def gundlach_reckoning(self):
"""Jeffrey Gundlach's Debt Reckoning and Paradigm Shift Signals"""
print("Building Gundlach's reckoning indicators...")
fed_proxy = self.df.get('DGS3MO', pd.Series(0))
long_yield = self.df.get('DGS10', pd.Series(0))
fed_cutting = fed_proxy.diff() < -0.05
yield_rising = long_yield.diff() > 0
self.features['gundlach_yield_anomaly'] = (
(fed_cutting & yield_rising).astype(float) +
(long_yield - fed_proxy)
)
gold_return = self.df.get('Gold', pd.Series(0)).pct_change(21)
treasury_return = self.df.get('US_Treasuries_Long', pd.Series(1)).pct_change(21)
self.features['gundlach_flight_shift'] = gold_return / (treasury_return + 0.001)
dollar_weak = self.df.get('DXY', pd.Series(0)).pct_change(21) * -1
em_outperform = (self.df.get('Emerging_Markets', 0) + self.df.get('Europe', 0)) / 2
em_outperform = em_outperform.pct_change(21)
sp_return = self.df.get('SP500', pd.Series(0)).pct_change(21)
self.features['gundlach_capital_reversal'] = (
dollar_weak * 0.5 +
(em_outperform - sp_return) * 0.5
)
regional_stress = (self.df.get('Regional_Banks', 0) /
self.df.get('Financials', 1)).pct_change(21)
mortgage_reit_stress = self.df.get('Mortgage_REITs', pd.Series(0)).pct_change(21)
real_estate_vol = self.df.get('Real_Estate', pd.Series(1)).pct_change().rolling(21).std() * 100
self.features['gundlach_private_credit_risk'] = (
regional_stress * -0.4 +
mortgage_reit_stress * -0.3 +
real_estate_vol * 0.3
)
self.features['gundlach_reckoning'] = (
self.features['gundlach_yield_anomaly'] * 0.30 +
self.features['gundlach_flight_shift'] * 0.25 +
self.features['gundlach_capital_reversal'] * 0.25 +
self.features['gundlach_private_credit_risk'] * 0.20
)
self.features['gundlach_reckoning_norm'] = self._normalize(self.features['gundlach_reckoning'])
return self
def geopolitical_indicators(self):
"""Regional conflict and energy transition signals"""
print("Building geopolitical indicators...")
oil_volatility = self.df.get('Oil', pd.Series(1)).pct_change().rolling(3).std() * 100
defense_spike = self.df.get('Defense_Stocks', pd.Series(0)).pct_change(5)
gold_haven = self.df.get('Gold_Safe_Haven', pd.Series(0)).pct_change(5)
self.features['middle_east_risk'] = (
oil_volatility * 0.4 +
defense_spike * 0.3 +
gold_haven * 0.3
)
gas_volatility = self.df.get('NaturalGas', pd.Series(1)).pct_change().rolling(5).std() * 100
europe_decline = self.df.get('Europe', pd.Series(0)).pct_change(21) * -1
swiss_franc_strength = self.df.get('Swiss_Franc', pd.Series(0)).pct_change(21) * -1
self.features['europe_risk'] = (
gas_volatility * 0.5 +
europe_decline * 0.3 +
swiss_franc_strength * 0.2
)
chip_stress = self.df.get('Semiconductors', pd.Series(1)).pct_change().rolling(21).std() * 100
taiwan_korea = (self.df.get('Taiwan', 0) + self.df.get('South_Korea', 0)) / 2
china_diverge = taiwan_korea.pct_change(21) - self.df.get('China', pd.Series(0)).pct_change(21)
rare_earth = self.df.get('Rare_Earth', pd.Series(0)).pct_change(21)
self.features['asia_risk'] = (
chip_stress * 0.4 +
china_diverge * 0.3 +
rare_earth * 0.3
)
self.features['geopolitical_risk'] = (
self.features['middle_east_risk'] * 0.4 +
self.features['europe_risk'] * 0.3 +
self.features['asia_risk'] * 0.3
)
self.features['geopolitical_risk_norm'] = self._normalize(self.features['geopolitical_risk'])
uranium_momentum = self.df.get('Uranium', pd.Series(0)).pct_change(63)
clean_momentum = self.df.get('Clean_Energy', pd.Series(0)).pct_change(63)
oil_decline = self.df.get('Oil', pd.Series(0)).pct_change(252) * -1
self.features['energy_transition'] = (
uranium_momentum * 0.5 +
clean_momentum * 0.3 +
oil_decline * 0.2
)
return self
def cross_asset_features(self):
"""Advanced cross-asset relationships"""
print("Building cross-asset features...")
defensive = (self.df.get('Gold', 0) +
self.df.get('Utilities', 0) +
self.df.get('Healthcare', 0)) / 3
risk_on = (self.df.get('Technology', 0) +
self.df.get('Consumer_Discretionary', 0) +
self.df.get('Real_Estate', 0)) / 3
self.features['flight_ratio'] = defensive / (risk_on + 0.001)
regional_vs_broad = (self.df.get('Regional_Banks', 0) -
self.df.get('Financials', 0))
mortgage_vs_reit = (self.df.get('Mortgage_REITs', 0) -
self.df.get('REITs', 0))
em_vs_ig = (self.df.get('Emerging_Market_Debt', 0) -
self.df.get('Investment_Grade_Spread', 0))
self.features['credit_contagion'] = (
regional_vs_broad.pct_change(21) +
mortgage_vs_reit.pct_change(21) +
em_vs_ig.pct_change(21)
) / 3
vix = self.df.get('VIX', pd.Series(20))
vix_historical_avg = vix.rolling(252).mean()
geo_max = self.features[['middle_east_risk', 'europe_risk', 'asia_risk']].max(axis=1)
self.features['geo_amplification'] = geo_max * (vix / vix_historical_avg)
return self
def scenario_probabilities(self):
"""Dynamic probability weights for future scenarios"""
print("Calculating scenario probabilities...")
# Scenario 1: Credit Collapse
self.features['prob_credit_collapse'] = (
self.features['gundlach_reckoning_norm'] * 0.4 +
safe_zscore(self.features['gundlach_private_credit_risk']) * 0.03 +
safe_zscore(self.features['dalio_debt_cycle']) * 0.03
)
self.features['prob_credit_collapse'] = np.clip(self.features['prob_credit_collapse'], 0, 1)
# Scenario 2: Stagflation
inflation_high = (self.df.get('CPIAUCSL', pd.Series(0)).pct_change(12) * 100 > 2.5).astype(float)
unemployment_rising = (self.df.get('UNRATE', pd.Series(0)).diff() > 0).astype(float)
self.features['prob_stagflation'] = (
(inflation_high * unemployment_rising) * 0.3 +
safe_zscore(self.features['dalio_external_conflict']) * 0.03 +
safe_zscore(self.features['gundlach_capital_reversal']) * 0.02 +
self.features['stevenson_inequality_norm'] * 0.2
)
self.features['prob_stagflation'] = np.clip(self.features['prob_stagflation'], 0, 1)
# Scenario 3: Tech Monopoly Boom
self.features['prob_tech_boom'] = (
self.features['thiel_monopoly_norm'] * 0.4 +
safe_zscore(self.features['dalio_tech_force'] - self.features['dalio_debt_cycle']) * 0.03 +
safe_zscore(self.features['energy_transition']) * 0.02 +
(self.df.get('China_Tech', pd.Series(0)).pct_change(63) <
self.df.get('Technology', pd.Series(0)).pct_change(63)).astype(float) * 0.1
)
self.features['prob_tech_boom'] = np.clip(self.features['prob_tech_boom'], 0, 1)
self.features['prob_controlled_reset'] = 0.05
return self
def regime_detection(self):
"""Classify current market regime"""
print("Detecting market regimes...")
def classify_regime(row):
if (row['gundlach_reckoning_norm'] > 0.6 and row['prob_credit_collapse'] > 0.5):
return 'CRISIS'
elif row['thiel_monopoly_norm'] > 0.7:
return 'TECH_MONOPOLY'
elif (row['stevenson_inequality_norm'] > 0.6 and row['prob_stagflation'] > 0.4):
return 'INEQUALITY_TRAP'
elif row['geopolitical_risk_norm'] > 0.7:
return 'GEOPOLITICAL_SHOCK'
else:
return 'TRANSITION'
self.features['regime'] = self.features.apply(classify_regime, axis=1)
return self
def dimensionality_reduction(self):
"""Apply PCA to reduce feature space"""
print("Applying dimensionality reduction...")
debt_cols = [c for c in self.features.columns if 'dalio_debt' in c or 'gundlach' in c]
inequality_cols = [c for c in self.features.columns if 'inequality' in c or 'stevenson' in c]
geo_cols = [c for c in self.features.columns if 'risk' in c or 'middle_east' in c or 'europe' in c or 'asia' in c]
tech_cols = [c for c in self.features.columns if 'monopoly' in c or 'thiel' in c or 'tech' in c]
for name, cols in [('debt', debt_cols), ('inequality', inequality_cols),
('geo', geo_cols), ('tech', tech_cols)]:
if len(cols) > 0:
data = self.features[cols].dropna()
if len(data) > 10:
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)
pca = PCA(n_components=min(2, len(cols)))
pcs = pca.fit_transform(data_scaled)
for i in range(pcs.shape[1]):
self.features.loc[data.index, f'{name}_PC{i+1}'] = pcs[:, i]
return self
def _calculate_dollar_anomaly(self):
sp_correction = self.df.get('SP500', pd.Series(0)).pct_change(5) < -0.05
dollar_weakness = self.df.get('DXY', pd.Series(0)).pct_change(5) < 0
return (sp_correction & dollar_weakness).astype(float)
def _calculate_asia_tension(self):
taiwan = self.df.get('Taiwan', pd.Series(0))
china = self.df.get('China', pd.Series(0))
return (taiwan.pct_change(21) - china.pct_change(21)).fillna(0)
def _normalize(self, series, window=252):
rolling_mean = series.rolling(window, min_periods=20).mean()
rolling_std = series.rolling(window, min_periods=20).std()
return ((series - rolling_mean) / (rolling_std + 0.001)).clip(-3, 3) / 3
def build_all_features(self):
print("\n" + "="*80)
print("INTEGRATED THEORY FEATURE ENGINEERING")
print("="*80 + "\n")
self.calculate_returns_volatility()
self.dalio_forces()
self.stevenson_inequality()
self.thiel_monopoly()
self.gundlach_reckoning()
self.geopolitical_indicators()
self.cross_asset_features()
self.scenario_probabilities()
self.regime_detection()
self.dimensionality_reduction()
print("\n" + "="*80)
print("FEATURE ENGINEERING COMPLETE")
print("="*80)
print(f"Total features created: {len(self.features.columns)}")
print(f"Regimes detected: {self.features['regime'].value_counts().to_dict()}")
print(f"\nCurrent state (latest):")
print(f" - Dalio Composite: {self.features['dalio_composite_norm'].iloc[-1]:.3f}")
print(f" - Stevenson Inequality: {self.features['stevenson_inequality_norm'].iloc[-1]:.3f}")
print(f" - Thiel Monopoly: {self.features['thiel_monopoly_norm'].iloc[-1]:.3f}")
print(f" - Gundlach Reckoning: {self.features['gundlach_reckoning_norm'].iloc[-1]:.3f}")
print(f" - Regime: {self.features['regime'].iloc[-1]}")
print(f"\nScenario Probabilities:")
print(f" - Credit Collapse: {self.features['prob_credit_collapse'].iloc[-1]:.1%}")
print(f" - Stagflation: {self.features['prob_stagflation'].iloc[-1]:.1%}")
print(f" - Tech Boom: {self.features['prob_tech_boom'].iloc[-1]:.1%}")
return self.features
def main():
import argparse
parser = argparse.ArgumentParser(description='Integrated Market Theory Feature Engineering')
parser.add_argument('--input', default='unified_market_data.csv',
help='Input CSV file from geo_macro.py')
parser.add_argument('--output', default='enhanced_market_features.csv',
help='Output CSV file with engineered features')
args = parser.parse_args()
print(f"Loading data from {args.input}...")
df = pd.read_csv(args.input, index_col=0, parse_dates=True)
print(f"Loaded {len(df)} rows, {len(df.columns)} columns")
print(f"Date range: {df.index.min()} to {df.index.max()}")
engine = IntegratedTheoryFeatures(df)
features = engine.build_all_features()
features.to_csv(args.output) # ✅ FIXED: added missing parenthesis
if __name__ == "__main__":
main()