Add options flow analysis with put/call ratio, IV skew, unusual volume, max pain from real yfinance chains
34072a0 verified | """Options Flow v1.0 — Real Options Chain Intelligence | |
| Analyzes put/call ratio, implied volatility skew, unusual volume, | |
| open interest patterns, and max pain from yfinance options data. | |
| Falls back to heuristic estimates when chain unavailable. | |
| """ | |
| import yfinance as yf | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Optional, Tuple, List | |
| class OptionsFlow: | |
| """Options market microstructure intelligence for alpha generation.""" | |
| def __init__(self): | |
| self._cache = {} | |
| def _fetch_chain(self, ticker: str, days_out: int = 30) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[str]]: | |
| """Fetch options chain from yfinance. Returns (calls_df, puts_df, expiry_str). | |
| Returns None if data unavailable. | |
| """ | |
| cache_key = f"{ticker}_{days_out}" | |
| if cache_key in self._cache: | |
| return self._cache[cache_key] | |
| try: | |
| t = yf.Ticker(ticker) | |
| expiries = t.options | |
| if not expiries or len(expiries) == 0: | |
| return None, None, None | |
| # Find first expiration >= days_out | |
| target = (datetime.now() + timedelta(days=days_out)).strftime('%Y-%m-%d') | |
| selected = None | |
| for e in expiries: | |
| if e >= target: | |
| selected = e | |
| break | |
| if selected is None: | |
| selected = expiries[0] | |
| chain = t.option_chain(selected) | |
| calls = chain.calls | |
| puts = chain.puts | |
| self._cache[cache_key] = (calls, puts, selected) | |
| return calls, puts, selected | |
| except Exception as e: | |
| return None, None, None | |
| def put_call_ratio(self, ticker: str, days_out: int = 30, | |
| oi_weighted: bool = True) -> Dict: | |
| """Compute put/call ratio from options chain. | |
| oi_weighted: Use open interest instead of volume for longer-term positioning. | |
| """ | |
| calls, puts, expiry = self._fetch_chain(ticker, days_out) | |
| if calls is None or puts is None: | |
| return self._heuristic_pcr(ticker) | |
| if oi_weighted: | |
| call_val = calls['openInterest'].sum() if 'openInterest' in calls else calls['volume'].sum() | |
| put_val = puts['openInterest'].sum() if 'openInterest' in puts else puts['volume'].sum() | |
| else: | |
| call_val = calls['volume'].sum() | |
| put_val = puts['volume'].sum() | |
| pcr = put_val / (call_val + 1e-10) | |
| # Interpretation | |
| sentiment = 'neutral' | |
| if pcr < 0.5: sentiment = 'extreme_bullish' | |
| elif pcr < 0.7: sentiment = 'bullish' | |
| elif pcr > 1.5: sentiment = 'extreme_bearish' | |
| elif pcr > 1.0: sentiment = 'bearish' | |
| # Score 0-100: low PCR = bullish, high = bearish | |
| score = max(0, min(100, 100 - pcr * 40)) | |
| return { | |
| 'pcr': round(float(pcr), 3), | |
| 'sentiment': sentiment, | |
| 'score': round(float(score), 1), | |
| 'call_value': int(call_val), | |
| 'put_value': int(put_val), | |
| 'expiry': expiry, | |
| 'source': 'chain', | |
| 'weighted_by': 'open_interest' if oi_weighted else 'volume', | |
| } | |
| def _heuristic_pcr(self, ticker: str) -> Dict: | |
| """Estimate PCR when options chain unavailable.""" | |
| # Base on sector and recent price action | |
| try: | |
| df = yf.Ticker(ticker).history(period='1mo') | |
| ret_20d = df['Close'].pct_change(20).iloc[-1] | |
| # Rising stocks tend to have lower PCR (bullish) | |
| base_pcr = 0.7 - ret_20d * 5 # Rough heuristic | |
| base_pcr = max(0.3, min(2.0, base_pcr)) | |
| score = max(0, min(100, 100 - base_pcr * 40)) | |
| sentiment = 'neutral' | |
| if base_pcr < 0.5: sentiment = 'bullish' | |
| elif base_pcr > 1.0: sentiment = 'bearish' | |
| return { | |
| 'pcr': round(float(base_pcr), 3), | |
| 'sentiment': sentiment, | |
| 'score': round(float(score), 1), | |
| 'source': 'heuristic', | |
| 'note': 'Options chain unavailable — estimated from price action', | |
| } | |
| except: | |
| return { | |
| 'pcr': 0.70, | |
| 'sentiment': 'neutral', | |
| 'score': 50.0, | |
| 'source': 'default', | |
| 'note': 'Options data unavailable — default neutral', | |
| } | |
| def iv_skew(self, ticker: str, days_out: int = 30) -> Dict: | |
| """Analyze implied volatility skew from options chain. | |
| Steep put skew (puts expensive) = fear premium. | |
| Flat skew = complacency. | |
| Reverse skew (calls expensive) = extreme bullishness. | |
| """ | |
| calls, puts, expiry = self._fetch_chain(ticker, days_out) | |
| if calls is None or puts is None: | |
| return { | |
| 'skew': None, | |
| 'score': 50.0, | |
| 'interpretation': 'No options data available', | |
| 'source': 'unavailable', | |
| } | |
| try: | |
| # Get current price for ATM reference | |
| price = yf.Ticker(ticker).history(period='1d')['Close'].iloc[-1] | |
| # Find ATM options | |
| atm_call = calls.iloc[(calls['strike'] - price).abs().argsort()[:1]] | |
| atm_put = puts.iloc[(puts['strike'] - price).abs().argsort()[:1]] | |
| # Find 5% OTM put and call | |
| otm_put_strike = price * 0.95 | |
| otm_put = puts[puts['strike'] <= otm_put_strike].iloc[-1:] if len(puts[puts['strike'] <= otm_put_strike]) > 0 else puts.iloc[:1] | |
| otm_call_strike = price * 1.05 | |
| otm_call = calls[calls['strike'] >= otm_call_strike].iloc[:1] if len(calls[calls['strike'] >= otm_call_strike]) > 0 else calls.iloc[-1:] | |
| # Calculate skew | |
| atm_iv = float(atm_call['impliedVolatility'].iloc[0]) if 'impliedVolatility' in atm_call else 0.30 | |
| otm_put_iv = float(otm_put['impliedVolatility'].iloc[0]) if 'impliedVolatility' in otm_put else atm_iv | |
| otm_call_iv = float(otm_call['impliedVolatility'].iloc[0]) if 'impliedVolatility' in otm_call else atm_iv | |
| # Put skew = OTM put IV / ATM IV - 1 | |
| put_skew = (otm_put_iv / (atm_iv + 1e-10)) - 1 | |
| # Call skew = OTM call IV / ATM IV - 1 | |
| call_skew = (otm_call_iv / (atm_iv + 1e-10)) - 1 | |
| # Net skew: positive = puts expensive (fear), negative = calls expensive (greed) | |
| net_skew = put_skew - call_skew | |
| # Score: fear = bearish signal for longs, but can be contrarian | |
| if net_skew > 0.3: | |
| sentiment = 'extreme_fear' | |
| score = 15 # Contrarian: everyone hedging = potential bottom | |
| elif net_skew > 0.15: | |
| sentiment = 'fear' | |
| score = 30 | |
| elif net_skew > 0.05: | |
| sentiment = 'mild_fear' | |
| score = 45 | |
| elif net_skew < -0.15: | |
| sentiment = 'extreme_greed' | |
| score = 85 | |
| elif net_skew < -0.05: | |
| sentiment = 'greed' | |
| score = 70 | |
| else: | |
| sentiment = 'neutral' | |
| score = 50 | |
| return { | |
| 'skew': round(float(net_skew), 4), | |
| 'atm_iv': round(float(atm_iv), 4), | |
| 'otm_put_iv': round(float(otm_put_iv), 4), | |
| 'otm_call_iv': round(float(otm_call_iv), 4), | |
| 'sentiment': sentiment, | |
| 'score': float(score), | |
| 'source': 'chain', | |
| 'expiry': expiry, | |
| } | |
| except Exception: | |
| return { | |
| 'skew': None, | |
| 'score': 50.0, | |
| 'interpretation': 'Error computing skew', | |
| 'source': 'error', | |
| } | |
| def unusual_volume(self, ticker: str, days_out: int = 30, | |
| threshold_mult: float = 2.0) -> Dict: | |
| """Detect unusual options volume vs historical average.""" | |
| calls, puts, expiry = self._fetch_chain(ticker, days_out) | |
| if calls is None or puts is None: | |
| return { | |
| 'is_unusual': False, | |
| 'score': 50.0, | |
| 'source': 'unavailable', | |
| } | |
| try: | |
| total_volume = calls['volume'].sum() + puts['volume'].sum() | |
| # Estimate average volume from available expiries | |
| t = yf.Ticker(ticker) | |
| all_volumes = [] | |
| for e in t.options[:3]: # Check first 3 expiries | |
| try: | |
| chain = t.option_chain(e) | |
| vol = chain.calls['volume'].sum() + chain.puts['volume'].sum() | |
| all_volumes.append(vol) | |
| except: | |
| continue | |
| avg_volume = np.mean(all_volumes) if all_volumes else total_volume | |
| ratio = total_volume / (avg_volume + 1e-10) | |
| is_unusual = ratio > threshold_mult | |
| if ratio > 5: score = 90 | |
| elif ratio > 3: score = 75 | |
| elif ratio > 2: score = 60 | |
| elif ratio > 1.5: score = 55 | |
| else: score = 50 | |
| return { | |
| 'is_unusual': bool(is_unusual), | |
| 'volume_ratio': round(float(ratio), 2), | |
| 'total_volume': int(total_volume), | |
| 'avg_volume': int(avg_volume), | |
| 'score': float(score), | |
| 'source': 'chain', | |
| 'expiry': expiry, | |
| } | |
| except Exception: | |
| return { | |
| 'is_unusual': False, | |
| 'score': 50.0, | |
| 'source': 'unavailable', | |
| } | |
| def max_pain(self, ticker: str, days_out: int = 30) -> Dict: | |
| """Calculate options max pain — price where option holders lose most. | |
| Tends to act as a magnet near expiration. | |
| """ | |
| calls, puts, expiry = self._fetch_chain(ticker, days_out) | |
| if calls is None or puts is None: | |
| return { | |
| 'max_pain': None, | |
| 'score': 50.0, | |
| 'source': 'unavailable', | |
| } | |
| try: | |
| price = yf.Ticker(ticker).history(period='1d')['Close'].iloc[-1] | |
| # Combine all strikes | |
| all_strikes = sorted(set(calls['strike'].tolist() + puts['strike'].tolist())) | |
| pain_values = [] | |
| for strike in all_strikes: | |
| # Call pain = (strike - S) * OI for ITM calls | |
| itm_calls = calls[calls['strike'] <= strike] | |
| call_pain = ((strike - itm_calls['strike']) * itm_calls['openInterest']).sum() | |
| # Put pain = (S - strike) * OI for ITM puts | |
| itm_puts = puts[puts['strike'] >= strike] | |
| put_pain = ((itm_puts['strike'] - strike) * itm_puts['openInterest']).sum() | |
| total_pain = call_pain + put_pain | |
| pain_values.append((strike, total_pain)) | |
| if not pain_values: | |
| return {'max_pain': None, 'score': 50.0, 'source': 'unavailable'} | |
| pain_df = pd.DataFrame(pain_values, columns=['strike', 'pain']) | |
| max_pain_strike = pain_df.loc[pain_df['pain'].idxmin(), 'strike'] | |
| # Score based on distance to max pain | |
| distance_pct = abs(price - max_pain_strike) / (price + 1e-10) | |
| if distance_pct < 0.02: | |
| score = 50 # Near max pain = balanced | |
| elif price < max_pain_strike: | |
| score = 60 # Below max pain = potential upside | |
| else: | |
| score = 40 # Above max pain = potential downside | |
| return { | |
| 'max_pain': round(float(max_pain_strike), 2), | |
| 'current_price': round(float(price), 2), | |
| 'distance_pct': round(float(distance_pct) * 100, 2), | |
| 'score': float(score), | |
| 'source': 'chain', | |
| 'expiry': expiry, | |
| } | |
| except Exception: | |
| return { | |
| 'max_pain': None, | |
| 'score': 50.0, | |
| 'source': 'error', | |
| } | |
| def gamma_exposure(self, ticker: str, days_out: int = 30) -> Dict: | |
| """Estimate aggregate gamma exposure from options chain. | |
| Positive gamma = MM hedging stabilizes price. | |
| Negative gamma = MM hedging amplifies moves. | |
| """ | |
| calls, puts, expiry = self._fetch_chain(ticker, days_out) | |
| if calls is None or puts is None: | |
| return { | |
| 'gamma_sign': 'unknown', | |
| 'score': 50.0, | |
| 'source': 'unavailable', | |
| } | |
| try: | |
| price = yf.Ticker(ticker).history(period='1d')['Close'].iloc[-1] | |
| # Simplified gamma estimate using gamma * OI * sign | |
| # Positive gamma when calls OI > puts OI near ATM | |
| atm_range = price * 0.05 | |
| near_calls = calls[abs(calls['strike'] - price) < atm_range] | |
| near_puts = puts[abs(puts['strike'] - price) < atm_range] | |
| call_oi = near_calls['openInterest'].sum() if 'openInterest' in near_calls else near_calls['volume'].sum() | |
| put_oi = near_puts['openInterest'].sum() if 'openInterest' in near_puts else near_puts['volume'].sum() | |
| # Net gamma: calls positive, puts negative for long holders | |
| net = call_oi - put_oi | |
| total = call_oi + put_oi + 1e-10 | |
| gamma_ratio = net / total | |
| if gamma_ratio > 0.3: | |
| gamma_sign = 'strong_positive' | |
| score = 70 # Stabilizing | |
| elif gamma_ratio > 0.1: | |
| gamma_sign = 'positive' | |
| score = 60 | |
| elif gamma_ratio < -0.3: | |
| gamma_sign = 'strong_negative' | |
| score = 30 # Destabilizing | |
| elif gamma_ratio < -0.1: | |
| gamma_sign = 'negative' | |
| score = 40 | |
| else: | |
| gamma_sign = 'neutral' | |
| score = 50 | |
| return { | |
| 'gamma_ratio': round(float(gamma_ratio), 3), | |
| 'gamma_sign': gamma_sign, | |
| 'score': float(score), | |
| 'call_oi': int(call_oi), | |
| 'put_oi': int(put_oi), | |
| 'source': 'chain', | |
| 'expiry': expiry, | |
| } | |
| except Exception: | |
| return { | |
| 'gamma_sign': 'unknown', | |
| 'score': 50.0, | |
| 'source': 'error', | |
| } | |
| def full_analysis(self, ticker: str) -> Dict: | |
| """Complete options flow intelligence.""" | |
| pcr = self.put_call_ratio(ticker) | |
| skew = self.iv_skew(ticker) | |
| volume = self.unusual_volume(ticker) | |
| pain = self.max_pain(ticker) | |
| gamma = self.gamma_exposure(ticker) | |
| # Composite options score | |
| scores = [pcr.get('score', 50), skew.get('score', 50), | |
| volume.get('score', 50), pain.get('score', 50), | |
| gamma.get('score', 50)] | |
| weights = [0.30, 0.25, 0.20, 0.15, 0.10] | |
| composite = np.average(scores, weights=weights) | |
| return { | |
| 'ticker': ticker, | |
| 'composite_score': round(float(composite), 1), | |
| 'interpretation': ( | |
| 'Options flow strongly bullish' if composite > 75 else | |
| 'Options flow bullish' if composite > 60 else | |
| 'Options flow neutral' if composite > 40 else | |
| 'Options flow bearish' if composite > 25 else | |
| 'Options flow strongly bearish' | |
| ), | |
| 'put_call_ratio': pcr, | |
| 'iv_skew': skew, | |
| 'unusual_volume': volume, | |
| 'max_pain': pain, | |
| 'gamma': gamma, | |
| 'timestamp': datetime.now().isoformat(), | |
| } | |
| if __name__ == '__main__': | |
| flow = OptionsFlow() | |
| result = flow.full_analysis('AAPL') | |
| print(f"Options Composite: {result['composite_score']:.1f}/100") | |
| print(f"Interpretation: {result['interpretation']}") | |
| print(f"PCR: {result['put_call_ratio'].get('pcr', 'N/A')}") | |
| print(f"Skew: {result['iv_skew'].get('skew', 'N/A')}") | |
| print(f"Unusual Volume: {result['unusual_volume'].get('is_unusual', False)}") | |
| print(f"Max Pain: {result['max_pain'].get('max_pain', 'N/A')}") | |