stocks / core /data_sync.py
Arrechenash's picture
Initial Commit
da67450
import logging
import os
from datetime import UTC, datetime, timedelta
import pandas as pd
from config import (
ALPACA_SIP_DELAY_MINUTES,
CALENDAR_DIR,
CORPORATE_ACTIONS_DIR,
METADATA_DIR,
)
from config import (
DAILY_DATA as DATA_PATH,
)
from core.data_provider import AlpacaProvider, CorporateActionType
from core.utils.execution import ParallelExecutor
from core.utils.paths import ensure_dir
logger = logging.getLogger(__name__)
class DataSync:
def __init__(self):
self.provider = AlpacaProvider()
self.batch_size = 200
self.executor = ParallelExecutor(max_workers=10)
def get_nasdaq_symbols(self):
return self.provider.fetch_nasdaq_symbols()
def sync_metadata(self):
logger.info("--- Syncing Symbol Metadata ---")
df = self.provider.fetch_nasdaq_metadata()
ensure_dir(os.path.dirname(METADATA_DIR))
df.to_csv(METADATA_DIR, index=False)
logger.info(f"Saved {len(df)} symbols to {METADATA_DIR}")
def sync_bars(self, window_years=2):
logger.info(f"--- Syncing Bars ({window_years} years) ---")
symbols = self.get_nasdaq_symbols()
logger.info(f"Found {len(symbols)} symbols from Nasdaq.")
end_date = datetime.now(UTC) - timedelta(minutes=ALPACA_SIP_DELAY_MINUTES)
start_date = end_date - timedelta(days=window_years * 365)
batches = [symbols[i : i + self.batch_size] for i in range(0, len(symbols), self.batch_size)]
def process_batch(batch):
try:
bars = self.provider.fetch_daily_bars(symbols=batch, start=start_date, end=end_date)
if not bars.empty:
return bars.reset_index()
except Exception as e:
if "no data found" not in str(e).lower():
logger.error(f"Error fetching batch: {e}")
return None
logger.info(f"--- Downloading Data ({window_years} years) ---")
results = self.executor.map(process_batch, batches, desc="Syncing Batches")
if results:
logger.info("--- Saving Data ---")
df = pd.concat(results)
ensure_dir(os.path.dirname(DATA_PATH))
df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_convert("UTC")
df.to_parquet(DATA_PATH, index=False)
logger.info(f"Saved {len(df)} rows to {DATA_PATH}")
else:
logger.warning("No data fetched.")
def sync_calendar(self):
logger.info("--- Syncing Market Calendar ---")
df = self.provider.fetch_calendar()
ensure_dir(os.path.dirname(CALENDAR_DIR))
df.to_csv(CALENDAR_DIR, index=False)
logger.info(f"Saved calendar to {CALENDAR_DIR}")
def sync_corporate_actions(self, days=90):
logger.info(f"--- Syncing Corporate Actions (last {days} days) ---")
since = datetime.now().date() - timedelta(days=days)
# Fetch fresh data (Alpaca limits to 90 days)
df = self.provider.fetch_corporate_actions(
ca_types=[
CorporateActionType.SPLIT,
CorporateActionType.MERGER,
CorporateActionType.SPINOFF,
CorporateActionType.DIVIDEND,
],
since=since,
until=datetime.now().date(),
)
ensure_dir(os.path.dirname(CORPORATE_ACTIONS_DIR))
# Merge with existing data if available
if os.path.exists(CORPORATE_ACTIONS_DIR):
existing_df = pd.read_csv(CORPORATE_ACTIONS_DIR)
combined = pd.concat([existing_df, df], ignore_index=True)
combined = combined.drop_duplicates(subset=["corporate_action_id"], keep="last")
# Normalize ex_date to string for consistency
combined["ex_date"] = pd.to_datetime(combined["ex_date"]).dt.strftime("%Y-%m-%d")
combined.to_csv(CORPORATE_ACTIONS_DIR, index=False)
logger.info(f"Merged and saved {len(combined)} corporate actions to {CORPORATE_ACTIONS_DIR}")
else:
df.to_csv(CORPORATE_ACTIONS_DIR, index=False)
logger.info(f"Saved {len(df)} corporate actions to {CORPORATE_ACTIONS_DIR}")