Spaces:
Running
Running
File size: 4,201 Bytes
da67450 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | import logging
import os
from datetime import UTC, datetime, timedelta
import pandas as pd
from config import (
ALPACA_SIP_DELAY_MINUTES,
CALENDAR_DIR,
CORPORATE_ACTIONS_DIR,
METADATA_DIR,
)
from config import (
DAILY_DATA as DATA_PATH,
)
from core.data_provider import AlpacaProvider, CorporateActionType
from core.utils.execution import ParallelExecutor
from core.utils.paths import ensure_dir
logger = logging.getLogger(__name__)
class DataSync:
def __init__(self):
self.provider = AlpacaProvider()
self.batch_size = 200
self.executor = ParallelExecutor(max_workers=10)
def get_nasdaq_symbols(self):
return self.provider.fetch_nasdaq_symbols()
def sync_metadata(self):
logger.info("--- Syncing Symbol Metadata ---")
df = self.provider.fetch_nasdaq_metadata()
ensure_dir(os.path.dirname(METADATA_DIR))
df.to_csv(METADATA_DIR, index=False)
logger.info(f"Saved {len(df)} symbols to {METADATA_DIR}")
def sync_bars(self, window_years=2):
logger.info(f"--- Syncing Bars ({window_years} years) ---")
symbols = self.get_nasdaq_symbols()
logger.info(f"Found {len(symbols)} symbols from Nasdaq.")
end_date = datetime.now(UTC) - timedelta(minutes=ALPACA_SIP_DELAY_MINUTES)
start_date = end_date - timedelta(days=window_years * 365)
batches = [symbols[i : i + self.batch_size] for i in range(0, len(symbols), self.batch_size)]
def process_batch(batch):
try:
bars = self.provider.fetch_daily_bars(symbols=batch, start=start_date, end=end_date)
if not bars.empty:
return bars.reset_index()
except Exception as e:
if "no data found" not in str(e).lower():
logger.error(f"Error fetching batch: {e}")
return None
logger.info(f"--- Downloading Data ({window_years} years) ---")
results = self.executor.map(process_batch, batches, desc="Syncing Batches")
if results:
logger.info("--- Saving Data ---")
df = pd.concat(results)
ensure_dir(os.path.dirname(DATA_PATH))
df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_convert("UTC")
df.to_parquet(DATA_PATH, index=False)
logger.info(f"Saved {len(df)} rows to {DATA_PATH}")
else:
logger.warning("No data fetched.")
def sync_calendar(self):
logger.info("--- Syncing Market Calendar ---")
df = self.provider.fetch_calendar()
ensure_dir(os.path.dirname(CALENDAR_DIR))
df.to_csv(CALENDAR_DIR, index=False)
logger.info(f"Saved calendar to {CALENDAR_DIR}")
def sync_corporate_actions(self, days=90):
logger.info(f"--- Syncing Corporate Actions (last {days} days) ---")
since = datetime.now().date() - timedelta(days=days)
# Fetch fresh data (Alpaca limits to 90 days)
df = self.provider.fetch_corporate_actions(
ca_types=[
CorporateActionType.SPLIT,
CorporateActionType.MERGER,
CorporateActionType.SPINOFF,
CorporateActionType.DIVIDEND,
],
since=since,
until=datetime.now().date(),
)
ensure_dir(os.path.dirname(CORPORATE_ACTIONS_DIR))
# Merge with existing data if available
if os.path.exists(CORPORATE_ACTIONS_DIR):
existing_df = pd.read_csv(CORPORATE_ACTIONS_DIR)
combined = pd.concat([existing_df, df], ignore_index=True)
combined = combined.drop_duplicates(subset=["corporate_action_id"], keep="last")
# Normalize ex_date to string for consistency
combined["ex_date"] = pd.to_datetime(combined["ex_date"]).dt.strftime("%Y-%m-%d")
combined.to_csv(CORPORATE_ACTIONS_DIR, index=False)
logger.info(f"Merged and saved {len(combined)} corporate actions to {CORPORATE_ACTIONS_DIR}")
else:
df.to_csv(CORPORATE_ACTIONS_DIR, index=False)
logger.info(f"Saved {len(df)} corporate actions to {CORPORATE_ACTIONS_DIR}")
|