""" Integrated Market Theory - Feature Engineering Pipeline Combines all tickers from geo_macro.py into unified theory indicators Usage: python feature_engineering.py --input unified_market_data.csv --output enhanced_features.csv """ import pandas as pd import numpy as np from sklearn.decomposition import PCA from sklearn.preprocessing import StandardScaler import warnings warnings.filterwarnings('ignore') def safe_zscore(series, window=252, min_obs=30): """Rolling z-score with fallback to 0 for unstable windows""" mean = series.rolling(window, min_periods=min_obs).mean() std = series.rolling(window, min_periods=min_obs).std() z = (series - mean) / std return z.fillna(0).clip(-3, 3) class IntegratedTheoryFeatures: """ Transforms raw market data into theory-driven features combining: - Dalio's 5 Forces - Stevenson's Inequality Metrics - Thiel's Monopoly Indicators - Gundlach's Reckoning Signals """ def __init__(self, df): # Validate critical columns required = {'SP500', 'DGS10', 'Gold', 'VIX', 'UNRATE', 'CPIAUCSL'} missing = required - set(df.columns) if missing: raise ValueError(f"Critical data missing: {missing}") self.df = df.copy() self.features = pd.DataFrame(index=df.index) def calculate_returns_volatility(self, windows=[21, 63, 252]): """Calculate multi-timeframe returns and volatility for all tickers""" print("Calculating returns and volatility...") for col in self.df.columns: for window in windows: # Returns self.df[f'{col}_ret{window}'] = self.df[col].pct_change(window) # Volatility self.df[f'{col}_vol{window}'] = self.df[col].pct_change().rolling(window).std() # Momentum self.df[f'{col}_mom{window}'] = ( self.df[col].pct_change(window) - self.df[col].pct_change(window).shift(window) ) return self def dalio_forces(self): """Ray Dalio's 5 Forces Composite Indicators""" print("Building Dalio's 5 Forces...") # Force 1: Debt/Economic Cycle yield_curve = self.df.get('DGS10', 0) - self.df.get('DGS2', 0) inflation_mom = self.df.get('CPIAUCSL', pd.Series(0)).pct_change(12) * 100 hy_spread = self.df.get('BAMLH0A0HYM2', pd.Series(0)) / 100 self.features['dalio_debt_cycle'] = ( yield_curve * 0.3 + inflation_mom * 0.4 + hy_spread * 0.3 ) # Force 2: Internal Conflict consumer_weakness = (self.df.get('Consumer_Discretionary', 0) / self.df.get('Consumer_Staples', 1)).pct_change(63) * -1 unemployment_stress = self.df.get('UNRATE', pd.Series(0)).diff() * 2 small_large_gap = (self.df.get('Small_Cap_Value', 0) / self.df.get('SP500', 1)).pct_change(63) * -1 self.features['dalio_internal_conflict'] = ( consumer_weakness * 0.4 + unemployment_stress * 0.3 + small_large_gap * 0.3 ) # Force 3: External Conflict defense_momentum = self.df.get('Defense_Stocks', pd.Series(0)).pct_change(21) dollar_anomaly = self._calculate_dollar_anomaly() china_taiwan_tension = self._calculate_asia_tension() self.features['dalio_external_conflict'] = ( defense_momentum * 0.4 + dollar_anomaly * 0.3 + china_taiwan_tension * 0.3 ) # Force 4: Acts of Nature water_stress = self.df.get('Water', pd.Series(0)).pct_change(63) ag_volatility = self.df.get('Agricultural', pd.Series(0)).pct_change().rolling(63).std() * 100 self.features['dalio_nature_force'] = ( water_stress * 0.6 + ag_volatility * 0.4 ) # Force 5: Technology/Inventiveness tech_outperform = (self.df.get('Technology', 0) / self.df.get('SP500', 1)).pct_change(21) cloud_momentum = self.df.get('Cloud_Computing', pd.Series(0)).pct_change(63) ai_momentum = self.df.get('Robotics_AI', pd.Series(0)).pct_change(63) self.features['dalio_tech_force'] = ( tech_outperform * 0.4 + cloud_momentum * 0.3 + ai_momentum * 0.3 ) # Master Composite dalio_components = [ self.features['dalio_debt_cycle'] * 0.35, self.features['dalio_internal_conflict'] * 0.25, self.features['dalio_external_conflict'] * 0.20, self.features['dalio_tech_force'] * 0.15, self.features['dalio_nature_force'] * 0.05 ] self.features['dalio_composite'] = pd.concat(dalio_components, axis=1).sum(axis=1) self.features['dalio_composite_norm'] = self._normalize(self.features['dalio_composite']) return self def stevenson_inequality(self): """Gary Stevenson's Inequality Amplification Metrics""" print("Building Stevenson's inequality indicators...") asset_rich = (self.df.get('Gold', 0) + self.df.get('Real_Estate', 0) + self.df.get('Growth_Stocks', 0)) / 3 middle_class = (self.df.get('Consumer_Staples', 0) + self.df.get('Regional_Banks', 0) + self.df.get('Small_Cap_Value', 0)) / 3 self.features['inequality_wealth_flow'] = ( asset_rich.pct_change(63) - middle_class.pct_change(63) ) luxury = self.df.get('Retail_Luxury', pd.Series(0)).pct_change(21) mass = (self.df.get('Restaurants', 0) + self.df.get('Retail', 0)) / 2 mass = mass.pct_change(21) self.features['inequality_consumption_gap'] = luxury - mass quality_credit = (self.df.get('Investment_Grade_Spread', 0) + self.df.get('Preferred_Stock', 0)) / 2 junk_credit = (self.df.get('HYG', 0) + self.df.get('JNK', 0) + self.df.get('Emerging_Market_Debt', 0)) / 3 self.features['inequality_credit_access'] = ( quality_credit.pct_change(63) - junk_credit.pct_change(63) ) self.features['stevenson_inequality'] = ( self.features['inequality_wealth_flow'] * 0.4 + self.features['inequality_consumption_gap'] * 0.3 + self.features['inequality_credit_access'] * 0.3 ) self.features['stevenson_inequality_norm'] = self._normalize(self.features['stevenson_inequality']) asset_inflation = (self.df.get('Gold', 0) + self.df.get('Real_Estate', 0)).pct_change(21) wage_proxy = self.df.get('Staffing', pd.Series(0)).pct_change(21) self.features['inequality_transmission'] = asset_inflation - wage_proxy return self def thiel_monopoly(self): """Peter Thiel's Monopoly vs Competition Indicators""" print("Building Thiel's monopoly indicators...") tech_strength = self.df.get('Technology', 0) finance_strength = self.df.get('Financials', 1) self.features['monopoly_cash_moat'] = ( tech_strength.pct_change(63) - finance_strength.pct_change(63) ) network_sectors = (self.df.get('Cloud_Computing', 0) * 0.4 + self.df.get('Communication_Services', 0) * 0.3 + self.df.get('Fintech', 0) * 0.3) self.features['monopoly_network_effects'] = network_sectors.pct_change(63) tech_volatility = self.df.get('Technology', pd.Series(1)).pct_change().rolling(63).std() chip_strength = self.df.get('Semiconductors', pd.Series(0)).pct_change(63) self.features['monopoly_defensibility'] = ( (1 / (tech_volatility + 0.001)) * 0.01 + chip_strength * 0.5 ) self.features['thiel_monopoly'] = ( self.features['monopoly_cash_moat'] * 0.35 + self.features['monopoly_network_effects'] * 0.35 + self.features['monopoly_defensibility'] * 0.30 ) self.features['thiel_monopoly_norm'] = self._normalize(self.features['thiel_monopoly']) tech_return = self.df.get('Technology', pd.Series(0)).pct_change(21) rate_change = self.df.get('DGS10', pd.Series(0)).diff() * -1 self.features['monopoly_immunity'] = tech_return / (rate_change.abs() + 0.001) specialized = (self.df.get('Semiconductors', 0) + self.df.get('Cloud_Computing', 0) + self.df.get('Robotics_AI', 0)) / 3 broad_tech = self.df.get('Technology', 1) self.features['tech_concentration'] = specialized / broad_tech return self def gundlach_reckoning(self): """Jeffrey Gundlach's Debt Reckoning and Paradigm Shift Signals""" print("Building Gundlach's reckoning indicators...") fed_proxy = self.df.get('DGS3MO', pd.Series(0)) long_yield = self.df.get('DGS10', pd.Series(0)) fed_cutting = fed_proxy.diff() < -0.05 yield_rising = long_yield.diff() > 0 self.features['gundlach_yield_anomaly'] = ( (fed_cutting & yield_rising).astype(float) + (long_yield - fed_proxy) ) gold_return = self.df.get('Gold', pd.Series(0)).pct_change(21) treasury_return = self.df.get('US_Treasuries_Long', pd.Series(1)).pct_change(21) self.features['gundlach_flight_shift'] = gold_return / (treasury_return + 0.001) dollar_weak = self.df.get('DXY', pd.Series(0)).pct_change(21) * -1 em_outperform = (self.df.get('Emerging_Markets', 0) + self.df.get('Europe', 0)) / 2 em_outperform = em_outperform.pct_change(21) sp_return = self.df.get('SP500', pd.Series(0)).pct_change(21) self.features['gundlach_capital_reversal'] = ( dollar_weak * 0.5 + (em_outperform - sp_return) * 0.5 ) regional_stress = (self.df.get('Regional_Banks', 0) / self.df.get('Financials', 1)).pct_change(21) mortgage_reit_stress = self.df.get('Mortgage_REITs', pd.Series(0)).pct_change(21) real_estate_vol = self.df.get('Real_Estate', pd.Series(1)).pct_change().rolling(21).std() * 100 self.features['gundlach_private_credit_risk'] = ( regional_stress * -0.4 + mortgage_reit_stress * -0.3 + real_estate_vol * 0.3 ) self.features['gundlach_reckoning'] = ( self.features['gundlach_yield_anomaly'] * 0.30 + self.features['gundlach_flight_shift'] * 0.25 + self.features['gundlach_capital_reversal'] * 0.25 + self.features['gundlach_private_credit_risk'] * 0.20 ) self.features['gundlach_reckoning_norm'] = self._normalize(self.features['gundlach_reckoning']) return self def geopolitical_indicators(self): """Regional conflict and energy transition signals""" print("Building geopolitical indicators...") oil_volatility = self.df.get('Oil', pd.Series(1)).pct_change().rolling(3).std() * 100 defense_spike = self.df.get('Defense_Stocks', pd.Series(0)).pct_change(5) gold_haven = self.df.get('Gold_Safe_Haven', pd.Series(0)).pct_change(5) self.features['middle_east_risk'] = ( oil_volatility * 0.4 + defense_spike * 0.3 + gold_haven * 0.3 ) gas_volatility = self.df.get('NaturalGas', pd.Series(1)).pct_change().rolling(5).std() * 100 europe_decline = self.df.get('Europe', pd.Series(0)).pct_change(21) * -1 swiss_franc_strength = self.df.get('Swiss_Franc', pd.Series(0)).pct_change(21) * -1 self.features['europe_risk'] = ( gas_volatility * 0.5 + europe_decline * 0.3 + swiss_franc_strength * 0.2 ) chip_stress = self.df.get('Semiconductors', pd.Series(1)).pct_change().rolling(21).std() * 100 taiwan_korea = (self.df.get('Taiwan', 0) + self.df.get('South_Korea', 0)) / 2 china_diverge = taiwan_korea.pct_change(21) - self.df.get('China', pd.Series(0)).pct_change(21) rare_earth = self.df.get('Rare_Earth', pd.Series(0)).pct_change(21) self.features['asia_risk'] = ( chip_stress * 0.4 + china_diverge * 0.3 + rare_earth * 0.3 ) self.features['geopolitical_risk'] = ( self.features['middle_east_risk'] * 0.4 + self.features['europe_risk'] * 0.3 + self.features['asia_risk'] * 0.3 ) self.features['geopolitical_risk_norm'] = self._normalize(self.features['geopolitical_risk']) uranium_momentum = self.df.get('Uranium', pd.Series(0)).pct_change(63) clean_momentum = self.df.get('Clean_Energy', pd.Series(0)).pct_change(63) oil_decline = self.df.get('Oil', pd.Series(0)).pct_change(252) * -1 self.features['energy_transition'] = ( uranium_momentum * 0.5 + clean_momentum * 0.3 + oil_decline * 0.2 ) return self def cross_asset_features(self): """Advanced cross-asset relationships""" print("Building cross-asset features...") defensive = (self.df.get('Gold', 0) + self.df.get('Utilities', 0) + self.df.get('Healthcare', 0)) / 3 risk_on = (self.df.get('Technology', 0) + self.df.get('Consumer_Discretionary', 0) + self.df.get('Real_Estate', 0)) / 3 self.features['flight_ratio'] = defensive / (risk_on + 0.001) regional_vs_broad = (self.df.get('Regional_Banks', 0) - self.df.get('Financials', 0)) mortgage_vs_reit = (self.df.get('Mortgage_REITs', 0) - self.df.get('REITs', 0)) em_vs_ig = (self.df.get('Emerging_Market_Debt', 0) - self.df.get('Investment_Grade_Spread', 0)) self.features['credit_contagion'] = ( regional_vs_broad.pct_change(21) + mortgage_vs_reit.pct_change(21) + em_vs_ig.pct_change(21) ) / 3 vix = self.df.get('VIX', pd.Series(20)) vix_historical_avg = vix.rolling(252).mean() geo_max = self.features[['middle_east_risk', 'europe_risk', 'asia_risk']].max(axis=1) self.features['geo_amplification'] = geo_max * (vix / vix_historical_avg) return self def scenario_probabilities(self): """Dynamic probability weights for future scenarios""" print("Calculating scenario probabilities...") # Scenario 1: Credit Collapse self.features['prob_credit_collapse'] = ( self.features['gundlach_reckoning_norm'] * 0.4 + safe_zscore(self.features['gundlach_private_credit_risk']) * 0.03 + safe_zscore(self.features['dalio_debt_cycle']) * 0.03 ) self.features['prob_credit_collapse'] = np.clip(self.features['prob_credit_collapse'], 0, 1) # Scenario 2: Stagflation inflation_high = (self.df.get('CPIAUCSL', pd.Series(0)).pct_change(12) * 100 > 2.5).astype(float) unemployment_rising = (self.df.get('UNRATE', pd.Series(0)).diff() > 0).astype(float) self.features['prob_stagflation'] = ( (inflation_high * unemployment_rising) * 0.3 + safe_zscore(self.features['dalio_external_conflict']) * 0.03 + safe_zscore(self.features['gundlach_capital_reversal']) * 0.02 + self.features['stevenson_inequality_norm'] * 0.2 ) self.features['prob_stagflation'] = np.clip(self.features['prob_stagflation'], 0, 1) # Scenario 3: Tech Monopoly Boom self.features['prob_tech_boom'] = ( self.features['thiel_monopoly_norm'] * 0.4 + safe_zscore(self.features['dalio_tech_force'] - self.features['dalio_debt_cycle']) * 0.03 + safe_zscore(self.features['energy_transition']) * 0.02 + (self.df.get('China_Tech', pd.Series(0)).pct_change(63) < self.df.get('Technology', pd.Series(0)).pct_change(63)).astype(float) * 0.1 ) self.features['prob_tech_boom'] = np.clip(self.features['prob_tech_boom'], 0, 1) self.features['prob_controlled_reset'] = 0.05 return self def regime_detection(self): """Classify current market regime""" print("Detecting market regimes...") def classify_regime(row): if (row['gundlach_reckoning_norm'] > 0.6 and row['prob_credit_collapse'] > 0.5): return 'CRISIS' elif row['thiel_monopoly_norm'] > 0.7: return 'TECH_MONOPOLY' elif (row['stevenson_inequality_norm'] > 0.6 and row['prob_stagflation'] > 0.4): return 'INEQUALITY_TRAP' elif row['geopolitical_risk_norm'] > 0.7: return 'GEOPOLITICAL_SHOCK' else: return 'TRANSITION' self.features['regime'] = self.features.apply(classify_regime, axis=1) return self def dimensionality_reduction(self): """Apply PCA to reduce feature space""" print("Applying dimensionality reduction...") debt_cols = [c for c in self.features.columns if 'dalio_debt' in c or 'gundlach' in c] inequality_cols = [c for c in self.features.columns if 'inequality' in c or 'stevenson' in c] geo_cols = [c for c in self.features.columns if 'risk' in c or 'middle_east' in c or 'europe' in c or 'asia' in c] tech_cols = [c for c in self.features.columns if 'monopoly' in c or 'thiel' in c or 'tech' in c] for name, cols in [('debt', debt_cols), ('inequality', inequality_cols), ('geo', geo_cols), ('tech', tech_cols)]: if len(cols) > 0: data = self.features[cols].dropna() if len(data) > 10: scaler = StandardScaler() data_scaled = scaler.fit_transform(data) pca = PCA(n_components=min(2, len(cols))) pcs = pca.fit_transform(data_scaled) for i in range(pcs.shape[1]): self.features.loc[data.index, f'{name}_PC{i+1}'] = pcs[:, i] return self def _calculate_dollar_anomaly(self): sp_correction = self.df.get('SP500', pd.Series(0)).pct_change(5) < -0.05 dollar_weakness = self.df.get('DXY', pd.Series(0)).pct_change(5) < 0 return (sp_correction & dollar_weakness).astype(float) def _calculate_asia_tension(self): taiwan = self.df.get('Taiwan', pd.Series(0)) china = self.df.get('China', pd.Series(0)) return (taiwan.pct_change(21) - china.pct_change(21)).fillna(0) def _normalize(self, series, window=252): rolling_mean = series.rolling(window, min_periods=20).mean() rolling_std = series.rolling(window, min_periods=20).std() return ((series - rolling_mean) / (rolling_std + 0.001)).clip(-3, 3) / 3 def build_all_features(self): print("\n" + "="*80) print("INTEGRATED THEORY FEATURE ENGINEERING") print("="*80 + "\n") self.calculate_returns_volatility() self.dalio_forces() self.stevenson_inequality() self.thiel_monopoly() self.gundlach_reckoning() self.geopolitical_indicators() self.cross_asset_features() self.scenario_probabilities() self.regime_detection() self.dimensionality_reduction() print("\n" + "="*80) print("FEATURE ENGINEERING COMPLETE") print("="*80) print(f"Total features created: {len(self.features.columns)}") print(f"Regimes detected: {self.features['regime'].value_counts().to_dict()}") print(f"\nCurrent state (latest):") print(f" - Dalio Composite: {self.features['dalio_composite_norm'].iloc[-1]:.3f}") print(f" - Stevenson Inequality: {self.features['stevenson_inequality_norm'].iloc[-1]:.3f}") print(f" - Thiel Monopoly: {self.features['thiel_monopoly_norm'].iloc[-1]:.3f}") print(f" - Gundlach Reckoning: {self.features['gundlach_reckoning_norm'].iloc[-1]:.3f}") print(f" - Regime: {self.features['regime'].iloc[-1]}") print(f"\nScenario Probabilities:") print(f" - Credit Collapse: {self.features['prob_credit_collapse'].iloc[-1]:.1%}") print(f" - Stagflation: {self.features['prob_stagflation'].iloc[-1]:.1%}") print(f" - Tech Boom: {self.features['prob_tech_boom'].iloc[-1]:.1%}") return self.features def main(): import argparse parser = argparse.ArgumentParser(description='Integrated Market Theory Feature Engineering') parser.add_argument('--input', default='unified_market_data.csv', help='Input CSV file from geo_macro.py') parser.add_argument('--output', default='enhanced_market_features.csv', help='Output CSV file with engineered features') args = parser.parse_args() print(f"Loading data from {args.input}...") df = pd.read_csv(args.input, index_col=0, parse_dates=True) print(f"Loaded {len(df)} rows, {len(df.columns)} columns") print(f"Date range: {df.index.min()} to {df.index.max()}") engine = IntegratedTheoryFeatures(df) features = engine.build_all_features() features.to_csv(args.output) # ✅ FIXED: added missing parenthesis if __name__ == "__main__": main()