Space68 / app.py
QuantumLearner's picture
Update app.py
cce7792 verified
import streamlit as st
import numpy as np
import pandas as pd
import yfinance as yf
import warnings
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import datetime
warnings.filterwarnings('ignore')
# Set wide page layout
st.set_page_config(page_title="Spread Estimation", layout="wide")
st.title("Spread Estimation")
st.write("This application estimates the rolling bid-ask spread using a rolling window estimator on OHLC prices. Each chart below shows volume, close price, rolling volatility, and rolling spread. Use the expanders to see additional analysis for each interval. For further details on the methodology, please see [this article](https://entreprenerdly.com/estimating-bid-ask-spreads-using-ohlc-prices/).")
with st.expander("Theory and Methodology ", expanded=False):
st.markdown(r"""
#### Methodology for Estimating Rolling Bid-Ask Spreads
This function allows for rolling estimation of bid-ask spreads. It is suitable for analyzing transaction costs over time.
##### Bid-Ask Spread Definition
The **effective bid-ask spread** measures the deviation of observed transaction prices from the unobserved fundamental price. Formally, for a given trade:
$$
S = \frac{2D (P - P^*)}{P^*}
$$
where:
- \( P \) is the observed transaction price.
- \( P^* \) is the unobserved fundamental price.
- \( D \) is a trade direction indicator (+1 for buy, -1 for sell).
Since \( P^* \) is unobserved, various methods exist to estimate the spread using **low-frequency data (OHLC prices)**.
##### Improving Estimation with OHLC Prices
The function implements an estimator that:
1. Uses **open, high, low, and close** prices.
2. Corrects for **discretely observed prices** to avoid biases.
3. Uses the **Generalized Method of Moments (GMM)** to minimize estimation variance.
##### Moment Conditions
We define multiple log-returns:
- **Midpoint log-prices**:
$$
\eta_t = \frac{\log(H_t) + \log(L_t)}{2}
$$
- **Returns used for estimation**:
$$
r_1 = \eta_t - o_t,\quad r_2 = o_t - \eta_{t-1},\quad r_3 = \eta_t - c_{t-1},\quad r_4 = c_{t-1} - \eta_{t-1},\quad r_5 = o_t - c_{t-1}
$$
where \( o_t \), \( c_t \), \( H_t \), and \( L_t \) are the log-transformed **open, close, high, and low** prices.
A key step is defining **indicators for price variation**:
$$
\tau_t =
\begin{cases}
0, & \text{if } H_t = L_t = c_{t-1} \\\\
1, & \text{otherwise}
\end{cases}
$$
This handles cases when prices remain unchanged and prevents overestimation.
#### Estimator Calculation
Using the moment conditions, the estimator is:
$$
S^2 = -8\,\frac{E[(\eta_t - o_t)(o_t - c_{t-1})]}{P[o_t \neq H_t,\, \tau_t = 1] + P[o_t \neq L_t,\, \tau_t = 1]}
$$
Multiple estimators are computed and combined using GMM weighting:
$$
S^2_{\text{EDGE}} = w_1\,E[x_1] + w_2\,E[x_2]
$$
where \( w_1 \) and \( w_2 \) are chosen to minimize variance:
$$
w_1 = \frac{\operatorname{Var}[x_2]}{\operatorname{Var}[x_1] + \operatorname{Var}[x_2]},\quad
w_2 = \frac{\operatorname{Var}[x_1]}{\operatorname{Var}[x_1] + \operatorname{Var}[x_2]}
$$
Finally, the **spread estimate** is given by:
$$
S = \sqrt{\max(0, S^2)}
$$
##### Rolling Estimation
The estimator uses a **rolling window approach**:
- The window size is user-defined.
- The estimates update dynamically, allowing for time-varying spread analysis.
- Negative values are reset to zero for consistency.
##### Implementation
The function `edge_rolling` computes **rolling bid-ask spread estimates** from OHLC prices. It accepts:
- `data`: A DataFrame with `open`, `high`, `low`, and `close` prices.
- `window`: The rolling window size.
- `sign`: Boolean to preserve the sign of the estimate.
- `kwargs`: Additional arguments for the Pandas rolling function.
This estimator improves accuracy in markets with varying trading frequencies.
For further details, see:
- [Ardia, D., Guidotti, E., & Kroencke, T. (2024). Efficient Estimation of Bid-Ask Spreads from OHLC Prices. Journal of Financial Economics.](https://doi.org/10.1016/j.jfineco.2024.103916)
""", unsafe_allow_html=True)
with st.sidebar:
with st.expander("User Inputs", expanded=True):
ticker = st.text_input(
"Ticker", value="CVNA",
help="Enter the ticker symbol or cryptopair (e.g., 'AAPL', 'BTC-USD')."
)
start_date = st.date_input(
"Start Date", value=pd.to_datetime("2022-01-01"),
help="Select the start date for the analysis."
)
default_end_date = datetime.date.today() + datetime.timedelta(days=1)
end_date = st.date_input(
"End Date", value=default_end_date,
help="Select the end date for the analysis (default is tomorrow)."
)
run_analysis = st.button("Run Analysis")
# Function to compute the rolling spread estimate
def edge_rolling(data: pd.DataFrame, window: int, sign: bool = False, **kwargs) -> pd.Series:
df = data.rename(columns=str.lower, inplace=False)
log_open = np.log(df['open'])
log_high = np.log(df['high'])
log_low = np.log(df['low'])
log_close = np.log(df['close'])
log_mid = (log_high + log_low) / 2.0
log_high_prev = log_high.shift(1)
log_low_prev = log_low.shift(1)
log_close_prev = log_close.shift(1)
log_mid_prev = log_mid.shift(1)
r1 = log_mid - log_open
r2 = log_open - log_mid_prev
r3 = log_mid - log_close_prev
r4 = log_close_prev - log_mid_prev
r5 = log_open - log_close_prev
tau = np.where(
np.isnan(log_high) | np.isnan(log_low) | np.isnan(log_close_prev),
np.nan,
(log_high != log_low) | (log_low != log_close_prev)
)
ind_o_h = tau * np.where(np.isnan(log_open) | np.isnan(log_high), np.nan, log_open != log_high)
ind_o_l = tau * np.where(np.isnan(log_open) | np.isnan(log_low), np.nan, log_open != log_low)
ind_c_h = tau * np.where(np.isnan(log_close_prev) | np.isnan(log_high_prev), np.nan, log_close_prev != log_high_prev)
ind_c_l = tau * np.where(np.isnan(log_close_prev) | np.isnan(log_low_prev), np.nan, log_close_prev != log_low_prev)
prod_12 = r1 * r2
prod_34 = r3 * r4
prod_15 = r1 * r5
prod_45 = r4 * r5
tau_r1 = tau * r1
tau_r2 = tau * r2
tau_r4 = tau * r4
tau_r5 = tau * r5
vals = pd.DataFrame({
'prod_12': prod_12,
'prod_34': prod_34,
'prod_15': prod_15,
'prod_45': prod_45,
'tau': tau,
'r1': r1,
'tau_r2': tau_r2,
'r3': r3,
'tau_r4': tau_r4,
'r5': r5,
'prod_12_sq': prod_12 ** 2,
'prod_34_sq': prod_34 ** 2,
'prod_15_sq': prod_15 ** 2,
'prod_45_sq': prod_45 ** 2,
'prod_12_34': prod_12 * prod_34,
'prod_15_45': prod_15 * prod_45,
'tau_r2_r2': tau_r2 * r2,
'tau_r4_r4': tau_r4 * r4,
'tau_r5_r5': tau_r5 * r5,
'tau_r2_prod12': tau_r2 * prod_12,
'tau_r4_prod34': tau_r4 * prod_34,
'tau_r5_prod15': tau_r5 * prod_15,
'tau_r4_prod45': tau_r4 * prod_45,
'tau_r4_prod12': tau_r4 * prod_12,
'tau_r2_prod34': tau_r2 * prod_34,
'tau_r2_r4': tau_r2 * r4,
'tau_r1_prod45': tau_r1 * prod_45,
'tau_r5_prod45': tau_r5 * prod_45,
'tau_r4_r5': tau_r4 * r5,
'tau_r5_only': tau_r5,
'ind_o_h': ind_o_h,
'ind_o_l': ind_o_l,
'ind_c_h': ind_c_h,
'ind_c_l': ind_c_l
}, index=df.index)
vals.iloc[0] = np.nan
window_adj = window - 1 if isinstance(window, (int, np.integer)) else window
if 'min_periods' in kwargs and isinstance(kwargs['min_periods'], (int, np.integer)):
kwargs['min_periods'] = max(0, kwargs['min_periods'] - 1)
roll_vals = vals.rolling(window=window_adj, **kwargs).mean()
p_tau = roll_vals['tau']
p_open = roll_vals['ind_o_h'] + roll_vals['ind_o_l']
p_close = roll_vals['ind_c_h'] + roll_vals['ind_c_l']
count_tau = vals['tau'].rolling(window=window_adj, **kwargs).sum()
roll_vals[(count_tau < 2) | (p_open == 0) | (p_close == 0)] = np.nan
a1 = -4.0 / p_open
a2 = -4.0 / p_close
a3 = roll_vals['r1'] / p_tau
a4 = roll_vals['tau_r4'] / p_tau
a5 = roll_vals['r3'] / p_tau
a6 = roll_vals['r5'] / p_tau
a12 = 2 * a1 * a2
a11 = a1 ** 2
a22 = a2 ** 2
a33 = a3 ** 2
a55 = a5 ** 2
a66 = a6 ** 2
E1 = a1 * (roll_vals['prod_12'] - a3 * roll_vals['tau_r2']) + \
a2 * (roll_vals['prod_34'] - a4 * roll_vals['r3'])
E2 = a1 * (roll_vals['prod_15'] - a3 * roll_vals['tau_r5_only']) + \
a2 * (roll_vals['prod_45'] - a4 * roll_vals['r5'])
V1 = - E1**2 + (
a11 * (roll_vals['prod_12_sq'] - 2 * a3 * roll_vals['tau_r2_prod12'] + a33 * roll_vals['tau_r2_r2']) +
a22 * (roll_vals['prod_34_sq'] - 2 * a5 * roll_vals['tau_r4_prod34'] + a55 * roll_vals['tau_r4_r4']) +
a12 * (roll_vals['prod_12_34'] - a3 * roll_vals['tau_r2_prod34'] - a5 * roll_vals['tau_r4_prod12'] + a3 * a5 * roll_vals['tau_r2_r4'])
)
V2 = - E2**2 + (
a11 * (roll_vals['prod_15_sq'] - 2 * a3 * roll_vals['tau_r5_prod15'] + a33 * roll_vals['tau_r5_r5']) +
a22 * (roll_vals['prod_45_sq'] - 2 * a6 * roll_vals['tau_r4_prod45'] + a66 * roll_vals['tau_r4_r4']) +
a12 * (roll_vals['prod_15_45'] - a3 * roll_vals['tau_r5_prod45'] - a6 * roll_vals['tau_r4_r5'] + a3 * a6 * roll_vals['tau_r4_r5'])
)
tot_var = V1 + V2
s2 = np.where(tot_var > 0, (V2 * E1 + V1 * E2) / tot_var, (E1 + E2) / 2.0)
spread = np.sqrt(np.abs(s2))
if sign:
spread *= np.sign(s2)
return pd.Series(spread, index=df.index)
# Download data function supporting different intervals
def download_data(ticker, start, end, interval="1d"):
if interval in ["1d", "1wk", "1mo"]:
data = yf.download(ticker, start=start, end=end, interval=interval, auto_adjust=True)
return data
else:
period_mapping = {"1m": "8d", "5m": "60d", "60m": "300d"} #changed this from 720 to 300
if interval in period_mapping:
period = period_mapping[interval]
data = yf.download(ticker, period=period, interval=interval, auto_adjust=True)
return data
else:
data = yf.download(ticker, start=start, end=end, interval=interval, auto_adjust=True)
return data
# Run analysis when button is clicked
if run_analysis:
start_date_str = pd.to_datetime(start_date).strftime("%Y-%m-%d")
end_date_str = pd.to_datetime(end_date).strftime("%Y-%m-%d")
intervals = ["1d", "60m", "5m", "1m"]
for interval in intervals:
st.markdown(f"## Spread Estimation at {interval} data")
with st.spinner(f"Downloading {interval} data..."):
data = download_data(ticker, start_date_str, end_date_str, interval=interval)
if data.empty:
st.error(f"No data available for the {interval} interval.")
continue
if isinstance(data.columns, pd.MultiIndex):
data.columns = data.columns.get_level_values(0)
try:
ohlc = data[['Open', 'High', 'Low', 'Close']]
except Exception as e:
st.error("Error processing data columns.")
continue
try:
rolling_spreads = edge_rolling(ohlc, window=15, min_periods=10, sign=False)
except Exception as e:
st.error("Error computing rolling spread.")
continue
data_with_spread = data.copy()
data_with_spread["Spread"] = rolling_spreads
volume = data['Volume']
returns = ohlc['Close'].pct_change()
rolling_vol = returns.rolling(window=15).std()
upper_band = ohlc['Close'] * (1 + rolling_spreads / 2)
lower_band = ohlc['Close'] * (1 - rolling_spreads / 2)
# Create the main Plotly chart
fig_ts = go.Figure()
fig_ts.add_trace(go.Bar(
x=ohlc.index, y=volume,
name="Volume",
marker_color="gray",
opacity=1,
yaxis="y"
))
fig_ts.add_trace(go.Scatter(
x=ohlc.index, y=ohlc['Close'],
mode="lines",
name="Close Price",
line=dict(color="lime"),
yaxis="y2"
))
fig_ts.add_trace(go.Scatter(
x=ohlc.index, y=lower_band,
mode="lines",
name="Lower Band",
line=dict(color="rgba(0,0,0,0)"),
showlegend=False,
yaxis="y2"
))
fig_ts.add_trace(go.Scatter(
x=ohlc.index, y=upper_band,
mode="lines",
name="Spread Band",
line=dict(color="gray"),
fill="tonexty",
fillcolor="rgba(128,128,128,0.3)",
yaxis="y2"
))
fig_ts.add_trace(go.Scatter(
x=ohlc.index, y=rolling_vol,
mode="lines",
name="Rolling Volatility",
line=dict(color="orange", dash="dash"),
yaxis="y3"
))
fig_ts.add_trace(go.Scatter(
x=ohlc.index, y=rolling_spreads,
mode="lines",
name="Rolling Spread",
line=dict(color="blue"),
yaxis="y4"
))
fig_ts.update_layout(
template="plotly_dark",
paper_bgcolor='#0e1117',
plot_bgcolor='#0e1117',
title=dict(text=f"Rolling Spread, Volume, Close Price, and Rolling Volatility ({interval} data)", font=dict(color="white")),
xaxis=dict(
title="Date",
tickformat="%Y-%m-%d",
nticks=20,
showgrid=True,
gridcolor="rgba(255,255,255,0.2)",
color="white",
tickfont=dict(color="white")
),
yaxis=dict(
title="Volume",
side="left",
showgrid=True,
gridcolor="rgba(255,255,255,0.2)",
color="white",
tickfont=dict(color="white")
),
yaxis2=dict(
title="Close Price",
overlaying="y",
side="left",
position=0.05,
showgrid=True,
gridcolor="rgba(255,255,255,0.2)",
color="white",
tickfont=dict(color="white")
),
yaxis3=dict(
title="Rolling Volatility",
overlaying="y",
side="right",
position=0.95,
showgrid=True,
gridcolor="rgba(255,255,255,0.2)",
color="white",
tickfont=dict(color="white")
),
yaxis4=dict(
title="Rolling Spread",
overlaying="y",
side="right",
position=1,
showgrid=True,
gridcolor="rgba(255,255,255,0.2)",
color="white",
tickfont=dict(color="white")
),
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1,
font=dict(color="white")
),
margin=dict(l=50, r=50, t=80, b=50)
)
st.plotly_chart(fig_ts, use_container_width=True)
with st.expander(f"Additional Analysis for {interval}", expanded=False):
st.write("This section shows a preview of the raw data, scatter plots of lagged relationships, and a rolling correlation chart.")
st.subheader("Raw Data Preview")
st.dataframe(data_with_spread, use_container_width=True)
lag_period = 1
lagged_spreads = rolling_spreads.shift(lag_period)
lagged_volume = volume.shift(lag_period)
lagged_vol = rolling_vol.shift(lag_period)
titles = [
f"Returns vs Lagged Spreads ({lag_period})",
f"Spreads vs Lagged Volume ({lag_period})",
f"Returns vs Lagged Volume ({lag_period})",
f"Volatility vs Lagged Spreads ({lag_period})",
f"Lagged Volatility vs Current Spreads ({lag_period})"
]
data_pairs = [
(lagged_spreads, returns),
(lagged_volume, rolling_spreads),
(lagged_volume, returns),
(lagged_spreads, rolling_vol),
(lagged_vol, rolling_spreads)
]
colors = ["blue", "red", "green", "orange", "magenta"]
fig_scatter = make_subplots(
rows=1, cols=5,
shared_xaxes=False, horizontal_spacing=0.08,
subplot_titles=titles
)
for i, (x, y) in enumerate(data_pairs, start=1):
mask = (~x.isna()) & (~y.isna())
fig_scatter.add_trace(go.Scatter(
x=x[mask], y=y[mask],
mode="markers",
marker=dict(color=colors[i-1], size=5),
name=titles[i-1]
), row=1, col=i)
fig_scatter.update_layout(
template="plotly_dark",
paper_bgcolor='#0e1117',
plot_bgcolor='#0e1117',
title=dict(text=f"Lagged Value Analysis ({lag_period})", font=dict(color="white")),
xaxis1=dict(gridcolor="rgba(255,255,255,0.2)", color="white", tickfont=dict(color="white")),
xaxis2=dict(gridcolor="rgba(255,255,255,0.2)", color="white", tickfont=dict(color="white")),
xaxis3=dict(gridcolor="rgba(255,255,255,0.2)", color="white", tickfont=dict(color="white")),
xaxis4=dict(gridcolor="rgba(255,255,255,0.2)", color="white", tickfont=dict(color="white")),
xaxis5=dict(gridcolor="rgba(255,255,255,0.2)", color="white", tickfont=dict(color="white")),
yaxis=dict(gridcolor="rgba(255,255,255,0.2)", color="white", tickfont=dict(color="white")),
legend=dict(font=dict(color="white"))
)
st.plotly_chart(fig_scatter, use_container_width=True)
window_corr = 60
lag_period_corr = 1
lagged_volume_corr = volume.shift(lag_period_corr)
lagged_vol_corr = rolling_vol.shift(lag_period_corr)
rolling_corr_volume_spread = lagged_volume_corr.rolling(window=window_corr).corr(rolling_spreads)
rolling_corr_volatility_spread = lagged_vol_corr.rolling(window=window_corr).corr(rolling_spreads)
fig_corr = go.Figure()
fig_corr.add_trace(go.Scatter(
x=rolling_corr_volume_spread.index, y=rolling_corr_volume_spread,
mode="lines",
name=f"Rolling Corr (Lagged Volume vs Spread, lag={lag_period_corr}, window={window_corr})",
line=dict(color="red")
))
fig_corr.add_trace(go.Scatter(
x=rolling_corr_volatility_spread.index, y=rolling_corr_volatility_spread,
mode="lines",
name=f"Rolling Corr (Lagged Volatility vs Spread, lag={lag_period_corr}, window={window_corr})",
line=dict(color="orange")
))
fig_corr.update_layout(
template="plotly_dark",
paper_bgcolor='#0e1117',
plot_bgcolor='#0e1117',
title=dict(text=f"Rolling Correlations (Window={window_corr}, Lag={lag_period_corr})", font=dict(color="white")),
xaxis=dict(
title="Date",
tickformat="%Y-%m-%d",
nticks=20,
showgrid=True,
gridcolor="rgba(255,255,255,0.2)",
color="white",
tickfont=dict(color="white")
),
yaxis=dict(
title="Correlation",
showgrid=True,
gridcolor="rgba(255,255,255,0.2)",
color="white",
tickfont=dict(color="white")
),
legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1, font=dict(color="white")),
margin=dict(l=50, r=50, t=80, b=50)
)
st.plotly_chart(fig_corr, use_container_width=True)
st.markdown(
"""
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
""",
unsafe_allow_html=True
)