alphaforge-quant-system / market_data.py
Premchan369's picture
Upload market_data.py
c522dca verified
"""Market Data Pipeline for AlphaForge."""
import numpy as np
import pandas as pd
import yfinance as yf
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')
class MarketDataPipeline:
"""Fetch and preprocess market data"""
def __init__(self, tickers: List[str], start_date: str, end_date: str):
self.tickers = tickers
self.start_date = start_date
self.end_date = end_date
self.data = {}
def fetch_data(self) -> Dict[str, pd.DataFrame]:
"""Fetch OHLCV data for all tickers"""
print(f"Fetching data for {len(self.tickers)} tickers...")
for ticker in self.tickers:
try:
df = yf.download(ticker, start=self.start_date, end=self.end_date, progress=False)
if len(df) > 100:
# Flatten multi-index columns if present
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df.columns = [c.title() if isinstance(c, str) else c for c in df.columns]
# Ensure standard column names
col_map = {}
for c in df.columns:
sc = str(c).upper()
if 'OPEN' in sc:
col_map[c] = 'Open'
elif 'HIGH' in sc:
col_map[c] = 'High'
elif 'LOW' in sc:
col_map[c] = 'Low'
elif 'CLOSE' in sc:
col_map[c] = 'Close'
elif 'VOLUME' in sc or 'VOL' in sc:
col_map[c] = 'Volume'
if col_map:
df = df.rename(columns=col_map)
for req in ['Open', 'High', 'Low', 'Close', 'Volume']:
if req not in df.columns:
df[req] = np.nan
self.data[ticker] = df
except Exception as e:
print(f"Error fetching {ticker}: {e}")
print(f"Successfully fetched {len(self.data)} tickers")
return self.data
def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Compute technical indicators"""
features = pd.DataFrame(index=df.index)
close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
high = df['High'].values.flatten() if hasattr(df['High'], 'values') else np.array(df['High']).flatten()
low = df['Low'].values.flatten() if hasattr(df['Low'], 'values') else np.array(df['Low']).flatten()
volume = df['Volume'].values.flatten() if hasattr(df['Volume'], 'values') else np.array(df['Volume']).flatten()
# Returns
for d in [1, 5, 10, 21, 63]:
features[f'return_{d}d'] = np.log(close / np.roll(close, d))
# Realized volatility
log_ret = np.log(close / np.roll(close, 1))
for d in [5, 21, 63]:
rvol = pd.Series(log_ret).rolling(d).apply(lambda x: np.sqrt(252/d * np.sum(x**2)))
features[f'rvol_{d}d'] = rvol.values
# Moving averages
for d in [5, 10, 20, 50, 200]:
sma = pd.Series(close).rolling(d).mean()
features[f'sma_{d}d'] = sma.values / close - 1
# RSI
delta = pd.Series(close).diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(14).mean()
avg_loss = loss.rolling(14).mean()
rs = avg_gain / avg_loss
features['rsi_14'] = (100 - 100 / (1 + rs)).values
# MACD
ema12 = pd.Series(close).ewm(span=12).mean()
ema26 = pd.Series(close).ewm(span=26).mean()
features['macd'] = (ema12 - ema26).values / close
features['macd_signal'] = pd.Series(features['macd']).ewm(span=9).mean().values
# Bollinger Bands
sma20 = pd.Series(close).rolling(20).mean()
std20 = pd.Series(close).rolling(20).std()
features['bb_position'] = ((close - sma20) / (2 * std20)).flatten() if hasattr(sma20, 'values') else (close - sma20.values) / (2 * std20.values)
# Volume indicators
features['volume_sma_ratio'] = (volume / pd.Series(volume).rolling(20).mean().values)
features['volume_change'] = np.log(volume / np.roll(volume, 1))
# Price-based
features['intraday_range'] = (high - low) / close
features['open_gap'] = (close - np.roll(close, 1)) / np.roll(close, 1)
return features.replace([np.inf, -np.inf], np.nan).fillna(0)
def compute_cross_asset_features(self) -> pd.DataFrame:
"""Compute cross-asset correlation and spread features"""
returns = {}
for ticker, df in self.data.items():
close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
returns[ticker] = np.log(close / np.roll(close, 1))
returns_df = pd.DataFrame(returns, index=list(self.data.values())[0].index).fillna(0)
features = pd.DataFrame(index=returns_df.index)
# Market beta (vs SPY)
if 'SPY' in returns_df.columns:
for ticker in returns_df.columns:
if ticker != 'SPY':
beta = returns_df[ticker].rolling(63).cov(returns_df['SPY']) / returns_df['SPY'].rolling(63).var()
features[f'{ticker}_beta'] = beta.values
# Average correlation
corr_window = returns_df.rolling(63).corr()
features['avg_correlation'] = corr_window.groupby(level=0).mean().mean(axis=1).values
# Sector momentum spreads
if all(x in returns_df.columns for x in ['XLF', 'XLK', 'XLE']):
features['fin_vs_tech'] = (returns_df['XLF'] - returns_df['XLK']).rolling(21).sum().values
features['energy_vs_market'] = (returns_df['XLE'] - returns_df['SPY']).rolling(21).sum().values
return features.fillna(0)
def create_feature_matrix(self) -> pd.DataFrame:
"""Create unified feature matrix for all assets"""
all_features = []
for ticker, df in self.data.items():
tech_features = self.compute_technical_indicators(df)
tech_features['ticker'] = ticker
tech_features['close'] = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
all_features.append(tech_features)
features_df = pd.concat(all_features, ignore_index=False)
# Add cross-asset features
cross_features = self.compute_cross_asset_features()
# Merge
for col in cross_features.columns:
features_df[col] = np.nan
for idx in features_df.index.unique():
if idx in cross_features.index:
features_df.loc[idx, col] = cross_features.loc[idx, col]
features_df = features_df.fillna(0)
# Sliding window z-score normalization per ticker
numeric_cols = [c for c in features_df.columns if c not in ['ticker', 'close']]
normalized = features_df.copy()
for ticker in normalized['ticker'].unique():
mask = normalized['ticker'] == ticker
for col in numeric_cols:
series = normalized.loc[mask, col]
normalized.loc[mask, col] = (series - series.rolling(21).mean()) / series.rolling(21).std().replace(0, 1)
return normalized.replace([np.inf, -np.inf], 0).fillna(0)
def create_sequences(self, features_df: pd.DataFrame, lookback: int = 60,
forecast_horizon: int = 5) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Create sequences for time series models"""
X_list, y_list, tickers_list, dates_list = [], [], [], []
feature_cols = [c for c in features_df.columns if c not in ['ticker', 'close']]
for ticker in features_df['ticker'].unique():
ticker_df = features_df[features_df['ticker'] == ticker].copy()
ticker_df = ticker_df.sort_index()
if len(ticker_df) < lookback + forecast_horizon + 21:
continue
values = ticker_df[feature_cols].values
closes = ticker_df['close'].values
for i in range(lookback, len(values) - forecast_horizon):
X_list.append(values[i-lookback:i])
# Target: future return
future_return = np.log(closes[i + forecast_horizon] / closes[i])
y_list.append(future_return)
tickers_list.append(ticker)
dates_list.append(ticker_df.index[i])
return (np.array(X_list), np.array(y_list),
np.array(tickers_list), np.array(dates_list))