File size: 9,201 Bytes
c522dca | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | """Market Data Pipeline for AlphaForge."""
import numpy as np
import pandas as pd
import yfinance as yf
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')
class MarketDataPipeline:
"""Fetch and preprocess market data"""
def __init__(self, tickers: List[str], start_date: str, end_date: str):
self.tickers = tickers
self.start_date = start_date
self.end_date = end_date
self.data = {}
def fetch_data(self) -> Dict[str, pd.DataFrame]:
"""Fetch OHLCV data for all tickers"""
print(f"Fetching data for {len(self.tickers)} tickers...")
for ticker in self.tickers:
try:
df = yf.download(ticker, start=self.start_date, end=self.end_date, progress=False)
if len(df) > 100:
# Flatten multi-index columns if present
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df.columns = [c.title() if isinstance(c, str) else c for c in df.columns]
# Ensure standard column names
col_map = {}
for c in df.columns:
sc = str(c).upper()
if 'OPEN' in sc:
col_map[c] = 'Open'
elif 'HIGH' in sc:
col_map[c] = 'High'
elif 'LOW' in sc:
col_map[c] = 'Low'
elif 'CLOSE' in sc:
col_map[c] = 'Close'
elif 'VOLUME' in sc or 'VOL' in sc:
col_map[c] = 'Volume'
if col_map:
df = df.rename(columns=col_map)
for req in ['Open', 'High', 'Low', 'Close', 'Volume']:
if req not in df.columns:
df[req] = np.nan
self.data[ticker] = df
except Exception as e:
print(f"Error fetching {ticker}: {e}")
print(f"Successfully fetched {len(self.data)} tickers")
return self.data
def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute technical indicators"""
features = pd.DataFrame(index=df.index)
close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
high = df['High'].values.flatten() if hasattr(df['High'], 'values') else np.array(df['High']).flatten()
low = df['Low'].values.flatten() if hasattr(df['Low'], 'values') else np.array(df['Low']).flatten()
volume = df['Volume'].values.flatten() if hasattr(df['Volume'], 'values') else np.array(df['Volume']).flatten()
# Returns
for d in [1, 5, 10, 21, 63]:
features[f'return_{d}d'] = np.log(close / np.roll(close, d))
# Realized volatility
log_ret = np.log(close / np.roll(close, 1))
for d in [5, 21, 63]:
rvol = pd.Series(log_ret).rolling(d).apply(lambda x: np.sqrt(252/d * np.sum(x**2)))
features[f'rvol_{d}d'] = rvol.values
# Moving averages
for d in [5, 10, 20, 50, 200]:
sma = pd.Series(close).rolling(d).mean()
features[f'sma_{d}d'] = sma.values / close - 1
# RSI
delta = pd.Series(close).diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(14).mean()
avg_loss = loss.rolling(14).mean()
rs = avg_gain / avg_loss
features['rsi_14'] = (100 - 100 / (1 + rs)).values
# MACD
ema12 = pd.Series(close).ewm(span=12).mean()
ema26 = pd.Series(close).ewm(span=26).mean()
features['macd'] = (ema12 - ema26).values / close
features['macd_signal'] = pd.Series(features['macd']).ewm(span=9).mean().values
# Bollinger Bands
sma20 = pd.Series(close).rolling(20).mean()
std20 = pd.Series(close).rolling(20).std()
features['bb_position'] = ((close - sma20) / (2 * std20)).flatten() if hasattr(sma20, 'values') else (close - sma20.values) / (2 * std20.values)
# Volume indicators
features['volume_sma_ratio'] = (volume / pd.Series(volume).rolling(20).mean().values)
features['volume_change'] = np.log(volume / np.roll(volume, 1))
# Price-based
features['intraday_range'] = (high - low) / close
features['open_gap'] = (close - np.roll(close, 1)) / np.roll(close, 1)
return features.replace([np.inf, -np.inf], np.nan).fillna(0)
def compute_cross_asset_features(self) -> pd.DataFrame:
"""Compute cross-asset correlation and spread features"""
returns = {}
for ticker, df in self.data.items():
close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
returns[ticker] = np.log(close / np.roll(close, 1))
returns_df = pd.DataFrame(returns, index=list(self.data.values())[0].index).fillna(0)
features = pd.DataFrame(index=returns_df.index)
# Market beta (vs SPY)
if 'SPY' in returns_df.columns:
for ticker in returns_df.columns:
if ticker != 'SPY':
beta = returns_df[ticker].rolling(63).cov(returns_df['SPY']) / returns_df['SPY'].rolling(63).var()
features[f'{ticker}_beta'] = beta.values
# Average correlation
corr_window = returns_df.rolling(63).corr()
features['avg_correlation'] = corr_window.groupby(level=0).mean().mean(axis=1).values
# Sector momentum spreads
if all(x in returns_df.columns for x in ['XLF', 'XLK', 'XLE']):
features['fin_vs_tech'] = (returns_df['XLF'] - returns_df['XLK']).rolling(21).sum().values
features['energy_vs_market'] = (returns_df['XLE'] - returns_df['SPY']).rolling(21).sum().values
return features.fillna(0)
def create_feature_matrix(self) -> pd.DataFrame:
"""Create unified feature matrix for all assets"""
all_features = []
for ticker, df in self.data.items():
tech_features = self.compute_technical_indicators(df)
tech_features['ticker'] = ticker
tech_features['close'] = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
all_features.append(tech_features)
features_df = pd.concat(all_features, ignore_index=False)
# Add cross-asset features
cross_features = self.compute_cross_asset_features()
# Merge
for col in cross_features.columns:
features_df[col] = np.nan
for idx in features_df.index.unique():
if idx in cross_features.index:
features_df.loc[idx, col] = cross_features.loc[idx, col]
features_df = features_df.fillna(0)
# Sliding window z-score normalization per ticker
numeric_cols = [c for c in features_df.columns if c not in ['ticker', 'close']]
normalized = features_df.copy()
for ticker in normalized['ticker'].unique():
mask = normalized['ticker'] == ticker
for col in numeric_cols:
series = normalized.loc[mask, col]
normalized.loc[mask, col] = (series - series.rolling(21).mean()) / series.rolling(21).std().replace(0, 1)
return normalized.replace([np.inf, -np.inf], 0).fillna(0)
def create_sequences(self, features_df: pd.DataFrame, lookback: int = 60,
forecast_horizon: int = 5) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Create sequences for time series models"""
X_list, y_list, tickers_list, dates_list = [], [], [], []
feature_cols = [c for c in features_df.columns if c not in ['ticker', 'close']]
for ticker in features_df['ticker'].unique():
ticker_df = features_df[features_df['ticker'] == ticker].copy()
ticker_df = ticker_df.sort_index()
if len(ticker_df) < lookback + forecast_horizon + 21:
continue
values = ticker_df[feature_cols].values
closes = ticker_df['close'].values
for i in range(lookback, len(values) - forecast_horizon):
X_list.append(values[i-lookback:i])
# Target: future return
future_return = np.log(closes[i + forecast_horizon] / closes[i])
y_list.append(future_return)
tickers_list.append(ticker)
dates_list.append(ticker_df.index[i])
return (np.array(X_list), np.array(y_list),
np.array(tickers_list), np.array(dates_list))
|