"""Market Data Pipeline for AlphaForge.""" import numpy as np import pandas as pd import yfinance as yf from typing import Dict, List, Optional, Tuple import warnings warnings.filterwarnings('ignore') class MarketDataPipeline: """Fetch and preprocess market data""" def __init__(self, tickers: List[str], start_date: str, end_date: str): self.tickers = tickers self.start_date = start_date self.end_date = end_date self.data = {} def fetch_data(self) -> Dict[str, pd.DataFrame]: """Fetch OHLCV data for all tickers""" print(f"Fetching data for {len(self.tickers)} tickers...") for ticker in self.tickers: try: df = yf.download(ticker, start=self.start_date, end=self.end_date, progress=False) if len(df) > 100: # Flatten multi-index columns if present if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) df.columns = [c.title() if isinstance(c, str) else c for c in df.columns] # Ensure standard column names col_map = {} for c in df.columns: sc = str(c).upper() if 'OPEN' in sc: col_map[c] = 'Open' elif 'HIGH' in sc: col_map[c] = 'High' elif 'LOW' in sc: col_map[c] = 'Low' elif 'CLOSE' in sc: col_map[c] = 'Close' elif 'VOLUME' in sc or 'VOL' in sc: col_map[c] = 'Volume' if col_map: df = df.rename(columns=col_map) for req in ['Open', 'High', 'Low', 'Close', 'Volume']: if req not in df.columns: df[req] = np.nan self.data[ticker] = df except Exception as e: print(f"Error fetching {ticker}: {e}") print(f"Successfully fetched {len(self.data)} tickers") return self.data def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: """Compute technical indicators""" features = pd.DataFrame(index=df.index) close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten() high = df['High'].values.flatten() if hasattr(df['High'], 'values') else np.array(df['High']).flatten() low = df['Low'].values.flatten() if hasattr(df['Low'], 'values') else np.array(df['Low']).flatten() volume = df['Volume'].values.flatten() if hasattr(df['Volume'], 'values') else np.array(df['Volume']).flatten() # Returns for d in [1, 5, 10, 21, 63]: features[f'return_{d}d'] = np.log(close / np.roll(close, d)) # Realized volatility log_ret = np.log(close / np.roll(close, 1)) for d in [5, 21, 63]: rvol = pd.Series(log_ret).rolling(d).apply(lambda x: np.sqrt(252/d * np.sum(x**2))) features[f'rvol_{d}d'] = rvol.values # Moving averages for d in [5, 10, 20, 50, 200]: sma = pd.Series(close).rolling(d).mean() features[f'sma_{d}d'] = sma.values / close - 1 # RSI delta = pd.Series(close).diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(14).mean() avg_loss = loss.rolling(14).mean() rs = avg_gain / avg_loss features['rsi_14'] = (100 - 100 / (1 + rs)).values # MACD ema12 = pd.Series(close).ewm(span=12).mean() ema26 = pd.Series(close).ewm(span=26).mean() features['macd'] = (ema12 - ema26).values / close features['macd_signal'] = pd.Series(features['macd']).ewm(span=9).mean().values # Bollinger Bands sma20 = pd.Series(close).rolling(20).mean() std20 = pd.Series(close).rolling(20).std() features['bb_position'] = ((close - sma20) / (2 * std20)).flatten() if hasattr(sma20, 'values') else (close - sma20.values) / (2 * std20.values) # Volume indicators features['volume_sma_ratio'] = (volume / pd.Series(volume).rolling(20).mean().values) features['volume_change'] = np.log(volume / np.roll(volume, 1)) # Price-based features['intraday_range'] = (high - low) / close features['open_gap'] = (close - np.roll(close, 1)) / np.roll(close, 1) return features.replace([np.inf, -np.inf], np.nan).fillna(0) def compute_cross_asset_features(self) -> pd.DataFrame: """Compute cross-asset correlation and spread features""" returns = {} for ticker, df in self.data.items(): close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten() returns[ticker] = np.log(close / np.roll(close, 1)) returns_df = pd.DataFrame(returns, index=list(self.data.values())[0].index).fillna(0) features = pd.DataFrame(index=returns_df.index) # Market beta (vs SPY) if 'SPY' in returns_df.columns: for ticker in returns_df.columns: if ticker != 'SPY': beta = returns_df[ticker].rolling(63).cov(returns_df['SPY']) / returns_df['SPY'].rolling(63).var() features[f'{ticker}_beta'] = beta.values # Average correlation corr_window = returns_df.rolling(63).corr() features['avg_correlation'] = corr_window.groupby(level=0).mean().mean(axis=1).values # Sector momentum spreads if all(x in returns_df.columns for x in ['XLF', 'XLK', 'XLE']): features['fin_vs_tech'] = (returns_df['XLF'] - returns_df['XLK']).rolling(21).sum().values features['energy_vs_market'] = (returns_df['XLE'] - returns_df['SPY']).rolling(21).sum().values return features.fillna(0) def create_feature_matrix(self) -> pd.DataFrame: """Create unified feature matrix for all assets""" all_features = [] for ticker, df in self.data.items(): tech_features = self.compute_technical_indicators(df) tech_features['ticker'] = ticker tech_features['close'] = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten() all_features.append(tech_features) features_df = pd.concat(all_features, ignore_index=False) # Add cross-asset features cross_features = self.compute_cross_asset_features() # Merge for col in cross_features.columns: features_df[col] = np.nan for idx in features_df.index.unique(): if idx in cross_features.index: features_df.loc[idx, col] = cross_features.loc[idx, col] features_df = features_df.fillna(0) # Sliding window z-score normalization per ticker numeric_cols = [c for c in features_df.columns if c not in ['ticker', 'close']] normalized = features_df.copy() for ticker in normalized['ticker'].unique(): mask = normalized['ticker'] == ticker for col in numeric_cols: series = normalized.loc[mask, col] normalized.loc[mask, col] = (series - series.rolling(21).mean()) / series.rolling(21).std().replace(0, 1) return normalized.replace([np.inf, -np.inf], 0).fillna(0) def create_sequences(self, features_df: pd.DataFrame, lookback: int = 60, forecast_horizon: int = 5) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Create sequences for time series models""" X_list, y_list, tickers_list, dates_list = [], [], [], [] feature_cols = [c for c in features_df.columns if c not in ['ticker', 'close']] for ticker in features_df['ticker'].unique(): ticker_df = features_df[features_df['ticker'] == ticker].copy() ticker_df = ticker_df.sort_index() if len(ticker_df) < lookback + forecast_horizon + 21: continue values = ticker_df[feature_cols].values closes = ticker_df['close'].values for i in range(lookback, len(values) - forecast_horizon): X_list.append(values[i-lookback:i]) # Target: future return future_return = np.log(closes[i + forecast_horizon] / closes[i]) y_list.append(future_return) tickers_list.append(ticker) dates_list.append(ticker_df.index[i]) return (np.array(X_list), np.array(y_list), np.array(tickers_list), np.array(dates_list))