import yfinance as yf import pandas as pd import numpy as np from concurrent.futures import ThreadPoolExecutor, as_completed from config import logger, Color import warnings import requests import urllib.parse import xml.etree.ElementTree as ET def _fetch_single_news_sentiment(ticker: str, hf_token: str) -> dict: result = {'sentiment': 0.0} try: headlines = [] try: tk = yf.Ticker(ticker) for item in tk.news[:5]: if 'title' in item: headlines.append(item['title']) except Exception: pass try: q = urllib.parse.quote(f"{ticker} stock") url = f"https://news.google.com/rss/search?q={q}&hl=en-US&gl=US&ceid=US:en" resp = requests.get(url, timeout=5) if resp.status_code == 200: root = ET.fromstring(resp.content) for item in root.findall('./channel/item')[:5]: headlines.append(item.find('title').text) except Exception: pass if not headlines or not hf_token: return result headers = {"Authorization": f"Bearer {hf_token}"} headlines = list(set(headlines))[:5] API_URL = "https://api-inference.huggingface.co/models/ProsusAI/finbert" response = requests.post(API_URL, headers=headers, json={"inputs": headlines}, timeout=10) if response.status_code == 200: preds = response.json() score_sum = 0 count = 0 for pred in preds: if isinstance(pred, list): top = max(pred, key=lambda x: x['score']) else: top = pred label = top.get('label', 'neutral').lower() prob = top.get('score', 0) if label == 'positive': score_sum += prob elif label == 'negative': score_sum -= prob count += 1 if count > 0: result['sentiment'] = score_sum / count else: logger.debug(f"HF API returned {response.status_code}: {response.text}") except Exception as e: logger.debug(f"Failed to fetch AI sentiment for {ticker}: {e}") return result def fetch_ai_news_sentiment(tickers: list, hf_token: str, silent: bool = False) -> dict: if not hf_token: return {t: {'sentiment': 0.0} for t in tickers} if not silent: print(f" {Color.CYAN}[INFO] Fetching AI News Sentiment (FinBERT) for {len(tickers)} assets...{Color.RESET}", end="", flush=True) results = {} with warnings.catch_warnings(): warnings.simplefilter("ignore") executor = ThreadPoolExecutor(max_workers=min(5, len(tickers) if tickers else 1)) future_to_ticker = { executor.submit(_fetch_single_news_sentiment, t, hf_token): t for t in tickers } try: for future in as_completed(future_to_ticker, timeout=60): t = future_to_ticker[future] try: results[t] = future.result() except Exception as e: results[t] = {'sentiment': 0.0} except Exception as e: logger.warning("News fetch timed out.") # Fill remaining with default in case of timeout for t in tickers: if t not in results: results[t] = {'sentiment': 0.0} executor.shutdown(wait=False) if not silent: print(f" {Color.GREEN}done.{Color.RESET}") return results import datetime from database import AssetFundamentals from sqlalchemy.orm import Session import time def _fetch_single_fundamental(ticker: str) -> dict: res = {'pe_ratio': None, 'fcf_yield': None, 'operating_margin': None, 'implied_volatility': None} try: tk = yf.Ticker(ticker) info = tk.info if info: res['pe_ratio'] = info.get('trailingPE') or info.get('forwardPE') fcf = info.get('freeCashflow') mc = info.get('marketCap') if fcf and mc and mc > 0: res['fcf_yield'] = fcf / mc res['operating_margin'] = info.get('operatingMargins') try: options = tk.options if options: chain = tk.option_chain(options[0]) if not chain.calls.empty and not chain.puts.empty: # Very simple ATM implied volatility proxy calls_iv = chain.calls['impliedVolatility'].median() puts_iv = chain.puts['impliedVolatility'].median() res['implied_volatility'] = (calls_iv + puts_iv) / 2.0 except Exception: pass except Exception as e: logger.debug(f"Failed fundamental fetch for {ticker}: {e}") return res def fetch_fundamentals_and_options(tickers: list, session: Session, silent: bool = False) -> dict: results = {} to_fetch = [] # 1. Check Cache now = datetime.datetime.utcnow() for t in tickers: cached = session.query(AssetFundamentals).filter_by(ticker=t).first() if cached and cached.updated_at and (now - cached.updated_at).days < 7: results[t] = { 'pe_ratio': cached.pe_ratio, 'fcf_yield': cached.fcf_yield, 'operating_margin': cached.operating_margin, 'implied_volatility': cached.implied_volatility } else: to_fetch.append((t, cached)) if not to_fetch: return results if not silent: logger.info(f"Fetching fundamentals & options for {len(to_fetch)} assets (ETA: ~{len(to_fetch)}s)...") # 2. Fetch missing with rate limiting with warnings.catch_warnings(): warnings.simplefilter("ignore") # Use only 2 workers to avoid Yahoo Finance rate limits executor = ThreadPoolExecutor(max_workers=2) future_to_ticker = { executor.submit(_fetch_single_fundamental, t): (t, cached_obj) for t, cached_obj in to_fetch } try: for future in as_completed(future_to_ticker, timeout=60): t, cached_obj = future_to_ticker[future] try: data = future.result() results[t] = data # Update DB if not cached_obj: cached_obj = AssetFundamentals(ticker=t) session.add(cached_obj) cached_obj.pe_ratio = data['pe_ratio'] cached_obj.fcf_yield = data['fcf_yield'] cached_obj.operating_margin = data['operating_margin'] cached_obj.implied_volatility = data['implied_volatility'] cached_obj.updated_at = now except Exception as e: results[t] = {'pe_ratio': None, 'fcf_yield': None, 'operating_margin': None, 'implied_volatility': None} except Exception as e: logger.warning("Fundamentals fetch timed out.") # Fill remaining with default in case of timeout for t, _ in to_fetch: if t not in results: results[t] = {'pe_ratio': None, 'fcf_yield': None, 'operating_margin': None, 'implied_volatility': None} executor.shutdown(wait=False) try: session.commit() except Exception as e: session.rollback() logger.error(f"Failed to commit fundamental cache: {e}") if not silent: logger.info("Fundamental data fetch complete.") return results