File size: 11,410 Bytes
cafdd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28bc1a7
 
cafdd88
28bc1a7
 
 
 
 
cafdd88
28bc1a7
 
 
 
5d6b190
 
 
cafdd88
5d6b190
 
 
 
 
 
 
 
 
 
 
 
 
 
28bc1a7
5d6b190
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b62855
5d6b190
 
 
 
 
 
 
 
28bc1a7
cafdd88
2b62855
 
28bc1a7
 
 
 
cafdd88
28bc1a7
 
 
 
 
 
 
 
 
 
 
 
 
 
2b62855
 
28bc1a7
2b62855
 
 
4aac777
2b62855
 
e3682e7
cafdd88
28bc1a7
 
e3682e7
 
 
 
 
28bc1a7
 
 
e3682e7
 
cafdd88
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44f08fc
 
 
aa68b93
 
44f08fc
 
 
 
 
 
 
 
 
 
 
aa68b93
 
44f08fc
aa68b93
44f08fc
2b62855
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import yfinance as yf
import pandas as pd
import numpy as np
import json
import os
import logging
from typing import List, Dict, Optional
from core.schema import TickerData
from config import settings

logging.basicConfig(level=settings.LOG_LEVEL)
logger = logging.getLogger(__name__)

class SectorCache:
    """
    Manages a local cache of Ticker -> Sector mappings to avoid 
    yfinance API throttling and improve speed.
    """
    def __init__(self, cache_file: str = settings.SECTOR_MAP_FILE):
        self.cache_file = cache_file
        self.sector_map = self._load_cache()

    def _load_cache(self) -> Dict[str, str]:
        if os.path.exists(self.cache_file):
            try:
                with open(self.cache_file, 'r') as f:
                    return json.load(f)
            except Exception as e:
                logger.error(f"Failed to load sector cache: {e}")
                return {}
        return {}

    def save_cache(self):
        os.makedirs(os.path.dirname(self.cache_file), exist_ok=True)
        with open(self.cache_file, 'w') as f:
            json.dump(self.sector_map, f, indent=2)

    def get_sector(self, ticker: str) -> Optional[str]:
        return self.sector_map.get(ticker)

    def update_sector(self, ticker: str, sector: str):
        self.sector_map[ticker] = sector

class MarketDataEngine:
    """
    Handles robust data ingestion from diverse sources (Wikipedia, yfinance).
    Implements data cleaning and validation policies.
    """
    def __init__(self):
        self.sector_cache = SectorCache()
        
    def fetch_sp500_tickers(self) -> List[str]:
        """
        Loads S&P 500 components from a static JSON file (Production Mode).
        Eliminates dependency on Wikipedia scraping.
        """
        try:
            universe_file = os.path.join(os.path.dirname(__file__), 'sp500_universe.json')
            
            # If we happen to not have the file, use the fallback list
            if not os.path.exists(universe_file):
                logger.warning("Universe file not found. Using fallback.")
                return self._get_fallback_tickers()
                
            with open(universe_file, 'r') as f:
                universe_data = json.load(f)
                
            tickers = []
            for item in universe_data:
                ticker = item['ticker']
                sector = item['sector']
                tickers.append(ticker)
                self.sector_cache.update_sector(ticker, sector)
                
            self.sector_cache.save_cache()
            logger.info(f"Successfully loaded {len(tickers)} tickers from static universe.")
            return tickers

        except Exception as e:
            logger.error(f"Error loading universe: {e}")
            return self._get_fallback_tickers()

    def _get_fallback_tickers(self) -> List[str]:
        # Fallback for Demo Reliability
        fallback_map = {
            "AAPL": "Information Technology", "MSFT": "Information Technology", "GOOGL": "Communication Services",
            "AMZN": "Consumer Discretionary", "NVDA": "Information Technology", "META": "Communication Services",
            "TSLA": "Consumer Discretionary", "BRK-B": "Financials", "V": "Financials", "UNH": "Health Care",
            "XOM": "Energy", "JNJ": "Health Care", "JPM": "Financials", "PG": "Consumer Staples",
            "LLY": "Health Care", "MA": "Financials", "CVX": "Energy", "MRK": "Health Care",
            "HD": "Consumer Discretionary", "PEP": "Consumer Staples", "COST": "Consumer Staples"
        }
        for t, s in fallback_map.items():
            self.sector_cache.update_sector(t, s)
        return list(fallback_map.keys())

    def fetch_market_data(self, tickers: List[str], start_date: str = "2023-01-01") -> pd.DataFrame:
        """
        Fetches adjusted close prices for a list of tickers using REAL data logic.
        Uses sequential fetching (threads=False) and retries to handle rate limits.
        """
        import time
        
        # Clean tickers
        valid_tickers = [t.strip().upper() for t in tickers if t]
        if not valid_tickers:
            return pd.DataFrame()
            
        logger.info(f"Downloading prices for {len(valid_tickers)} tickers (Real Data Mode)...")
        
        data = pd.DataFrame()
        # Chunked Download Strategy to avoid timeouts/rate-limits
        chunk_size = 20
        all_data = []
        
        for i in range(0, len(valid_tickers), chunk_size):
            chunk = valid_tickers[i:i+chunk_size]
            logger.info(f"Downloading chunk {i//chunk_size + 1}: {chunk[:3]}...")
            
            chunk_data = pd.DataFrame()
            # Retry logic per chunk
            for attempt in range(3):
                try:
                    # Ticker-by-Ticker usually more reliable for small batches than bulk download if bulk is failing
                    # But let's stick to download() for speed, just smaller batches.
                    
                    # Note: threads=True might actually be better for speed if we are chunking, 
                    # but threads=False is safer for rate limits. Let's try threads=False but small chunks.
                    temp = yf.download(chunk, start=start_date, group_by='ticker', threads=False, progress=False, timeout=20)
                    
                    if not temp.empty:
                        chunk_data = temp
                        break
                    time.sleep(1)
                except Exception as e:
                    logger.warning(f"Chunk failed: {e}")
                    time.sleep(1)
            
            if not chunk_data.empty:
                all_data.append(chunk_data)
        
        if not all_data:
             logger.error("All chunks failed.")
             # If user insists on live data, we might return empty here? 
             # But let's keep the fallback but make it less likely to be needed.
             pass # Will fall through to empty check

        # Concatenate
        try:
            if all_data:
                data = pd.concat(all_data, axis=1)
            else:
                data = pd.DataFrame()
        except:
            data = pd.DataFrame()

        if data.empty:
            logger.error("All download attempts failed. Switching to SYNTHETIC data.")
            return self._generate_synthetic_data(valid_tickers, start_date)

        try:
            # Handle MultiIndex
            df_close = pd.DataFrame()
            
            if len(valid_tickers) == 1:
                t = valid_tickers[0]
                if 'Adj Close' in data.columns:
                    df_close[t] = data['Adj Close']
                elif 'Close' in data.columns:
                     df_close[t] = data['Close']
            else:
                try:
                    df_close = data['Adj Close']
                except KeyError:
                    try:
                        df_close = data.xs('Adj Close', level=0, axis=1)
                    except:
                        try:
                            # Fix for group_by='ticker' (Adj Close is at Level 1)
                            df_close = data.xs('Adj Close', level=1, axis=1)
                        except:
                            try:
                                df_close = data['Close']
                            except:
                                try:
                                    df_close = data.xs('Close', level=1, axis=1)
                                except:
                                    pass

            # Drop columns with all NaNs
            df_close.dropna(axis=1, how='all', inplace=True)
            
            if df_close.empty:
                logger.warning("Extraction resulted in empty DataFrame. Switching to SYNTHETIC data.")
                return self._generate_synthetic_data(valid_tickers, start_date)
                
            return df_close
            
        except Exception as e:
            logger.error(f"Error processing market data: {e}. Switching to SYNTHETIC data.")
            return self._generate_synthetic_data(valid_tickers, start_date)

    def _clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Applies data quality rules:
        1. Drop columns with > 10% missing data.
        2. Forward fill then Backward fill remaining NaNs.
        """
        initial_count = len(df.columns)
        
        # Rule 1: Drop > 10% missing
        missing_frac = df.isnull().mean()
        drop_cols = missing_frac[missing_frac > 0.10].index.tolist()
        df_clean = df.drop(columns=drop_cols)
        
        dropped_count = len(drop_cols)
        if dropped_count > 0:
            logger.warning(f"Dropped {dropped_count} tickers due to >10% missing data: {drop_cols[:5]}...")
            
        # Rule 2: Fill NaNs
        df_clean = df_clean.ffill().bfill()
        
        logger.info(f"Data cleaning complete. Retained {len(df_clean.columns)}/{initial_count} tickers.")
        return df_clean

    def get_sector_map(self) -> Dict[str, str]:
        return self.sector_cache.sector_map

    def fetch_market_caps(self, tickers: List[str]) -> Dict[str, float]:
        """
        Returns market caps from local static cache.
        Does NOT fetch live to avoid timeouts/rate-limits on HF Spaces.
        """
        cache_file = os.path.join(settings.DATA_DIR, "market_cap_cache.json")
        caps = {}
        
        # Load Cache
        if os.path.exists(cache_file):
            try:
                with open(cache_file, 'r') as f:
                    caps = json.load(f)
            except Exception as e:
                logger.error(f"Failed to load cap cache: {e}")
        else:
            logger.warning("Market Cap Cache file not found! 'Smallest/Largest' strategies may fail.")
            
        # Return requested
        return {t: caps.get(t, 0) for t in tickers}

    def _generate_synthetic_data(self, tickers: List[str], start_date: str) -> pd.DataFrame:
        """
        Generates realistic-looking random walk data for tickers
        to ensure the app runs even if Yahoo Finance is down.
        """
        logger.warning(f"Generating SYNTHETIC market data for {len(tickers)} tickers (Demo Mode).")
        try:
            dates = pd.date_range(start=start_date, end=pd.Timestamp.now(), freq='B')
            df = pd.DataFrame(index=dates)
            
            # Consistent random seed so the "demo" looks stable between refreshes
            np.random.seed(42)
            
            for ticker in tickers:
                # Start price between 50 and 200
                start_price = np.random.uniform(50, 200)
                
                # Generate returns: Drift + Volatility
                # Annual Drift ~ 10%, Annual Vol ~ 20%
                # Daily Drift ~ 10%/252, Daily Vol ~ 20%/sqrt(252)
                mu = 0.10 / 252
                sigma = 0.20 / np.sqrt(252)
                
                returns = np.random.normal(mu, sigma, len(dates))
                
                # Path
                price_path = start_price * (1 + returns).cumprod()
                df[ticker] = price_path
                
            return df
            
        except Exception as e:
            logger.error(f"Error generating synthetic data: {e}")
            return pd.DataFrame()