Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| import httpx | |
| from datetime import datetime, timedelta | |
| from . import cache, config | |
| import asyncpraw | |
| import json | |
| import os | |
| from pathlib import Path | |
| import pytz | |
| # This part you added is kept, as it's a good fallback | |
| LOCAL_DATA_DIR = Path(os.getenv( | |
| "TRADE_LOCAL_DATA", | |
| "./local_data" # Simplified default for portability | |
| )) | |
| LOCAL_DATA_DIR.mkdir(exist_ok=True) | |
| def _process_finnhub_data(data: dict, symbol: str) -> pd.DataFrame: | |
| """Processes JSON from Finnhub into a clean DataFrame.""" | |
| if not data or data.get('s') != 'ok' or 'c' not in data: | |
| print(f"No valid data received from Finnhub for {symbol}.") | |
| return pd.DataFrame() | |
| df = pd.DataFrame({ | |
| 'Open': data['o'], | |
| 'High': data['h'], | |
| 'Low': data['l'], | |
| 'Close': data['c'], | |
| 'Volume': data['v'] | |
| }) | |
| # Finnhub timestamps are UNIX timestamps | |
| df.index = pd.to_datetime(data['t'], unit='s', utc=True) | |
| df.dropna(inplace=True) | |
| return df | |
| class UnifiedDataProvider: | |
| def __init__(self): | |
| # Create a single, reusable client for all API calls | |
| self.client = httpx.AsyncClient(timeout=20.0) | |
| self.reddit_instance = None | |
| self.local_data_cache = {} | |
| print("✅ UnifiedDataProvider initialized with Finnhub.") | |
| def _load_local_data(self, symbol: str) -> dict | None: | |
| """Loads symbol JSON from local files if present.""" | |
| path = LOCAL_DATA_DIR / f"{symbol.upper()}_external_data.json" | |
| if path.exists(): | |
| print(f"Loading local data from: {path}") | |
| with open(path, "r") as f: | |
| data = json.load(f) | |
| self.local_data_cache[symbol.upper()] = data | |
| return data | |
| return None | |
| async def fetch_multi_timeframe_stock_data(self, symbol: str) -> dict: | |
| """ | |
| REWRITTEN FOR FREE TIER: Fetches the current day's quote data from Finnhub. | |
| Historical candle data is a premium feature. | |
| """ | |
| print(f"Fetching Finnhub REAL-TIME QUOTE for {symbol}...") | |
| dfs = {} | |
| base_url = "https://finnhub.io/api/v1/quote" | |
| params = {"symbol": symbol, "token": config.FINNHUB_KEY} | |
| try: | |
| res = await self.client.get(base_url, params=params) | |
| res.raise_for_status() | |
| data = res.json() | |
| if data and 'c' in data and data['c'] != 0: | |
| # Create a single-row DataFrame | |
| df = pd.DataFrame([{ | |
| 'Open': data['o'], | |
| 'High': data['h'], | |
| 'Low': data['l'], | |
| 'Close': data['c'], | |
| # Volume is not in the quote endpoint, so we put a placeholder | |
| 'Volume': 0 | |
| }]) | |
| # Use the current time for the index | |
| df.index = pd.to_datetime([datetime.now(tz=pytz.UTC)]) | |
| # Since we only have one row, we can't do multi-timeframe. | |
| # We will return this single row for the 'daily' key. | |
| dfs["daily"] = df | |
| print(f" - Fetched real-time quote for {symbol} from Finnhub.") | |
| else: | |
| dfs["daily"] = pd.DataFrame() | |
| except (httpx.RequestError, httpx.HTTPStatusError) as e: | |
| print(f"ERROR: Finnhub Quote request failed for {symbol}: {e}") | |
| dfs["daily"] = pd.DataFrame() | |
| return dfs | |
| async def fetch_news(self, symbol: str, days: int = 3) -> tuple: | |
| """Fetches news from Finnhub, with local file fallback.""" | |
| local_data = self._load_local_data(symbol) | |
| if local_data and 'news_data' in local_data: | |
| return local_data['news_data'], "local_file" | |
| cache_key = f"news_{symbol}_{days}" | |
| cached_data = cache.get(cache_key) | |
| if cached_data: | |
| return cached_data, "cache" | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days) | |
| url = f"https://finnhub.io/api/v1/company-news" | |
| params = { | |
| "symbol": symbol, | |
| "from": start_date.strftime('%Y-%m-%d'), | |
| "to": end_date.strftime('%Y-%m-%d'), | |
| "token": config.FINNHUB_KEY | |
| } | |
| try: | |
| res = await self.client.get(url, params=params) | |
| res.raise_for_status() | |
| data = res.json() | |
| cache.put(cache_key, data) | |
| return data, "api" | |
| except (httpx.RequestError, httpx.HTTPStatusError) as e: | |
| print(f"ERROR: Finnhub news request failed for {symbol}: {e}") | |
| return [], "error" | |
| def _get_reddit_instance(self): | |
| """Initializes the asyncpraw Reddit instance.""" | |
| if self.reddit_instance is None: | |
| self.reddit_instance = asyncpraw.Reddit( | |
| client_id=config.REDDIT_CLIENT_ID, | |
| client_secret=config.REDDIT_CLIENT_SECRET, | |
| user_agent=config.REDDIT_USER_AGENT | |
| ) | |
| return self.reddit_instance | |
| async def fetch_reddit_data(self, symbol: str, limit: int = 25) -> tuple: | |
| """Fetches Reddit data, with local file fallback.""" | |
| local_data = self._load_local_data(symbol) | |
| if local_data and 'reddit_data' in local_data: | |
| return local_data['reddit_data'], "local_file" | |
| cache_key = f"reddit_{symbol}_{limit}" | |
| cached_data = cache.get(cache_key) | |
| if cached_data: | |
| return cached_data, "cache" | |
| reddit = self._get_reddit_instance() | |
| submissions_data = [] | |
| query = f'"{symbol}" OR "${symbol}"' | |
| subreddits = ["stocks", "wallstreetbets", "options"] | |
| for sub_name in subreddits: | |
| try: | |
| subreddit = await reddit.subreddit(sub_name) | |
| async for submission in subreddit.search(query, limit=limit, sort='new'): | |
| submissions_data.append({ | |
| 'title': submission.title, 'score': submission.score, | |
| 'url': submission.url, 'created_utc': submission.created_utc, | |
| 'subreddit': sub_name | |
| }) | |
| except Exception as e: | |
| print(f"ERROR: Reddit fetch for {symbol} in {sub_name} failed: {e}") | |
| cache.put(cache_key, submissions_data) | |
| return submissions_data, "api" | |
| def get_alternative_data(self, symbol: str) -> dict: | |
| """ | |
| Gets VIX and sector data from Finnhub. | |
| NOTE: Put/Call and IV are hardcoded as they require a separate options data provider. | |
| """ | |
| # This function is not async because Finnhub's free plan is slow, | |
| # and running these sequentially is more stable than parallel async calls. | |
| vix_level = 20.0 | |
| sector = "Unknown" | |
| try: | |
| # Finnhub uses .VIX for the index, but we use a synchronous httpx client here | |
| with httpx.Client() as sync_client: | |
| # Get VIX | |
| vix_url = f"https://finnhub.io/api/v1/quote?symbol=^VIX&token={config.FINNHUB_KEY}" | |
| vix_res = sync_client.get(vix_url) | |
| if vix_res.status_code == 200: | |
| vix_level = vix_res.json().get('c', 20.0) | |
| # Get Company Profile for Sector | |
| profile_url = f"https://finnhub.io/api/v1/stock/profile2?symbol={symbol}&token={config.FINNHUB_KEY}" | |
| profile_res = sync_client.get(profile_url) | |
| if profile_res.status_code == 200: | |
| sector = profile_res.json().get('finnhubIndustry', 'Unknown') | |
| except Exception as e: | |
| print(f"ERROR fetching alternative data for {symbol}: {e}") | |
| return { | |
| "vix_level": round(vix_level, 2), | |
| "sector": sector, | |
| "put_call_ratio": 0.85, # Hardcoded: Requires options data provider | |
| "iv_rank": 45.5 # Hardcoded: Requires options data provider | |
| } | |
| async def close(self): | |
| """Closes all persistent connections.""" | |
| if self.reddit_instance: | |
| await self.reddit_instance.close() | |
| print("Reddit instance closed.") | |
| await self.client.aclose() | |
| print("HTTPX client closed.") |