import yfinance as yf import pandas as pd import numpy as np from concurrent.futures import ThreadPoolExecutor, as_completed from config import logger, Color import warnings def _fetch_single_option_sentiment(ticker: str) -> dict: """ Fetches the near-term options chain for a single ticker to calculate put/call ratio and an implied volatility skew proxy. """ result = {'put_call_ratio': 1.0, 'iv_skew': 0.0} try: tk = yf.Ticker(ticker) expirations = tk.options if not expirations: return result # Get the nearest expiration to capture current speculative sentiment opt = tk.option_chain(expirations[0]) calls = opt.calls puts = opt.puts # Calculate Volume-based Put/Call Ratio (fallback to Open Interest if volume is 0/NaN) call_vol = calls['volume'].sum() if 'volume' in calls else 0 put_vol = puts['volume'].sum() if 'volume' in puts else 0 if call_vol == 0 and put_vol == 0: call_vol = calls['openInterest'].sum() if 'openInterest' in calls else 0 put_vol = puts['openInterest'].sum() if 'openInterest' in puts else 0 if call_vol > 0: result['put_call_ratio'] = float(put_vol / call_vol) elif put_vol > 0: result['put_call_ratio'] = 5.0 # Arbitrary cap for extremely bearish flow # Calculate a simple Implied Volatility Skew proxy (Average Put IV - Average Call IV) # In a real system, you would interpolate exact OTM strikes (e.g. 25-delta puts vs 25-delta calls) if 'impliedVolatility' in calls and 'impliedVolatility' in puts: call_iv = calls['impliedVolatility'].replace(0.0, np.nan).mean() put_iv = puts['impliedVolatility'].replace(0.0, np.nan).mean() if pd.notna(call_iv) and pd.notna(put_iv): result['iv_skew'] = float(put_iv - call_iv) except Exception as e: logger.debug(f"Failed to fetch options sentiment for {ticker}: {e}") return result def fetch_options_sentiment(tickers: list, silent: bool = False) -> dict: """ Parallelized fetcher for options market sentiment. Returns a dictionary mapping tickers to their sentiment features. """ if not silent: print(f" {Color.CYAN}ℹ Fetching alternative data (options flow) for {len(tickers)} assets...{Color.RESET}", end="", flush=True) results = {} with warnings.catch_warnings(): warnings.simplefilter("ignore") with ThreadPoolExecutor(max_workers=min(10, len(tickers) if tickers else 1)) as executor: future_to_ticker = { executor.submit(_fetch_single_option_sentiment, t): t for t in tickers } for future in as_completed(future_to_ticker): t = future_to_ticker[future] try: sentiment = future.result() results[t] = sentiment except Exception as e: logger.error(f"Error processing options for {t}: {e}") results[t] = {'put_call_ratio': 1.0, 'iv_skew': 0.0} if not silent: print(f" {Color.GREEN}done.{Color.RESET}") return results