ckharche's picture
Update trade_analysis/data.py
edf18c6 verified
import pandas as pd
import numpy as np
import httpx
from datetime import datetime, timedelta
from . import cache, config
import asyncpraw
import json
import os
from pathlib import Path
import pytz
# This part you added is kept, as it's a good fallback
LOCAL_DATA_DIR = Path(os.getenv(
"TRADE_LOCAL_DATA",
"./local_data" # Simplified default for portability
))
LOCAL_DATA_DIR.mkdir(exist_ok=True)
def _process_finnhub_data(data: dict, symbol: str) -> pd.DataFrame:
"""Processes JSON from Finnhub into a clean DataFrame."""
if not data or data.get('s') != 'ok' or 'c' not in data:
print(f"No valid data received from Finnhub for {symbol}.")
return pd.DataFrame()
df = pd.DataFrame({
'Open': data['o'],
'High': data['h'],
'Low': data['l'],
'Close': data['c'],
'Volume': data['v']
})
# Finnhub timestamps are UNIX timestamps
df.index = pd.to_datetime(data['t'], unit='s', utc=True)
df.dropna(inplace=True)
return df
class UnifiedDataProvider:
def __init__(self):
# Create a single, reusable client for all API calls
self.client = httpx.AsyncClient(timeout=20.0)
self.reddit_instance = None
self.local_data_cache = {}
print("✅ UnifiedDataProvider initialized with Finnhub.")
def _load_local_data(self, symbol: str) -> dict | None:
"""Loads symbol JSON from local files if present."""
path = LOCAL_DATA_DIR / f"{symbol.upper()}_external_data.json"
if path.exists():
print(f"Loading local data from: {path}")
with open(path, "r") as f:
data = json.load(f)
self.local_data_cache[symbol.upper()] = data
return data
return None
async def fetch_multi_timeframe_stock_data(self, symbol: str) -> dict:
"""
REWRITTEN FOR FREE TIER: Fetches the current day's quote data from Finnhub.
Historical candle data is a premium feature.
"""
print(f"Fetching Finnhub REAL-TIME QUOTE for {symbol}...")
dfs = {}
base_url = "https://finnhub.io/api/v1/quote"
params = {"symbol": symbol, "token": config.FINNHUB_KEY}
try:
res = await self.client.get(base_url, params=params)
res.raise_for_status()
data = res.json()
if data and 'c' in data and data['c'] != 0:
# Create a single-row DataFrame
df = pd.DataFrame([{
'Open': data['o'],
'High': data['h'],
'Low': data['l'],
'Close': data['c'],
# Volume is not in the quote endpoint, so we put a placeholder
'Volume': 0
}])
# Use the current time for the index
df.index = pd.to_datetime([datetime.now(tz=pytz.UTC)])
# Since we only have one row, we can't do multi-timeframe.
# We will return this single row for the 'daily' key.
dfs["daily"] = df
print(f" - Fetched real-time quote for {symbol} from Finnhub.")
else:
dfs["daily"] = pd.DataFrame()
except (httpx.RequestError, httpx.HTTPStatusError) as e:
print(f"ERROR: Finnhub Quote request failed for {symbol}: {e}")
dfs["daily"] = pd.DataFrame()
return dfs
async def fetch_news(self, symbol: str, days: int = 3) -> tuple:
"""Fetches news from Finnhub, with local file fallback."""
local_data = self._load_local_data(symbol)
if local_data and 'news_data' in local_data:
return local_data['news_data'], "local_file"
cache_key = f"news_{symbol}_{days}"
cached_data = cache.get(cache_key)
if cached_data:
return cached_data, "cache"
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
url = f"https://finnhub.io/api/v1/company-news"
params = {
"symbol": symbol,
"from": start_date.strftime('%Y-%m-%d'),
"to": end_date.strftime('%Y-%m-%d'),
"token": config.FINNHUB_KEY
}
try:
res = await self.client.get(url, params=params)
res.raise_for_status()
data = res.json()
cache.put(cache_key, data)
return data, "api"
except (httpx.RequestError, httpx.HTTPStatusError) as e:
print(f"ERROR: Finnhub news request failed for {symbol}: {e}")
return [], "error"
def _get_reddit_instance(self):
"""Initializes the asyncpraw Reddit instance."""
if self.reddit_instance is None:
self.reddit_instance = asyncpraw.Reddit(
client_id=config.REDDIT_CLIENT_ID,
client_secret=config.REDDIT_CLIENT_SECRET,
user_agent=config.REDDIT_USER_AGENT
)
return self.reddit_instance
async def fetch_reddit_data(self, symbol: str, limit: int = 25) -> tuple:
"""Fetches Reddit data, with local file fallback."""
local_data = self._load_local_data(symbol)
if local_data and 'reddit_data' in local_data:
return local_data['reddit_data'], "local_file"
cache_key = f"reddit_{symbol}_{limit}"
cached_data = cache.get(cache_key)
if cached_data:
return cached_data, "cache"
reddit = self._get_reddit_instance()
submissions_data = []
query = f'"{symbol}" OR "${symbol}"'
subreddits = ["stocks", "wallstreetbets", "options"]
for sub_name in subreddits:
try:
subreddit = await reddit.subreddit(sub_name)
async for submission in subreddit.search(query, limit=limit, sort='new'):
submissions_data.append({
'title': submission.title, 'score': submission.score,
'url': submission.url, 'created_utc': submission.created_utc,
'subreddit': sub_name
})
except Exception as e:
print(f"ERROR: Reddit fetch for {symbol} in {sub_name} failed: {e}")
cache.put(cache_key, submissions_data)
return submissions_data, "api"
def get_alternative_data(self, symbol: str) -> dict:
"""
Gets VIX and sector data from Finnhub.
NOTE: Put/Call and IV are hardcoded as they require a separate options data provider.
"""
# This function is not async because Finnhub's free plan is slow,
# and running these sequentially is more stable than parallel async calls.
vix_level = 20.0
sector = "Unknown"
try:
# Finnhub uses .VIX for the index, but we use a synchronous httpx client here
with httpx.Client() as sync_client:
# Get VIX
vix_url = f"https://finnhub.io/api/v1/quote?symbol=^VIX&token={config.FINNHUB_KEY}"
vix_res = sync_client.get(vix_url)
if vix_res.status_code == 200:
vix_level = vix_res.json().get('c', 20.0)
# Get Company Profile for Sector
profile_url = f"https://finnhub.io/api/v1/stock/profile2?symbol={symbol}&token={config.FINNHUB_KEY}"
profile_res = sync_client.get(profile_url)
if profile_res.status_code == 200:
sector = profile_res.json().get('finnhubIndustry', 'Unknown')
except Exception as e:
print(f"ERROR fetching alternative data for {symbol}: {e}")
return {
"vix_level": round(vix_level, 2),
"sector": sector,
"put_call_ratio": 0.85, # Hardcoded: Requires options data provider
"iv_rank": 45.5 # Hardcoded: Requires options data provider
}
async def close(self):
"""Closes all persistent connections."""
if self.reddit_instance:
await self.reddit_instance.close()
print("Reddit instance closed.")
await self.client.aclose()
print("HTTPX client closed.")