Spaces:
Sleeping
Sleeping
| """ | |
| Data Manager - Orchestrates all data sources and provides unified interface. | |
| This module is the main entry point for data collection. It: | |
| - Initializes all data sources | |
| - Handles fallbacks between sources | |
| - Merges data into unified datasets | |
| - Manages caching at the dataset level | |
| """ | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List | |
| import pandas as pd | |
| import numpy as np | |
| import yaml | |
| import logging | |
| from dotenv import load_dotenv | |
| from src.data.base import BaseDataSource, DataFetchError | |
| from src.data.fred import FREDDataSource | |
| from src.data.cboe import CBOEDataSource | |
| from src.data.yfinance_source import YFinanceDataSource | |
| from src.data.alpha_vantage import AlphaVantageDataSource | |
| # Load environment variables | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| class DataManager: | |
| """ | |
| Orchestrates data collection from multiple sources. | |
| This class provides a unified interface for collecting all data | |
| needed for the volatility regime prediction project. | |
| Example: | |
| dm = DataManager() | |
| dataset = dm.collect_all() | |
| # Or collect specific data | |
| vix = dm.get_vix_data() | |
| futures = dm.get_vix_futures() | |
| """ | |
| def __init__( | |
| self, | |
| config_path: Optional[Path] = None, | |
| cache_dir: Optional[Path] = None | |
| ): | |
| """ | |
| Initialize DataManager. | |
| Args: | |
| config_path: Path to config.yaml. Defaults to config/config.yaml. | |
| cache_dir: Base directory for caching. Defaults to data/raw. | |
| """ | |
| # Load configuration | |
| if config_path is None: | |
| config_path = Path("config/config.yaml") | |
| self.config = self._load_config(config_path) | |
| # Set up directories | |
| self.cache_dir = Path(cache_dir) if cache_dir else Path("data/raw") | |
| self.processed_dir = Path("data/processed") | |
| self.interim_dir = Path("data/interim") | |
| for dir_path in [self.cache_dir, self.processed_dir, self.interim_dir]: | |
| dir_path.mkdir(parents=True, exist_ok=True) | |
| # Initialize data sources | |
| self._init_sources() | |
| # Date range from config | |
| self.start_date = datetime.strptime( | |
| self.config['data']['start_date'], '%Y-%m-%d' | |
| ) | |
| self.end_date = datetime.now() if self.config['data']['end_date'] is None else \ | |
| datetime.strptime(self.config['data']['end_date'], '%Y-%m-%d') | |
| logger.info(f"DataManager initialized: {self.start_date} to {self.end_date}") | |
| def _load_config(self, config_path: Path) -> Dict[str, Any]: | |
| """Load configuration from YAML file.""" | |
| if config_path.exists(): | |
| with open(config_path, 'r') as f: | |
| return yaml.safe_load(f) | |
| else: | |
| logger.warning(f"Config file not found: {config_path}, using defaults") | |
| return { | |
| 'data': { | |
| 'start_date': '2006-01-01', | |
| 'end_date': None, | |
| 'cache': { | |
| 'enabled': True, | |
| 'expiry_days': 1 | |
| } | |
| } | |
| } | |
| def _init_sources(self) -> None: | |
| """Initialize all data sources.""" | |
| cache_config = self.config.get('data', {}).get('cache', {}) | |
| cache_enabled = cache_config.get('enabled', True) | |
| cache_expiry = cache_config.get('expiry_days', 1) | |
| # FRED source | |
| try: | |
| self.fred = FREDDataSource( | |
| cache_dir=self.cache_dir / 'fred', | |
| cache_enabled=cache_enabled, | |
| cache_expiry_days=cache_expiry | |
| ) | |
| logger.info("FRED data source initialized") | |
| except Exception as e: | |
| logger.warning(f"Could not initialize FRED source: {e}") | |
| self.fred = None | |
| # CBOE source | |
| try: | |
| self.cboe = CBOEDataSource( | |
| cache_dir=self.cache_dir / 'cboe', | |
| cache_enabled=cache_enabled, | |
| cache_expiry_days=cache_expiry | |
| ) | |
| logger.info("CBOE data source initialized") | |
| except Exception as e: | |
| logger.warning(f"Could not initialize CBOE source: {e}") | |
| self.cboe = None | |
| # Yahoo Finance source | |
| try: | |
| self.yfinance = YFinanceDataSource( | |
| cache_dir=self.cache_dir / 'yfinance', | |
| cache_enabled=cache_enabled, | |
| cache_expiry_days=cache_expiry | |
| ) | |
| logger.info("Yahoo Finance data source initialized") | |
| except Exception as e: | |
| logger.warning(f"Could not initialize Yahoo Finance source: {e}") | |
| self.yfinance = None | |
| # Alpha Vantage (premium) options/intraday source | |
| alpha_cfg = self.config.get('data', {}).get('sources', {}).get('alpha_vantage', {}) | |
| alpha_enabled = alpha_cfg.get('enabled', False) | |
| if alpha_enabled: | |
| try: | |
| self.alpha_vantage = AlphaVantageDataSource( | |
| cache_dir=self.cache_dir / 'alpha_vantage', | |
| cache_enabled=cache_enabled, | |
| cache_expiry_days=cache_expiry, | |
| api_key=alpha_cfg.get('api_key'), | |
| ) | |
| logger.info("Alpha Vantage data source initialized") | |
| except Exception as e: | |
| logger.warning(f"Could not initialize Alpha Vantage source: {e}") | |
| self.alpha_vantage = None | |
| else: | |
| self.alpha_vantage = None | |
| def get_vix_data( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| source: str = 'cboe' | |
| ) -> pd.DataFrame: | |
| """ | |
| Get VIX index data. | |
| Args: | |
| start_date: Start date (defaults to config). | |
| end_date: End date (defaults to config). | |
| source: 'cboe' or 'fred'. | |
| Returns: | |
| DataFrame with VIX data. | |
| """ | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| if source == 'cboe' and self.cboe: | |
| try: | |
| return self.cboe.fetch_vix_index(start, end) | |
| except DataFetchError as e: | |
| logger.warning(f"CBOE fetch failed, trying FRED: {e}") | |
| if self.fred: | |
| return self.fred.fetch_vix(start, end) | |
| raise DataFetchError("No available source for VIX data") | |
| def get_vix_indices( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| include_skew: bool = True | |
| ) -> pd.DataFrame: | |
| """ | |
| Get all VIX-related indices (VIX, VVIX, VIX9D, SKEW, etc.). | |
| Args: | |
| start_date: Start date. | |
| end_date: End date. | |
| include_skew: Whether to include SKEW index. | |
| Returns: | |
| DataFrame with all VIX indices. | |
| """ | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| if not self.cboe: | |
| raise DataFetchError("CBOE source not available") | |
| # Fetch VIX indices | |
| series = ['VIX', 'VVIX', 'VIX9D', 'VIX3M', 'VIX6M'] | |
| if include_skew: | |
| series.append('SKEW') | |
| return self.cboe.fetch_with_cache(start, end, series=series) | |
| def get_putcall_ratios( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Get put/call ratio and volume data from CBOE. | |
| Args: | |
| start_date: Start date. | |
| end_date: End date. | |
| Returns: | |
| DataFrame with total, index, equity, and VIX put/call data. | |
| """ | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| if not self.cboe: | |
| raise DataFetchError("CBOE source not available") | |
| return self.cboe.fetch_with_cache( | |
| start, end, | |
| series=['TOTAL_PC', 'INDEX_PC', 'EQUITY_PC', 'VIX_PC'] | |
| ) | |
| def get_vix_futures( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Get VIX futures term structure data. | |
| Args: | |
| start_date: Start date. | |
| end_date: End date. | |
| Returns: | |
| DataFrame with VX1-VX9 and term structure metrics. | |
| """ | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| if not self.cboe: | |
| raise DataFetchError("CBOE source not available") | |
| return self.cboe.fetch_vix_futures(start, end) | |
| def get_options_data( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| frequency: str = "W-FRI", | |
| symbol: str = "SPY", # SPY has IV/Greeks; SPX does not in API | |
| require_greeks: bool = True, | |
| use_cached: bool = True, | |
| ) -> pd.DataFrame: | |
| """ | |
| Fetch summarized historical options data (Alpha Vantage premium). | |
| Note: Use SPY (S&P 500 ETF) not SPX - the API returns IV/Greeks | |
| only for equity options, not index options. | |
| Args: | |
| start_date: Start date. | |
| end_date: End date. | |
| frequency: Pandas offset alias to subsample requests (default weekly Friday). | |
| symbol: Underlying symbol (SPY recommended for IV/Greeks). | |
| require_greeks: Request greeks/IV fields. | |
| use_cached: If True, try to load from cached parquet file first. | |
| Returns: | |
| DataFrame with 26 options features per date (IV, skew, term structure, Greeks). | |
| """ | |
| if not self.alpha_vantage: | |
| raise DataFetchError("Alpha Vantage source not available or disabled") | |
| # Try loading from batch-collected data first | |
| if use_cached: | |
| cache_file = self.cache_dir / 'alpha_vantage' / f'{symbol}_options_history.parquet' | |
| if cache_file.exists(): | |
| logger.info(f"Loading cached options data from {cache_file}") | |
| df = pd.read_parquet(cache_file) | |
| # Filter to date range | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| df = df.loc[start:end] | |
| return df | |
| start = (start_date or self.start_date).date() | |
| end = (end_date or self.end_date).date() | |
| dates = pd.date_range(start=start, end=end, freq=frequency) | |
| logger.info( | |
| f"Fetching Alpha Vantage options for {symbol}: {len(dates)} trading snapshots" | |
| ) | |
| frames = [] | |
| for dt in dates: | |
| try: | |
| frame = self.alpha_vantage.fetch_historical_options( | |
| date=dt.to_pydatetime(), | |
| symbol=symbol, | |
| require_greeks=require_greeks, | |
| ) | |
| frames.append(frame) | |
| except Exception as e: | |
| logger.warning(f"Options fetch failed for {dt.date()}: {e}") | |
| if not frames: | |
| raise DataFetchError("No options data retrieved from Alpha Vantage") | |
| combined = pd.concat(frames).sort_index() | |
| return combined | |
| def get_spx_prices( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Get S&P 500 price data. | |
| Args: | |
| start_date: Start date. | |
| end_date: End date. | |
| Returns: | |
| DataFrame with S&P 500 OHLCV data. | |
| """ | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| if not self.yfinance: | |
| raise DataFetchError("Yahoo Finance source not available") | |
| return self.yfinance.fetch_spx(start, end) | |
| def get_economic_data( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None | |
| ) -> pd.DataFrame: | |
| """ | |
| Get economic indicators from FRED. | |
| Args: | |
| start_date: Start date. | |
| end_date: End date. | |
| Returns: | |
| DataFrame with interest rates, spreads, and conditions. | |
| """ | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| if not self.fred: | |
| raise DataFetchError("FRED source not available") | |
| # Fetch different categories | |
| dfs = [] | |
| try: | |
| rates = self.fred.fetch_interest_rates(start, end) | |
| dfs.append(rates) | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch interest rates: {e}") | |
| try: | |
| spreads = self.fred.fetch_credit_spreads(start, end) | |
| dfs.append(spreads) | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch credit spreads: {e}") | |
| try: | |
| conditions = self.fred.fetch_financial_conditions(start, end) | |
| dfs.append(conditions) | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch financial conditions: {e}") | |
| if not dfs: | |
| raise DataFetchError("No economic data retrieved") | |
| return pd.concat(dfs, axis=1) | |
| def collect_all( | |
| self, | |
| start_date: Optional[datetime] = None, | |
| end_date: Optional[datetime] = None, | |
| include_futures: bool = True, | |
| include_economic: bool = True, | |
| include_putcall: bool = True, | |
| include_options: bool = False | |
| ) -> pd.DataFrame: | |
| """ | |
| Collect all available data and merge into unified dataset. | |
| Args: | |
| start_date: Start date. | |
| end_date: End date. | |
| include_futures: Whether to include VIX futures (slower). | |
| include_economic: Whether to include economic indicators. | |
| include_putcall: Whether to include put/call ratios. | |
| Returns: | |
| Merged DataFrame with all data. | |
| """ | |
| start = start_date or self.start_date | |
| end = end_date or self.end_date | |
| logger.info(f"Collecting all data: {start} to {end}") | |
| datasets = [] | |
| # VIX indices (now includes SKEW) | |
| logger.info("Fetching VIX indices and SKEW...") | |
| try: | |
| vix_data = self.get_vix_indices(start, end, include_skew=True) | |
| datasets.append(('vix_indices', vix_data)) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch VIX indices: {e}") | |
| # Put/Call ratios | |
| if include_putcall: | |
| logger.info("Fetching put/call ratios...") | |
| try: | |
| putcall_data = self.get_putcall_ratios(start, end) | |
| datasets.append(('putcall', putcall_data)) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch put/call ratios: {e}") | |
| # VIX futures | |
| if include_futures: | |
| logger.info("Fetching VIX futures (this may take a while)...") | |
| try: | |
| futures_data = self.get_vix_futures(start, end) | |
| datasets.append(('vix_futures', futures_data)) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch VIX futures: {e}") | |
| # S&P 500 prices | |
| logger.info("Fetching S&P 500 prices...") | |
| try: | |
| spx_data = self.get_spx_prices(start, end) | |
| datasets.append(('spx', spx_data)) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch S&P 500: {e}") | |
| # Alpha Vantage options (premium) | |
| if include_options: | |
| opts_cfg = self.config.get('data', {}).get('sources', {}).get('alpha_vantage', {}) | |
| freq = opts_cfg.get('frequency', 'W-FRI') | |
| symbol = opts_cfg.get('options_symbol', 'SPX') | |
| require_greeks = opts_cfg.get('require_greeks', True) | |
| logger.info(f"Fetching Alpha Vantage options for {symbol} (freq={freq})...") | |
| try: | |
| options_data = self.get_options_data( | |
| start, end, | |
| frequency=freq, | |
| symbol=symbol, | |
| require_greeks=require_greeks | |
| ) | |
| datasets.append(('options', options_data)) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch Alpha Vantage options: {e}") | |
| # Economic data | |
| if include_economic: | |
| logger.info("Fetching economic data from FRED...") | |
| try: | |
| econ_data = self.get_economic_data(start, end) | |
| datasets.append(('economic', econ_data)) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch economic data: {e}") | |
| if not datasets: | |
| raise DataFetchError("No data collected from any source") | |
| # Merge all datasets | |
| logger.info("Merging datasets...") | |
| merged = self._merge_datasets(datasets) | |
| # Filter to trading days only (where we have VIX data) | |
| if 'VIX_CLOSE' in merged.columns: | |
| trading_days_mask = merged['VIX_CLOSE'].notna() | |
| merged = merged[trading_days_mask] | |
| logger.info(f"Filtered to {len(merged)} trading days") | |
| # Save to interim storage | |
| interim_path = self.interim_dir / 'raw_merged.parquet' | |
| merged.to_parquet(interim_path) | |
| logger.info(f"Saved raw merged data to {interim_path}") | |
| return merged | |
| def _merge_datasets( | |
| self, | |
| datasets: List[tuple] | |
| ) -> pd.DataFrame: | |
| """ | |
| Merge multiple datasets on date index. | |
| Args: | |
| datasets: List of (name, DataFrame) tuples. | |
| Returns: | |
| Merged DataFrame. | |
| """ | |
| if not datasets: | |
| return pd.DataFrame() | |
| # Start with first dataset | |
| name, merged = datasets[0] | |
| logger.info(f"Starting merge with {name}: {merged.shape}") | |
| # Merge remaining datasets | |
| for name, df in datasets[1:]: | |
| logger.info(f"Merging {name}: {df.shape}") | |
| # Align on index | |
| merged = merged.join(df, how='outer') | |
| # Sort by date | |
| merged = merged.sort_index() | |
| logger.info(f"Final merged shape: {merged.shape}") | |
| return merged | |
| def get_data_summary(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Generate summary statistics for a dataset. | |
| Args: | |
| df: DataFrame to summarize. | |
| Returns: | |
| Dictionary with summary statistics. | |
| """ | |
| summary = { | |
| 'shape': df.shape, | |
| 'date_range': { | |
| 'start': df.index.min().isoformat() if len(df) > 0 else None, | |
| 'end': df.index.max().isoformat() if len(df) > 0 else None, | |
| 'trading_days': len(df) | |
| }, | |
| 'columns': { | |
| 'total': len(df.columns), | |
| 'list': df.columns.tolist() | |
| }, | |
| 'missing_data': { | |
| col: { | |
| 'count': int(df[col].isna().sum()), | |
| 'percent': float(df[col].isna().mean() * 100) | |
| } | |
| for col in df.columns | |
| }, | |
| 'statistics': df.describe().to_dict() | |
| } | |
| return summary | |
| def clear_all_caches(self) -> None: | |
| """Clear caches for all data sources.""" | |
| if self.fred: | |
| self.fred.clear_cache() | |
| if self.cboe: | |
| self.cboe.clear_cache() | |
| if self.yfinance: | |
| self.yfinance.clear_cache() | |
| if getattr(self, 'alpha_vantage', None): | |
| self.alpha_vantage.clear_cache() | |
| logger.info("Cleared all caches") | |
| if __name__ == "__main__": | |
| # Test the DataManager | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| dm = DataManager() | |
| # Collect a small sample | |
| print("\nCollecting data (small sample for testing)...") | |
| df = dm.collect_all( | |
| start_date=datetime(2023, 1, 1), | |
| end_date=datetime(2023, 12, 31), | |
| include_futures=False # Faster for testing | |
| ) | |
| print(f"\nDataset shape: {df.shape}") | |
| print(f"Columns: {df.columns.tolist()}") | |
| print(f"\nSample:\n{df.head()}") | |