import logging import os from datetime import UTC, datetime, timedelta import pandas as pd from config import ( ALPACA_SIP_DELAY_MINUTES, CALENDAR_DIR, CORPORATE_ACTIONS_DIR, METADATA_DIR, ) from config import ( DAILY_DATA as DATA_PATH, ) from core.data_provider import AlpacaProvider, CorporateActionType from core.utils.execution import ParallelExecutor from core.utils.paths import ensure_dir logger = logging.getLogger(__name__) class DataSync: def __init__(self): self.provider = AlpacaProvider() self.batch_size = 200 self.executor = ParallelExecutor(max_workers=10) def get_nasdaq_symbols(self): return self.provider.fetch_nasdaq_symbols() def sync_metadata(self): logger.info("--- Syncing Symbol Metadata ---") df = self.provider.fetch_nasdaq_metadata() ensure_dir(os.path.dirname(METADATA_DIR)) df.to_csv(METADATA_DIR, index=False) logger.info(f"Saved {len(df)} symbols to {METADATA_DIR}") def sync_bars(self, window_years=2): logger.info(f"--- Syncing Bars ({window_years} years) ---") symbols = self.get_nasdaq_symbols() logger.info(f"Found {len(symbols)} symbols from Nasdaq.") end_date = datetime.now(UTC) - timedelta(minutes=ALPACA_SIP_DELAY_MINUTES) start_date = end_date - timedelta(days=window_years * 365) batches = [symbols[i : i + self.batch_size] for i in range(0, len(symbols), self.batch_size)] def process_batch(batch): try: bars = self.provider.fetch_daily_bars(symbols=batch, start=start_date, end=end_date) if not bars.empty: return bars.reset_index() except Exception as e: if "no data found" not in str(e).lower(): logger.error(f"Error fetching batch: {e}") return None logger.info(f"--- Downloading Data ({window_years} years) ---") results = self.executor.map(process_batch, batches, desc="Syncing Batches") if results: logger.info("--- Saving Data ---") df = pd.concat(results) ensure_dir(os.path.dirname(DATA_PATH)) df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_convert("UTC") df.to_parquet(DATA_PATH, index=False) logger.info(f"Saved {len(df)} rows to {DATA_PATH}") else: logger.warning("No data fetched.") def sync_calendar(self): logger.info("--- Syncing Market Calendar ---") df = self.provider.fetch_calendar() ensure_dir(os.path.dirname(CALENDAR_DIR)) df.to_csv(CALENDAR_DIR, index=False) logger.info(f"Saved calendar to {CALENDAR_DIR}") def sync_corporate_actions(self, days=90): logger.info(f"--- Syncing Corporate Actions (last {days} days) ---") since = datetime.now().date() - timedelta(days=days) # Fetch fresh data (Alpaca limits to 90 days) df = self.provider.fetch_corporate_actions( ca_types=[ CorporateActionType.SPLIT, CorporateActionType.MERGER, CorporateActionType.SPINOFF, CorporateActionType.DIVIDEND, ], since=since, until=datetime.now().date(), ) ensure_dir(os.path.dirname(CORPORATE_ACTIONS_DIR)) # Merge with existing data if available if os.path.exists(CORPORATE_ACTIONS_DIR): existing_df = pd.read_csv(CORPORATE_ACTIONS_DIR) combined = pd.concat([existing_df, df], ignore_index=True) combined = combined.drop_duplicates(subset=["corporate_action_id"], keep="last") # Normalize ex_date to string for consistency combined["ex_date"] = pd.to_datetime(combined["ex_date"]).dt.strftime("%Y-%m-%d") combined.to_csv(CORPORATE_ACTIONS_DIR, index=False) logger.info(f"Merged and saved {len(combined)} corporate actions to {CORPORATE_ACTIONS_DIR}") else: df.to_csv(CORPORATE_ACTIONS_DIR, index=False) logger.info(f"Saved {len(df)} corporate actions to {CORPORATE_ACTIONS_DIR}")