File size: 9,201 Bytes
c522dca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
"""Market Data Pipeline for AlphaForge."""
import numpy as np
import pandas as pd
import yfinance as yf
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings('ignore')


class MarketDataPipeline:
    """Fetch and preprocess market data"""
    
    def __init__(self, tickers: List[str], start_date: str, end_date: str):
        self.tickers = tickers
        self.start_date = start_date
        self.end_date = end_date
        self.data = {}
        
    def fetch_data(self) -> Dict[str, pd.DataFrame]:
        """Fetch OHLCV data for all tickers"""
        print(f"Fetching data for {len(self.tickers)} tickers...")
        for ticker in self.tickers:
            try:
                df = yf.download(ticker, start=self.start_date, end=self.end_date, progress=False)
                if len(df) > 100:
                    # Flatten multi-index columns if present
                    if isinstance(df.columns, pd.MultiIndex):
                        df.columns = df.columns.get_level_values(0)
                    df.columns = [c.title() if isinstance(c, str) else c for c in df.columns]
                    # Ensure standard column names
                    col_map = {}
                    for c in df.columns:
                        sc = str(c).upper()
                        if 'OPEN' in sc:
                            col_map[c] = 'Open'
                        elif 'HIGH' in sc:
                            col_map[c] = 'High'
                        elif 'LOW' in sc:
                            col_map[c] = 'Low'
                        elif 'CLOSE' in sc:
                            col_map[c] = 'Close'
                        elif 'VOLUME' in sc or 'VOL' in sc:
                            col_map[c] = 'Volume'
                    if col_map:
                        df = df.rename(columns=col_map)
                    for req in ['Open', 'High', 'Low', 'Close', 'Volume']:
                        if req not in df.columns:
                            df[req] = np.nan
                    self.data[ticker] = df
            except Exception as e:
                print(f"Error fetching {ticker}: {e}")
        print(f"Successfully fetched {len(self.data)} tickers")
        return self.data
    
    def compute_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        """Compute technical indicators"""
        features = pd.DataFrame(index=df.index)
        close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
        high = df['High'].values.flatten() if hasattr(df['High'], 'values') else np.array(df['High']).flatten()
        low = df['Low'].values.flatten() if hasattr(df['Low'], 'values') else np.array(df['Low']).flatten()
        volume = df['Volume'].values.flatten() if hasattr(df['Volume'], 'values') else np.array(df['Volume']).flatten()
        
        # Returns
        for d in [1, 5, 10, 21, 63]:
            features[f'return_{d}d'] = np.log(close / np.roll(close, d))
        
        # Realized volatility
        log_ret = np.log(close / np.roll(close, 1))
        for d in [5, 21, 63]:
            rvol = pd.Series(log_ret).rolling(d).apply(lambda x: np.sqrt(252/d * np.sum(x**2)))
            features[f'rvol_{d}d'] = rvol.values
        
        # Moving averages
        for d in [5, 10, 20, 50, 200]:
            sma = pd.Series(close).rolling(d).mean()
            features[f'sma_{d}d'] = sma.values / close - 1
        
        # RSI
        delta = pd.Series(close).diff()
        gain = delta.where(delta > 0, 0)
        loss = -delta.where(delta < 0, 0)
        avg_gain = gain.rolling(14).mean()
        avg_loss = loss.rolling(14).mean()
        rs = avg_gain / avg_loss
        features['rsi_14'] = (100 - 100 / (1 + rs)).values
        
        # MACD
        ema12 = pd.Series(close).ewm(span=12).mean()
        ema26 = pd.Series(close).ewm(span=26).mean()
        features['macd'] = (ema12 - ema26).values / close
        features['macd_signal'] = pd.Series(features['macd']).ewm(span=9).mean().values
        
        # Bollinger Bands
        sma20 = pd.Series(close).rolling(20).mean()
        std20 = pd.Series(close).rolling(20).std()
        features['bb_position'] = ((close - sma20) / (2 * std20)).flatten() if hasattr(sma20, 'values') else (close - sma20.values) / (2 * std20.values)
        
        # Volume indicators
        features['volume_sma_ratio'] = (volume / pd.Series(volume).rolling(20).mean().values)
        features['volume_change'] = np.log(volume / np.roll(volume, 1))
        
        # Price-based
        features['intraday_range'] = (high - low) / close
        features['open_gap'] = (close - np.roll(close, 1)) / np.roll(close, 1)
        
        return features.replace([np.inf, -np.inf], np.nan).fillna(0)
    
    def compute_cross_asset_features(self) -> pd.DataFrame:
        """Compute cross-asset correlation and spread features"""
        returns = {}
        for ticker, df in self.data.items():
            close = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
            returns[ticker] = np.log(close / np.roll(close, 1))
        
        returns_df = pd.DataFrame(returns, index=list(self.data.values())[0].index).fillna(0)
        
        features = pd.DataFrame(index=returns_df.index)
        
        # Market beta (vs SPY)
        if 'SPY' in returns_df.columns:
            for ticker in returns_df.columns:
                if ticker != 'SPY':
                    beta = returns_df[ticker].rolling(63).cov(returns_df['SPY']) / returns_df['SPY'].rolling(63).var()
                    features[f'{ticker}_beta'] = beta.values
        
        # Average correlation
        corr_window = returns_df.rolling(63).corr()
        features['avg_correlation'] = corr_window.groupby(level=0).mean().mean(axis=1).values
        
        # Sector momentum spreads
        if all(x in returns_df.columns for x in ['XLF', 'XLK', 'XLE']):
            features['fin_vs_tech'] = (returns_df['XLF'] - returns_df['XLK']).rolling(21).sum().values
            features['energy_vs_market'] = (returns_df['XLE'] - returns_df['SPY']).rolling(21).sum().values
        
        return features.fillna(0)
    
    def create_feature_matrix(self) -> pd.DataFrame:
        """Create unified feature matrix for all assets"""
        all_features = []
        
        for ticker, df in self.data.items():
            tech_features = self.compute_technical_indicators(df)
            tech_features['ticker'] = ticker
            tech_features['close'] = df['Close'].values.flatten() if hasattr(df['Close'], 'values') else np.array(df['Close']).flatten()
            all_features.append(tech_features)
        
        features_df = pd.concat(all_features, ignore_index=False)
        
        # Add cross-asset features
        cross_features = self.compute_cross_asset_features()
        
        # Merge
        for col in cross_features.columns:
            features_df[col] = np.nan
            for idx in features_df.index.unique():
                if idx in cross_features.index:
                    features_df.loc[idx, col] = cross_features.loc[idx, col]
        
        features_df = features_df.fillna(0)
        
        # Sliding window z-score normalization per ticker
        numeric_cols = [c for c in features_df.columns if c not in ['ticker', 'close']]
        normalized = features_df.copy()
        
        for ticker in normalized['ticker'].unique():
            mask = normalized['ticker'] == ticker
            for col in numeric_cols:
                series = normalized.loc[mask, col]
                normalized.loc[mask, col] = (series - series.rolling(21).mean()) / series.rolling(21).std().replace(0, 1)
        
        return normalized.replace([np.inf, -np.inf], 0).fillna(0)
    
    def create_sequences(self, features_df: pd.DataFrame, lookback: int = 60, 
                         forecast_horizon: int = 5) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Create sequences for time series models"""
        X_list, y_list, tickers_list, dates_list = [], [], [], []
        
        feature_cols = [c for c in features_df.columns if c not in ['ticker', 'close']]
        
        for ticker in features_df['ticker'].unique():
            ticker_df = features_df[features_df['ticker'] == ticker].copy()
            ticker_df = ticker_df.sort_index()
            
            if len(ticker_df) < lookback + forecast_horizon + 21:
                continue
            
            values = ticker_df[feature_cols].values
            closes = ticker_df['close'].values
            
            for i in range(lookback, len(values) - forecast_horizon):
                X_list.append(values[i-lookback:i])
                
                # Target: future return
                future_return = np.log(closes[i + forecast_horizon] / closes[i])
                y_list.append(future_return)
                tickers_list.append(ticker)
                dates_list.append(ticker_df.index[i])
        
        return (np.array(X_list), np.array(y_list), 
                np.array(tickers_list), np.array(dates_list))