portfolio_opt / alternative_data.py
engineportf's picture
Add Quant Terminal Easter Egg
aceffd1
Raw
History Blame Contribute Delete
7.98 kB
import yfinance as yf
import pandas as pd
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import logger, Color
import warnings
import requests
import urllib.parse
import xml.etree.ElementTree as ET
def _fetch_single_news_sentiment(ticker: str, hf_token: str) -> dict:
result = {'sentiment': 0.0}
try:
headlines = []
try:
tk = yf.Ticker(ticker)
for item in tk.news[:5]:
if 'title' in item:
headlines.append(item['title'])
except Exception:
pass
try:
q = urllib.parse.quote(f"{ticker} stock")
url = f"https://news.google.com/rss/search?q={q}&hl=en-US&gl=US&ceid=US:en"
resp = requests.get(url, timeout=5)
if resp.status_code == 200:
root = ET.fromstring(resp.content)
for item in root.findall('./channel/item')[:5]:
headlines.append(item.find('title').text)
except Exception:
pass
if not headlines or not hf_token:
return result
headers = {"Authorization": f"Bearer {hf_token}"}
headlines = list(set(headlines))[:5]
API_URL = "https://api-inference.huggingface.co/models/ProsusAI/finbert"
response = requests.post(API_URL, headers=headers, json={"inputs": headlines}, timeout=10)
if response.status_code == 200:
preds = response.json()
score_sum = 0
count = 0
for pred in preds:
if isinstance(pred, list):
top = max(pred, key=lambda x: x['score'])
else:
top = pred
label = top.get('label', 'neutral').lower()
prob = top.get('score', 0)
if label == 'positive':
score_sum += prob
elif label == 'negative':
score_sum -= prob
count += 1
if count > 0:
result['sentiment'] = score_sum / count
else:
logger.debug(f"HF API returned {response.status_code}: {response.text}")
except Exception as e:
logger.debug(f"Failed to fetch AI sentiment for {ticker}: {e}")
return result
def fetch_ai_news_sentiment(tickers: list, hf_token: str, silent: bool = False) -> dict:
if not hf_token:
return {t: {'sentiment': 0.0} for t in tickers}
if not silent:
print(f" {Color.CYAN}[INFO] Fetching AI News Sentiment (FinBERT) for {len(tickers)} assets...{Color.RESET}", end="", flush=True)
results = {}
with warnings.catch_warnings():
warnings.simplefilter("ignore")
executor = ThreadPoolExecutor(max_workers=min(5, len(tickers) if tickers else 1))
future_to_ticker = {
executor.submit(_fetch_single_news_sentiment, t, hf_token): t for t in tickers
}
try:
for future in as_completed(future_to_ticker, timeout=60):
t = future_to_ticker[future]
try:
results[t] = future.result()
except Exception as e:
results[t] = {'sentiment': 0.0}
except Exception as e:
logger.warning("News fetch timed out.")
# Fill remaining with default in case of timeout
for t in tickers:
if t not in results:
results[t] = {'sentiment': 0.0}
executor.shutdown(wait=False)
if not silent:
print(f" {Color.GREEN}done.{Color.RESET}")
return results
import datetime
from database import AssetFundamentals
from sqlalchemy.orm import Session
import time
def _fetch_single_fundamental(ticker: str) -> dict:
res = {'pe_ratio': None, 'fcf_yield': None, 'operating_margin': None, 'implied_volatility': None}
try:
tk = yf.Ticker(ticker)
info = tk.info
if info:
res['pe_ratio'] = info.get('trailingPE') or info.get('forwardPE')
fcf = info.get('freeCashflow')
mc = info.get('marketCap')
if fcf and mc and mc > 0:
res['fcf_yield'] = fcf / mc
res['operating_margin'] = info.get('operatingMargins')
try:
options = tk.options
if options:
chain = tk.option_chain(options[0])
if not chain.calls.empty and not chain.puts.empty:
# Very simple ATM implied volatility proxy
calls_iv = chain.calls['impliedVolatility'].median()
puts_iv = chain.puts['impliedVolatility'].median()
res['implied_volatility'] = (calls_iv + puts_iv) / 2.0
except Exception:
pass
except Exception as e:
logger.debug(f"Failed fundamental fetch for {ticker}: {e}")
return res
def fetch_fundamentals_and_options(tickers: list, session: Session, silent: bool = False) -> dict:
results = {}
to_fetch = []
# 1. Check Cache
now = datetime.datetime.utcnow()
for t in tickers:
cached = session.query(AssetFundamentals).filter_by(ticker=t).first()
if cached and cached.updated_at and (now - cached.updated_at).days < 7:
results[t] = {
'pe_ratio': cached.pe_ratio,
'fcf_yield': cached.fcf_yield,
'operating_margin': cached.operating_margin,
'implied_volatility': cached.implied_volatility
}
else:
to_fetch.append((t, cached))
if not to_fetch:
return results
if not silent:
logger.info(f"Fetching fundamentals & options for {len(to_fetch)} assets (ETA: ~{len(to_fetch)}s)...")
# 2. Fetch missing with rate limiting
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Use only 2 workers to avoid Yahoo Finance rate limits
executor = ThreadPoolExecutor(max_workers=2)
future_to_ticker = {
executor.submit(_fetch_single_fundamental, t): (t, cached_obj) for t, cached_obj in to_fetch
}
try:
for future in as_completed(future_to_ticker, timeout=60):
t, cached_obj = future_to_ticker[future]
try:
data = future.result()
results[t] = data
# Update DB
if not cached_obj:
cached_obj = AssetFundamentals(ticker=t)
session.add(cached_obj)
cached_obj.pe_ratio = data['pe_ratio']
cached_obj.fcf_yield = data['fcf_yield']
cached_obj.operating_margin = data['operating_margin']
cached_obj.implied_volatility = data['implied_volatility']
cached_obj.updated_at = now
except Exception as e:
results[t] = {'pe_ratio': None, 'fcf_yield': None, 'operating_margin': None, 'implied_volatility': None}
except Exception as e:
logger.warning("Fundamentals fetch timed out.")
# Fill remaining with default in case of timeout
for t, _ in to_fetch:
if t not in results:
results[t] = {'pe_ratio': None, 'fcf_yield': None, 'operating_margin': None, 'implied_volatility': None}
executor.shutdown(wait=False)
try:
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Failed to commit fundamental cache: {e}")
if not silent:
logger.info("Fundamental data fetch complete.")
return results