import streamlit as st import yfinance as yf import numpy as np import pandas as pd from datetime import datetime from scipy.optimize import brentq from scipy.stats import norm, gaussian_kde from scipy.interpolate import splrep, BSpline from scipy.integrate import simps import plotly.graph_objects as go from plotly.subplots import make_subplots st.set_page_config(layout="wide", page_title="Forward-Looking Probability") st.markdown("## Forward-Looking Market-Implied Probability Distribution") st.markdown("#### Option-Based Price Forecasting Using Implied Volatility") st.write( "This tool analyzes the implied probability distribution of a stock's future price using call option data. " "It calculates implied volatilities via the Black-Scholes model, derives a risk-neutral probability density function using " "the Breeden-Litzenberger formula, and then smooths the result with Kernel Density Estimation (KDE). " "A unified strike grid is used for the 3D surface, while 2D analysis focuses on individual expiration dates." ) with st.expander("How It Works", expanded=False): st.write("The analysis is based on the Black-Scholes model for European call options:") st.latex(r"C(S,K,T,r,\sigma)=S\Phi(d_1)-Ke^{-rT}\Phi(d_2)") st.latex(r"d_1=\frac{\ln\left(\frac{S}{K}\right)+(r+0.5\sigma^2)T}{\sigma\sqrt{T}}") st.latex(r"d_2=d_1-\sigma\sqrt{T}") st.write("The risk-neutral probability density function (PDF) is derived using the Breeden-Litzenberger formula:") st.latex(r"\text{PDF}(K)=e^{rT}\frac{\partial^2C}{\partial K^2}") st.write("The resulting PDF is then smoothed using Kernel Density Estimation (KDE).") # ============================================================================= # SIDEBAR - General Settings # ============================================================================= st.sidebar.title("Parameters") with st.sidebar.expander("General Settings", expanded=True): ticker_input = st.text_input("Ticker Symbol", value="NVDA") lower_pct = st.number_input( "Price % Decrease", value=10, min_value=1, max_value=100, step=1, help="For 2D plots: lower threshold = current price * (1 - percentage/100)" ) upper_pct = st.number_input( "Price % Increase", value=10, min_value=1, max_value=100, step=1, help="For 2D plots: upper threshold = current price * (1 + percentage/100)" ) # ============================================================================= # SIDEBAR - Advanced Settings # ============================================================================= with st.sidebar.expander("Advanced Settings", expanded=True): risk_free = st.number_input( "Risk-Free Rate", value=0.04, step=0.01, format="%.2f", help="The annualized risk-free rate used in option pricing." ) min_volume = st.number_input( "Minimum Volume", value=20, step=1, help="Minimum trading volume required for an option to be considered liquid." ) max_spread_ratio = st.number_input( "Max Spread Ratio", value=0.2, step=0.01, format="%.2f", help="Maximum acceptable ratio of bid-ask spread to ask price. Options exceeding this will be excluded." ) # ============================================================================= # Run Analysis Button (placed outside the expanders) # ============================================================================= run_analysis = st.sidebar.button("Run Analysis") # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def call_bs_price(S, K, T, r, sigma): if T <= 0: return max(S - K, 0) d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T)) d2 = d1 - sigma * np.sqrt(T) return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2) def implied_vol_call(price, S, K, T, r): if T <= 0: return np.nan def f(iv): return call_bs_price(S, K, T, r, iv) - price try: return brentq(f, 1e-9, 5.0) except: return np.nan def build_pdf(K_grid, iv_spline_tck, S, T, r): iv_vals = BSpline(*iv_spline_tck)(K_grid) call_prices = [call_bs_price(S, K, T, r, iv) for K, iv in zip(K_grid, iv_vals)] dC_dK = np.gradient(call_prices, K_grid) d2C_dK2 = np.gradient(dC_dK, K_grid) pdf_raw = np.exp(r * T) * d2C_dK2 return np.clip(pdf_raw, 0, None) def build_cdf(K_grid, pdf_vals): cdf_vals = [] running = 0.0 for i in range(len(K_grid)): if i == 0: cdf_vals.append(0.0) else: area = simps(pdf_vals[i-1:i+1], K_grid[i-1:i+1]) running += area cdf_vals.append(running) cdf_vals = np.array(cdf_vals) if cdf_vals[-1] > 0: cdf_vals /= cdf_vals[-1] return cdf_vals # Function to filter illiquid options def filter_liquid_options(df, min_volume=20, max_spread_ratio=0.2): spread = df["ask"] - df["bid"] return df[(spread / df["ask"] < max_spread_ratio) & (df["bid"] > 0) & (df["volume"] >= min_volume)] # ============================================================================= # 3D ANALYSIS FUNCTION (CALLS ONLY) # ============================================================================= def compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio): all_expirations = data_ticker.options valid_expiries = [] days_list = [] calls_data_dict = {} # First pass: collect calls data from valid expiries. for exp_date in all_expirations: try: expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d") except: continue days_forward = (expiry_dt - datetime.now()).days if days_forward < 1: continue try: chain = data_ticker.option_chain(exp_date) except Exception: continue calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy() calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio) calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike') if calls_df.empty: continue valid_expiries.append(exp_date) days_list.append(days_forward) calls_data_dict[exp_date] = calls_df if not valid_expiries: raise ValueError("No valid expiries with call data.") K_grid_3d = np.linspace(current_price * 0.25, current_price * 3, 300) pdf_list = [] # Second pass: compute smoothed PDF for each expiry. for exp_date in valid_expiries: expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d") T_val = (expiry_dt - datetime.now()).days / 365.0 calls_df = calls_data_dict[exp_date] iv_vals = [] for _, row in calls_df.iterrows(): vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r) iv_vals.append(vol) calls_df['iv'] = iv_vals calls_df.dropna(subset=['iv'], inplace=True) if calls_df.empty: pdf_list.append(np.zeros_like(K_grid_3d)) continue strikes = calls_df['strike'].values ivs = calls_df['iv'].values try: iv_spline_tck = splrep(strikes, ivs, s=10, k=3) except Exception: pdf_list.append(np.zeros_like(K_grid_3d)) continue pdf_raw = build_pdf(K_grid_3d, iv_spline_tck, current_price, T_val, r) try: kde = gaussian_kde(K_grid_3d, weights=pdf_raw) pdf_smooth = kde(K_grid_3d) area = np.trapz(pdf_smooth, K_grid_3d) if area > 0: pdf_smooth /= area except Exception: pdf_smooth = pdf_raw pdf_list.append(pdf_smooth) pdf_matrix = np.array(pdf_list) days_array = np.array(days_list) TT, KK = np.meshgrid(days_array, K_grid_3d, indexing='ij') fig = go.Figure(data=[go.Surface( x=KK, y=TT, z=pdf_matrix, colorscale='Viridis', opacity=0.8 )]) fig.update_layout( scene=dict( xaxis_title='Strike', yaxis_title='Days to Expiry', zaxis_title='PDF' ), title="3D Smoothed Implied PDF Across Expiries", width=900, height=700 ) fig.add_annotation( x=0.98, y=0.98, xref="paper", yref="paper", text=f"Current Price: {current_price:.2f}", showarrow=False, align="right", font=dict(size=12), bordercolor="black", borderwidth=1, #bgcolor="white", opacity=0.8 ) return fig, valid_expiries # ============================================================================= # 2D ANALYSIS FUNCTION (CALLS ONLY) # ============================================================================= def compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio): try: expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d") except: return None days_forward = (expiry_dt - datetime.now()).days if days_forward < 1: return None T_val = days_forward / 365.0 try: chain = data_ticker.option_chain(exp_date) except: return None calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy() calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio) calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike') if calls_df.empty: return None iv_list = [] for _, row in calls_df.iterrows(): vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r) iv_list.append(vol) calls_df['iv'] = iv_list calls_df.dropna(subset=['iv'], inplace=True) if calls_df.empty: return None strikes = calls_df['strike'].values ivs = calls_df['iv'].values try: iv_spline_tck = splrep(strikes, ivs, s=10, k=3) except: return None K_min = strikes.min() K_max = strikes.max() K_grid_2d = np.linspace(K_min, K_max, 300) pdf_raw = build_pdf(K_grid_2d, iv_spline_tck, current_price, T_val, r) try: kde = gaussian_kde(K_grid_2d, weights=pdf_raw) pdf_smooth = kde(K_grid_2d) area = np.trapz(pdf_smooth, K_grid_2d) if area > 0: pdf_smooth /= area except: pdf_smooth = pdf_raw cdf = build_cdf(K_grid_2d, pdf_smooth) lower_thresh = current_price * (1 - lower_pct / 100) upper_thresh = current_price * (1 + upper_pct / 100) mask_below = K_grid_2d < lower_thresh mask_between = (K_grid_2d >= lower_thresh) & (K_grid_2d <= upper_thresh) mask_above = K_grid_2d > upper_thresh p_below = np.trapz(pdf_smooth[mask_below], K_grid_2d[mask_below]) p_between = np.trapz(pdf_smooth[mask_between], K_grid_2d[mask_between]) p_above = np.trapz(pdf_smooth[mask_above], K_grid_2d[mask_above]) fig_pdf_cdf = make_subplots(rows=1, cols=2, subplot_titles=("Smoothed PDF", "Smoothed CDF")) fig_pdf_cdf.add_trace(go.Scatter( x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue') ), row=1, col=1) fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=1) fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=1) fig_pdf_cdf.update_yaxes(title_text="PDF", row=1, col=1) fig_pdf_cdf.add_trace(go.Scatter( x=K_grid_2d, y=cdf, mode='lines', name='CDF', line=dict(color='blue') ), row=1, col=2) fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=2) fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=2) fig_pdf_cdf.update_yaxes(title_text="CDF", row=1, col=2) fig_pdf_cdf.update_layout(title_text="2D Analysis: PDF and CDF") fig_pdf_cdf.add_annotation( x=0.98, y=0.98, xref="paper", yref="paper", text=f"Current Price: {current_price:.2f}", showarrow=False, align="right", font=dict(size=12), #bordercolor="white", borderwidth=1, opacity=0.8 ) fig_threshold = go.Figure() fig_threshold.add_trace(go.Scatter( x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue') )) fig_threshold.add_vline( x=lower_thresh, line=dict(color='orange', dash='dash'), annotation_text=f'Lower: {lower_thresh:.2f}', annotation_position="top left", annotation_xshift=10, annotation_yshift=-10 ) fig_threshold.add_vline( x=upper_thresh, line=dict(color='purple', dash='dash'), annotation_text=f'Upper: {upper_thresh:.2f}', annotation_position="top right", annotation_xshift=-10, annotation_yshift=-10 ) fig_threshold.add_vline( x=current_price, line=dict(color='red', dash='dash'), annotation_text=f'Current: {current_price:.2f}', annotation_position="bottom right", annotation_xshift=-10, annotation_yshift=10 ) fig_threshold.add_trace(go.Scatter( x=K_grid_2d[mask_below], y=pdf_smooth[mask_below], mode='lines', fill='tozeroy', line=dict(color='lightblue'), showlegend=False )) fig_threshold.add_trace(go.Scatter( x=K_grid_2d[mask_between], y=pdf_smooth[mask_between], mode='lines', fill='tozeroy', line=dict(color='lightgrey'), showlegend=False )) fig_threshold.add_trace(go.Scatter( x=K_grid_2d[mask_above], y=pdf_smooth[mask_above], mode='lines', fill='tozeroy', line=dict(color='lightcoral'), showlegend=False )) fig_threshold.update_layout( title="Threshold Probability Plot", xaxis_title="Strike", yaxis_title="PDF" ) annotation_text = ( f"Probability below {lower_thresh:.2f} is {p_below:.2%}
" f"Probability between {lower_thresh:.2f} and {upper_thresh:.2f} is {p_between:.2%}
" f"Probability above {upper_thresh:.2f} is {p_above:.2%}" ) fig_threshold.add_annotation( x=0.75, y=0.5, xref="paper", yref="paper", text=annotation_text, showarrow=False, align="left", font=dict(size=12), #bordercolor="white", borderwidth=1, opacity=0.8 ) fig_threshold.add_annotation( x=0.98, y=0.98, xref="paper", yref="paper", text=f"Current Price: {current_price:.2f}", showarrow=False, align="right", font=dict(size=12), bordercolor="black", borderwidth=1, #bgcolor="white", opacity=0.8 ) result = { "K_grid_2d": K_grid_2d, "pdf_smooth": pdf_smooth, "cdf": cdf, "lower_thresh": lower_thresh, "upper_thresh": upper_thresh, "p_below": p_below, "p_between": p_between, "p_above": p_above, "fig_pdf_cdf": fig_pdf_cdf, "fig_threshold": fig_threshold, "days_to_exp": days_forward } return result # ============================================================================= # MAIN RUN (only run when the button is clicked) # ============================================================================= if run_analysis: with st.spinner("Running analysis, please wait..."): try: data_ticker = yf.Ticker(ticker_input) hist_data = data_ticker.history(period="1d") if hist_data.empty: st.error("No price data found.") st.stop() current_price = hist_data["Close"].iloc[-1] except Exception as e: st.error(f"Error fetching data: {e}") st.stop() st.write(f"Current Price: {round(current_price, 2)}") r = risk_free try: fig3d, valid_expiries_3d = compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio) except Exception as e: st.error(f"3D analysis error: {e}") st.stop() results_2d = {} for exp_date in data_ticker.options: res = compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio) if res is not None: results_2d[exp_date] = res if not results_2d: st.error("No valid expirations for 2D analysis.") st.stop() st.session_state.analysis_data = { "current_price": current_price, "expirations": list(results_2d.keys()), "results": results_2d, "fig3d": fig3d } # ============================================================================= # DISPLAY RESULTS (if analysis data exists) # ============================================================================= if "analysis_data" in st.session_state: ad = st.session_state.analysis_data st.write(f"**Current Price:** {round(ad['current_price'], 2)}") st.markdown("## 3D Probability Surface") st.plotly_chart(ad["fig3d"], use_container_width=True) st.markdown("## 2D Plots for Selected Expiration Date") chosen = st.selectbox("Choose expiration date:", options=ad["expirations"]) res2d = ad["results"][chosen] st.plotly_chart(res2d["fig_pdf_cdf"], use_container_width=True) st.plotly_chart(res2d["fig_threshold"], use_container_width=True) st.write("The 2D plots use calls data only. The 3D surface uses a unified strike grid.") else: st.info("Click 'Run Analysis' to start.") hide_streamlit_style = """ """ st.markdown(hide_streamlit_style, unsafe_allow_html=True)