Spaces:
Running
Running
| import logging | |
| import os | |
| from datetime import UTC, datetime, timedelta | |
| import pandas as pd | |
| from config import ( | |
| ALPACA_SIP_DELAY_MINUTES, | |
| CALENDAR_DIR, | |
| CORPORATE_ACTIONS_DIR, | |
| METADATA_DIR, | |
| ) | |
| from config import ( | |
| DAILY_DATA as DATA_PATH, | |
| ) | |
| from core.data_provider import AlpacaProvider, CorporateActionType | |
| from core.utils.execution import ParallelExecutor | |
| from core.utils.paths import ensure_dir | |
| logger = logging.getLogger(__name__) | |
| class DataSync: | |
| def __init__(self): | |
| self.provider = AlpacaProvider() | |
| self.batch_size = 200 | |
| self.executor = ParallelExecutor(max_workers=10) | |
| def get_nasdaq_symbols(self): | |
| return self.provider.fetch_nasdaq_symbols() | |
| def sync_metadata(self): | |
| logger.info("--- Syncing Symbol Metadata ---") | |
| df = self.provider.fetch_nasdaq_metadata() | |
| ensure_dir(os.path.dirname(METADATA_DIR)) | |
| df.to_csv(METADATA_DIR, index=False) | |
| logger.info(f"Saved {len(df)} symbols to {METADATA_DIR}") | |
| def sync_bars(self, window_years=2): | |
| logger.info(f"--- Syncing Bars ({window_years} years) ---") | |
| symbols = self.get_nasdaq_symbols() | |
| logger.info(f"Found {len(symbols)} symbols from Nasdaq.") | |
| end_date = datetime.now(UTC) - timedelta(minutes=ALPACA_SIP_DELAY_MINUTES) | |
| start_date = end_date - timedelta(days=window_years * 365) | |
| batches = [symbols[i : i + self.batch_size] for i in range(0, len(symbols), self.batch_size)] | |
| def process_batch(batch): | |
| try: | |
| bars = self.provider.fetch_daily_bars(symbols=batch, start=start_date, end=end_date) | |
| if not bars.empty: | |
| return bars.reset_index() | |
| except Exception as e: | |
| if "no data found" not in str(e).lower(): | |
| logger.error(f"Error fetching batch: {e}") | |
| return None | |
| logger.info(f"--- Downloading Data ({window_years} years) ---") | |
| results = self.executor.map(process_batch, batches, desc="Syncing Batches") | |
| if results: | |
| logger.info("--- Saving Data ---") | |
| df = pd.concat(results) | |
| ensure_dir(os.path.dirname(DATA_PATH)) | |
| df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_convert("UTC") | |
| df.to_parquet(DATA_PATH, index=False) | |
| logger.info(f"Saved {len(df)} rows to {DATA_PATH}") | |
| else: | |
| logger.warning("No data fetched.") | |
| def sync_calendar(self): | |
| logger.info("--- Syncing Market Calendar ---") | |
| df = self.provider.fetch_calendar() | |
| ensure_dir(os.path.dirname(CALENDAR_DIR)) | |
| df.to_csv(CALENDAR_DIR, index=False) | |
| logger.info(f"Saved calendar to {CALENDAR_DIR}") | |
| def sync_corporate_actions(self, days=90): | |
| logger.info(f"--- Syncing Corporate Actions (last {days} days) ---") | |
| since = datetime.now().date() - timedelta(days=days) | |
| # Fetch fresh data (Alpaca limits to 90 days) | |
| df = self.provider.fetch_corporate_actions( | |
| ca_types=[ | |
| CorporateActionType.SPLIT, | |
| CorporateActionType.MERGER, | |
| CorporateActionType.SPINOFF, | |
| CorporateActionType.DIVIDEND, | |
| ], | |
| since=since, | |
| until=datetime.now().date(), | |
| ) | |
| ensure_dir(os.path.dirname(CORPORATE_ACTIONS_DIR)) | |
| # Merge with existing data if available | |
| if os.path.exists(CORPORATE_ACTIONS_DIR): | |
| existing_df = pd.read_csv(CORPORATE_ACTIONS_DIR) | |
| combined = pd.concat([existing_df, df], ignore_index=True) | |
| combined = combined.drop_duplicates(subset=["corporate_action_id"], keep="last") | |
| # Normalize ex_date to string for consistency | |
| combined["ex_date"] = pd.to_datetime(combined["ex_date"]).dt.strftime("%Y-%m-%d") | |
| combined.to_csv(CORPORATE_ACTIONS_DIR, index=False) | |
| logger.info(f"Merged and saved {len(combined)} corporate actions to {CORPORATE_ACTIONS_DIR}") | |
| else: | |
| df.to_csv(CORPORATE_ACTIONS_DIR, index=False) | |
| logger.info(f"Saved {len(df)} corporate actions to {CORPORATE_ACTIONS_DIR}") | |