| """Market Data Pipeline for AlphaForge.""" |
| import numpy as np |
| import pandas as pd |
| import yfinance as yf |
| from typing import Dict, List, Optional, Tuple |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class MarketDataPipeline: |
| """Fetch and preprocess market data""" |
| |
| def __init__(self, tickers: List[str], start_date: str, end_date: str): |
| self.tickers = tickers |
| self.start_date = start_date |
| self.end_date = end_date |
| self.data = {} |
| |
| def fetch_data(self) -> Dict[str, pd.DataFrame]: |
| """Fetch OHLCV data for all tickers""" |
| print(f"Fetching data for {len(self.tickers)} tickers...") |
| for ticker in self.tickers: |
| try: |
| df = yf.download(ticker, start=self.start_date, end=self.end_date, progress=False) |
| if len(df) > 100: |
| |
| if isinstance(df.columns, pd.MultiIndex): |
| df.columns = df.columns.get_level_values(0) |
| df.columns = [c.title() if isinstance(c, str) else c for c in df.columns] |
| |
| col_map = {} |
| for c in df.columns: |
| sc = str(c).upper() |
| if 'OPEN' in sc: |
| col_map[c] = 'Open' |
| elif 'HIGH' in sc: |
| col_map[c] = 'High' |
| elif 'LOW' in sc: |
| col_map[c] = 'Low' |
| elif 'CLOSE' in sc: |
| col_map[c] = 'Close' |
| elif 'VOLUME' in sc or 'VOL' in sc: |
| col_map[c] = 'Volume' |
| if col_map: |
| df = df.rename(columns=col_map) |
| for req in ['Open', 'High', 'Low', 'Close', 'Volume']: |
| if req not in df.columns: |
| df[req] = np.nan |
| self.data[ticker] = df |
| except Exception as e: |
| print(f"Error fetching {ticker}: {e}") |
| print(f"Successfully fetched {len(self.data)} tickers") |
| return self.data |
| |
| def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame: |
| """Compute technical indicators""" |
| features = pd.DataFrame(index=df.index) |
| close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten() |
| high = df['High'].values.flatten() if hasattr(df['High'], 'values') else np.array(df['High']).flatten() |
| low = df['Low'].values.flatten() if hasattr(df['Low'], 'values') else np.array(df['Low']).flatten() |
| volume = df['Volume'].values.flatten() if hasattr(df['Volume'], 'values') else np.array(df['Volume']).flatten() |
| |
| |
| for d in [1, 5, 10, 21, 63]: |
| features[f'return_{d}d'] = np.log(close / np.roll(close, d)) |
| |
| |
| log_ret = np.log(close / np.roll(close, 1)) |
| for d in [5, 21, 63]: |
| rvol = pd.Series(log_ret).rolling(d).apply(lambda x: np.sqrt(252/d * np.sum(x**2))) |
| features[f'rvol_{d}d'] = rvol.values |
| |
| |
| for d in [5, 10, 20, 50, 200]: |
| sma = pd.Series(close).rolling(d).mean() |
| features[f'sma_{d}d'] = sma.values / close - 1 |
| |
| |
| delta = pd.Series(close).diff() |
| gain = delta.where(delta > 0, 0) |
| loss = -delta.where(delta < 0, 0) |
| avg_gain = gain.rolling(14).mean() |
| avg_loss = loss.rolling(14).mean() |
| rs = avg_gain / avg_loss |
| features['rsi_14'] = (100 - 100 / (1 + rs)).values |
| |
| |
| ema12 = pd.Series(close).ewm(span=12).mean() |
| ema26 = pd.Series(close).ewm(span=26).mean() |
| features['macd'] = (ema12 - ema26).values / close |
| features['macd_signal'] = pd.Series(features['macd']).ewm(span=9).mean().values |
| |
| |
| sma20 = pd.Series(close).rolling(20).mean() |
| std20 = pd.Series(close).rolling(20).std() |
| features['bb_position'] = ((close - sma20) / (2 * std20)).flatten() if hasattr(sma20, 'values') else (close - sma20.values) / (2 * std20.values) |
| |
| |
| features['volume_sma_ratio'] = (volume / pd.Series(volume).rolling(20).mean().values) |
| features['volume_change'] = np.log(volume / np.roll(volume, 1)) |
| |
| |
| features['intraday_range'] = (high - low) / close |
| features['open_gap'] = (close - np.roll(close, 1)) / np.roll(close, 1) |
| |
| return features.replace([np.inf, -np.inf], np.nan).fillna(0) |
| |
| def compute_cross_asset_features(self) -> pd.DataFrame: |
| """Compute cross-asset correlation and spread features""" |
| returns = {} |
| for ticker, df in self.data.items(): |
| close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten() |
| returns[ticker] = np.log(close / np.roll(close, 1)) |
| |
| returns_df = pd.DataFrame(returns, index=list(self.data.values())[0].index).fillna(0) |
| |
| features = pd.DataFrame(index=returns_df.index) |
| |
| |
| if 'SPY' in returns_df.columns: |
| for ticker in returns_df.columns: |
| if ticker != 'SPY': |
| beta = returns_df[ticker].rolling(63).cov(returns_df['SPY']) / returns_df['SPY'].rolling(63).var() |
| features[f'{ticker}_beta'] = beta.values |
| |
| |
| corr_window = returns_df.rolling(63).corr() |
| features['avg_correlation'] = corr_window.groupby(level=0).mean().mean(axis=1).values |
| |
| |
| if all(x in returns_df.columns for x in ['XLF', 'XLK', 'XLE']): |
| features['fin_vs_tech'] = (returns_df['XLF'] - returns_df['XLK']).rolling(21).sum().values |
| features['energy_vs_market'] = (returns_df['XLE'] - returns_df['SPY']).rolling(21).sum().values |
| |
| return features.fillna(0) |
| |
| def create_feature_matrix(self) -> pd.DataFrame: |
| """Create unified feature matrix for all assets""" |
| all_features = [] |
| |
| for ticker, df in self.data.items(): |
| tech_features = self.compute_technical_indicators(df) |
| tech_features['ticker'] = ticker |
| tech_features['close'] = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten() |
| all_features.append(tech_features) |
| |
| features_df = pd.concat(all_features, ignore_index=False) |
| |
| |
| cross_features = self.compute_cross_asset_features() |
| |
| |
| for col in cross_features.columns: |
| features_df[col] = np.nan |
| for idx in features_df.index.unique(): |
| if idx in cross_features.index: |
| features_df.loc[idx, col] = cross_features.loc[idx, col] |
| |
| features_df = features_df.fillna(0) |
| |
| |
| numeric_cols = [c for c in features_df.columns if c not in ['ticker', 'close']] |
| normalized = features_df.copy() |
| |
| for ticker in normalized['ticker'].unique(): |
| mask = normalized['ticker'] == ticker |
| for col in numeric_cols: |
| series = normalized.loc[mask, col] |
| normalized.loc[mask, col] = (series - series.rolling(21).mean()) / series.rolling(21).std().replace(0, 1) |
| |
| return normalized.replace([np.inf, -np.inf], 0).fillna(0) |
| |
| def create_sequences(self, features_df: pd.DataFrame, lookback: int = 60, |
| forecast_horizon: int = 5) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
| """Create sequences for time series models""" |
| X_list, y_list, tickers_list, dates_list = [], [], [], [] |
| |
| feature_cols = [c for c in features_df.columns if c not in ['ticker', 'close']] |
| |
| for ticker in features_df['ticker'].unique(): |
| ticker_df = features_df[features_df['ticker'] == ticker].copy() |
| ticker_df = ticker_df.sort_index() |
| |
| if len(ticker_df) < lookback + forecast_horizon + 21: |
| continue |
| |
| values = ticker_df[feature_cols].values |
| closes = ticker_df['close'].values |
| |
| for i in range(lookback, len(values) - forecast_horizon): |
| X_list.append(values[i-lookback:i]) |
| |
| |
| future_return = np.log(closes[i + forecast_horizon] / closes[i]) |
| y_list.append(future_return) |
| tickers_list.append(ticker) |
| dates_list.append(ticker_df.index[i]) |
| |
| return (np.array(X_list), np.array(y_list), |
| np.array(tickers_list), np.array(dates_list)) |
|
|