Space53 / app.py
QuantumLearner's picture
Update app.py
64c0582 verified
import streamlit as st
import yfinance as yf
import numpy as np
import pandas as pd
from datetime import datetime
from scipy.optimize import brentq
from scipy.stats import norm, gaussian_kde
from scipy.interpolate import splrep, BSpline
from scipy.integrate import simps
import plotly.graph_objects as go
from plotly.subplots import make_subplots
st.set_page_config(layout="wide", page_title="Forward-Looking Probability")
st.markdown("## Forward-Looking Market-Implied Probability Distribution")
st.markdown("#### Option-Based Price Forecasting Using Implied Volatility")
st.write(
"This tool analyzes the implied probability distribution of a stock's future price using call option data. "
"It calculates implied volatilities via the Black-Scholes model, derives a risk-neutral probability density function using "
"the Breeden-Litzenberger formula, and then smooths the result with Kernel Density Estimation (KDE). "
"A unified strike grid is used for the 3D surface, while 2D analysis focuses on individual expiration dates."
)
with st.expander("How It Works", expanded=False):
st.write("The analysis is based on the Black-Scholes model for European call options:")
st.latex(r"C(S,K,T,r,\sigma)=S\Phi(d_1)-Ke^{-rT}\Phi(d_2)")
st.latex(r"d_1=\frac{\ln\left(\frac{S}{K}\right)+(r+0.5\sigma^2)T}{\sigma\sqrt{T}}")
st.latex(r"d_2=d_1-\sigma\sqrt{T}")
st.write("The risk-neutral probability density function (PDF) is derived using the Breeden-Litzenberger formula:")
st.latex(r"\text{PDF}(K)=e^{rT}\frac{\partial^2C}{\partial K^2}")
st.write("The resulting PDF is then smoothed using Kernel Density Estimation (KDE).")
# =============================================================================
# SIDEBAR - General Settings
# =============================================================================
st.sidebar.title("Parameters")
with st.sidebar.expander("General Settings", expanded=True):
ticker_input = st.text_input("Ticker Symbol", value="NVDA")
lower_pct = st.number_input(
"Price % Decrease", value=10, min_value=1, max_value=100, step=1,
help="For 2D plots: lower threshold = current price * (1 - percentage/100)"
)
upper_pct = st.number_input(
"Price % Increase", value=10, min_value=1, max_value=100, step=1,
help="For 2D plots: upper threshold = current price * (1 + percentage/100)"
)
# =============================================================================
# SIDEBAR - Advanced Settings
# =============================================================================
with st.sidebar.expander("Advanced Settings", expanded=True):
risk_free = st.number_input(
"Risk-Free Rate", value=0.04, step=0.01, format="%.2f",
help="The annualized risk-free rate used in option pricing."
)
min_volume = st.number_input(
"Minimum Volume", value=20, step=1,
help="Minimum trading volume required for an option to be considered liquid."
)
max_spread_ratio = st.number_input(
"Max Spread Ratio", value=0.2, step=0.01, format="%.2f",
help="Maximum acceptable ratio of bid-ask spread to ask price. Options exceeding this will be excluded."
)
# =============================================================================
# Run Analysis Button (placed outside the expanders)
# =============================================================================
run_analysis = st.sidebar.button("Run Analysis")
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def call_bs_price(S, K, T, r, sigma):
if T <= 0:
return max(S - K, 0)
d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
d2 = d1 - sigma * np.sqrt(T)
return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
def implied_vol_call(price, S, K, T, r):
if T <= 0:
return np.nan
def f(iv):
return call_bs_price(S, K, T, r, iv) - price
try:
return brentq(f, 1e-9, 5.0)
except:
return np.nan
def build_pdf(K_grid, iv_spline_tck, S, T, r):
iv_vals = BSpline(*iv_spline_tck)(K_grid)
call_prices = [call_bs_price(S, K, T, r, iv) for K, iv in zip(K_grid, iv_vals)]
dC_dK = np.gradient(call_prices, K_grid)
d2C_dK2 = np.gradient(dC_dK, K_grid)
pdf_raw = np.exp(r * T) * d2C_dK2
return np.clip(pdf_raw, 0, None)
def build_cdf(K_grid, pdf_vals):
cdf_vals = []
running = 0.0
for i in range(len(K_grid)):
if i == 0:
cdf_vals.append(0.0)
else:
area = simps(pdf_vals[i-1:i+1], K_grid[i-1:i+1])
running += area
cdf_vals.append(running)
cdf_vals = np.array(cdf_vals)
if cdf_vals[-1] > 0:
cdf_vals /= cdf_vals[-1]
return cdf_vals
# Function to filter illiquid options
def filter_liquid_options(df, min_volume=20, max_spread_ratio=0.2):
spread = df["ask"] - df["bid"]
return df[(spread / df["ask"] < max_spread_ratio) & (df["bid"] > 0) & (df["volume"] >= min_volume)]
# =============================================================================
# 3D ANALYSIS FUNCTION (CALLS ONLY)
# =============================================================================
def compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio):
all_expirations = data_ticker.options
valid_expiries = []
days_list = []
calls_data_dict = {}
# First pass: collect calls data from valid expiries.
for exp_date in all_expirations:
try:
expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d")
except:
continue
days_forward = (expiry_dt - datetime.now()).days
if days_forward < 1:
continue
try:
chain = data_ticker.option_chain(exp_date)
except Exception:
continue
calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy()
calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio)
calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike')
if calls_df.empty:
continue
valid_expiries.append(exp_date)
days_list.append(days_forward)
calls_data_dict[exp_date] = calls_df
if not valid_expiries:
raise ValueError("No valid expiries with call data.")
K_grid_3d = np.linspace(current_price * 0.25, current_price * 3, 300)
pdf_list = []
# Second pass: compute smoothed PDF for each expiry.
for exp_date in valid_expiries:
expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d")
T_val = (expiry_dt - datetime.now()).days / 365.0
calls_df = calls_data_dict[exp_date]
iv_vals = []
for _, row in calls_df.iterrows():
vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r)
iv_vals.append(vol)
calls_df['iv'] = iv_vals
calls_df.dropna(subset=['iv'], inplace=True)
if calls_df.empty:
pdf_list.append(np.zeros_like(K_grid_3d))
continue
strikes = calls_df['strike'].values
ivs = calls_df['iv'].values
try:
iv_spline_tck = splrep(strikes, ivs, s=10, k=3)
except Exception:
pdf_list.append(np.zeros_like(K_grid_3d))
continue
pdf_raw = build_pdf(K_grid_3d, iv_spline_tck, current_price, T_val, r)
try:
kde = gaussian_kde(K_grid_3d, weights=pdf_raw)
pdf_smooth = kde(K_grid_3d)
area = np.trapz(pdf_smooth, K_grid_3d)
if area > 0:
pdf_smooth /= area
except Exception:
pdf_smooth = pdf_raw
pdf_list.append(pdf_smooth)
pdf_matrix = np.array(pdf_list)
days_array = np.array(days_list)
TT, KK = np.meshgrid(days_array, K_grid_3d, indexing='ij')
fig = go.Figure(data=[go.Surface(
x=KK,
y=TT,
z=pdf_matrix,
colorscale='Viridis',
opacity=0.8
)])
fig.update_layout(
scene=dict(
xaxis_title='Strike',
yaxis_title='Days to Expiry',
zaxis_title='PDF'
),
title="3D Smoothed Implied PDF Across Expiries",
width=900,
height=700
)
fig.add_annotation(
x=0.98, y=0.98, xref="paper", yref="paper",
text=f"Current Price: {current_price:.2f}",
showarrow=False,
align="right",
font=dict(size=12),
bordercolor="black",
borderwidth=1,
#bgcolor="white",
opacity=0.8
)
return fig, valid_expiries
# =============================================================================
# 2D ANALYSIS FUNCTION (CALLS ONLY)
# =============================================================================
def compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio):
try:
expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d")
except:
return None
days_forward = (expiry_dt - datetime.now()).days
if days_forward < 1:
return None
T_val = days_forward / 365.0
try:
chain = data_ticker.option_chain(exp_date)
except:
return None
calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy()
calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio)
calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike')
if calls_df.empty:
return None
iv_list = []
for _, row in calls_df.iterrows():
vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r)
iv_list.append(vol)
calls_df['iv'] = iv_list
calls_df.dropna(subset=['iv'], inplace=True)
if calls_df.empty:
return None
strikes = calls_df['strike'].values
ivs = calls_df['iv'].values
try:
iv_spline_tck = splrep(strikes, ivs, s=10, k=3)
except:
return None
K_min = strikes.min()
K_max = strikes.max()
K_grid_2d = np.linspace(K_min, K_max, 300)
pdf_raw = build_pdf(K_grid_2d, iv_spline_tck, current_price, T_val, r)
try:
kde = gaussian_kde(K_grid_2d, weights=pdf_raw)
pdf_smooth = kde(K_grid_2d)
area = np.trapz(pdf_smooth, K_grid_2d)
if area > 0:
pdf_smooth /= area
except:
pdf_smooth = pdf_raw
cdf = build_cdf(K_grid_2d, pdf_smooth)
lower_thresh = current_price * (1 - lower_pct / 100)
upper_thresh = current_price * (1 + upper_pct / 100)
mask_below = K_grid_2d < lower_thresh
mask_between = (K_grid_2d >= lower_thresh) & (K_grid_2d <= upper_thresh)
mask_above = K_grid_2d > upper_thresh
p_below = np.trapz(pdf_smooth[mask_below], K_grid_2d[mask_below])
p_between = np.trapz(pdf_smooth[mask_between], K_grid_2d[mask_between])
p_above = np.trapz(pdf_smooth[mask_above], K_grid_2d[mask_above])
fig_pdf_cdf = make_subplots(rows=1, cols=2, subplot_titles=("Smoothed PDF", "Smoothed CDF"))
fig_pdf_cdf.add_trace(go.Scatter(
x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue')
), row=1, col=1)
fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=1)
fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=1)
fig_pdf_cdf.update_yaxes(title_text="PDF", row=1, col=1)
fig_pdf_cdf.add_trace(go.Scatter(
x=K_grid_2d, y=cdf, mode='lines', name='CDF', line=dict(color='blue')
), row=1, col=2)
fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=2)
fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=2)
fig_pdf_cdf.update_yaxes(title_text="CDF", row=1, col=2)
fig_pdf_cdf.update_layout(title_text="2D Analysis: PDF and CDF")
fig_pdf_cdf.add_annotation(
x=0.98, y=0.98, xref="paper", yref="paper",
text=f"Current Price: {current_price:.2f}",
showarrow=False,
align="right",
font=dict(size=12),
#bordercolor="white",
borderwidth=1,
opacity=0.8
)
fig_threshold = go.Figure()
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue')
))
fig_threshold.add_vline(
x=lower_thresh,
line=dict(color='orange', dash='dash'),
annotation_text=f'Lower: {lower_thresh:.2f}',
annotation_position="top left",
annotation_xshift=10,
annotation_yshift=-10
)
fig_threshold.add_vline(
x=upper_thresh,
line=dict(color='purple', dash='dash'),
annotation_text=f'Upper: {upper_thresh:.2f}',
annotation_position="top right",
annotation_xshift=-10,
annotation_yshift=-10
)
fig_threshold.add_vline(
x=current_price,
line=dict(color='red', dash='dash'),
annotation_text=f'Current: {current_price:.2f}',
annotation_position="bottom right",
annotation_xshift=-10,
annotation_yshift=10
)
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d[mask_below], y=pdf_smooth[mask_below], mode='lines',
fill='tozeroy', line=dict(color='lightblue'), showlegend=False
))
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d[mask_between], y=pdf_smooth[mask_between], mode='lines',
fill='tozeroy', line=dict(color='lightgrey'), showlegend=False
))
fig_threshold.add_trace(go.Scatter(
x=K_grid_2d[mask_above], y=pdf_smooth[mask_above], mode='lines',
fill='tozeroy', line=dict(color='lightcoral'), showlegend=False
))
fig_threshold.update_layout(
title="Threshold Probability Plot",
xaxis_title="Strike",
yaxis_title="PDF"
)
annotation_text = (
f"Probability below {lower_thresh:.2f} is {p_below:.2%}<br>"
f"Probability between {lower_thresh:.2f} and {upper_thresh:.2f} is {p_between:.2%}<br>"
f"Probability above {upper_thresh:.2f} is {p_above:.2%}"
)
fig_threshold.add_annotation(
x=0.75, y=0.5, xref="paper", yref="paper",
text=annotation_text,
showarrow=False,
align="left",
font=dict(size=12),
#bordercolor="white",
borderwidth=1,
opacity=0.8
)
fig_threshold.add_annotation(
x=0.98, y=0.98, xref="paper", yref="paper",
text=f"Current Price: {current_price:.2f}",
showarrow=False,
align="right",
font=dict(size=12),
bordercolor="black",
borderwidth=1,
#bgcolor="white",
opacity=0.8
)
result = {
"K_grid_2d": K_grid_2d,
"pdf_smooth": pdf_smooth,
"cdf": cdf,
"lower_thresh": lower_thresh,
"upper_thresh": upper_thresh,
"p_below": p_below,
"p_between": p_between,
"p_above": p_above,
"fig_pdf_cdf": fig_pdf_cdf,
"fig_threshold": fig_threshold,
"days_to_exp": days_forward
}
return result
# =============================================================================
# MAIN RUN (only run when the button is clicked)
# =============================================================================
if run_analysis:
with st.spinner("Running analysis, please wait..."):
try:
data_ticker = yf.Ticker(ticker_input)
hist_data = data_ticker.history(period="1d")
if hist_data.empty:
st.error("No price data found.")
st.stop()
current_price = hist_data["Close"].iloc[-1]
except Exception as e:
st.error(f"Error fetching data: {e}")
st.stop()
st.write(f"Current Price: {round(current_price, 2)}")
r = risk_free
try:
fig3d, valid_expiries_3d = compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio)
except Exception as e:
st.error(f"3D analysis error: {e}")
st.stop()
results_2d = {}
for exp_date in data_ticker.options:
res = compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio)
if res is not None:
results_2d[exp_date] = res
if not results_2d:
st.error("No valid expirations for 2D analysis.")
st.stop()
st.session_state.analysis_data = {
"current_price": current_price,
"expirations": list(results_2d.keys()),
"results": results_2d,
"fig3d": fig3d
}
# =============================================================================
# DISPLAY RESULTS (if analysis data exists)
# =============================================================================
if "analysis_data" in st.session_state:
ad = st.session_state.analysis_data
st.write(f"**Current Price:** {round(ad['current_price'], 2)}")
st.markdown("## 3D Probability Surface")
st.plotly_chart(ad["fig3d"], use_container_width=True)
st.markdown("## 2D Plots for Selected Expiration Date")
chosen = st.selectbox("Choose expiration date:", options=ad["expirations"])
res2d = ad["results"][chosen]
st.plotly_chart(res2d["fig_pdf_cdf"], use_container_width=True)
st.plotly_chart(res2d["fig_threshold"], use_container_width=True)
st.write("The 2D plots use calls data only. The 3D surface uses a unified strike grid.")
else:
st.info("Click 'Run Analysis' to start.")
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)