import streamlit as st
import yfinance as yf
import numpy as np
import pandas as pd
from datetime import datetime
from scipy.optimize import brentq
from scipy.stats import norm, gaussian_kde
from scipy.interpolate import splrep, BSpline
from scipy.integrate import simps
import plotly.graph_objects as go
from plotly.subplots import make_subplots
st.set_page_config(layout="wide", page_title="Forward-Looking Probability")
st.markdown("## Forward-Looking Market-Implied Probability Distribution")
st.markdown("#### Option-Based Price Forecasting Using Implied Volatility")
st.write(
"This tool analyzes the implied probability distribution of a stock's future price using call option data. "
"It calculates implied volatilities via the Black-Scholes model, derives a risk-neutral probability density function using "
"the Breeden-Litzenberger formula, and then smooths the result with Kernel Density Estimation (KDE). "
"A unified strike grid is used for the 3D surface, while 2D analysis focuses on individual expiration dates."
)
with st.expander("How It Works", expanded=False):
st.write("The analysis is based on the Black-Scholes model for European call options:")
st.latex(r"C(S,K,T,r,\sigma)=S\Phi(d_1)-Ke^{-rT}\Phi(d_2)")
st.latex(r"d_1=\frac{\ln\left(\frac{S}{K}\right)+(r+0.5\sigma^2)T}{\sigma\sqrt{T}}")
st.latex(r"d_2=d_1-\sigma\sqrt{T}")
st.write("The risk-neutral probability density function (PDF) is derived using the Breeden-Litzenberger formula:")
st.latex(r"\text{PDF}(K)=e^{rT}\frac{\partial^2C}{\partial K^2}")
st.write("The resulting PDF is then smoothed using Kernel Density Estimation (KDE).")
# =============================================================================
# SIDEBAR - General Settings
# =============================================================================
st.sidebar.title("Parameters")
with st.sidebar.expander("General Settings", expanded=True):
ticker_input = st.text_input("Ticker Symbol", value="NVDA")
lower_pct = st.number_input(
"Price % Decrease", value=10, min_value=1, max_value=100, step=1,
help="For 2D plots: lower threshold = current price * (1 - percentage/100)"
)
upper_pct = st.number_input(
"Price % Increase", value=10, min_value=1, max_value=100, step=1,
help="For 2D plots: upper threshold = current price * (1 + percentage/100)"
)
# =============================================================================
# SIDEBAR - Advanced Settings
# =============================================================================
with st.sidebar.expander("Advanced Settings", expanded=True):
risk_free = st.number_input(
"Risk-Free Rate", value=0.04, step=0.01, format="%.2f",
help="The annualized risk-free rate used in option pricing."
)
min_volume = st.number_input(
"Minimum Volume", value=20, step=1,
help="Minimum trading volume required for an option to be considered liquid."
)
max_spread_ratio = st.number_input(
"Max Spread Ratio", value=0.2, step=0.01, format="%.2f",
help="Maximum acceptable ratio of bid-ask spread to ask price. Options exceeding this will be excluded."
)
# =============================================================================
# Run Analysis Button (placed outside the expanders)
# =============================================================================
run_analysis = st.sidebar.button("Run Analysis")
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def call_bs_price(S, K, T, r, sigma):
if T <= 0:
return max(S - K, 0)
d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
def implied_vol_call(price, S, K, T, r):
if T <= 0:
return np.nan
def f(iv):
return call_bs_price(S, K, T, r, iv) - price
try:
return brentq(f, 1e-9, 5.0)
except:
return np.nan
def build_pdf(K_grid, iv_spline_tck, S, T, r):
iv_vals = BSpline(*iv_spline_tck)(K_grid)
call_prices = [call_bs_price(S, K, T, r, iv) for K, iv in zip(K_grid, iv_vals)]
dC_dK = np.gradient(call_prices, K_grid)
d2C_dK2 = np.gradient(dC_dK, K_grid)
pdf_raw = np.exp(r * T) * d2C_dK2
return np.clip(pdf_raw, 0, None)
def build_cdf(K_grid, pdf_vals):
cdf_vals = []
running = 0.0
for i in range(len(K_grid)):
if i == 0:
cdf_vals.append(0.0)
else:
area = simps(pdf_vals[i-1:i+1], K_grid[i-1:i+1])
running += area
cdf_vals.append(running)
cdf_vals = np.array(cdf_vals)
if cdf_vals[-1] > 0:
cdf_vals /= cdf_vals[-1]
return cdf_vals
# Function to filter illiquid options
def filter_liquid_options(df, min_volume=20, max_spread_ratio=0.2):
spread = df["ask"] - df["bid"]
return df[(spread / df["ask"] < max_spread_ratio) & (df["bid"] > 0) & (df["volume"] >= min_volume)]
# =============================================================================
# 3D ANALYSIS FUNCTION (CALLS ONLY)
# =============================================================================
def compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio):
all_expirations = data_ticker.options
valid_expiries = []
days_list = []
calls_data_dict = {}
# First pass: collect calls data from valid expiries.
for exp_date in all_expirations:
try:
expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d")
except:
continue
days_forward = (expiry_dt - datetime.now()).days
if days_forward < 1:
continue
try:
chain = data_ticker.option_chain(exp_date)
except Exception:
continue
calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy()
calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio)
calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike')
if calls_df.empty:
continue
valid_expiries.append(exp_date)
days_list.append(days_forward)
calls_data_dict[exp_date] = calls_df
if not valid_expiries:
raise ValueError("No valid expiries with call data.")
K_grid_3d = np.linspace(current_price * 0.25, current_price * 3, 300)
pdf_list = []
# Second pass: compute smoothed PDF for each expiry.
for exp_date in valid_expiries:
expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d")
T_val = (expiry_dt - datetime.now()).days / 365.0
calls_df = calls_data_dict[exp_date]
iv_vals = []
for _, row in calls_df.iterrows():
vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r)
iv_vals.append(vol)
calls_df['iv'] = iv_vals
calls_df.dropna(subset=['iv'], inplace=True)
if calls_df.empty:
pdf_list.append(np.zeros_like(K_grid_3d))
continue
strikes = calls_df['strike'].values
ivs = calls_df['iv'].values
try:
iv_spline_tck = splrep(strikes, ivs, s=10, k=3)
except Exception:
pdf_list.append(np.zeros_like(K_grid_3d))
continue
pdf_raw = build_pdf(K_grid_3d, iv_spline_tck, current_price, T_val, r)
try:
kde = gaussian_kde(K_grid_3d, weights=pdf_raw)
pdf_smooth = kde(K_grid_3d)
area = np.trapz(pdf_smooth, K_grid_3d)
if area > 0:
pdf_smooth /= area
except Exception:
pdf_smooth = pdf_raw
pdf_list.append(pdf_smooth)
pdf_matrix = np.array(pdf_list)
days_array = np.array(days_list)
TT, KK = np.meshgrid(days_array, K_grid_3d, indexing='ij')
fig = go.Figure(data=[go.Surface(
x=KK,
y=TT,
z=pdf_matrix,
colorscale='Viridis',
opacity=0.8
)])
fig.update_layout(
scene=dict(
xaxis_title='Strike',
yaxis_title='Days to Expiry',
zaxis_title='PDF'
),
title="3D Smoothed Implied PDF Across Expiries",
width=900,
height=700
)
fig.add_annotation(
x=0.98, y=0.98, xref="paper", yref="paper",
text=f"Current Price: {current_price:.2f}",
showarrow=False,
align="right",
font=dict(size=12),
bordercolor="black",
borderwidth=1,
#bgcolor="white",
opacity=0.8
)
return fig, valid_expiries
# =============================================================================
# 2D ANALYSIS FUNCTION (CALLS ONLY)
# =============================================================================
def compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio):
try:
expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d")
except:
return None
days_forward = (expiry_dt - datetime.now()).days
if days_forward < 1:
return None
T_val = days_forward / 365.0
try:
chain = data_ticker.option_chain(exp_date)
except:
return None
calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy()
calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio)
calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike')
if calls_df.empty:
return None
iv_list = []
for _, row in calls_df.iterrows():
vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r)
iv_list.append(vol)
calls_df['iv'] = iv_list
calls_df.dropna(subset=['iv'], inplace=True)
if calls_df.empty:
return None
strikes = calls_df['strike'].values
ivs = calls_df['iv'].values
try:
iv_spline_tck = splrep(strikes, ivs, s=10, k=3)
except:
return None
K_min = strikes.min()
K_max = strikes.max()
K_grid_2d = np.linspace(K_min, K_max, 300)
pdf_raw = build_pdf(K_grid_2d, iv_spline_tck, current_price, T_val, r)
try:
kde = gaussian_kde(K_grid_2d, weights=pdf_raw)
pdf_smooth = kde(K_grid_2d)
area = np.trapz(pdf_smooth, K_grid_2d)
if area > 0:
pdf_smooth /= area
except:
pdf_smooth = pdf_raw
cdf = build_cdf(K_grid_2d, pdf_smooth)
lower_thresh = current_price * (1 - lower_pct / 100)
upper_thresh = current_price * (1 + upper_pct / 100)
mask_below = K_grid_2d < lower_thresh
mask_between = (K_grid_2d >= lower_thresh) & (K_grid_2d <= upper_thresh)
mask_above = K_grid_2d > upper_thresh
p_below = np.trapz(pdf_smooth[mask_below], K_grid_2d[mask_below])
p_between = np.trapz(pdf_smooth[mask_between], K_grid_2d[mask_between])
p_above = np.trapz(pdf_smooth[mask_above], K_grid_2d[mask_above])
fig_pdf_cdf = make_subplots(rows=1, cols=2, subplot_titles=("Smoothed PDF", "Smoothed CDF"))
fig_pdf_cdf.add_trace(go.Scatter(
x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue')
), row=1, col=1)
fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=1)
fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=1)
fig_pdf_cdf.update_yaxes(title_text="PDF", row=1, col=1)
fig_pdf_cdf.add_trace(go.Scatter(
x=K_grid_2d, y=cdf, mode='lines', name='CDF', line=dict(color='blue')
), row=1, col=2)
fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=2)
fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=2)
fig_pdf_cdf.update_yaxes(title_text="CDF", row=1, col=2)
fig_pdf_cdf.update_layout(title_text="2D Analysis: PDF and CDF")
fig_pdf_cdf.add_annotation(
x=0.98, y=0.98, xref="paper", yref="paper",
text=f"Current Price: {current_price:.2f}",
showarrow=False,
align="right",
font=dict(size=12),
#bordercolor="white",
borderwidth=1,
opacity=0.8
)
fig_threshold = go.Figure()
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue')
))
fig_threshold.add_vline(
x=lower_thresh,
line=dict(color='orange', dash='dash'),
annotation_text=f'Lower: {lower_thresh:.2f}',
annotation_position="top left",
annotation_xshift=10,
annotation_yshift=-10
)
fig_threshold.add_vline(
x=upper_thresh,
line=dict(color='purple', dash='dash'),
annotation_text=f'Upper: {upper_thresh:.2f}',
annotation_position="top right",
annotation_xshift=-10,
annotation_yshift=-10
)
fig_threshold.add_vline(
x=current_price,
line=dict(color='red', dash='dash'),
annotation_text=f'Current: {current_price:.2f}',
annotation_position="bottom right",
annotation_xshift=-10,
annotation_yshift=10
)
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d[mask_below], y=pdf_smooth[mask_below], mode='lines',
fill='tozeroy', line=dict(color='lightblue'), showlegend=False
))
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d[mask_between], y=pdf_smooth[mask_between], mode='lines',
fill='tozeroy', line=dict(color='lightgrey'), showlegend=False
))
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d[mask_above], y=pdf_smooth[mask_above], mode='lines',
fill='tozeroy', line=dict(color='lightcoral'), showlegend=False
))
fig_threshold.update_layout(
title="Threshold Probability Plot",
xaxis_title="Strike",
yaxis_title="PDF"
)
annotation_text = (
f"Probability below {lower_thresh:.2f} is {p_below:.2%}
"
f"Probability between {lower_thresh:.2f} and {upper_thresh:.2f} is {p_between:.2%}
"
f"Probability above {upper_thresh:.2f} is {p_above:.2%}"
)
fig_threshold.add_annotation(
x=0.75, y=0.5, xref="paper", yref="paper",
text=annotation_text,
showarrow=False,
align="left",
font=dict(size=12),
#bordercolor="white",
borderwidth=1,
opacity=0.8
)
fig_threshold.add_annotation(
x=0.98, y=0.98, xref="paper", yref="paper",
text=f"Current Price: {current_price:.2f}",
showarrow=False,
align="right",
font=dict(size=12),
bordercolor="black",
borderwidth=1,
#bgcolor="white",
opacity=0.8
)
result = {
"K_grid_2d": K_grid_2d,
"pdf_smooth": pdf_smooth,
"cdf": cdf,
"lower_thresh": lower_thresh,
"upper_thresh": upper_thresh,
"p_below": p_below,
"p_between": p_between,
"p_above": p_above,
"fig_pdf_cdf": fig_pdf_cdf,
"fig_threshold": fig_threshold,
"days_to_exp": days_forward
}
return result
# =============================================================================
# MAIN RUN (only run when the button is clicked)
# =============================================================================
if run_analysis:
with st.spinner("Running analysis, please wait..."):
try:
data_ticker = yf.Ticker(ticker_input)
hist_data = data_ticker.history(period="1d")
if hist_data.empty:
st.error("No price data found.")
st.stop()
current_price = hist_data["Close"].iloc[-1]
except Exception as e:
st.error(f"Error fetching data: {e}")
st.stop()
st.write(f"Current Price: {round(current_price, 2)}")
r = risk_free
try:
fig3d, valid_expiries_3d = compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio)
except Exception as e:
st.error(f"3D analysis error: {e}")
st.stop()
results_2d = {}
for exp_date in data_ticker.options:
res = compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio)
if res is not None:
results_2d[exp_date] = res
if not results_2d:
st.error("No valid expirations for 2D analysis.")
st.stop()
st.session_state.analysis_data = {
"current_price": current_price,
"expirations": list(results_2d.keys()),
"results": results_2d,
"fig3d": fig3d
}
# =============================================================================
# DISPLAY RESULTS (if analysis data exists)
# =============================================================================
if "analysis_data" in st.session_state:
ad = st.session_state.analysis_data
st.write(f"**Current Price:** {round(ad['current_price'], 2)}")
st.markdown("## 3D Probability Surface")
st.plotly_chart(ad["fig3d"], use_container_width=True)
st.markdown("## 2D Plots for Selected Expiration Date")
chosen = st.selectbox("Choose expiration date:", options=ad["expirations"])
res2d = ad["results"][chosen]
st.plotly_chart(res2d["fig_pdf_cdf"], use_container_width=True)
st.plotly_chart(res2d["fig_threshold"], use_container_width=True)
st.write("The 2D plots use calls data only. The 3D surface uses a unified strike grid.")
else:
st.info("Click 'Run Analysis' to start.")
hide_streamlit_style = """
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)