Space78 / app.py
QuantumLearner's picture
Update app.py
1d1b2a6 verified
# app.py — Market Breadth & Momentum (sticky results, no nested expanders)
import os
import io
import time
import math
import random
import threading
import concurrent.futures as cf
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import requests
import streamlit as st
from plotly.subplots import make_subplots
import plotly.graph_objects as go
# ----------------------------- Helpers & Caching -----------------------------
API_KEY = os.getenv("FMP_API_KEY")
MAX_WORKERS = 32
RATE_BACKOFF_MAX = 300
JITTER_SEC = 0.2
# ----------------------------- Page config -----------------------------
st.set_page_config(page_title="Market Breadth & Momentum", layout="wide")
st.title("Market Breadth & Momentum")
st.markdown(
"Tracks index trend, participation, and momentum across constituents. "
"Shows breadth strength (% above key averages), advance–decline behavior, new highs/lows, "
"the McClellan Oscillator, and cross-section momentum heatmaps."
)
# ---------- session state for sticky results ----------
if "run_id" not in st.session_state:
st.session_state.run_id = None
if "last_params" not in st.session_state:
st.session_state.last_params = None
# ----------------------------- Sidebar -----------------------------
with st.sidebar:
st.header("Parameters")
# Each expander is independent (no nesting).
with st.expander("Data Window", expanded=False):
default_start = datetime(2015, 1, 1).date()
default_end = (datetime.today().date() + timedelta(days=1))
start_date = st.date_input(
"Start date",
value=default_start,
min_value=datetime(2000, 1, 1).date(),
max_value=default_end,
help="Earlier start = more history but slower load. Later start = faster."
)
end_date = st.date_input(
"End date",
value=default_end,
min_value=default_start,
max_value=default_end,
help="End date is set to today + 1 by default to include the latest close."
)
with st.expander("Breadth Settings", expanded=False):
sma_fast = st.number_input(
"Fast MA (days)",
value=50, min_value=20, max_value=200, step=5,
help="Used for % above fast MA and index fast MA. Higher = slower, fewer flips."
)
sma_slow = st.number_input(
"Slow MA (days)",
value=200, min_value=100, max_value=400, step=10,
help="Used for % above slow MA and index slow MA. Higher = slower, longer trend focus."
)
vwap_weeks = st.number_input(
"VWAP lookback (weeks)",
value=200, min_value=52, max_value=520, step=4,
help="Anchored weekly VWAP for the index. Higher = more inertia."
)
ad_smooth = st.number_input(
"Adv/Decl smoothing (days)",
value=30, min_value=5, max_value=90, step=5,
help="Smooths advancing/declining counts. Higher = steadier lines."
)
mo_span_fast = st.number_input(
"McClellan fast EMA (days)",
value=19, min_value=5, max_value=30, step=1,
help="Fast EMA for McClellan Oscillator. Smaller = more sensitive."
)
mo_span_slow = st.number_input(
"McClellan slow EMA (days)",
value=39, min_value=10, max_value=60, step=1,
help="Slow EMA for McClellan Oscillator. Larger = smoother baseline."
)
mo_signal_span = st.number_input(
"McClellan signal EMA (days)",
value=9, min_value=3, max_value=20, step=1,
help="Signal line for the oscillator. Crosses indicate momentum turns."
)
with st.expander("Rebased Comparison", expanded=False):
rebase_days = st.number_input(
"Window (trading days)",
value=365, min_value=60, max_value=1000, step=5,
help="Back window for rebased comparison. Longer = more context, smaller features."
)
rebase_base = st.number_input(
"Base level",
value=100, min_value=1, max_value=1000, step=1,
help="Starting level for rebased lines."
)
y_pad = st.slider(
"Y-range padding",
min_value=1, max_value=8, value=3, step=1,
help="Higher padding widens the log-scale y-range."
)
with st.expander("Heatmaps", expanded=False):
heat_last_days = st.number_input(
"Daily return heatmap window (days)",
value=60, min_value=20, max_value=252, step=5,
help="Number of recent sessions for the daily return heatmap."
)
mom_look = st.number_input(
"Momentum lookback (days)",
value=30, min_value=10, max_value=252, step=5,
help="Return horizon for the percentile momentum heatmap."
)
# Buttons: run persists, clear removes results
#colA, colB = st.columns(2)
#with colA:
# run_clicked = st.button("Run Analysis", type="primary", use_container_width=True)
#with colB:
# clear_clicked = st.button("Clear Results", type="secondary", use_container_width=True)
# Buttons: run persists, no clear button
run_clicked = st.button("Run Analysis", type="primary", use_container_width=True)
clear_clicked = False # <- keep the variable so the rest of the code doesn't break
if run_clicked:
# freeze a snapshot of params used for this run
st.session_state.last_params = dict(
start_date=start_date,
end_date=end_date,
sma_fast=int(sma_fast),
sma_slow=int(sma_slow),
vwap_weeks=int(vwap_weeks),
ad_smooth=int(ad_smooth),
mo_span_fast=int(mo_span_fast),
mo_span_slow=int(mo_span_slow),
mo_signal_span=int(mo_signal_span),
rebase_days=int(rebase_days),
rebase_base=float(rebase_base),
y_pad=int(y_pad),
heat_last_days=int(heat_last_days),
mom_look=int(mom_look),
)
st.session_state.run_id = f"{time.time():.0f}"
if clear_clicked:
st.session_state.run_id = None
st.session_state.last_params = None
# If there are no results yet, show a hint and stop rendering heavy stuff.
if not st.session_state.run_id:
st.info("Set your parameters and click **Run Analysis**. Results will persist until you press **Clear Results**.")
st.stop()
# Use the frozen parameters from the last run so the view doesn’t “shift” on rerun.
P = st.session_state.last_params or {}
start_date = P.get("start_date", datetime(2015, 1, 1).date())
end_date = P.get("end_date", (datetime.today().date() + timedelta(days=1)))
sma_fast = P.get("sma_fast", 50)
sma_slow = P.get("sma_slow", 200)
vwap_weeks = P.get("vwap_weeks", 200)
ad_smooth = P.get("ad_smooth", 30)
mo_span_fast = P.get("mo_span_fast", 19)
mo_span_slow = P.get("mo_span_slow", 39)
mo_signal_span = P.get("mo_signal_span", 9)
rebase_days = P.get("rebase_days", 365)
rebase_base = P.get("rebase_base", 100.0)
y_pad = P.get("y_pad", 3)
heat_last_days = P.get("heat_last_days", 60)
mom_look = P.get("mom_look", 30)
st.caption(
f"Showing results for **Start** {start_date} → **End** {end_date} | "
f"50/200 MAs = {sma_fast}/{sma_slow} | VWAP weeks = {vwap_weeks} | "
f"AD smooth = {ad_smooth} | MO = {mo_span_fast}/{mo_span_slow} (signal {mo_signal_span}) | "
f"Rebase {rebase_days}d @ {rebase_base} | Heatmap {heat_last_days}d | Momentum lookback {mom_look}d."
)
# ----------------------------- Networking helpers -----------------------------
def _to_vendor(sym: str) -> str:
return sym.replace("-", ".")
_thread_local = threading.local()
def _session():
s = getattr(_thread_local, "session", None)
if s is None:
s = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS)
s.mount("https://", adapter)
_thread_local.session = s
return s
def _get_json(url: str, params: dict, timeout=60, backoff=5):
sess = _session()
while True:
r = sess.get(url, params=params, timeout=timeout)
if r.status_code == 429:
time.sleep(backoff)
backoff = min(backoff * 2, RATE_BACKOFF_MAX)
continue
r.raise_for_status()
return r.json()
def _parse_hist_payload(payload):
item = payload if isinstance(payload, dict) else (payload[0] if payload else {})
sym = item.get("symbol")
hist = item.get("historical") or []
if not sym or not hist:
return None, None
dfh = pd.DataFrame(hist)
if "date" not in dfh or "adjClose" not in dfh:
return None, None
s = (
dfh[["date", "adjClose"]]
.dropna()
.assign(date=lambda x: pd.to_datetime(x["date"]))
.set_index("date")["adjClose"]
.rename(sym)
)
return sym, s
@st.cache_data(show_spinner=False)
def fetch_sp500_table():
url = "https://financialmodelingprep.com/api/v3/sp500_constituent"
params = {"apikey": API_KEY}
payload = _get_json(url, params)
tab = pd.DataFrame(payload)
tab = tab.rename(columns={"symbol": "Symbol", "name": "Security"})
return tab[["Symbol", "Security"]].dropna()
def _fetch_one(orig_ticker: str, start: str, end: str):
time.sleep(random.random() * JITTER_SEC)
t_vendor = _to_vendor(orig_ticker)
url = f"https://financialmodelingprep.com/api/v3/historical-price-full/{t_vendor}"
params = {"from": start, "to": end, "apikey": API_KEY}
try:
payload = _get_json(url, params)
sym, s = _parse_hist_payload(payload)
if s is None or s.empty:
return orig_ticker, None, "no data"
return orig_ticker, s.rename(t_vendor), None
except Exception as e:
return orig_ticker, None, str(e)
@st.cache_data(show_spinner=False)
def build_close_parallel(tickers: list[str], start: str, end: str, max_workers: int = MAX_WORKERS):
series_dict = {}
missing = {}
lock = threading.Lock()
def _task(t):
orig, s, err = _fetch_one(t, start, end)
with lock:
if err:
missing[orig] = err
else:
series_dict[s.name] = s
with cf.ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = [ex.submit(_task, t) for t in tickers]
for _ in cf.as_completed(futures):
pass
if not series_dict:
return pd.DataFrame(), missing
df = pd.DataFrame(series_dict).sort_index()
df.index.name = "date"
f_to_o = {_to_vendor(t): t for t in tickers}
close = df.rename(columns=f_to_o)
close = close[[t for t in tickers if t in close.columns]]
return close, missing
@st.cache_data(show_spinner=False)
def fetch_index_ohlcv(start: str, end: str):
url = "https://financialmodelingprep.com/api/v3/historical-price-full/index/%5EGSPC"
params = {"from": start, "to": end, "apikey": API_KEY}
backoff = 5
while True:
r = requests.get(url, params=params, timeout=60)
if r.status_code == 429:
time.sleep(backoff)
backoff = min(backoff * 2, 300)
continue
r.raise_for_status()
payload = r.json()
if isinstance(payload, dict) and "historical" in payload:
hist = payload["historical"]
elif isinstance(payload, list) and payload and "historical" in payload[0]:
hist = payload[0]["historical"]
else:
hist = payload
idx_df = (
pd.DataFrame(hist)[["date", "close", "volume"]]
.assign(date=lambda x: pd.to_datetime(x["date"]))
.set_index("date")
.sort_index()
.rename(columns={"close": "Close", "volume": "Volume"})
)
return idx_df
def _safe_last(s):
s = s.dropna()
return s.iloc[-1] if len(s) else np.nan
# ----------------------------- Run (sticky) -----------------------------
with st.spinner("Loading tickers…"):
try:
spx_table = fetch_sp500_table()
except Exception:
st.error("Ticker table request failed. Try again later.")
st.stop()
tickers = spx_table["Symbol"].tolist()
st.caption(f"Constituents loaded: {len(tickers)}")
start_str = pd.to_datetime(start_date).strftime("%Y-%m-%d")
end_str = pd.to_datetime(end_date).strftime("%Y-%m-%d")
with st.spinner("Fetching historical prices (parallel)…"):
close, missing = build_close_parallel(tickers, start_str, end_str)
if close.empty:
st.error("No price data returned. Reduce the date range and retry.")
st.stop()
if missing:
st.warning(f"No data for {min(20, len(missing))} symbols (showing up to 20).")
clean_close = close.copy()
with st.spinner("Fetching index data…"):
try:
idx_df = fetch_index_ohlcv(
start=clean_close.index[0].strftime("%Y-%m-%d"),
end=end_str
)
except Exception:
st.error("Index data request failed. Try again later.")
st.stop()
idx = idx_df["Close"].reindex(clean_close.index).ffill()
idx_volume = idx_df["Volume"].reindex(clean_close.index).ffill()
# ===================== SECTION 1 — Breadth Dashboard =====================
st.header("Breadth Dashboard")
# Methodology (standalone expander)
with st.expander("Methodology", expanded=False):
# Overview
st.write("This panel tracks trend, participation, and momentum for a broad equity universe.")
st.write("Use it to judge trend quality, spot divergences, and gauge risk bias.")
# 1) Price trend (MAs, VWAP)
st.write("**Price trend**")
st.latex(r"\mathrm{SMA}_{n}(t)=\frac{1}{n}\sum_{k=0}^{n-1}P_{t-k}")
st.write("Approximate 200-week VWAP (using ~5 trading days per week):")
st.latex(r"\mathrm{VWAP}_{200w}(t)=\frac{\sum_{k=0}^{N-1}P_{t-k}V_{t-k}}{\sum_{k=0}^{N-1}V_{t-k}},\quad N\approx200\times5")
st.write("Price above both MAs and fast>slow = strong trend.")
st.write("Price below both MAs and fast<slow = weak trend.")
# 2) Participation breadth (% above MAs)
st.write("**Participation breadth**")
st.write("Share above n-day MA:")
st.latex(r"\%\,\text{Above}_n(t)=100\cdot\frac{\#\{i:\ P_{i,t}>\mathrm{SMA}_{n,i}(t)\}}{N}")
st.write("Zones: 0–20 weak, 20–50 neutral, 50–80 strong.")
st.write("Higher shares mean broad support for the trend.")
# 3) Advance–Decline line
st.write("**Advance–Decline (A/D) line**")
st.latex(r"A_t=\#\{i:\ P_{i,t}>P_{i,t-1}\},\quad D_t=\#\{i:\ P_{i,t}<P_{i,t-1}\}")
st.latex(r"\mathrm{ADLine}_t=\sum_{u\le t}(A_u-D_u)")
st.write("Rising A/D confirms uptrends. Falling A/D warns of narrow leadership.")
# 4) Net new 52-week highs
st.write("**Net new 52-week highs**")
st.latex(r"H_{i,t}^{52}=\max_{u\in[t-251,t]}P_{i,u},\quad L_{i,t}^{52}=\min_{u\in[t-251,t]}P_{i,u}")
st.latex(r"\text{NewHighs}_t=\sum_i \mathbf{1}\{P_{i,t}=H_{i,t}^{52}\},\quad \text{NewLows}_t=\sum_i \mathbf{1}\{P_{i,t}=L_{i,t}^{52}\}")
st.latex(r"\text{NetHighs}_t=\text{NewHighs}_t-\text{NewLows}_t")
st.write("Positive and persistent net highs support trend durability.")
# 5) Smoothed advancing vs declining counts
st.write("**Advancing vs declining (smoothed)**")
st.latex(r"\overline{A}_t=\frac{1}{w}\sum_{k=0}^{w-1}A_{t-k},\quad \overline{D}_t=\frac{1}{w}\sum_{k=0}^{w-1}D_{t-k}")
st.write("Advancers > decliners over the window = constructive breadth.")
# 6) McClellan Oscillator
st.write("**McClellan Oscillator (MO)**")
st.latex(r"E^{(n)}_t=\text{EMA}_n(A_t-D_t)")
st.latex(r"\mathrm{MO}_t=E^{(19)}_t-E^{(39)}_t")
st.write("Zero-line up-cross = improving momentum. Down-cross = fading momentum.")
st.write("A 9-day EMA of MO can act as a signal line.")
# Practical reads
st.write("**Practical use**")
st.write("- Broad strength: % above 200-day ≥ 50% supports trends.")
st.write("- Divergences: index near highs without A/D or MO confirmation = caution.")
st.write("- Breadth thrust: sharp rise in % above 50-day to ≥ 55% with a +20pt jump can mark regime turns.")
st.write("- MO near recent extremes flags stretched short-term conditions.")
# --- Compute indicators (respecting sidebar params) ---
sma_fast_idx = idx.rolling(int(sma_fast), min_periods=int(sma_fast)).mean()
sma_slow_idx = idx.rolling(int(sma_slow), min_periods=int(sma_slow)).mean()
vwap_days = int(vwap_weeks) * 5
vwap_idx = (idx * idx_volume).rolling(vwap_days, min_periods=vwap_days).sum() / \
idx_volume.rolling(vwap_days, min_periods=vwap_days).sum()
sma_fast_all = clean_close.rolling(int(sma_fast), min_periods=int(sma_fast)).mean()
sma_slow_all = clean_close.rolling(int(sma_slow), min_periods=int(sma_slow)).mean()
pct_above_fast = (clean_close > sma_fast_all).sum(axis=1) / clean_close.shape[1] * 100
pct_above_slow = (clean_close > sma_slow_all).sum(axis=1) / clean_close.shape[1] * 100
advances = (clean_close.diff() > 0).sum(axis=1)
declines = (clean_close.diff() < 0).sum(axis=1)
ad_line = (advances - declines).cumsum()
window = int(ad_smooth)
avg_adv = advances.rolling(window, min_periods=window).mean()
avg_decl = declines.rolling(window, min_periods=window).mean()
high52 = clean_close.rolling(252, min_periods=252).max()
low52 = clean_close.rolling(252, min_periods=252).min()
new_highs = (clean_close == high52).sum(axis=1)
new_lows = (clean_close == low52).sum(axis=1)
net_highs = new_highs - new_lows
sma10_net_hi = net_highs.rolling(10, min_periods=10).mean()
net_adv = (advances - declines).astype("float64")
ema_fast = net_adv.ewm(span=int(mo_span_fast), adjust=False).mean()
ema_slow = net_adv.ewm(span=int(mo_span_slow), adjust=False).mean()
mc_osc = (ema_fast - ema_slow).rename("MO")
mo_pos = mc_osc.clip(lower=0)
mo_neg = mc_osc.clip(upper=0)
bound = float(np.nanpercentile(np.abs(mc_osc.dropna()), 99)) if mc_osc.notna().sum() else 20.0
bound = max(20.0, math.ceil(bound / 10.0) * 10.0)
# --- Plot (6 rows) ---
fig = make_subplots(
rows=6, cols=1, shared_xaxes=True, vertical_spacing=0.03,
subplot_titles=(
"S&P 500 Price / Fast MA / Slow MA / Weekly VWAP",
f"% Above {int(sma_fast)}d & {int(sma_slow)}d",
"Advance–Decline Line",
"Net New 52-Week Highs (bar) + 10d SMA",
f"Advancing vs Declining ({int(window)}d MA)",
f"McClellan Oscillator ({int(mo_span_fast)},{int(mo_span_slow)})"
)
)
fig.update_layout(template="plotly_dark", font=dict(color="white"))
if hasattr(fig.layout, "annotations"):
for a in fig.layout.annotations:
a.font = dict(color="white", size=12)
# Row 1
fig.add_trace(go.Scatter(x=idx.index, y=idx, name="S&P 500"), row=1, col=1)
fig.add_trace(go.Scatter(x=sma_fast_idx.index, y=sma_fast_idx, name=f"{int(sma_fast)}-day MA"), row=1, col=1)
fig.add_trace(go.Scatter(x=sma_slow_idx.index, y=sma_slow_idx, name=f"{int(sma_slow)}-day MA"), row=1, col=1)
fig.add_trace(go.Scatter(x=vwap_idx.index, y=vwap_idx, name=f"{int(vwap_weeks)}-week VWAP"), row=1, col=1)
# Row 2
fig.add_hrect(y0=0, y1=20, line_width=0, fillcolor="red", opacity=0.3, row=2, col=1)
fig.add_hrect(y0=20, y1=50, line_width=0, fillcolor="yellow", opacity=0.3, row=2, col=1)
fig.add_hrect(y0=50, y1=80, line_width=0, fillcolor="green", opacity=0.3, row=2, col=1)
fig.add_trace(go.Scatter(x=pct_above_fast.index, y=pct_above_fast, name=f"% Above {int(sma_fast)}d"), row=2, col=1)
fig.add_trace(go.Scatter(x=pct_above_slow.index, y=pct_above_slow, name=f"% Above {int(sma_slow)}d"), row=2, col=1)
fig.add_annotation(x=0, xref="paper", y=10, yref="y2", text="Weak", showarrow=False, align="left", font=dict(color="white"))
fig.add_annotation(x=0, xref="paper", y=35, yref="y2", text="Neutral", showarrow=False, align="left", font=dict(color="white"))
fig.add_annotation(x=0, xref="paper", y=65, yref="y2", text="Strong", showarrow=False, align="left", font=dict(color="white"))
# Row 3
fig.add_trace(go.Scatter(x=ad_line.index, y=ad_line, name="A/D Line"), row=3, col=1)
# Row 4
fig.add_trace(go.Bar(x=net_highs.index, y=net_highs, name="Net New Highs", opacity=0.5), row=4, col=1)
fig.add_trace(go.Scatter(x=sma10_net_hi.index, y=sma10_net_hi, name="10-day SMA"), row=4, col=1)
# Row 5
fig.add_trace(go.Scatter(x=avg_adv.index, y=avg_adv, name=f"Adv ({int(window)}d MA)"), row=5, col=1)
fig.add_trace(go.Scatter(x=avg_decl.index, y=avg_decl, name=f"Dec ({int(window)}d MA)"), row=5, col=1)
# Row 6
fig.add_trace(
go.Bar(x=mo_pos.index, y=mo_pos, name="MO +",
marker=dict(color="#2ecc71", line=dict(width=0)),
hovertemplate="MO: %{y:.1f}<br>%{x|%Y-%m-%d}<extra></extra>",
showlegend=False),
row=6, col=1
)
fig.add_trace(
go.Bar(x=mo_neg.index, y=mo_neg, name="MO -",
marker=dict(color="#e74c3c", line=dict(width=0)),
hovertemplate="MO: %{y:.1f}<br>%{x|%Y-%m-%d}<extra></extra>",
showlegend=False),
row=6, col=1
)
fig.add_hline(y=0, line_width=1, line_dash="dash", line_color="rgba(180,180,180,0.8)", row=6, col=1)
fig.update_xaxes(
ticklabelmode="period",
tickformatstops=[
dict(dtickrange=[None, 24*3600*1000], value="%b %d\n%Y"),
dict(dtickrange=[24*3600*1000, 7*24*3600*1000], value="%b %d"),
dict(dtickrange=[7*24*3600*1000, "M1"], value="%b %d\n%Y"),
dict(dtickrange=["M1", "M6"], value="%b %Y"),
dict(dtickrange=["M6", None], value="%Y"),
],
tickangle=0,
tickfont=dict(color="white"),
title_font=dict(color="white"),
showgrid=True, gridcolor="rgba(160,160,160,0.2)",
showline=True, linecolor="rgba(255,255,255,0.4)",
rangeslider_visible=False
)
fig.update_yaxes(
tickfont=dict(color="white"),
title_font=dict(color="white"),
showgrid=True, gridcolor="rgba(160,160,160,0.2)",
showline=True, linecolor="rgba(255,255,255,0.4)"
)
fig.update_yaxes(title_text="Price", row=1, col=1)
fig.update_yaxes(title_text="Percent", row=2, col=1, range=[0, 100])
fig.update_yaxes(title_text="A/D", row=3, col=1)
fig.update_yaxes(title_text="Net", row=4, col=1)
fig.update_yaxes(title_text="Count", row=5, col=1)
fig.update_yaxes(title_text="MO", row=6, col=1, range=[-bound, bound], side="right")
fig.update_xaxes(title_text="Date", row=6, col=1)
fig.update_layout(
height=1350,
bargap=0.02,
barmode="relative",
legend=dict(
orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0,
font=dict(color="white")
),
margin=dict(l=60, r=20, t=40, b=40),
hovermode="x unified",
font=dict(color="white"),
title=dict(font=dict(color="white"))
)
st.plotly_chart(fig, use_container_width=True)
# --- Dynamic interpretation (standalone expander) ---
with st.expander("Dynamic Interpretation", expanded=False):
buf = io.StringIO()
def _last_val(s):
s = s.dropna()
return s.iloc[-1] if len(s) else np.nan
def _last_date(s):
s = s.dropna()
return s.index[-1] if len(s) else None
def _pct(a, b):
if not np.isfinite(a) or not np.isfinite(b) or b == 0:
return np.nan
return (a - b) / b * 100.0
def _fmt_pct(x):
return "n/a" if not np.isfinite(x) else f"{x:.1f}%"
def _fmt_num(x):
return "n/a" if not np.isfinite(x) else f"{x:,.2f}"
as_of = _last_date(idx)
px = _last_val(idx)
ma50 = _last_val(sma_fast_idx)
ma200 = _last_val(sma_slow_idx)
vwap200 = _last_val(vwap_idx)
p50 = float(_last_val(pct_above_fast))
p200 = float(_last_val(pct_above_slow))
ad_now = _last_val(ad_line)
nh_now = int(_last_val(new_highs)) if np.isfinite(_last_val(new_highs)) else 0
nh_sma = float(_last_val(sma10_net_hi))
avg_adv_last = float(_last_val(avg_adv))
avg_decl_last = float(_last_val(avg_decl))
_ema19 = net_adv.ewm(span=int(mo_span_fast), adjust=False).mean()
_ema39 = net_adv.ewm(span=int(mo_span_slow), adjust=False).mean()
mc_osc2 = (_ema19 - _ema39).rename("MO")
mc_signal = mc_osc2.ewm(span=int(mo_signal_span), adjust=False).mean().rename("Signal")
mo_last = float(_last_val(mc_osc2))
mo_prev = float(_last_val(mc_osc2.shift(1)))
mo_5ago = float(_last_val(mc_osc2.shift(5)))
mo_slope5 = mo_last - mo_5ago
mo_sig_last = float(_last_val(mc_signal))
mo_sig_prev = float(_last_val(mc_signal.shift(1)))
mo_roll = mc_osc2.rolling(252, min_periods=126)
mo_mean = mo_roll.mean()
mo_std = mo_roll.std()
mo_z = (mc_osc2 - mo_mean) / mo_std
mo_z_last = float(_last_val(mo_z))
mo_abs = np.abs(mc_osc2.dropna())
if len(mo_abs) >= 20:
mo_ext = float(np.nanpercentile(mo_abs.tail(252), 90))
else:
mo_ext = np.nan
look_fast = 10
look_mid = 20
look_div = 63
ma50_slope = _last_val(sma_fast_idx.diff(look_fast))
ma200_slope = _last_val(sma_slow_idx.diff(look_mid))
p50_chg = p50 - float(_last_val(pct_above_fast.shift(look_fast)))
p200_chg = p200 - float(_last_val(pct_above_slow.shift(look_fast)))
ad_mom = ad_now - float(_last_val(ad_line.shift(look_mid)))
d50 = _pct(px, ma50)
d200 = _pct(px, ma200)
dvw = _pct(px, vwap200)
h63 = float(_last_val(idx.rolling(look_div).max()))
dd63 = _pct(px, h63) if np.isfinite(h63) else np.nan
ad_63h = float(_last_val(ad_line.rolling(look_div).max()))
mo_63h = float(_last_val(mc_osc2.rolling(look_div).max()))
near_high_px = np.isfinite(h63) and np.isfinite(px) and px >= 0.995 * h63
near_high_ad = np.isfinite(ad_63h) and np.isfinite(ad_now) and ad_now >= 0.995 * ad_63h
near_high_mo = np.isfinite(mo_63h) and np.isfinite(mo_last) and mo_last >= 0.95 * mo_63h
breadth_thrust = (p50 >= 55) and (p50_chg >= 20)
score = 0
score += 1 if px > ma50 else 0
score += 1 if px > ma200 else 0
score += 1 if ma50 > ma200 else 0
score += 1 if ma50_slope > 0 else 0
score += 1 if p50 >= 50 else 0
score += 1 if p200 >= 50 else 0
score += 1 if ad_mom > 0 else 0
score += 1 if nh_now > 0 and nh_sma >= 0 else 0
score += 1 if avg_adv_last > avg_decl_last else 0
score += 1 if (mo_last > 0 and mo_slope5 > 0) else 0
if score >= 8:
regime = "Risk-on bias"
elif score >= 5:
regime = "Mixed bias"
else:
regime = "Risk-off bias"
print(f"=== Market breadth narrative — {as_of.date() if as_of is not None else 'N/A'} ===", file=buf)
# [Trend]
print("\n[Trend]", file=buf)
if np.isfinite(px) and np.isfinite(ma50) and np.isfinite(ma200):
print(
"The index is {px}, the 50-day is {ma50}, and the 200-day is {ma200}. "
"Price runs {d50} vs the 50-day and {d200} vs the 200-day. "
"The 50-day changed by {m50s} over {f} sessions and the 200-day changed by {m200s} over {m} sessions."
.format(
px=_fmt_num(px), ma50=_fmt_num(ma50), ma200=_fmt_num(ma200),
d50=_fmt_pct(d50), d200=_fmt_pct(d200),
m50s=f"{ma50_slope:+.2f}" if np.isfinite(ma50_slope) else "n/a",
m200s=f"{ma200_slope:+.2f}" if np.isfinite(ma200_slope) else "n/a",
f=look_fast, m=look_mid
), file=buf
)
if np.isfinite(vwap200):
print("The index is {dvw} versus the 200-week VWAP.".format(dvw=_fmt_pct(dvw)), file=buf)
if np.isfinite(dd63):
print("Distance from the 3-month high is {dd}.".format(dd=_fmt_pct(dd63)), file=buf)
if px > ma50 and ma50 > ma200:
print("Structure is bullish: price above both averages and the fast above the slow.", file=buf)
elif px < ma50 and ma50 < ma200:
print("Structure is bearish: price below both averages and the fast below the slow.", file=buf)
else:
print("Structure is mixed: levels are not aligned.", file=buf)
else:
print("Trend inputs are incomplete.", file=buf)
# [Participation]
print("\n[Participation]", file=buf)
if np.isfinite(p50) and np.isfinite(p200):
print(
"{p50} of members sit above the 50-day and {p200} above the 200-day. "
"The 50-day share moved {p50chg} over {f} sessions, and the 200-day share moved {p200chg}."
.format(
p50=f"{p50:.1f}%", p200=f"{p200:.1f}%",
p50chg=f"{p50_chg:+.1f} pts", p200chg=f"{p200_chg:+.1f} pts", f=look_fast
), file=buf
)
if p50 < 20 and p200 < 20:
print("Participation is very weak across both horizons.", file=buf)
elif p50 < 50 and p200 < 50:
print("Participation is weak; leadership is narrow.", file=buf)
elif p50 >= 50 and p200 < 50:
print("Short-term breadth improved, long-term base still soft.", file=buf)
elif p50 >= 50 and p200 >= 50:
print("Participation is broad and supportive.", file=buf)
if breadth_thrust:
print("The 50-day breadth jump qualifies as a breadth thrust.", file=buf)
else:
print("Breadth percentages are missing.", file=buf)
# [Advance–Decline]
print("\n[Advance–Decline]", file=buf)
if np.isfinite(ad_now):
print(
"A/D momentum over {m} sessions is {admom:+.0f}. "
"Price is {pxnear} a 3-month high and A/D is {adnear} the same mark."
.format(
m=look_mid, admom=ad_mom,
pxnear="near" if near_high_px else "not near",
adnear="near" if near_high_ad else "not near"
), file=buf
)
if near_high_px and not near_high_ad:
print("Price tested highs without A/D confirmation.", file=buf)
elif near_high_px and near_high_ad:
print("Price and A/D both near recent highs.", file=buf)
elif (not near_high_px) and near_high_ad:
print("A/D improved while price lagged.", file=buf)
else:
print("No short-term confirmation signal.", file=buf)
else:
print("A/D data is unavailable.", file=buf)
# [McClellan Oscillator]
print("\n[McClellan Oscillator]", file=buf)
if np.isfinite(mo_last):
zero_cross_up = (mo_prev < 0) and (mo_last >= 0)
zero_cross_down = (mo_prev > 0) and (mo_last <= 0)
sig_cross_up = (mo_prev <= mo_sig_prev) and (mo_last > mo_sig_last)
sig_cross_down = (mo_prev >= mo_sig_prev) and (mo_last < mo_sig_last)
near_extreme = np.isfinite(mo_ext) and (abs(mo_last) >= 0.9 * mo_ext)
print(
"MO prints {mo:+.1f} with a 9-day signal at {sig:+.1f}. "
"Five-day slope is {slope:+.1f}. Z-score over 1y is {z}."
.format(
mo=mo_last, sig=mo_sig_last, slope=mo_slope5,
z=f"{mo_z_last:.2f}" if np.isfinite(mo_z_last) else "n/a"
), file=buf
)
if zero_cross_up:
print("Bullish zero-line cross: momentum turned positive.", file=buf)
if zero_cross_down:
print("Bearish zero-line cross: momentum turned negative.", file=buf)
if sig_cross_up:
print("Bullish signal cross: MO moved above its 9-day signal.", file=buf)
if sig_cross_down:
print("Bearish signal cross: MO fell below its 9-day signal.", file=buf)
if near_extreme:
tag = "positive" if mo_last > 0 else "negative"
print(f"MO is near a recent {tag} extreme by distribution.", file=buf)
elif np.isfinite(mo_ext):
print(f"Recent absolute extreme band is about ±{mo_ext:.0f}.", file=buf)
if near_high_px and not near_high_mo:
print("Price near short-term highs without a matching MO high.", file=buf)
if (not near_high_px) and near_high_mo:
print("MO near a short-term high while price lags.", file=buf)
else:
print("MO series is unavailable.", file=buf)
# [New Highs vs Lows]
print("\n[New Highs vs Lows]", file=buf)
if np.isfinite(nh_sma):
if nh_now > 0 and nh_sma >= 0:
print("Net new highs are positive and the 10-day trend is non-negative.", file=buf)
elif nh_now < 0 and nh_sma <= 0:
print("Net new lows dominate and the 10-day trend is negative.", file=buf)
else:
print("Daily print and 10-day trend disagree; signal is mixed.", file=buf)
else:
print("High/low series is incomplete.", file=buf)
# [Advancing vs Declining]
print("\n[Advancing vs Declining]", file=buf)
if np.isfinite(avg_adv_last) and np.isfinite(avg_decl_last):
spread = avg_adv_last - avg_decl_last
print(
"On a {w}-day smoothing window, advancers average {adv:.0f} and decliners {dec:.0f}. Net spread is {spr:+.0f}."
.format(w=window, adv=avg_adv_last, dec=avg_decl_last, spr=spread), file=buf
)
if spread > 0:
print("The spread favors advancers.", file=buf)
elif spread < 0:
print("The spread favors decliners.", file=buf)
else:
print("Advancers and decliners are balanced.", file=buf)
else:
print("Smoothed A/D data is missing.", file=buf)
# [Aggregate]
print("\n[Aggregate]", file=buf)
print("Composite score is {score}/10 → {regime}.".format(score=score, regime=regime), file=buf)
if regime == "Risk-on bias":
if p200 >= 60 and ma200_slope > 0 and mo_last > 0:
print("Long-term breadth and MO agree; pullbacks above the 50-day tend to be buyable.", file=buf)
else:
print("Tone is supportive; watch the 200-day and MO zero-line for confirmation.", file=buf)
elif regime == "Mixed bias":
print("Signals diverge; manage size and tighten risk until MO and breadth align.", file=buf)
else:
if p200 <= 40 and ma200_slope < 0 and mo_last < 0:
print("Weak long-term breadth with negative MO argues for caution.", file=buf)
else:
print("Bias leans defensive until breadth steadies and MO turns up.", file=buf)
# [What to monitor]
print("\n[What to monitor]", file=buf)
print("Watch the 200-day breadth around 50% for confirmation of durable trends.", file=buf)
print("Track MO zero-line and signal crosses during price tests of resistance.", file=buf)
print("Look for steady positive net new highs over a 10-day window.", file=buf)
st.text(buf.getvalue())
# ===================== SECTION 2 — Rebased Comparison =====================
st.header("Rebased Comparison (Last N sessions)")
# Methodology (standalone expander)
with st.expander("Methodology", expanded=False):
st.write("Compares stock paths on a common scale and highlights leadership vs laggards.")
st.write("Use it to judge breadth, concentration, and dispersion over the selected window.")
st.write("**Rebasing (start = B)**")
st.latex(r"R_{i,t}= \frac{P_{i,t}}{P_{i,t_0}}\times B")
st.write("Each line shows cumulative performance since the window start.")
st.write("The index is rebased the same way for reference.")
st.write("**Log scale**")
st.write("We plot the y-axis in log scale so equal percent moves look equal.")
st.write("Y-range uses robust bounds (1st–99th percentiles) with padding.")
st.write("**Leaders and laggards**")
st.latex(r"\text{Perf}_{i}=R_{i,T}")
st.write("Leaders are highest Perf at T. Laggards are lowest.")
st.write("MAG7 are highlighted if present.")
st.write("**Equal-weight summaries**")
st.latex(r"\text{EWAvg}_T=\frac{1}{M}\sum_{i=1}^{M}R_{i,T}")
st.latex(r"\text{Median}_T=\operatorname{median}\{R_{i,T}\}")
st.latex(r"\%\text{Up}_T=100\cdot \frac{1}{M}\sum_{i=1}^{M}\mathbf{1}[R_{i,T}>B]")
st.latex(r"\%\text{BeatIdx}_T=100\cdot \frac{1}{M}\sum_{i=1}^{M}\mathbf{1}[R_{i,T}>R_{\text{idx},T}]")
st.write("These give a breadth read relative to the index and to flat (B).")
st.write("**Dispersion (cross-section)**")
st.latex(r"\sigma_T=\operatorname{stdev}\{R_{i,T}\},\quad \text{IQR}_T=Q_{0.75}-Q_{0.25}")
st.write("High dispersion means large performance spread across names.")
st.write("**Concentration (top N share of gains)**")
st.latex(r"\text{TopNShare}_T=\frac{\sum_{i\in \text{Top}N}(R_{i,T}-B)}{\sum_{j=1}^{M}(R_{j,T}-B)}\times 100")
st.write("Large TopNShare implies leadership is concentrated.")
st.write("**Correlation to index (optional diagnostic)**")
st.latex(r"\rho_i=\operatorname{corr}\big(\Delta \ln P_{i,t},\, \Delta \ln P_{\text{idx},t}\big)")
st.write("Lower median correlation favors stock picking. High correlation means beta drives moves.")
st.write("**Practical reads**")
st.write("- Broad advance: many lines above the index and %BeatIdx high.")
st.write("- Concentration risk: TopNShare large while most lines trail the index.")
st.write("- Rotation/dispersion: high cross-section std and lower median correlation.")
st.write("- Leadership quality: leaders holding gains on a log scale with limited drawdowns.")
n_days = int(rebase_days)
base = float(rebase_base)
recent = clean_close.iloc[-n_days:].dropna(axis=1, how="any")
if recent.empty:
st.warning("Not enough overlapping history for the rebased comparison window.")
else:
first = recent.iloc[0]
mask = (first > 0) & np.isfinite(first)
rebased = (recent.loc[:, mask] / first[mask]) * base
perf = rebased.iloc[-1].dropna()
mag7_all = ["AAPL","MSFT","AMZN","META","GOOGL","NVDA","TSLA"]
mag7 = [t for t in mag7_all if t in rebased.columns]
non_mag = perf.drop(index=mag7, errors="ignore")
top5 = non_mag.nlargest(min(5, len(non_mag))).index.tolist()
worst5 = non_mag.nsmallest(min(5, len(non_mag))).index.tolist()
mag_colors = {
"AAPL":"#00bfff","MSFT":"#3cb44b","AMZN":"#ffe119",
"META":"#4363d8","GOOGL":"#f58231","NVDA":"#911eb4","TSLA":"#46f0f0"
}
spx = idx.reindex(rebased.index).dropna()
spx_rebased = spx / spx.iloc[0] * base
def hover_tmpl(name: str) -> str:
return "%{y:.2f}<br>%{x|%Y-%m-%d}<extra>" + name + "</extra>"
fig2 = go.Figure()
for t in rebased.columns:
fig2.add_trace(go.Scatter(
x=rebased.index, y=rebased[t], name=t, mode="lines",
line=dict(width=1, color="rgba(160,160,160,0.4)"),
hovertemplate=hover_tmpl(t), showlegend=False
))
for t in mag7:
fig2.add_trace(go.Scatter(
x=rebased.index, y=rebased[t], name=t, mode="lines",
line=dict(width=2, color=mag_colors.get(t, "#ffffff")),
hovertemplate=hover_tmpl(t)
))
for t in top5:
fig2.add_trace(go.Scatter(
x=rebased.index, y=rebased[t], name=f"Top {t}", mode="lines",
line=dict(width=2, color="lime"),
hovertemplate=hover_tmpl(t), showlegend=False
))
for t in worst5:
fig2.add_trace(go.Scatter(
x=rebased.index, y=rebased[t], name=f"Worst {t}", mode="lines",
line=dict(width=2, color="red", dash="dash"),
hovertemplate=hover_tmpl(t), showlegend=False
))
fig2.add_trace(go.Scatter(
x=spx_rebased.index, y=spx_rebased.values, name="S&P 500 (rebased)", mode="lines",
line=dict(width=3, color="white"), hovertemplate=hover_tmpl("S&P 500")
))
vals = pd.concat([rebased.stack(), pd.Series(spx_rebased.values, index=spx_rebased.index)])
vals = vals.replace([np.inf, -np.inf], np.nan).dropna()
vals = vals[vals > 0]
y_range = None
if len(vals) > 10:
qlo, qhi = vals.quantile([0.01, 0.99])
y_min = max(1e-2, qlo / y_pad)
y_max = max(y_min * 1.1, qhi * y_pad)
y_range = [np.log10(y_min), np.log10(y_max)]
fig2.update_yaxes(type="log", range=y_range, title=f"Rebased Price (start = {int(base)})")
fig2.update_xaxes(title="Date")
fig2.update_layout(
template="plotly_dark",
height=700,
margin=dict(l=60, r=30, t=70, b=90),
title=f"Price Level Comparison (Rebased, Log Scale) — Last {n_days} Sessions",
legend=dict(orientation="h", y=-0.18, yanchor="top", x=0, xanchor="left"),
hovermode="closest",
font=dict(color="white")
)
st.plotly_chart(fig2, use_container_width=True)
# Dynamic Interpretation (standalone expander)
with st.expander("Dynamic Interpretation", expanded=False):
buf2 = io.StringIO()
def _fmt_pct(x):
return "n/a" if pd.isna(x) else f"{x:.1f}%"
def _fmt_num(x):
return "n/a" if pd.isna(x) else f"{x:,.2f}"
if rebased.empty or spx_rebased.empty:
print("No data for interpretation.", file=buf2)
else:
as_of = rebased.index[-1].date()
perf_last = rebased.iloc[-1].dropna()
spx_last = float(spx_rebased.iloc[-1])
n_names = len(perf_last)
eq_avg = float(perf_last.mean())
eq_med = float(perf_last.median())
pct_pos = float((perf_last > base).mean() * 100)
pct_beat = float((perf_last > spx_last).mean() * 100)
disp_std = float(perf_last.std(ddof=0))
iqr_lo, iqr_hi = float(perf_last.quantile(0.25)), float(perf_last.quantile(0.75))
iqr_w = iqr_hi - iqr_lo
mag7_in = [t for t in mag7 if t in perf_last.index]
rest_idx = perf_last.index.difference(mag7_in)
mag7_mean = float(perf_last[mag7_in].mean()) if len(mag7_in) else np.nan
rest_mean = float(perf_last[rest_idx].mean()) if len(rest_idx) else np.nan
mag7_beat = float((perf_last[mag7_in] > spx_last).mean() * 100) if len(mag7_in) else np.nan
gains_all = float((perf_last - base).sum())
topN = 10
top_contrib = np.nan
if abs(gains_all) > 1e-9:
top_contrib = float((perf_last.sort_values(ascending=False).head(topN) - base).sum() / gains_all * 100)
rets = rebased.pct_change().replace([np.inf, -np.inf], np.nan).dropna(how="all")
spx_r = pd.Series(spx_rebased, index=spx_rebased.index).pct_change()
corr_to_spx = rets.corrwith(spx_r, axis=0).dropna()
corr_med = float(corr_to_spx.median()) if len(corr_to_spx) else np.nan
low_corr_share = float((corr_to_spx < 0.3).mean() * 100) if len(corr_to_spx) else np.nan
spx_chg = spx_last - base
k = min(5, n_names)
leaders = perf_last.sort_values(ascending=False).head(k)
laggards = perf_last.sort_values(ascending=True).head(k)
print(f"=== Rebased performance read — {as_of} (window: {n_days} sessions) ===\n", file=buf2)
print("[Market]", file=buf2)
print(f"S&P 500 is {_fmt_pct(spx_chg)} over the window.", file=buf2)
print(f"Equal-weight average is {_fmt_pct(eq_avg - base)}, median is {_fmt_pct(eq_med - base)}.", file=buf2)
if np.isfinite(eq_avg) and np.isfinite(spx_last):
gap = (eq_avg - spx_last)
side = "above" if gap >= 0 else "below"
print(f"Equal-weight sits {_fmt_pct(abs(gap))} {side} the index.", file=buf2)
print("", file=buf2)
print("[Breadth]", file=buf2)
print(f"{_fmt_pct(pct_pos)} of names are up. {_fmt_pct(pct_beat)} beat the index.", file=buf2)
print(f"Dispersion std is {_fmt_num(disp_std)} points on the rebased scale.", file=buf2)
print(f"IQR width is {_fmt_num(iqr_w)} points ({_fmt_num(iqr_lo)} to {_fmt_num(iqr_hi)}).", file=buf2)
if pct_pos >= 70 and pct_beat >= 55:
print("Rally is broad. Leadership is shared across many names.", file=buf2)
elif pct_pos <= 35 and pct_beat <= 45:
print("Rally is narrow or absent. Leadership is concentrated.", file=buf2)
else:
print("Breadth is mixed. The tape can rotate quickly.", file=buf2)
print("", file=buf2)
print("[Concentration]", file=buf2)
if np.isfinite(top_contrib):
print(f"Top {topN} names explain {_fmt_pct(top_contrib)} of equal-weight gains.", file=buf2)
if len(mag7_in):
print(f"MAG7 equal-weight is {_fmt_pct(mag7_mean - base)}. Rest is {_fmt_pct(rest_mean - base)}.", file=buf2)
if np.isfinite(mag7_beat):
print(f"{_fmt_pct(mag7_beat)} of MAG7 beat the index.", file=buf2)
else:
print("MAG7 tickers are not all present in this window.", file=buf2)
print("", file=buf2)
print("[Correlation]", file=buf2)
if len(corr_to_spx):
print(f"Median correlation to the index is {_fmt_num(corr_med)}.", file=buf2)
print(f"{_fmt_pct(low_corr_share)} of names show low correlation (<0.30).", file=buf2)
if np.isfinite(corr_med) and corr_med < 0.5:
print("Factor dispersion is high. Stock picking matters more.", file=buf2)
elif np.isfinite(corr_med) and corr_med > 0.8:
print("Common beta dominates. Moves are index-driven.", file=buf2)
else:
print("Correlation sits in a middle zone. Rotation can continue.", file=buf2)
else:
print("Not enough data to compute correlations.", file=buf2)
print("", file=buf2)
print("[Leaders]", file=buf2)
for t, v in leaders.items():
print(f" {t}: {_fmt_pct(v - base)}", file=buf2)
print("\n[Laggards]", file=buf2)
for t, v in laggards.items():
print(f" {t}: {_fmt_pct(v - base)}", file=buf2)
print("\n[What to monitor]", file=buf2)
print("Watch the gap between equal-weight and index. A widening gap signals concentration risk.", file=buf2)
print("Track the share beating the index. Sustained readings above 55% support trend durability.", file=buf2)
print("Watch median correlation. Falling correlation favors dispersion and relative value setups.", file=buf2)
st.text(buf2.getvalue())
# ===================== SECTION 3 — Daily Return Heatmap =====================
st.header("Daily Return Heatmap")
# Methodology (standalone expander)
with st.expander("Methodology", expanded=False):
st.write("Shows daily % returns for all names over the selected window. Highlights broad up/down days, dispersion, and leadership.")
st.write("Use it to spot synchronized moves, stress days, and rotation across the universe.")
st.write("**Daily return (per name)**")
st.latex(r"r_{i,t}=\frac{P_{i,t}}{P_{i,t-1}}-1")
st.write("**Heatmap values**")
st.write("Cells display r_{i,t}. Tickers are sorted by the most recent day’s return so leaders/laggards are obvious.")
st.write("**Robust color scale (cap extremes)**")
st.latex(r"c=\operatorname{P95}\left(\left|r_{i,t}\right|\right)\ \text{over the window}")
st.latex(r"\text{color range}=[-c,\,+c],\quad \text{midpoint}=0")
st.write("Capping avoids a few outliers overpowering the color scale.")
st.write("**Breadth and dispersion (how to read)**")
st.latex(r"\text{Up share}_t=100\cdot \frac{1}{N}\sum_{i=1}^{N}\mathbf{1}[r_{i,t}>0]")
st.latex(r"\sigma_{\text{cs},t}=\operatorname{stdev}\{r_{i,t}\}_{i=1}^{N}")
st.write("- High up share with low dispersion = uniform risk-on.")
st.write("- Mixed colors with high dispersion = rotation and factor spread.")
st.write("- Clusters of red/green by industry often flag sector moves.")
st.write("**Large-move counts (quick context)**")
st.latex(r"\text{BigUp}_t=\sum_{i}\mathbf{1}[r_{i,t}\ge \tau],\quad \text{BigDn}_t=\sum_{i}\mathbf{1}[r_{i,t}\le -\tau]")
st.latex(r"\tau=2\% \ \text{(default)}")
st.write("A jump in BigUp/BigDn signals a thrust or a shock day.")
st.write("**Short-horizon follow-through**")
st.latex(r"\bar{r}_{i,t}^{(w)}=\frac{1}{w}\sum_{k=0}^{w-1} r_{i,t-k},\quad w=5")
st.write("A broad rise in 5-day averages supports continuation; a fade warns of stall.")
st.write("**Practical reads**")
st.write("- Many greens, low dispersion: beta tailwind; index setups work.")
st.write("- Greens + high dispersion: stock picking/sector tilts matter.")
st.write("- Reds concentrated in a few groups: rotate risk, not necessarily de-risk.")
st.write("- Extreme red breadth with spikes in dispersion: watch liquidity and reduce gross.")
# Daily returns last N days
ret_daily = clean_close.pct_change().iloc[1:]
ret_window = int(heat_last_days)
ret_last = ret_daily.iloc[-ret_window:]
if ret_last.empty:
st.warning("Not enough data for the daily return heatmap.")
else:
order = ret_last.iloc[-1].sort_values(ascending=True).index
ret_last = ret_last[order]
abs_max = np.nanpercentile(np.abs(ret_last.values), 95)
z = ret_last.T.values
x = ret_last.index
y = list(order)
n_dates = len(x)
step = max(1, n_dates // 10)
xtick_vals = x[::step]
xtick_texts = [ts.strftime("%Y-%m-%d") for ts in xtick_vals]
fig_hm = go.Figure(go.Heatmap(
z=z, x=x, y=y,
colorscale="RdYlGn",
zmin=-abs_max, zmax=abs_max, zmid=0,
colorbar=dict(title="Daily Return", tickformat=".0%"),
hovertemplate="%{y}<br>%{x|%Y-%m-%d}<br>%{z:.2%}<extra></extra>"
))
height = max(800, min(3200, 18 * len(y)))
fig_hm.update_layout(
template="plotly_dark",
title=f"Last {ret_window}-Day Daily Return Heatmap",
height=height,
margin=dict(l=100, r=40, t=60, b=60),
font=dict(color="white")
)
fig_hm.update_yaxes(title="Tickers (sorted by latest daily return)", tickfont=dict(size=8))
fig_hm.update_xaxes(title="Date", tickmode="array", tickvals=xtick_vals, ticktext=xtick_texts, tickangle=45)
st.plotly_chart(fig_hm, use_container_width=True)
# Dynamic Interpretation (standalone expander)
with st.expander("Dynamic Interpretation", expanded=False):
buf3 = io.StringIO()
def _pct(x):
return "n/a" if pd.isna(x) else f"{x*100:.1f}%"
def _pp(x):
return "n/a" if pd.isna(x) else f"{x*100:.2f}%"
if ret_last.empty:
print("No data for interpretation.", file=buf3)
else:
as_of = ret_last.index[-1].date()
last = ret_last.iloc[-1]
N = last.shape[0]
up = int((last > 0).sum())
dn = int((last < 0).sum())
flat = int(N - up - dn)
mean = float(last.mean()); med = float(last.median())
std = float(last.std(ddof=0))
q25 = float(last.quantile(0.25)); q75 = float(last.quantile(0.75))
iqr = q75 - q25
thr = 0.02
big_up = int((last >= thr).sum())
big_dn = int((last <= -thr).sum())
w = min(5, len(ret_last))
avg_w = ret_last.tail(w).mean()
pct_pos_w = float((avg_w > 0).mean())
cs_std = ret_last.std(axis=1, ddof=0)
today_std = float(cs_std.iloc[-1])
disp_pct = float((cs_std <= today_std).mean())
k = min(10, N)
leaders = last.sort_values(ascending=False).head(k)
laggards = last.sort_values(ascending=True ).head(k)
def _streak(s, max_look=20):
v = s.tail(max_look).to_numpy(dtype=float)
sign = np.sign(v); sign[np.isnan(sign)] = 0
if len(sign) == 0 or sign[-1] == 0:
return 0
tgt = sign[-1]; cnt = 0
for x in sign[::-1]:
if x == tgt: cnt += 1
else: break
return int(cnt if tgt > 0 else -cnt)
streaks = {t: _streak(ret_last[t]) for t in set(leaders.index).union(laggards.index)}
print(f"=== Daily return heatmap read — {as_of} (last {len(ret_last)} sessions) ===", file=buf3)
print("\n[Today]", file=buf3)
print(f"Up: {up}/{N} ({_pct(up/N)}). Down: {dn}/{N} ({_pct(dn/N)}). Flat: {flat}.", file=buf3)
print(f"Mean: {_pp(mean)}. Median: {_pp(med)}. Std: {_pp(std)}. IQR: {_pp(iqr)}.", file=buf3)
print(f"Moves ≥ {int(thr*100)}%: +{big_up}. Moves ≤ -{int(thr*100)}%: {big_dn}.", file=buf3)
print("\n[Recent breadth]", file=buf3)
print(f"{_pct(pct_pos_w)} of names have a positive average over the last {w} sessions.", file=buf3)
print("\n[Dispersion]", file=buf3)
print(f"Cross-section std today: {_pp(today_std)} (window percentile ~{disp_pct*100:.0f}th).", file=buf3)
print("\n[Leaders today]", file=buf3)
for t, v in leaders.items():
stv = streaks.get(t, 0)
lab = ("flat" if stv == 0 else (f"{stv}d up" if stv > 0 else f"{-stv}d down"))
print(f" {t}: {_pp(v)} ({lab})", file=buf3)
print("\n[Laggards today]", file=buf3)
for t, v in laggards.items():
stv = streaks.get(t, 0)
lab = ("flat" if stv == 0 else (f"{stv}d up" if stv > 0 else f"{-stv}d down"))
print(f" {t}: {_pp(v)} ({lab})", file=buf3)
print("\n[What to monitor]", file=buf3)
print("Watch big-move counts and the 5-day positive share for follow-through.", file=buf3)
print("Track dispersion; elevated dispersion favors relative moves over index moves.", file=buf3)
st.text(buf3.getvalue())
# ===================== SECTION 4 — Percentile Momentum Heatmap =====================
st.header("Percentile Momentum Heatmap")
# Methodology (standalone expander)
with st.expander("Methodology", expanded=False):
st.write("Ranks each stock’s medium-horizon return against the cross-section each day.")
st.write("Use it to spot broad momentum, rotation, and persistence.")
st.write("**n-day return (per name)**")
st.latex(r"r^{(n)}_{i,t}=\frac{P_{i,t}}{P_{i,t-n}}-1")
st.write("**Cross-sectional percentile (per day)**")
st.latex(r"p_{i,t}=\frac{\operatorname{rank}\!\left(r^{(n)}_{i,t}\right)}{N}")
st.write("0 means worst in the universe that day. 1 means best.")
st.write("The heatmap shows p_{i,t}. Rows are sorted by the latest percentile.")
st.write("**Breadth buckets (how to read)**")
st.latex(r"\text{Top\,20\%}_t=\frac{1}{N}\sum_{i}\mathbf{1}[p_{i,t}\ge 0.80]")
st.latex(r"\text{Bottom\,20\%}_t=\frac{1}{N}\sum_{i}\mathbf{1}[p_{i,t}\le 0.20]")
st.write("High Top-20% share signals broad upside momentum. High Bottom-20% share signals broad weakness.")
st.write("**Momentum shift vs a short lookback**")
st.latex(r"\Delta p_i=p_{i,T}-p_{i,T-w}")
st.write("Improving names: Δp_i > 0. Weakening names: Δp_i < 0.")
st.write("**Persistence (top/bottom quintile)**")
st.latex(r"\text{TopQ}_{i}=\sum_{k=0}^{w-1}\mathbf{1}[p_{i,T-k}\ge 0.80]")
st.latex(r"\text{BotQ}_{i}=\sum_{k=0}^{w-1}\mathbf{1}[p_{i,T-k}\le 0.20]")
st.write("Names with TopQ = w held leadership. BotQ = w stayed weak.")
st.write("**Practical reads**")
st.write("- Rising median percentile and high Top-20% share: trend has breadth.")
st.write("- Mixed median with both tails active: rotation/dispersion regime.")
st.write("- Persistent top-quintile list: candidates for follow-through.")
st.write("- Persistent bottom-quintile list: candidates for mean-reversion checks.")
look_days = int(mom_look)
ret_n = clean_close.pct_change(look_days)
ret_n = ret_n.iloc[look_days:]
if ret_n.empty:
st.warning("Not enough data for the momentum heatmap.")
else:
perc = ret_n.rank(axis=1, pct=True)
order2 = perc.iloc[-1].sort_values(ascending=True).index
perc = perc[order2]
z = perc.T.values
x = perc.index
y = list(order2)
n_dates = len(x)
step = max(1, n_dates // 10)
xtick_vals = x[::step]
xtick_texts = [ts.strftime("%Y-%m-%d") for ts in xtick_vals]
fig_pm = go.Figure(go.Heatmap(
z=z, x=x, y=y,
colorscale="Viridis",
zmin=0, zmax=1,
colorbar=dict(title="Return Percentile"),
hovertemplate="%{y}<br>%{x|%Y-%m-%d}<br>%{z:.0%}<extra></extra>"
))
height = max(800, min(3200, 18 * len(y)))
fig_pm.update_layout(
template="plotly_dark",
title=f"{look_days}-Day Return Percentile Heatmap",
height=height,
margin=dict(l=110, r=40, t=60, b=60),
font=dict(color="white")
)
fig_pm.update_yaxes(title="Tickers (sorted by latest %ile)", tickfont=dict(size=8))
fig_pm.update_xaxes(title="Date", tickmode="array", tickvals=xtick_vals, ticktext=xtick_texts, tickangle=45)
st.plotly_chart(fig_pm, use_container_width=True)
# Dynamic Interpretation (standalone expander)
with st.expander("Dynamic Interpretation", expanded=False):
buf4 = io.StringIO()
if perc.empty or ret_n.empty:
print("No data for interpretation.", file=buf4)
else:
as_of = perc.index[-1].date()
last_p = perc.iloc[-1].astype(float)
last_r = ret_n.iloc[-1].astype(float)
N = int(last_p.shape[0])
mean_p = float(last_p.mean()); med_p = float(last_p.median())
q25 = float(last_p.quantile(0.25)); q75 = float(last_p.quantile(0.75))
iqr_w = q75 - q25
top10 = float((last_p >= 0.90).mean() * 100)
top20 = float((last_p >= 0.80).mean() * 100)
mid40 = float(((last_p > 0.40) & (last_p < 0.60)).mean() * 100)
bot20 = float((last_p <= 0.20).mean() * 100)
bot10 = float((last_p <= 0.10).mean() * 100)
pct_up = float((last_r > 0).mean() * 100)
look = min(5, len(perc))
delta = (last_p - perc.iloc[-look].astype(float)).dropna()
improving = float((delta > 0).mean() * 100)
weakening = float((delta < 0).mean() * 100)
delta_med = float(delta.median())
k = min(10, N)
leaders = last_p.sort_values(ascending=False).head(k)
laggards = last_p.sort_values(ascending=True ).head(k)
window_p = 5
top_quint = (perc.tail(window_p) >= 0.80).sum()
bot_quint = (perc.tail(window_p) <= 0.20).sum()
persistent_up = top_quint[top_quint == window_p].index.tolist()
persistent_dn = bot_quint[bot_quint == window_p].index.tolist()
print(f"=== {look_days}-day momentum read — {as_of} ===", file=buf4)
print("\n[Snapshot]", file=buf4)
print(f"Names: {N}. Up on window: {pct_up:.1f}%.", file=buf4)
print(f"Mean percentile: {mean_p:.2f}. Median: {med_p:.2f}.", file=buf4)
print(f"IQR: {q25:.2f}{q75:.2f} (width {iqr_w:.2f}).", file=buf4)
print("\n[Breadth]", file=buf4)
print(f"Top 10%: {top10:.1f}%. Top 20%: {top20:.1f}%.", file=buf4)
print(f"Middle 40–60%: {mid40:.1f}%.", file=buf4)
print(f"Bottom 20%: {bot20:.1f}%. Bottom 10%: {bot10:.1f}%.", file=buf4)
print("\n[Shift]", file=buf4)
print(f"Improving vs {look} days ago: {improving:.1f}%. Weakening: {weakening:.1f}%.", file=buf4)
print(f"Median percentile change: {delta_med:+.2f}.", file=buf4)
print("\n[Leaders]", file=buf4)
for t, v in leaders.items():
print(f" {t}: {v:.2f}", file=buf4)
print("\n[Laggards]", file=buf4)
for t, v in laggards.items():
print(f" {t}: {v:.2f}", file=buf4)
print("\n[Persistence]", file=buf4)
if persistent_up:
up_list = ", ".join(persistent_up[:15]) + ("…" if len(persistent_up) > 15 else "")
print(f"Top-quintile {window_p} days: {up_list}", file=buf4)
else:
print("No names stayed in the top quintile.", file=buf4)
if persistent_dn:
dn_list = ", ".join(persistent_dn[:15]) + ("…" if len(persistent_dn) > 15 else "")
print(f"Bottom-quintile {window_p} days: {dn_list}", file=buf4)
else:
print("No names stayed in the bottom quintile.", file=buf4)
print("\n[Focus]", file=buf4)
print("Watch the top-quintile share. Rising share supports continuation.", file=buf4)
print("Track the median percentile. Sustained readings above 0.60 show broad momentum.", file=buf4)
print("Use persistence lists for follow-through and mean-reversion checks.", file=buf4)
st.text(buf4.getvalue())
# Hide default Streamlit style
st.markdown(
"""
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
""",
unsafe_allow_html=True
)