File size: 4,201 Bytes
da67450
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import logging
import os
from datetime import UTC, datetime, timedelta

import pandas as pd

from config import (
    ALPACA_SIP_DELAY_MINUTES,
    CALENDAR_DIR,
    CORPORATE_ACTIONS_DIR,
    METADATA_DIR,
)
from config import (
    DAILY_DATA as DATA_PATH,
)
from core.data_provider import AlpacaProvider, CorporateActionType
from core.utils.execution import ParallelExecutor
from core.utils.paths import ensure_dir

logger = logging.getLogger(__name__)


class DataSync:
    def __init__(self):
        self.provider = AlpacaProvider()
        self.batch_size = 200
        self.executor = ParallelExecutor(max_workers=10)

    def get_nasdaq_symbols(self):
        return self.provider.fetch_nasdaq_symbols()

    def sync_metadata(self):
        logger.info("--- Syncing Symbol Metadata ---")
        df = self.provider.fetch_nasdaq_metadata()
        ensure_dir(os.path.dirname(METADATA_DIR))
        df.to_csv(METADATA_DIR, index=False)
        logger.info(f"Saved {len(df)} symbols to {METADATA_DIR}")

    def sync_bars(self, window_years=2):
        logger.info(f"--- Syncing Bars ({window_years} years) ---")
        symbols = self.get_nasdaq_symbols()
        logger.info(f"Found {len(symbols)} symbols from Nasdaq.")

        end_date = datetime.now(UTC) - timedelta(minutes=ALPACA_SIP_DELAY_MINUTES)
        start_date = end_date - timedelta(days=window_years * 365)

        batches = [symbols[i : i + self.batch_size] for i in range(0, len(symbols), self.batch_size)]

        def process_batch(batch):
            try:
                bars = self.provider.fetch_daily_bars(symbols=batch, start=start_date, end=end_date)
                if not bars.empty:
                    return bars.reset_index()
            except Exception as e:
                if "no data found" not in str(e).lower():
                    logger.error(f"Error fetching batch: {e}")
            return None

        logger.info(f"--- Downloading Data ({window_years} years) ---")
        results = self.executor.map(process_batch, batches, desc="Syncing Batches")

        if results:
            logger.info("--- Saving Data ---")
            df = pd.concat(results)
            ensure_dir(os.path.dirname(DATA_PATH))
            df["timestamp"] = pd.to_datetime(df["timestamp"]).dt.tz_convert("UTC")
            df.to_parquet(DATA_PATH, index=False)
            logger.info(f"Saved {len(df)} rows to {DATA_PATH}")
        else:
            logger.warning("No data fetched.")

    def sync_calendar(self):
        logger.info("--- Syncing Market Calendar ---")
        df = self.provider.fetch_calendar()
        ensure_dir(os.path.dirname(CALENDAR_DIR))
        df.to_csv(CALENDAR_DIR, index=False)
        logger.info(f"Saved calendar to {CALENDAR_DIR}")

    def sync_corporate_actions(self, days=90):
        logger.info(f"--- Syncing Corporate Actions (last {days} days) ---")
        since = datetime.now().date() - timedelta(days=days)

        # Fetch fresh data (Alpaca limits to 90 days)
        df = self.provider.fetch_corporate_actions(
            ca_types=[
                CorporateActionType.SPLIT,
                CorporateActionType.MERGER,
                CorporateActionType.SPINOFF,
                CorporateActionType.DIVIDEND,
            ],
            since=since,
            until=datetime.now().date(),
        )

        ensure_dir(os.path.dirname(CORPORATE_ACTIONS_DIR))

        # Merge with existing data if available
        if os.path.exists(CORPORATE_ACTIONS_DIR):
            existing_df = pd.read_csv(CORPORATE_ACTIONS_DIR)
            combined = pd.concat([existing_df, df], ignore_index=True)
            combined = combined.drop_duplicates(subset=["corporate_action_id"], keep="last")
            # Normalize ex_date to string for consistency
            combined["ex_date"] = pd.to_datetime(combined["ex_date"]).dt.strftime("%Y-%m-%d")
            combined.to_csv(CORPORATE_ACTIONS_DIR, index=False)
            logger.info(f"Merged and saved {len(combined)} corporate actions to {CORPORATE_ACTIONS_DIR}")
        else:
            df.to_csv(CORPORATE_ACTIONS_DIR, index=False)
            logger.info(f"Saved {len(df)} corporate actions to {CORPORATE_ACTIONS_DIR}")