Spaces:
Sleeping
Sleeping
| import os | |
| from pathlib import Path | |
| WRITABLE_BASE = Path(os.environ.get("VIX_CACHE_DIR", "/tmp")).resolve() | |
| # make Path.home() point to /tmp (or your provided VIX_CACHE_DIR) | |
| os.environ.setdefault("HOME", str(WRITABLE_BASE)) | |
| # standard XDG dirs under the writable base | |
| os.environ.setdefault("XDG_DATA_HOME", str(WRITABLE_BASE / ".local" / "share")) | |
| os.environ.setdefault("XDG_CACHE_HOME", str(WRITABLE_BASE / ".cache")) | |
| os.environ.setdefault("XDG_STATE_HOME", str(WRITABLE_BASE / ".local" / "state")) | |
| # ensure they exist | |
| for p in ( | |
| Path(os.environ["XDG_DATA_HOME"]), | |
| Path(os.environ["XDG_CACHE_HOME"]), | |
| Path(os.environ["XDG_STATE_HOME"]), | |
| ): | |
| p.mkdir(parents=True, exist_ok=True) | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| from hmmlearn.hmm import GaussianHMM | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.decomposition import PCA | |
| import warnings | |
| import nest_asyncio | |
| import asyncio | |
| from vix_utils import async_load_vix_term_structure | |
| from datetime import datetime, timedelta | |
| import traceback | |
| # ---------- App config ---------- | |
| st.set_page_config(layout="wide", page_title="VIX Regime Detection") | |
| nest_asyncio.apply() | |
| warnings.filterwarnings("ignore", category=FutureWarning, module="vix_utils") | |
| # ---------- Helpers ---------- | |
| def fetch_term_structure_cached(): | |
| """ | |
| Fresh event loop for hosted envs; closed after use to avoid leaks. | |
| Cached to keep memory + network stable across reruns. | |
| """ | |
| loop = asyncio.new_event_loop() | |
| try: | |
| asyncio.set_event_loop(loop) | |
| return loop.run_until_complete(async_load_vix_term_structure()) | |
| finally: | |
| try: | |
| loop.run_until_complete(loop.shutdown_asyncgens()) | |
| except Exception: | |
| pass | |
| loop.close() | |
| def limit_df_rows_for_display(df: pd.DataFrame, n: int) -> pd.DataFrame: | |
| if len(df) <= n: | |
| return df | |
| return df.tail(n) | |
| BG = "#0e1117" | |
| # ---------- Title ---------- | |
| st.title("VIX Regime Detection") | |
| st.write( | |
| "This tool tracks the VIX term structure and identifies regimes: contango, backwardation, or cautious. It reports carry sign, curve slope, and changes over time and shows regime persistence and transition probabilities." | |
| "For more details, see [this article](https://entreprenerdly.com/detecting-vix-term-structure-regimes/)." | |
| ) | |
| # ---------- Sidebar ---------- | |
| with st.sidebar: | |
| st.title("Parameters") | |
| # Data Range expander | |
| with st.expander("Data Range", expanded=False): | |
| start_date = st.date_input( | |
| "Start Date", | |
| value=datetime(2020, 1, 1), | |
| min_value=datetime(2000, 1, 1), | |
| max_value=datetime.today(), | |
| help=( | |
| "Start date for the sample. " | |
| "Earlier date = more history and slower load. " | |
| "Later date = less history and faster load." | |
| ) | |
| ) | |
| # Slope Time Series | |
| with st.expander("Slope Time Series Parameters", expanded=False): | |
| slope_thr = st.number_input( | |
| "Regime Threshold (thr)", | |
| value=0.5, | |
| step=0.1, | |
| help=( | |
| "Threshold for classifying slope regimes. " | |
| "Increase = fewer CONTANGO/BACKWARDATION labels; more CAUTIOUS. " | |
| "Decrease = more sensitive regime switches." | |
| ) | |
| ) | |
| # HMM Regime | |
| with st.expander("HMM Regime Parameters", expanded=False): | |
| hmm_n_components = st.number_input( | |
| "Number of Components", | |
| value=3, | |
| min_value=2, | |
| max_value=5, | |
| step=1, | |
| help=( | |
| "Hidden states in the HMM. " | |
| "Higher = more regimes (may overfit). " | |
| "Lower = simpler states (may merge regimes)." | |
| ) | |
| ) | |
| hmm_n_iter = st.number_input( | |
| "Max Iterations", | |
| value=500, | |
| min_value=100, | |
| step=100, | |
| help=( | |
| "Training iterations for HMM. " | |
| "Increase if the model does not converge. " | |
| "Higher = slower." | |
| ) | |
| ) | |
| # Carry Spreads | |
| with st.expander("Carry Spread Parameters", expanded=False): | |
| carry_short_leg = st.number_input( | |
| "Short Leg Tenor (Monthly)", | |
| value=1.0, | |
| step=1.0, | |
| help=( | |
| "Short leg used in spreads (long − short). " | |
| "1.0 = VX1. " | |
| "Raising this uses further-out as the short base." | |
| ) | |
| ) | |
| carry_long_legs = st.text_input( | |
| "Long Legs Tenors (Comma-separated)", | |
| value="2.0,6.0", | |
| help=( | |
| "Comma list of long legs for spreads (e.g., 2.0,6.0). " | |
| "Each adds a line: VXk − VXshort. " | |
| "Add more legs to compare more horizons." | |
| ) | |
| ) | |
| # PCA | |
| with st.expander("PCA Decomposition Parameters", expanded=False): | |
| pca_n_components = st.number_input( | |
| "Number of Components", | |
| value=3, | |
| min_value=2, | |
| max_value=5, | |
| step=1, | |
| help=( | |
| "Number of principal components. " | |
| "Higher = more variance captured (may add noise). " | |
| "Lower = focuses on main factors." | |
| ) | |
| ) | |
| pca_tenors = st.text_input( | |
| "Tenors (Comma-separated)", | |
| value="1.0,2.0,3.0,4.0,5.0,6.0", | |
| help=( | |
| "Monthly tenors to include (e.g., 1.0,2.0,...). " | |
| "Rows with any missing values drop. " | |
| "Fewer tenors keep more dates." | |
| ) | |
| ) | |
| # Constant-Maturity Index | |
| with st.expander("Constant-Maturity Parameters", expanded=False): | |
| cm_target = st.number_input( | |
| "Target Days to Maturity", | |
| value=30, | |
| min_value=10, | |
| step=5, | |
| help=( | |
| "Target maturity for the blend in days. " | |
| "Increase = tilt toward longer contracts. " | |
| "Decrease = tilt toward front-month." | |
| ) | |
| ) | |
| cm_start = st.number_input( | |
| "Starting Index Level", | |
| value=1.0, | |
| step=0.1, | |
| help=( | |
| "Initial index level for charting. " | |
| "Scale factor only. " | |
| "Changing this does not change returns." | |
| ) | |
| ) | |
| # Stability toggle for Spaces (caps heavy charts & samples) | |
| SAFE_MODE = st.checkbox( | |
| "Safe mode (limit heavy charts)", | |
| value=True, | |
| help="Caps figure sizes & sampling to avoid out-of-memory on hosted environments.", | |
| ) | |
| # Run button | |
| run_analysis = st.button("Run Analysis") | |
| # ---------- Main ---------- | |
| if run_analysis: | |
| with st.spinner("Loading data..."): | |
| try: | |
| # Fixed end date (kept, though not used further) | |
| end_date = datetime.today() + timedelta(days=1) | |
| # Async data load (cached, fresh loop) | |
| df = fetch_term_structure_cached() | |
| # Keep only columns we actually use to cut memory | |
| keep_cols = [ | |
| "Trade Date", "Expiry", "Tenor_Days", "Tenor_Monthly", | |
| "Weekly", "Expired", "Settle" | |
| ] | |
| df = df[keep_cols].copy() | |
| # Parse Trade Date, sort | |
| df["Trade Date"] = pd.to_datetime(df["Trade Date"]) | |
| df.sort_values(["Trade Date", "Tenor_Days"], inplace=True) | |
| # Drop exact duplicates early to avoid bloat | |
| dup_count = int(df.duplicated(subset=keep_cols).sum()) | |
| if dup_count: | |
| df = df.drop_duplicates(subset=keep_cols) | |
| st.warning(f"Removed {dup_count:,} duplicate rows detected in the source feed.") | |
| # Filter by start date | |
| df = df[df["Trade Date"] >= pd.to_datetime(start_date)] | |
| if df.empty: | |
| st.error("No data available for the selected date range.") | |
| st.stop() | |
| # ---------- Section 1: Raw Dataframe ---------- | |
| st.header("VIX Term Structure Dataset") | |
| st.write("The raw VIX term structure data loaded for analysis.") | |
| with st.expander("1. Raw Dataset", expanded=False): | |
| max_show = 2_000 if SAFE_MODE else 10_000 | |
| st.caption(f"Showing last {min(max_show, len(df)):,} rows (total {len(df):,}). Use download to get all.") | |
| st.dataframe(limit_df_rows_for_display(df, max_show), use_container_width=True, height=420) | |
| st.download_button( | |
| "Download full CSV", | |
| df.to_csv(index=False).encode(), | |
| file_name="vix_term_structure.csv", | |
| mime="text/csv", | |
| ) | |
| # ---------- Section 2: Slope Time Series Analysis ---------- | |
| st.header("Slope Time Series Across Time") | |
| st.write("Visualizes the VIX term structure slopes over time with regime classifications.") | |
| with st.expander("Methodology", expanded=False): | |
| st.write(""" | |
| This analysis filters the VIX futures data to include only dates from the specified start date onward. For each trade date, the data is grouped, and only groups with at least two tenors are considered. The term structure is sorted by monthly tenor. | |
| The slope for each date is calculated as the difference between the settle price of the last tenor and the first tenor: | |
| """) | |
| st.latex(r"\text{Slope} = \text{Settle}_{\text{last}} - \text{Settle}_{\text{first}}") | |
| st.write(""" | |
| Regimes are classified based on this slope and a user-defined threshold θ (default 0.5): | |
| - If Slope > θ, the regime is CONTANGO (futures prices increase with maturity, indicating normal market conditions where longer-term volatility is expected to be higher). | |
| """) | |
| st.latex(r"\text{Slope} > \theta") | |
| st.write(""" | |
| - If Slope < -θ, the regime is BACKWARDATION (futures prices decrease with maturity, often signaling market stress or imminent volatility spikes). | |
| """) | |
| st.latex(r"\text{Slope} < -\theta") | |
| st.write(""" | |
| - Otherwise, the regime is CAUTIOUS (flat or near-flat curve, indicating uncertainty or transition between regimes). | |
| A Plotly figure with a slider is created to navigate through each trade date's term structure curve (settle prices vs expiry dates). Each curve is plotted with lines and markers, visible one at a time via the slider. The title dynamically updates to show the date and regime. | |
| This visualization allows investors to observe how the term structure evolves over time and how regimes shift. This gives insights into market sentiment and potential volatility dynamics. | |
| """) | |
| df_sub = df.copy() | |
| df_sub.sort_values("Trade Date", inplace=True) | |
| groups = [ | |
| (dt, grp.sort_values("Tenor_Monthly")) | |
| for dt, grp in df_sub.groupby("Trade Date") | |
| if len(grp) > 1 | |
| ] | |
| # Cap the number of slider dates to avoid thousands of traces | |
| MAX_SLIDER_DATES = 240 if SAFE_MODE else 800 | |
| if len(groups) > MAX_SLIDER_DATES: | |
| groups = groups[-MAX_SLIDER_DATES:] | |
| regime_map = {} | |
| for dt, grp in groups: | |
| slope = grp["Settle"].iloc[-1] - grp["Settle"].iloc[0] | |
| if slope > slope_thr: | |
| regime = "CONTANGO" | |
| elif slope < -slope_thr: | |
| regime = "BACKWARDATION" | |
| else: | |
| regime = "CAUTIOUS" | |
| regime_map[str(pd.to_datetime(dt).date())] = regime | |
| if len(groups) == 0: | |
| st.info("Not enough data to render the slider plot.") | |
| else: | |
| fig = go.Figure() | |
| dates = [] | |
| for i, (dt, grp) in enumerate(groups): | |
| date_str = str(pd.to_datetime(dt).date()) | |
| dates.append(date_str) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=grp["Expiry"], | |
| y=grp["Settle"], | |
| mode="lines+markers", | |
| name=date_str, | |
| visible=(i == len(groups) - 1), | |
| line=dict(width=2), | |
| marker=dict(size=6), | |
| ) | |
| ) | |
| steps = [] | |
| for i, d in enumerate(dates): | |
| title = f"VIX Term Structure — {d} — {regime_map[d]}" | |
| steps.append({ | |
| "method": "update", | |
| "args": [ | |
| {"visible": [j == i for j in range(len(dates))]}, | |
| {"title": title}, | |
| ], | |
| "label": d, | |
| }) | |
| slider = { | |
| "active": len(dates) - 1, | |
| "currentvalue": {"prefix": "Trade Date: ", "font": {"size": 14}}, | |
| "pad": {"t": 16, "b": 0}, | |
| "x": 0.0, | |
| "y": 0.0015, | |
| "len": 1.0, | |
| "steps": steps, | |
| } | |
| fig.update_layout( | |
| sliders=[slider], | |
| title=f"VIX Term Structure — {dates[-1]} — {regime_map[dates[-1]]}", | |
| xaxis_title="Futures Expiry", | |
| yaxis_title="VIX Futures Price", | |
| height=500, | |
| margin=dict(l=60, r=20, t=60, b=90), | |
| template="plotly_dark", | |
| paper_bgcolor=BG, #"rgba(0,0,0,1)", | |
| plot_bgcolor=BG,#"rgba(0,0,0,1)", | |
| title_font_color="white", | |
| font=dict(color="white"), | |
| ) | |
| fig.update_xaxes(gridcolor="rgba(255,255,255,0.08)") | |
| fig.update_yaxes(gridcolor="rgba(255,255,255,0.08)") | |
| st.plotly_chart(fig, use_container_width=True) | |
| with st.expander("Dynamic Interpretation", expanded=False): | |
| # Build daily and interpretations (unchanged logic) | |
| daily_rows = [] | |
| grp_map = {pd.to_datetime(dt): g.sort_values('Tenor_Monthly') for dt, g in groups} | |
| for dt, grp in grp_map.items(): | |
| g = grp.dropna(subset=['Tenor_Monthly', 'Settle']).copy() | |
| settle_by_m = g.groupby('Tenor_Monthly', as_index=True)['Settle'].last() | |
| if settle_by_m.size < 2: | |
| continue | |
| first_tenor = settle_by_m.index.min() | |
| last_tenor = settle_by_m.index.max() | |
| front = float(settle_by_m.loc[first_tenor]) | |
| back = float(settle_by_m.loc[last_tenor]) | |
| slope = back - front | |
| curve_width = float(settle_by_m.max() - settle_by_m.min()) | |
| n_tenors = int(settle_by_m.size) | |
| vx1 = float(settle_by_m.loc[1.0]) if 1.0 in settle_by_m.index else np.nan | |
| vx2 = float(settle_by_m.loc[2.0]) if 2.0 in settle_by_m.index else np.nan | |
| vx6 = float(settle_by_m.loc[6.0]) if 6.0 in settle_by_m.index else np.nan | |
| c12 = (vx2 - vx1) if np.isfinite(vx1) and np.isfinite(vx2) else np.nan | |
| c61 = (vx6 - vx1) if np.isfinite(vx1) and np.isfinite(vx6) else np.nan | |
| dstr = str(pd.to_datetime(dt).date()) | |
| daily_rows.append({ | |
| 'Trade Date': pd.to_datetime(dt), | |
| 'Slope': slope, | |
| 'Front': front, | |
| 'Back': back, | |
| 'CurveWidth': curve_width, | |
| 'NumTenors': n_tenors, | |
| 'VX2_VX1': c12, | |
| 'VX6_VX1': c61, | |
| 'Regime': regime_map.get(dstr, 'UNKNOWN') | |
| }) | |
| daily = pd.DataFrame(daily_rows).sort_values('Trade Date').reset_index(drop=True) | |
| if not daily.empty: | |
| for w in (5, 20, 60, 120): | |
| mp_mean = min(3, w) | |
| mp_std = min(10, w) | |
| daily[f'Slope_MA_{w}'] = daily['Slope'].rolling(window=w, min_periods=mp_mean).mean() | |
| daily[f'Slope_STD_{w}'] = daily['Slope'].rolling(window=w, min_periods=mp_std).std() | |
| daily['Slope_Z_120'] = np.where( | |
| daily['Slope_STD_120'].fillna(0) > 0, | |
| (daily['Slope'] - daily['Slope_MA_120']) / daily['Slope_STD_120'], | |
| np.nan | |
| ) | |
| def _streak(vals): | |
| out = np.ones(len(vals), dtype=int) | |
| for i in range(1, len(vals)): | |
| out[i] = out[i-1] + 1 if vals[i] == vals[i-1] else 1 | |
| return out | |
| daily['Regime_Streak'] = _streak(daily['Regime'].to_numpy()) | |
| def _trend_tag(val, ref): | |
| if pd.isna(ref): return "n/a" | |
| return "above" if val > ref else ("below" if val < ref else "equal") | |
| def _trend_word(vs_ma): | |
| if vs_ma == "above": return "steeper" | |
| if vs_ma == "below": return "flatter" | |
| return "unchanged" | |
| def _carry_word(x): | |
| if pd.isna(x): return "n/a" | |
| return "positive" if x >= 0 else "negative" | |
| def _dominant_regime(comp): | |
| if not comp: return "n/a" | |
| k = max(comp, key=comp.get) | |
| return f"{k.lower()} ({comp[k]:.1f}%)" | |
| def _safe_pct_rank(series, value): | |
| s = pd.to_numeric(series, errors='coerce').dropna() | |
| if s.empty or not np.isfinite(value): return np.nan | |
| return float((s < value).mean() * 100.0) | |
| def _qbin(series, value, q=(0.1,0.25,0.5,0.75,0.9)): | |
| s = pd.to_numeric(series, errors='coerce').dropna() | |
| if s.empty or not np.isfinite(value): return "n/a" | |
| qs = s.quantile(list(q)).to_dict() | |
| if value <= qs[q[0]]: return f"≤{int(q[0]*100)}th" | |
| if value <= qs[q[1]]: return f"{int(q[0]*100)}–{int(q[1]*100)}th" | |
| if value <= qs[q[2]]: return f"{int(q[1]*100)}–{int(q[2]*100)}th" | |
| if value <= qs[q[3]]: return f"{int(q[2]*100)}–{int(q[3]*100)}th" | |
| if value <= qs[q[4]]: return f"{int(q[3]*100)}–{int(q[4]*100)}th" | |
| return f">{int(q[4]*100)}th" | |
| start, end = daily['Trade Date'].min().date(), daily['Trade Date'].max().date() | |
| days = len(daily) | |
| avg_tenors = daily['NumTenors'].mean() | |
| last = daily.iloc[-1] | |
| st.write("— Snapshot —") | |
| st.write(f"Sample {start} to {end} ({days} days).") | |
| st.write(f"Average tenors per day {avg_tenors:.1f}.") | |
| st.write(f"Today {last['Regime'].lower()} with slope {last['Slope']:.2f} pts.") | |
| st.write(f"Curve width {last['CurveWidth']:.2f} pts across {last['NumTenors']} tenors.") | |
| if not pd.isna(last['VX2_VX1']): | |
| st.write(f"Front carry VX2−VX1 {last['VX2_VX1']:.2f} pts ({_carry_word(last['VX2_VX1'])}).") | |
| if not pd.isna(last['VX6_VX1']): st.write(f"Term carry VX6−VX1 {last['VX6_VX1']:.2f} pts ({_carry_word(last['VX6_VX1'])}).") | |
| tag5 = _trend_tag(last['Slope'], last.get('Slope_MA_5')) | |
| tag20 = _trend_tag(last['Slope'], last.get('Slope_MA_20')) | |
| if tag5 != "n/a": st.write(f"Slope is {_trend_word(tag5)} than 5-day average.") | |
| if tag20 != "n/a": st.write(f"Slope is {_trend_word(tag20)} than 20-day average.") | |
| z120 = last.get('Slope_Z_120', np.nan) | |
| if not pd.isna(z120): | |
| if z120 >= 2: st.write(f"Slope high vs 120-day history (z={z120:.2f}).") | |
| elif z120 <= -2: st.write(f"Slope low vs 120-day history (z={z120:.2f}).") | |
| else: st.write(f"Slope within 120-day normal (z={z120:.2f}).") | |
| arr = pd.to_numeric(daily['Slope'], errors='coerce').dropna().to_numpy() | |
| if arr.size and np.isfinite(last['Slope']): | |
| pct = float((arr < last['Slope']).mean() * 100.0) | |
| st.write(f"Slope at {pct:.1f} percentile of sample.") | |
| for window in (30, 90): | |
| sub = daily.tail(window) | |
| if sub.empty: continue | |
| comp = (sub['Regime'].value_counts(normalize=True) * 100).to_dict() | |
| dom = _dominant_regime(comp) | |
| st.write(f"Last {window} days dominant regime {dom}.") | |
| streak = int(last['Regime_Streak']) | |
| if len(daily) >= 2: | |
| changed = daily['Regime'].to_numpy() != daily['Regime'].shift(1).to_numpy() | |
| if changed.any(): | |
| last_change_idx = np.where(changed)[0][-1] | |
| last_change_day = daily.iloc[last_change_idx]['Trade Date'].date() | |
| st.write(f"Current {last['Regime'].lower()} streak {streak} days since {last_change_day}.") | |
| else: | |
| st.write(f"Current {last['Regime'].lower()} streak {streak} days.") | |
| if len(daily) >= 3: | |
| hi = daily.nlargest(1, 'Slope').iloc[0] | |
| lo = daily.nsmallest(1, 'Slope').iloc[0] | |
| st.write(f"Max slope {hi['Slope']:.2f} on {hi['Trade Date'].date()} ({hi['Regime']}).") | |
| st.write(f"Min slope {lo['Slope']:.2f} on {lo['Trade Date'].date()} ({lo['Regime']}).") | |
| anoms = daily[daily['Slope_Z_120'].abs() >= 3] | |
| if len(anoms) > 0: | |
| last_a = anoms.iloc[-1] | |
| st.write(f"Recent anomaly {last_a['Trade Date'].date()} (|z120|={abs(last_a['Slope_Z_120']):.2f}).") | |
| sparse = daily[daily['NumTenors'] < 3] | |
| if len(sparse) > 0: | |
| st.write(f"{len(sparse)} sparse days (<3 tenors). Treat slopes carefully.") | |
| st.write("— History context —") | |
| today = last['Trade Date'].date() | |
| reg = last['Regime'] | |
| slope_pct = _safe_pct_rank(daily['Slope'], last['Slope']) | |
| width_pct = _safe_pct_rank(daily['CurveWidth'], last['CurveWidth']) | |
| c12_pct = _safe_pct_rank(daily['VX2_VX1'], last['VX2_VX1']) if pd.notna(last['VX2_VX1']) else np.nan | |
| c61_pct = _safe_pct_rank(daily['VX6_VX1'], last['VX6_VX1']) if pd.notna(last['VX6_VX1']) else np.nan | |
| if pd.notna(slope_pct): st.write(f"{today}: slope percentile vs sample {slope_pct:.1f}%.") | |
| if pd.notna(width_pct): st.write(f"{today}: width percentile vs sample {width_pct:.1f}%.") | |
| if pd.notna(c12_pct): st.write(f"{today}: VX2−VX1 percentile vs sample {c12_pct:.1f}%.") | |
| if pd.notna(c61_pct): st.write(f"{today}: VX6−VX1 percentile vs sample {c61_pct:.1f}%.") | |
| sub_reg = daily[daily['Regime'] == reg] | |
| if not sub_reg.empty: | |
| slope_reg_pct = _safe_pct_rank(sub_reg['Slope'], last['Slope']) | |
| width_reg_pct = _safe_pct_rank(sub_reg['CurveWidth'], last['CurveWidth']) | |
| if pd.notna(slope_reg_pct): st.write(f"{today}: slope percentile within {reg.lower()} {slope_reg_pct:.1f}%.") | |
| if pd.notna(width_reg_pct): st.write(f"{today}: width percentile within {reg.lower()} {width_reg_pct:.1f}%.") | |
| slope_med = sub_reg['Slope'].median() | |
| slope_diff = last['Slope'] - slope_med | |
| st.write(f"{today}: slope vs {reg.lower()} median {slope_diff:+.2f} pts (median {slope_med:.2f}).") | |
| else: | |
| st.write(f"{today}: no history for regime {reg}.") | |
| spells = [] | |
| start_idx = 0 | |
| vals = daily['Regime'].to_numpy() | |
| for i in range(1, len(vals)+1): | |
| if i == len(vals) or vals[i] != vals[i-1]: | |
| r = vals[i-1] | |
| length = i - start_idx | |
| spells.append({'Regime': r, 'Length': length, 'EndIndex': i-1}) | |
| start_idx = i | |
| spells = pd.DataFrame(spells) | |
| if not spells.empty: | |
| cur_len = int(spells.iloc[-1]['Length']) | |
| reg_spells = spells[spells['Regime'] == reg]['Length'] | |
| mean_len = reg_spells.mean() if not reg_spells.empty else np.nan | |
| p75_len = reg_spells.quantile(0.75) if not reg_spells.empty else np.nan | |
| st.write(f"{today}: current {reg.lower()} spell {cur_len} days.") | |
| if pd.notna(mean_len): | |
| tag = "longer" if cur_len > mean_len else ("shorter" if cur_len < mean_len else "near mean") | |
| st.write(f"{today}: spell is {tag} than mean ({mean_len:.1f} days).") | |
| if pd.notna(p75_len): | |
| st.write(f"{today}: spell {'≥' if cur_len >= p75_len else '<'} 75th percentile ({p75_len:.0f} days).") | |
| trans = ( | |
| daily[['Regime']] | |
| .assign(Prev=lambda x: x['Regime'].shift(1)) | |
| .dropna() | |
| .value_counts() | |
| .rename('Count') | |
| .reset_index() | |
| ) | |
| if not trans.empty: | |
| denom = trans.groupby('Prev')['Count'].sum() | |
| trans['Prob'] = trans.apply(lambda r: r['Count'] / denom.loc[r['Prev']], axis=1) | |
| stay_row = trans[(trans['Prev']==reg) & (trans['Regime']==reg)] | |
| if not stay_row.empty: | |
| p_stay = float(stay_row['Prob'].iloc[0]) | |
| st.write(f"{today}: one-day stay probability in {reg.lower()} {p_stay:.2f}.") | |
| daily['Month'] = daily['Trade Date'].dt.to_period('M') | |
| cur_month = daily['Month'].iloc[-1] | |
| mtd = daily[daily['Month'] == cur_month] | |
| if not mtd.empty: | |
| mtd_chg = (mtd['Regime'] != mtd['Regime'].shift(1)).sum() | |
| mtd_changes = max(0, mtd_chg - 1) | |
| by_month = ( | |
| daily | |
| .assign(chg=lambda x: x['Regime'] != x['Regime'].shift(1)) | |
| .groupby('Month')['chg'].sum() | |
| .clip(lower=0) - 1 | |
| ).clip(lower=0) | |
| typical = by_month.median() if not by_month.dropna().empty else np.nan | |
| st.write(f"{today}: regime changes month-to-date {int(mtd_changes)}.") | |
| if pd.notna(typical): | |
| comp = "above" if mtd_changes > typical else ("below" if mtd_changes < typical else "in line") | |
| st.write(f"{today}: MTD regime churn {comp} median month ({typical:.0f}).") | |
| moy = mtd['Trade Date'].dt.month.iloc[-1] | |
| same_moy = daily[daily['Trade Date'].dt.month == moy]['Slope'] | |
| if not same_moy.dropna().empty: | |
| moy_pct = _safe_pct_rank(same_moy, last['Slope']) | |
| st.write(f"{today}: slope percentile vs historical {pd.Timestamp(today).strftime('%B')} {moy_pct:.1f}%.") | |
| slope_bin = _qbin(daily['Slope'], last['Slope']) | |
| width_bin = _qbin(daily['CurveWidth'], last['CurveWidth']) | |
| st.write(f"{today}: slope bin {slope_bin}.") | |
| st.write(f"{today}: width bin {width_bin}.") | |
| s_all = pd.to_numeric(daily['Slope'], errors='coerce').dropna() | |
| if not s_all.empty and np.isfinite(last['Slope']): | |
| tail_hi = (s_all >= last['Slope']).mean()*100.0 | |
| tail_lo = (s_all <= last['Slope']).mean()*100.0 | |
| tail = min(tail_hi, tail_lo) | |
| st.write(f"{today}: tail frequency at this slope level {tail:.1f}%.") | |
| band = max(0.25, s_all.std()*0.1) if not s_all.empty else 0.25 | |
| recent_sim = daily[(daily['Regime'] == reg) & | |
| (daily['Slope'].between(last['Slope']-band, last['Slope']+band))] | |
| if len(recent_sim) >= 2: | |
| prev = recent_sim.iloc[-2]['Trade Date'].date() | |
| days_since = (pd.Timestamp(today) - pd.Timestamp(prev)).days | |
| st.write(f"{today}: last similar day was {prev} ({days_since} days ago).") | |
| def _stab(row): | |
| c1 = abs(row['Slope'] - row.get('Slope_MA_20', np.nan)) | |
| c2 = abs(row.get('Slope_Z_120', np.nan)) | |
| parts = [] | |
| if np.isfinite(c1): parts.append(1.0 / (1.0 + c1)) | |
| if np.isfinite(c2): parts.append(1.0 / (1.0 + c2)) | |
| return np.mean(parts) if parts else np.nan | |
| daily['Stability'] = daily.apply(_stab, axis=1) | |
| stab_pct = _safe_pct_rank(daily['Stability'], daily['Stability'].iloc[-1]) | |
| if pd.notna(stab_pct): | |
| st.write(f"{today}: stability percentile {stab_pct:.1f}% (higher means steadier slope).") | |
| # ---------- Section 3: 3D Term-Structure Visualization ---------- | |
| st.header("Term-Structure Surface") | |
| st.write("A 3D scatter plot showing the VIX term structure over time with trade date, days to expiration, and settle price.") | |
| with st.expander("Methodology", expanded=False): | |
| st.write(""" | |
| This visualization filters the data to include only monthly (non-weekly) and non-expired VIX futures contracts. Dates with fewer than two tenors are excluded to ensure meaningful term structures. | |
| The data is grouped by trade date, and a 3D scatter plot is created using Plotly Express, with axes: | |
| - X: Trade Date | |
| - Y: Tenor_Days (days to expiration) | |
| - Z: Settle (settlement price) | |
| - Color: Settle price for visual distinction | |
| This plot provides view of how the term structure (settle prices across different maturities) evolves over time. It allows observation of patterns such as persistent contango or backwardation, level shifts, and curvature changes in a multi-dimensional space. | |
| Key computations per date (used in interpretation but not directly in plot): | |
| - Level: Mean settle price across tenors | |
| """) | |
| st.latex(r"\bar{S} = \frac{1}{n} \sum_{i=1}^{n} S_i") | |
| st.write(""" | |
| - Width: Max - Min settle price | |
| """) | |
| st.latex(r"W = \max(S) - \min(S)") | |
| st.write(""" | |
| - Slope (pts/day): Linear regression coefficient of settle vs tenor_days | |
| """) | |
| st.latex(r"S = \beta_0 + \beta_1 \cdot D + \epsilon") | |
| st.write(""" | |
| where β1 is the slope | |
| """) | |
| st.write(""" | |
| - Curvature: Quadratic regression coefficient if ≥3 tenors | |
| """) | |
| st.latex(r"S = \gamma_0 + \gamma_1 \cdot D + \gamma_2 \cdot D^2 + \epsilon") | |
| st.write(""" | |
| where γ2 is the curvature | |
| """) | |
| st.write(""" | |
| - Carry spreads: Differences like VX2 - VX1 if tenors available | |
| The Surface helps identify clusters, trends, and anomalies in the term structure surface. | |
| """) | |
| monthly_df = df[(df["Weekly"] == False) & (df["Expired"] == False)].copy() | |
| valid_dates = monthly_df['Trade Date'].value_counts() | |
| valid_dates = valid_dates[valid_dates > 1].index | |
| monthly_df_filtered = monthly_df[monthly_df['Trade Date'].isin(valid_dates)].copy() | |
| # Downsample for hosted stability | |
| if SAFE_MODE: | |
| cutoff = pd.Timestamp.today().normalize() - pd.DateOffset(months=24) | |
| monthly_df_filtered = monthly_df_filtered[monthly_df_filtered["Trade Date"] >= cutoff] | |
| unique_dates = monthly_df_filtered["Trade Date"].drop_duplicates().sort_values() | |
| step = max(1, len(unique_dates) // 150) if len(unique_dates) else 1 | |
| keep_dates = set(unique_dates.iloc[::step]) | |
| monthly_df_filtered = monthly_df_filtered[monthly_df_filtered["Trade Date"].isin(keep_dates)] | |
| MAX_ROWS_3D = 15_000 if SAFE_MODE else 50_000 | |
| if len(monthly_df_filtered) == 0: | |
| st.info("Not enough data for the 3D surface after filtering.") | |
| elif len(monthly_df_filtered) > MAX_ROWS_3D: | |
| st.warning("3D surface skipped to protect memory (too many points). Disable Safe mode or narrow the date range.") | |
| else: | |
| cmin = float(monthly_df_filtered["Settle"].min()) | |
| cmax = float(monthly_df_filtered["Settle"].max()) | |
| fig = px.scatter_3d( | |
| monthly_df_filtered, | |
| x="Trade Date", | |
| y="Tenor_Days", | |
| z="Settle", | |
| color="Settle", | |
| color_continuous_scale="Viridis", | |
| range_color=(cmin, cmax), | |
| title="VIX Futures Term Structure over Time" | |
| ) | |
| fig.update_traces( | |
| marker=dict(size=2 if SAFE_MODE else 3, opacity=0.9, | |
| colorbar=dict( | |
| title="Settle", | |
| thickness=12, | |
| tickcolor="white", | |
| titlefont=dict(color="white"), | |
| tickfont=dict(color="white") | |
| )) | |
| ) | |
| fig.update_layout( | |
| scene=dict( | |
| xaxis_title='Trade Date', | |
| yaxis_title='Days to Expiration', | |
| zaxis_title='Settle Price', | |
| xaxis=dict(gridcolor="rgba(255,255,255,0.08)", tickcolor="white"), | |
| yaxis=dict(gridcolor="rgba(255,255,255,0.08)", tickcolor="white"), | |
| zaxis=dict(gridcolor="rgba(255,255,255,0.08)", tickcolor="white"), | |
| bgcolor=BG,#"rgba(0,0,0,1)" | |
| ), | |
| template="plotly_dark", | |
| paper_bgcolor=BG, #"rgba(0,0,0,1)", | |
| plot_bgcolor=BG,#"rgba(0,0,0,1)", | |
| title_font_color="white", | |
| font=dict(color="white"), | |
| margin=dict(l=0, r=0, t=60, b=0) | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| with st.expander("Dynamic Interpretation", expanded=False): | |
| grouped = monthly_df_filtered.groupby("Trade Date") | |
| rows = [] | |
| for dt, g in grouped: | |
| g = g.dropna(subset=["Tenor_Days", "Settle"]).sort_values("Tenor_Days") | |
| if g["Tenor_Days"].nunique() < 2: | |
| continue | |
| level = float(g["Settle"].mean()) | |
| width = float(g["Settle"].max() - g["Settle"].min()) | |
| front = float(g.iloc[0]["Settle"]) | |
| back = float(g.iloc[-1]["Settle"]) | |
| nten = int(g["Tenor_Days"].nunique()) | |
| x = g["Tenor_Days"].to_numpy(dtype=float) | |
| y = g["Settle"].to_numpy(dtype=float) | |
| lin = np.polyfit(x, y, 1) | |
| slope = float(lin[0]) | |
| curv = np.nan | |
| if nten >= 3: | |
| quad = np.polyfit(x, y, 2) | |
| curv = float(quad[0]) | |
| by_m = (g.dropna(subset=["Tenor_Monthly"]) | |
| .drop_duplicates("Tenor_Monthly") | |
| .set_index("Tenor_Monthly")["Settle"]) | |
| vx1 = float(by_m.loc[1.0]) if 1.0 in by_m.index else np.nan | |
| vx2 = float(by_m.loc[2.0]) if 2.0 in by_m.index else np.nan | |
| vx6 = float(by_m.loc[6.0]) if 6.0 in by_m.index else np.nan | |
| c12 = (vx2 - vx1) if np.isfinite(vx1) and np.isfinite(vx2) else np.nan | |
| c61 = (vx6 - vx1) if np.isfinite(vx1) and np.isfinite(vx6) else np.nan | |
| rows.append({ | |
| "Trade Date": pd.to_datetime(dt), | |
| "Level": level, | |
| "Width": width, | |
| "Front": front, | |
| "Back": back, | |
| "Slope_pd": slope, | |
| "Curvature": curv, | |
| "NumTenors": nten, | |
| "VX2_VX1": c12, | |
| "VX6_VX1": c61 | |
| }) | |
| surf = pd.DataFrame(rows).sort_values("Trade Date").reset_index(drop=True) | |
| if not surf.empty: | |
| for w in (5, 20, 60, 120): | |
| mp_mean = min(3, w) | |
| mp_std = min(10, w) | |
| surf[f"SlopeMA_{w}"] = surf["Slope_pd"].rolling(w, min_periods=mp_mean).mean() | |
| surf[f"SlopeSTD_{w}"] = surf["Slope_pd"].rolling(w, min_periods=mp_std).std() | |
| surf[f"LevelMA_{w}"] = surf["Level"].rolling(w, min_periods=mp_mean).mean() | |
| surf["SlopeZ_120"] = np.where( | |
| surf["SlopeSTD_120"].fillna(0) > 0, | |
| (surf["Slope_pd"] - surf["SlopeMA_120"]) / surf["SlopeSTD_120"], | |
| np.nan | |
| ) | |
| def pct_rank(series, value): | |
| s = pd.to_numeric(series, errors="coerce").dropna() | |
| if s.empty or not np.isfinite(value): | |
| return np.nan | |
| return float((s < value).mean() * 100.0) | |
| def explain_percentile(label, pct): | |
| if pd.isna(pct): | |
| st.write(f"{label}: n/a. Not enough history.") | |
| else: | |
| higher = 100.0 - pct | |
| st.write(f"{label}: {pct:.1f}% of days were lower. {higher:.1f}% were higher.") | |
| def trend_tag(val, ref): | |
| if pd.isna(ref): return "n/a" | |
| return "above" if val > ref else ("below" if val < ref else "equal") | |
| def carry_word(x): | |
| if pd.isna(x): return "n/a" | |
| return "positive" if x >= 0 else "negative" | |
| last = surf.iloc[-1] | |
| start, end = surf["Trade Date"].min().date(), surf["Trade Date"].max().date() | |
| st.write("— Term-structure surface snapshot —") | |
| st.write(f"Sample {start} to {end} ({len(surf)} days).") | |
| st.write(f"Level {last['Level']:.2f}. Width {last['Width']:.2f}.") | |
| st.write(f"Slope {last['Slope_pd']:.4f} pts/day. Curvature {last['Curvature']:.6f}.") | |
| if not pd.isna(last["VX2_VX1"]): | |
| st.write(f"Front carry VX2−VX1 {last['VX2_VX1']:.2f} ({carry_word(last['VX2_VX1'])}).") | |
| if not pd.isna(last["VX6_VX1"]): | |
| st.write(f"Term carry VX6−VX1 {last['VX6_VX1']:.2f} ({carry_word(last['VX6_VX1'])}).") | |
| t5 = trend_tag(last["Slope_pd"], last.get("SlopeMA_5")) | |
| t20 = trend_tag(last["Slope_pd"], last.get("SlopeMA_20")) | |
| if t5 != "n/a": st.write(f"Slope is {t5} the 5-day mean.") | |
| if t20 != "n/a": st.write(f"Slope is {t20} the 20-day mean.") | |
| z = last.get("SlopeZ_120", np.nan) | |
| if pd.notna(z): | |
| if z >= 2: st.write(f"Slope is high vs 120-day history (z={z:.2f}).") | |
| elif z <= -2: st.write(f"Slope is low vs 120-day history (z={z:.2f}).") | |
| else: st.write(f"Slope is within 120-day range (z={z:.2f}).") | |
| st.write("— How today compares to history —") | |
| slope_pct = pct_rank(surf["Slope_pd"], last["Slope_pd"]) | |
| width_pct = pct_rank(surf["Width"], last["Width"]) | |
| level_pct = pct_rank(surf["Level"], last["Level"]) | |
| explain_percentile("Slope percentile", slope_pct) | |
| explain_percentile("Width percentile", width_pct) | |
| explain_percentile("Level percentile", level_pct) | |
| if pd.notna(last["VX2_VX1"]): | |
| c12_pct = pct_rank(surf["VX2_VX1"], last["VX2_VX1"]) | |
| explain_percentile("VX2−VX1 percentile", c12_pct) | |
| if pd.notna(last["VX6_VX1"]): | |
| c61_pct = pct_rank(surf["VX6_VX1"], last["VX6_VX1"]) | |
| explain_percentile("VX6−VX1 percentile", c61_pct) | |
| hi = surf.nlargest(1, "Slope_pd").iloc[0] | |
| lo = surf.nsmallest(1, "Slope_pd").iloc[0] | |
| st.write(f"Steepest day {hi['Trade Date'].date()} with {hi['Slope_pd']:.4f} pts/day.") | |
| st.write(f"Flattest day {lo['Trade Date'].date()} with {lo['Slope_pd']:.4f} pts/day.") | |
| surf["Month"] = surf["Trade Date"].dt.to_period("M") | |
| cur_m = surf["Month"].iloc[-1] | |
| mtd = surf[surf["Month"] == cur_m] | |
| if not mtd.empty and len(mtd) >= 5: | |
| mtd_slope_std = float(mtd["Slope_pd"].std()) | |
| mtd_level_std = float(mtd["Level"].std()) | |
| st.write(f"MTD slope std {mtd_slope_std:.4f} pts/day. MTD level std {mtd_level_std:.2f}.") | |
| sparse = surf[surf["NumTenors"] < 3] | |
| if len(sparse) > 0: | |
| st.write(f"{len(sparse)} days have <3 tenors. Interpret slope and curvature carefully.") | |
| # ---------- Section 4: HMM Regime Classification ---------- | |
| st.header("HMM Regime Classification") | |
| st.write("Classifies VIX regimes using Hidden Markov Model on slope time series.") | |
| with st.expander("Methodology", expanded=False): | |
| st.write(""" | |
| This analysis focuses on monthly VIX futures contracts. For each trade date with at least two tenors, the daily slope is computed as the linear regression coefficient of settle prices against days to expiration: | |
| """) | |
| st.latex(r"S = \beta_0 + \beta_1 \cdot D + \epsilon, \quad \text{slope} = \beta_1") | |
| st.write(""" | |
| where S is settle price, D is tenor_days. | |
| The slopes are standardized: | |
| """) | |
| st.latex(r"Z = \frac{\beta_1 - \mu}{\sigma}") | |
| st.write(""" | |
| A Gaussian Hidden Markov Model (HMM) is fitted to the standardized slopes with user-specified number of components (states, default 3), full covariance, and max iterations (default 500). The HMM assumes the observed slopes are emissions from hidden regimes, modeled as Gaussian distributions. | |
| The HMM parameters include: | |
| - Means and covariances for each state's emission probability: | |
| """) | |
| st.latex(r"p(x_t \mid z_t = k) = \mathcal{N}(x_t \mid \mu_k, \Sigma_k)") | |
| st.write(""" | |
| - Transition matrix | |
| """) | |
| st.latex(r"A_{jk} = p(z_t = k \mid z_{t-1} = j)") | |
| st.write(""" | |
| - Initial state probabilities | |
| """) | |
| st.latex(r"\pi_k = p(z_1 = k)") | |
| st.write(""" | |
| States are mapped to regimes based on sorted mean slopes: lowest to BACKWARDATION, middle to CAUTIOUS, highest to CONTANGO (for 3 states). | |
| The Viterbi algorithm predicts the most likely sequence of hidden states (regimes). | |
| The plot shows slopes over time, colored by regime, with a black line connecting the slopes and a dashed horizontal at 0 for reference. | |
| """) | |
| base = df[~df['Weekly']].copy() | |
| rows = [] | |
| for d, g in base.groupby('Trade Date'): | |
| g = g.sort_values('Tenor_Days') | |
| if len(g) < 2: | |
| continue | |
| slope = np.polyfit(g['Tenor_Days'], g['Settle'], 1)[0] | |
| rows.append({'Trade Date': d, 'Slope': slope}) | |
| slope_df = pd.DataFrame(rows).sort_values('Trade Date') | |
| # Cap HMM sample size in SAFE_MODE | |
| MAX_HMM_OBS = 800 if SAFE_MODE else 3000 | |
| if len(slope_df) > MAX_HMM_OBS: | |
| slope_df = slope_df.tail(MAX_HMM_OBS) | |
| X = StandardScaler().fit_transform(slope_df[['Slope']]) | |
| # Trim iterations in safe mode (respect user's input otherwise) | |
| hmm_iter = min(int(hmm_n_iter), 300) if SAFE_MODE else int(hmm_n_iter) | |
| hmm = GaussianHMM( | |
| n_components=int(hmm_n_components), | |
| covariance_type='full', | |
| n_iter=hmm_iter, | |
| random_state=1 | |
| ).fit(X) | |
| hidden = hmm.predict(X) | |
| state_mean = pd.Series(hmm.means_.flatten(), index=range(hmm.n_components)) | |
| order = state_mean.sort_values().index | |
| label_map = {order[i]: ['BACKWARDATION', 'CAUTIOUS', 'CONTANGO'][i] for i in range(min(3, hmm.n_components))} | |
| slope_df['Regime'] = [label_map.get(s, 'UNKNOWN') for s in hidden] | |
| cat_order = ['BACKWARDATION', 'CAUTIOUS', 'CONTANGO', 'UNKNOWN'] | |
| color_map = { | |
| 'BACKWARDATION': '#d62728', | |
| 'CAUTIOUS': '#7f7f7f', | |
| 'CONTANGO': '#2ca02c', | |
| 'UNKNOWN': '#1f77b4' | |
| } | |
| fig = px.scatter( | |
| slope_df, | |
| x='Trade Date', | |
| y='Slope', | |
| color='Regime', | |
| category_orders={'Regime': cat_order}, | |
| color_discrete_map=color_map, | |
| opacity=0.6, | |
| title='Daily VIX Curve Slope with Regime States (HMM)' | |
| ) | |
| fig.add_trace( | |
| go.Scatter( | |
| x=slope_df['Trade Date'], | |
| y=slope_df['Slope'], | |
| mode='lines', | |
| line=dict(color='white', width=1), | |
| name='Slope (line)' | |
| ) | |
| ) | |
| fig.add_hline(y=0, line_dash='dash', line_color='rgba(255,255,255,0.6)') | |
| fig.update_layout( | |
| xaxis_title='Trade Date', | |
| yaxis_title='Slope (pts / day)', | |
| template="plotly_dark", | |
| paper_bgcolor=BG, #"rgba(0,0,0,1)", | |
| plot_bgcolor=BG,# "rgba(0,0,0,1)", | |
| title_font_color="white", | |
| font=dict(color="white"), | |
| legend=dict( | |
| bgcolor="rgba(0,0,0,0)", | |
| font=dict(color="white"), | |
| title_font=dict(color="white") | |
| ), | |
| margin=dict(l=60, r=20, t=60, b=40) | |
| ) | |
| fig.update_xaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| fig.update_yaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| with st.expander("Dynamic Interpretation", expanded=False): | |
| trans = pd.DataFrame( | |
| hmm.transmat_, | |
| index=[label_map.get(i, f"S{i}") for i in range(hmm.n_components)], | |
| columns=[label_map.get(i, f"S{i}") for i in range(hmm.n_components)] | |
| ) | |
| st.write("\nTransition probabilities\n") | |
| st.dataframe(trans.round(3)) | |
| def pct_rank(series, value): | |
| s = pd.to_numeric(series, errors="coerce").dropna() | |
| if s.empty or not np.isfinite(value): | |
| return np.nan | |
| return float((s < value).mean() * 100.0) | |
| def exp_duration(pii): | |
| if np.isclose(pii, 1.0): | |
| return np.inf | |
| return 1.0 / max(1e-12, (1.0 - pii)) | |
| def note_regime(name): | |
| if name == "CONTANGO": | |
| return "term structure slopes up. carry tends to be positive." | |
| if name == "BACKWARDATION": | |
| return "term structure slopes down. stress is more likely." | |
| return "term structure is near flat. signals are mixed." | |
| def risk_bias_for_transition(src, dst): | |
| if src == "CONTANGO" and dst == "CAUTIOUS": | |
| return "carry tailwind may fade." | |
| if src == "CONTANGO" and dst == "BACKWARDATION": | |
| return "risk-off jump risk is present." | |
| if src == "CAUTIOUS" and dst == "CONTANGO": | |
| return "carry tailwind may rebuild." | |
| if src == "CAUTIOUS" and dst == "BACKWARDATION": | |
| return "stress risk increases." | |
| if src == "BACKWARDATION" and dst == "CAUTIOUS": | |
| return "stress may ease." | |
| if src == "BACKWARDATION" and dst == "CONTANGO": | |
| return "stress may unwind fast." | |
| return "no clear tilt." | |
| def entropy_row(p): | |
| p = np.asarray(p, float) | |
| p = p[p > 0] | |
| return -np.sum(p * np.log2(p)) if p.size else np.nan | |
| _, post = hmm.score_samples(X) | |
| today = slope_df['Trade Date'].iloc[-1].date() | |
| cur_state = hidden[-1] | |
| cur_regime = label_map.get(cur_state, f"S{cur_state}") | |
| cur_probs = {label_map.get(i, f"S{i}"): float(post[-1, i]) for i in range(hmm.n_components)} | |
| cur_prob = cur_probs[cur_regime] | |
| stay_prob = float(trans.loc[cur_regime, cur_regime]) | |
| edur = exp_duration(stay_prob) | |
| st.write("— Interpretation —") | |
| st.write(f"Date {today}. Model labels today as {cur_regime} (prob {cur_prob:.2f}).") | |
| st.write(f"This means {note_regime(cur_regime)}") | |
| if cur_prob >= 0.8: | |
| st.write("Confidence is high. The label is stable.") | |
| elif cur_prob >= 0.6: | |
| st.write("Confidence is moderate. Treat it as useful, not certain.") | |
| else: | |
| st.write("Confidence is low. Be cautious using this label.") | |
| if stay_prob >= 0.85: | |
| st.write("Day-to-day persistence is high. Expect the same regime tomorrow.") | |
| elif stay_prob >= 0.65: | |
| st.write("Day-to-day persistence is moderate. A hold is slightly more likely.") | |
| else: | |
| st.write("Day-to-day persistence is low. A switch is common.") | |
| if np.isinf(edur): | |
| st.write("Spells in this regime can run very long in this model.") | |
| elif edur >= 10: | |
| st.write(f"Typical spell length is long (~{edur:.0f} days).") | |
| elif edur >= 5: | |
| st.write(f"Typical spell length is medium (~{edur:.0f} days).") | |
| else: | |
| st.write(f"Typical spell length is short (~{edur:.0f} days).") | |
| streak = 1 | |
| for i in range(len(hidden) - 2, -1, -1): | |
| if hidden[i] == cur_state: | |
| streak += 1 | |
| else: | |
| break | |
| st.write(f"Current streak is {streak} days.") | |
| if np.isfinite(edur): | |
| ratio = streak / max(edur, 1e-9) | |
| if ratio >= 1.2: | |
| st.write("Streak is long vs average. Switch risk rises.") | |
| elif ratio >= 0.6: | |
| st.write("Streak is mid to late stage.") | |
| else: | |
| st.write("Streak is early stage.") | |
| row_sorted = trans.loc[cur_regime].sort_values(ascending=False) | |
| exit_target = row_sorted.drop(index=cur_regime).idxmax() | |
| exit_p = float(row_sorted.drop(index=cur_regime).max()) | |
| back_p = float(trans.loc[exit_target, cur_regime]) | |
| asym = exit_p - back_p | |
| st.write(f"Most likely exit is to {exit_target} at {exit_p:.2f}.") | |
| st.write(f"If that happens: {risk_bias_for_transition(cur_regime, exit_target)}") | |
| if abs(asym) >= 0.10: | |
| tilt = "outbound-biased" if asym > 0 else "inbound-biased" | |
| st.write(f"Flow between {cur_regime} and {exit_target} is {tilt} ({asym:+.2f}).") | |
| else: | |
| st.write("Two-way flow between these regimes is roughly balanced.") | |
| h_bits = entropy_row(trans.loc[cur_regime].values) | |
| if h_bits <= 0.6: | |
| st.write("Next-state outcomes are concentrated. Path is predictable.") | |
| elif h_bits <= 1.2: | |
| st.write("Next-state outcomes cluster in a few paths.") | |
| else: | |
| st.write("Next-state outcomes are diffuse. Path is uncertain.") | |
| T = trans.values | |
| name_to_idx = {n:i for i, n in enumerate(trans.index)} | |
| i0 = name_to_idx[cur_regime] | |
| def kstep(T, i, k): | |
| Tk = np.linalg.matrix_power(T, k) | |
| return pd.Series(Tk[i], index=trans.columns) | |
| d5 = kstep(T, i0, 5) | |
| p5_stay = float(d5[cur_regime]) | |
| if p5_stay >= 0.60: | |
| st.write("Five-day view: staying in the same regime is the base case.") | |
| elif p5_stay >= 0.35: | |
| st.write("Five-day view: staying is plausible but not dominant.") | |
| else: | |
| st.write("Five-day view: a different regime is more likely.") | |
| eigvals, eigvecs = np.linalg.eig(T.T) | |
| idx = np.argmin(np.abs(eigvals - 1)) | |
| pi = np.real(eigvecs[:, idx]); pi = pi / pi.sum() | |
| pi_series = pd.Series(pi, index=trans.index).sort_values(ascending=False) | |
| st.write("If the process is stable, time splits like this:") | |
| for k, v in pi_series.items(): | |
| st.write(f" {k}: {v:.2f}") | |
| weighted_stay = float(np.dot(pi, np.diag(T))) | |
| churn = 1.0 - weighted_stay | |
| if churn >= 0.30: | |
| st.write("Long-run: regimes churn often.") | |
| elif churn <= 0.10: | |
| st.write("Long-run: regimes are sticky.") | |
| else: | |
| st.write("Long-run: regimes churn at a moderate pace.") | |
| cur_slope = float(slope_df['Slope'].iloc[-1]) | |
| pct_full = pct_rank(slope_df['Slope'], cur_slope) | |
| st.write(f"Current slope is {cur_slope:.4f} pts/day.") | |
| if pd.notna(pct_full): | |
| higher = 100.0 - pct_full | |
| if pct_full >= 90: | |
| st.write("Slope is high vs history. The curve is unusually steep upward.") | |
| elif pct_full <= 10: | |
| st.write("Slope is low vs history. The curve is unusually steep downward.") | |
| else: | |
| band = "upper" if pct_full >= 60 else ("lower" if pct_full <= 40 else "middle") | |
| st.write(f"Slope sits in the {band} part of its range " | |
| f"({pct_full:.1f}% of days were lower; {higher:.1f}% higher).") | |
| means = hmm.means_.ravel() | |
| if hmm.covariance_type == "full": | |
| stds = np.sqrt(np.array([c[0,0] for c in hmm.covars_])) | |
| elif hmm.covariance_type == "diag": | |
| stds = np.sqrt(hmm.covars_.ravel()) | |
| else: | |
| stds = np.sqrt(hmm.covars_) | |
| sep = np.mean([ | |
| abs(means[i] - means[j]) / max(1e-9, 0.5*(stds[i] + stds[j])) | |
| for i in range(len(means)) for j in range(i+1, len(means)) | |
| ]) | |
| if sep >= 2.0: | |
| st.write("States separate well. Labels should be stable.") | |
| elif sep >= 1.0: | |
| st.write("States have moderate overlap. Expect some flips.") | |
| else: | |
| st.write("States overlap a lot. Treat labels with care.") | |
| if hasattr(hmm, "monitor_"): | |
| conv = hmm.monitor_.converged | |
| n_iter = hmm.monitor_.iter | |
| if not conv: | |
| st.write(f"Training did not fully converge in {n_iter} iterations. Use caution.") | |
| # ---------- Section 5: Carry Spread Analysis ---------- | |
| st.header("Carry Spread Analysis") | |
| st.write("Analyzes carry spreads between short and long term VIX futures expectations.") | |
| with st.expander("Methodology", expanded=False): | |
| st.write(""" | |
| This analysis uses monthly VIX futures data. The settle prices are pivoted into a wide format with rows as trade dates and columns as monthly tenors. | |
| Carry spreads are computed as the difference between settle prices of longer tenors and the short leg (default VX1): | |
| """) | |
| st.latex(r"\text{Carry Spread}_{k} = \text{Settle}_{k} - \text{Settle}_{\text{short}}, \quad k > \text{short}") | |
| st.write(""" | |
| where k are user-specified long legs (default 2,6). Only dates where both legs exist are included. | |
| The spreads are melted into long format for plotting and plotted as lines over time, colored by leg, with markers. A dashed horizontal at 0 distinguishes positive (contango) from negative (backwardation) carry. | |
| Positive carry indicates potential roll-down benefits for long positions, while negative carry suggests cost for holding. This helps assess the economic incentive for carrying futures positions across maturities. | |
| """) | |
| monthly_df_full = df[~df['Weekly']].copy() | |
| monthly_df_full = monthly_df_full.sort_values('Trade Date') | |
| pivot = ( | |
| monthly_df_full | |
| .pivot(index='Trade Date', columns='Tenor_Monthly', values='Settle') | |
| .sort_index() | |
| ) | |
| spreads = pd.DataFrame(index=pivot.index) | |
| long_legs = [float(l.strip()) for l in carry_long_legs.split(',') if l.strip()] | |
| for long_leg in long_legs: | |
| if {carry_short_leg, long_leg}.issubset(pivot.columns): | |
| label = f'VX{int(long_leg) if float(long_leg).is_integer() else long_leg}-VX{int(carry_short_leg) if float(carry_short_leg).is_integer() else carry_short_leg}' | |
| spreads[label] = pivot[long_leg] - pivot[carry_short_leg] | |
| spreads = spreads.dropna(how='all') | |
| spreads_long = spreads.reset_index().melt( | |
| id_vars='Trade Date', value_name='Spread', var_name='Leg' | |
| ) | |
| fig = px.line( | |
| spreads_long, | |
| x='Trade Date', | |
| y='Spread', | |
| color='Leg', | |
| title='VIX Carry Spreads (Front ↔ 2nd & 6th Month)', | |
| markers=True, | |
| color_discrete_sequence=px.colors.qualitative.Plotly | |
| ) | |
| fig.update_traces(marker=dict(size=5), line=dict(width=2)) | |
| fig.add_hline(y=0, line_dash='dash', line_color='rgba(255,255,255,0.6)') | |
| fig.update_layout( | |
| xaxis_title='Trade Date', | |
| yaxis_title='Spread (points)', | |
| template="plotly_dark", | |
| paper_bgcolor=BG, #"rgba(0,0,0,1)", | |
| plot_bgcolor=BG,#"rgba(0,0,0,1)", | |
| title_font_color="white", | |
| font=dict(color="white"), | |
| legend=dict( | |
| bgcolor="rgba(0,0,0,0)", | |
| font=dict(color="white"), | |
| title_font=dict(color="white") | |
| ), | |
| margin=dict(l=60, r=20, t=60, b=40) | |
| ) | |
| fig.update_xaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| fig.update_yaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| with st.expander("Dynamic Interpretation", expanded=False): | |
| if spreads.empty: | |
| st.write("No spreads could be computed because required tenors are missing in the dataset.") | |
| else: | |
| latest = spreads.iloc[-1] | |
| date = spreads.index[-1].date() | |
| st.write(f"Latest trade date in sample: {date}") | |
| for col in spreads.columns: | |
| series = spreads[col].dropna() | |
| if series.empty: | |
| continue | |
| val = latest[col] | |
| mean = series.mean() | |
| pct = (series.rank(pct=True).iloc[-1] * 100).round(1) | |
| st.write(f"\nSpread: {col}") | |
| st.write(f" Current value: {val:.2f} points") | |
| st.write(f" Historical mean: {mean:.2f} points") | |
| st.write(f" Current percentile vs history: {pct}%") | |
| if val > 0: | |
| st.write(" Interpretation: Futures curve is in CONTANGO for this leg " | |
| f"(longer maturity higher than front).") | |
| elif val < 0: | |
| st.write(" Interpretation: Futures curve is in BACKWARDATION for this leg " | |
| f"(front contract richer than longer maturity).") | |
| else: | |
| st.write(" Interpretation: Spread is flat, indicating balance between front and further contracts.") | |
| if val > mean: | |
| st.write(" Compared to history: Current spread is ABOVE average, " | |
| "suggesting stronger than typical contango/backwardation.") | |
| elif val < mean: | |
| st.write(" Compared to history: Current spread is BELOW average, " | |
| "suggesting weaker structure than typical.") | |
| else: | |
| st.write(" Compared to history: Current spread is close to historical mean.") | |
| st.write("\nNote: Percentiles show how extreme today’s spread is compared to the full sample. " | |
| "For example, a 90% percentile means the spread is higher than 90% of past values, " | |
| "indicating an unusually strong curve slope.") | |
| # ---------- Section 6: PCA Decomposition of the Curve ---------- | |
| st.header("PCA Decomposition of the Curve") | |
| st.write("Decomposes the VIX curve into principal components like level, slope, and curvature.") | |
| with st.expander("Methodology", expanded=False): | |
| st.write(""" | |
| This analysis uses monthly VIX futures, pivoting settle prices by trade date and user-specified tenors (default first 6 months). Rows with missing values are dropped. | |
| The matrix is standardized column-wise: | |
| """) | |
| st.latex(r"Z_{ij} = \frac{S_{ij} - \mu_j}{\sigma_j}") | |
| st.write(""" | |
| Principal Component Analysis (PCA) is applied to extract user-specified components (default 3): | |
| - The covariance matrix is eigendecomposed. | |
| """) | |
| st.latex(r"\Sigma = \frac{1}{n-1} Z^\top Z") | |
| st.write(""" | |
| - Components are the eigenvectors sorted by eigenvalues (variance explained). | |
| Scores are projections: | |
| """) | |
| st.latex(r"\text{Scores} = Z \cdot V") | |
| st.write(""" | |
| where V is the loading matrix. | |
| Labels: PC1 as Level (parallel shifts), PC2 as Slope (tilt), PC3 as Curvature (bend). | |
| Plots: | |
| - Line plot of scores over time. | |
| - Bar plot of explained variance ratios: | |
| """) | |
| st.latex(r"\lambda_k \big/ \sum \lambda") | |
| st.write(""" | |
| PCA reduces dimensionality, capturing main modes of variation in the term structure: level (overall volatility), slope (carry/roll), curvature (mid-term premiums). | |
| """) | |
| pca_df = df[~df['Weekly']].copy() | |
| pivot = ( | |
| pca_df | |
| .pivot(index='Trade Date', columns='Tenor_Monthly', values='Settle') | |
| .sort_index() | |
| ) | |
| tenors_list = [float(t.strip()) for t in pca_tenors.split(',') if t.strip()] | |
| wide = pivot[tenors_list].dropna() | |
| # Cap PCA rows for hosted stability | |
| MAX_PCA_ROWS = 1200 if SAFE_MODE else 4000 | |
| if len(wide) > MAX_PCA_ROWS: | |
| wide = wide.tail(MAX_PCA_ROWS) | |
| X = StandardScaler().fit_transform(wide.values) | |
| pca = PCA(n_components=int(pca_n_components)).fit(X) | |
| labels = ['Level (PC1)', 'Slope (PC2)', 'Curvature (PC3)', 'PC4', 'PC5'][:int(pca_n_components)] | |
| pc_scores = pd.DataFrame(pca.transform(X), index=wide.index, columns=labels) | |
| fig_scores = px.line( | |
| pc_scores, | |
| x=pc_scores.index, | |
| y=labels, | |
| title='VIX Term-Structure PCA Scores', | |
| template='plotly_dark', | |
| color_discrete_sequence=px.colors.qualitative.Plotly | |
| ) | |
| fig_scores.update_traces(line=dict(width=2)) | |
| fig_scores.update_layout( | |
| xaxis_title='Trade Date', | |
| yaxis_title='Score (z-scaled)', | |
| paper_bgcolor=BG,#'rgba(0,0,0,1)', | |
| plot_bgcolor=BG,#'rgba(0,0,0,1)', | |
| title_font_color='white', | |
| font=dict(color='white'), | |
| legend=dict( | |
| bgcolor="rgba(0,0,0,0)", | |
| font=dict(color="white"), | |
| title_font=dict(color="white") | |
| ), | |
| margin=dict(l=60, r=20, t=60, b=40) | |
| ) | |
| fig_scores.update_xaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| fig_scores.update_yaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| fig_var = px.bar( | |
| x=labels, | |
| y=pca.explained_variance_ratio_, | |
| title='Explained Variance by Component', | |
| template='plotly_dark' | |
| ) | |
| fig_var.update_traces(marker=dict(line=dict(width=0))) | |
| fig_var.update_layout( | |
| xaxis_title='Component', | |
| yaxis_title='Variance Share', | |
| paper_bgcolor=BG,#'rgba(0,0,0,1)', | |
| plot_bgcolor=BG,#'rgba(0,0,0,1)', | |
| title_font_color='white', | |
| font=dict(color='white'), | |
| margin=dict(l=60, r=20, t=60, b=40) | |
| ) | |
| fig_var.update_xaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| fig_var.update_yaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| st.plotly_chart(fig_scores, use_container_width=True) | |
| st.plotly_chart(fig_var, use_container_width=True) | |
| with st.expander("Dynamic Interpretation", expanded=False): | |
| def pct_rank(series, value): | |
| s = pd.to_numeric(series, errors="coerce").dropna() | |
| if s.empty or not np.isfinite(value): | |
| return np.nan | |
| return float((s < value).mean() * 100.0) | |
| def band_from_pct(p): | |
| if pd.isna(p): return "n/a" | |
| if p >= 90: return "extreme high (top 10%)" | |
| if p >= 75: return "high (top quartile)" | |
| if p <= 10: return "extreme low (bottom 10%)" | |
| if p <= 25: return "low (bottom quartile)" | |
| return "middle range" | |
| def delta_tag(x, pos, neg, neutral="unchanged"): | |
| if pd.isna(x): return neutral | |
| if x > 0: return pos | |
| if x < 0: return neg | |
| return neutral | |
| st.write("\n— PCA components and what they mean —") | |
| st.write("PC1: Level. Parallel moves of the whole curve. High means futures are broadly high. Low means broadly low.") | |
| st.write("PC2: Slope. Steepness front to back. Positive means contango (back > front). Negative means backwardation (front > back).") | |
| st.write("PC3: Curvature. Shape in the middle. Positive means a hump in mid tenors. Negative means a dip in mid tenors.") | |
| var_share = pca.explained_variance_ratio_ | |
| total_var = var_share.sum() | |
| st.write("\n— Variance explained —") | |
| for i, v in enumerate(var_share): | |
| st.write(f"PC{i+1} accounts for {v*100:.1f}% of curve changes.") | |
| st.write(f"Together they cover {total_var*100:.1f}% of the variation. The rest is noise or higher order shape.") | |
| latest_date = pc_scores.index[-1].date() | |
| row = pc_scores.iloc[-1] | |
| lvl = float(row[labels[0]]) if len(labels) > 0 else np.nan | |
| slp = float(row[labels[1]]) if len(labels) > 1 else np.nan | |
| cur = float(row[labels[2]]) if len(labels) > 2 else np.nan | |
| lvl_pct = pct_rank(pc_scores[labels[0]], lvl) if len(labels) > 0 else np.nan | |
| slp_pct = pct_rank(pc_scores[labels[1]], slp) if len(labels) > 1 else np.nan | |
| cur_pct = pct_rank(pc_scores[labels[2]], cur) if len(labels) > 2 else np.nan | |
| lvl_band = band_from_pct(lvl_pct) | |
| slp_band = band_from_pct(slp_pct) | |
| cur_band = band_from_pct(cur_pct) | |
| lvl_d5 = pc_scores[labels[0]].diff(5).iloc[-1] if len(labels) > 0 else np.nan | |
| slp_d5 = pc_scores[labels[1]].diff(5).iloc[-1] if len(labels) > 1 else np.nan | |
| cur_d5 = pc_scores[labels[2]].diff(5).iloc[-1] if len(labels) > 2 else np.nan | |
| lvl_d20 = pc_scores[labels[0]].diff(20).iloc[-1] if len(labels) > 0 else np.nan | |
| slp_d20 = pc_scores[labels[1]].diff(20).iloc[-1] if len(labels) > 1 else np.nan | |
| cur_d20 = pc_scores[labels[2]].diff(20).iloc[-1] if len(labels) > 2 else np.nan | |
| st.write(f"\n— Latest observation: {latest_date} —") | |
| if len(labels) > 0: | |
| st.write("\nLevel (PC1):") | |
| st.write(f"Position vs history: {lvl_band}. This gauges the overall price of variance along the strip.") | |
| if "high" in lvl_band: | |
| st.write("Implication: options and variance products tend to be rich across expiries.") | |
| elif "low" in lvl_band: | |
| st.write("Implication: options and variance products tend to be cheap across expiries.") | |
| else: | |
| st.write("Implication: overall level is near its long-run zone.") | |
| st.write(f"Recent move: {delta_tag(lvl_d5,'up over 1 week','down over 1 week')}; " | |
| f"{delta_tag(lvl_d20,'up over 1 month','down over 1 month')}.") | |
| st.write("Use case: compare with slope. High level with negative slope often marks stress. " | |
| "High level with positive slope often marks calm but pricey carry.") | |
| if len(labels) > 1: | |
| st.write("\nSlope (PC2):") | |
| st.write(f"Position vs history: {slp_band}. This is the carry signal.") | |
| if "high" in slp_band: | |
| st.write("Implication: contango is strong. Front tends to roll down. Short-vol carry is favorable but watch for shocks.") | |
| elif "low" in slp_band: | |
| st.write("Implication: backwardation or near inversion. Hedging demand is high. Carry is hostile for short front exposure.") | |
| else: | |
| st.write("Implication: slope is near normal. Carry is modest.") | |
| st.write(f"Recent move: {delta_tag(slp_d5,'steepening over 1 week','flattening over 1 week')}; " | |
| f"{delta_tag(slp_d20,'steepening over 1 month','flattening over 1 month')}.") | |
| st.write("Risk note: fast drops in slope from a high zone often precede drawdowns in carry trades.") | |
| if len(labels) > 2: | |
| st.write("\nCurvature (PC3):") | |
| st.write(f"Position vs history: {cur_band}. This shows where risk concentrates on the term structure.") | |
| if "high" in cur_band: | |
| st.write("Implication: mid tenors are favored vs the ends. Market assigns more risk to the 2–4 month window.") | |
| elif "low" in cur_band: | |
| st.write("Implication: mid tenors are discounted vs the ends. Risk focus sits in very short or long expiries.") | |
| else: | |
| st.write("Implication: shape is ordinary. No special mid-curve premium or discount.") | |
| st.write(f"Recent move: {delta_tag(cur_d5,'higher over 1 week','lower over 1 week')}; " | |
| f"{delta_tag(cur_d20,'higher over 1 month','lower over 1 month')}.") | |
| st.write("Use case: aligns hedges to the horizon that the market prices most.") | |
| st.write("\n— Joint reading and practical takeaways —") | |
| if len(labels) > 1: | |
| calm_contango = (("high" in slp_band or "extreme high" in slp_band) and "middle" in lvl_band) | |
| expensive_calm = (("high" in slp_band or "extreme high" in slp_band) and ("high" in lvl_band or "extreme high" in lvl_band)) | |
| stress_state = (("low" in slp_band or "extreme low" in slp_band) and ("high" in lvl_band or "extreme high" in lvl_band)) | |
| flat_transition = ("middle" in slp_band and "middle" in lvl_band) | |
| if stress_state: | |
| st.write("Stress signal: high level with backwardation. Hedging flows dominate. Carry is negative at the front.") | |
| elif expensive_calm: | |
| st.write("Calm but expensive: strong contango with high level. Carry is favorable, but risk premium is rich.") | |
| elif calm_contango: | |
| st.write("Benign carry: contango with normal level. Classic roll-down conditions.") | |
| elif flat_transition: | |
| st.write("Transition zone: level and slope near normal. Wait for a break in slope momentum.") | |
| else: | |
| st.write("Mixed signals: cross-currents across level and slope. Reduce leverage and watch slope momentum.") | |
| if len(labels) > 2: | |
| if "high" in cur_band or "extreme high" in cur_band: | |
| st.write("Horizon bias: risk priced in mid tenors. Size hedges in the 2–4 month area.") | |
| elif "low" in cur_band or "extreme low" in cur_band: | |
| st.write("Horizon bias: risk priced at the tails. Favor very short or long expiries for hedges.") | |
| warn = [] | |
| if ("high" in slp_band or "extreme high" in slp_band) and slp_d5 < 0: | |
| warn.append("slope is rolling over from a high zone") | |
| if ("low" in slp_band or "extreme low" in slp_band) and slp_d5 > 0: | |
| warn.append("slope is rebounding from a low zone") | |
| if ("high" in lvl_band or "extreme high" in lvl_band) and lvl_d5 > 0: | |
| warn.append("level keeps rising; shock risk remains") | |
| if warn: | |
| st.write("Watchlist: " + "; ".join(warn) + ".") | |
| st.write("\n— Recap —") | |
| if len(labels) > 0: | |
| if "high" in lvl_band or "extreme high" in lvl_band: | |
| st.write("The whole curve is pricey. Protection costs more than usual.") | |
| elif "low" in lvl_band or "extreme low" in lvl_band: | |
| st.write("The whole curve is cheap. Protection costs less than usual.") | |
| else: | |
| st.write("The whole curve is fairly priced vs its own history.") | |
| if len(labels) > 1: | |
| if "high" in slp_band or "extreme high" in slp_band: | |
| st.write("Carry is supportive right now. It helps short front exposure, unless a shock hits.") | |
| elif "low" in slp_band or "extreme low" in slp_band: | |
| st.write("Carry is hostile right now. It punishes short front exposure.") | |
| else: | |
| st.write("Carry is modest. No strong tilt from slope.") | |
| if len(labels) > 2: | |
| if "high" in cur_band or "extreme high" in cur_band: | |
| st.write("Risk is concentrated in the middle of the term structure.") | |
| elif "low" in cur_band or "extreme low" in cur_band: | |
| st.write("Risk is concentrated at the very short or very long end.") | |
| else: | |
| st.write("Risk is spread evenly across the curve.") | |
| st.write("These readings are in-sample. Use them as context, not a forecast.") | |
| # ---------- Section 7: Constant-Maturity 30-Day Futures Index ---------- | |
| st.header("Constant-Maturity 30-Day Futures Index") | |
| st.write("Constructs an unlevered index simulating constant 30-day maturity VIX futures exposure.") | |
| with st.expander("Methodology", expanded=False): | |
| st.write(""" | |
| This constructs a synthetic constant-maturity VIX futures price by interpolating between the nearest contracts bracketing the target maturity (default 30 days). | |
| For each trade date, identify the contract with tenor d1 ≤ target (closest below) and d2 ≥ target (closest above). | |
| If only one available, use its settle. Else, linear interpolation: | |
| """) | |
| st.latex(r"\text{Blend} = S_1 + w \cdot (S_2 - S_1), \quad w = \frac{\text{target} - d_1}{d_2 - d_1}") | |
| st.write(""" | |
| where S1, S2 are settle prices. | |
| Daily returns: | |
| """) | |
| st.latex(r"r_t = \frac{\text{Blend}_t}{\text{Blend}_{t-1}} - 1") | |
| st.write(""" | |
| Index level: | |
| """) | |
| st.latex(r"I_t = I_0 \prod_{k=1}^{t} (1 + r_k)") | |
| st.write(""" | |
| starting at user-defined I0 (default 1). | |
| Plot the index over time with a dashed horizontal at starting level. | |
| This index proxies the performance of continuously rolling to maintain constant exposure to 30-day volatility, capturing roll yield and spot moves without leverage. | |
| """) | |
| roll_df = df.copy() | |
| roll_df = roll_df[roll_df['Settle'] > 0] | |
| roll_df = roll_df.sort_values(['Trade Date', 'Tenor_Days']) | |
| records = [] | |
| for trade_date, g in roll_df.groupby('Trade Date'): | |
| lo = g[g['Tenor_Days'] <= cm_target].tail(1) | |
| hi = g[g['Tenor_Days'] >= cm_target].head(1) | |
| if lo.empty and hi.empty: | |
| continue | |
| if hi.empty or lo.empty: | |
| blend = (hi if not hi.empty else lo)['Settle'].iloc[0] | |
| else: | |
| d1, p1 = lo.iloc[0][['Tenor_Days', 'Settle']] | |
| d2, p2 = hi.iloc[0][['Tenor_Days', 'Settle']] | |
| if d2 == d1: | |
| blend = p1 | |
| else: | |
| w2 = (cm_target - d1) / (d2 - d1) | |
| blend = p1 + w2 * (p2 - p1) | |
| if blend > 0: | |
| records.append({'Trade Date': trade_date, 'Blend': blend}) | |
| idx = ( | |
| pd.DataFrame(records) | |
| .sort_values('Trade Date') | |
| .assign(Return=lambda x: x['Blend'].pct_change(), | |
| Index=lambda x: cm_start * (1 + x['Return'].fillna(0)).cumprod()) | |
| ) | |
| fig = px.line( | |
| idx, | |
| x='Trade Date', | |
| y='Index', | |
| title='Constant-Maturity 30-Day VIX Futures Index (unlevered)', | |
| template='plotly_dark' | |
| ) | |
| fig.update_traces(line=dict(width=2)) | |
| fig.add_hline(y=cm_start, line_dash='dash', line_color='rgba(255,255,255,0.6)') | |
| fig.update_layout( | |
| xaxis_title='Trade Date', | |
| yaxis_title='Index level', | |
| paper_bgcolor=BG,#'rgba(0,0,0,1)', | |
| plot_bgcolor=BG,#'rgba(0,0,0,1)', | |
| title_font_color='white', | |
| font=dict(color='white'), | |
| showlegend=False, | |
| margin=dict(l=60, r=20, t=60, b=40) | |
| ) | |
| fig.update_xaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| fig.update_yaxes( | |
| title_font=dict(color="white"), | |
| tickfont=dict(color="white"), | |
| tickcolor="white", | |
| gridcolor="rgba(255,255,255,0.10)", | |
| zerolinecolor="rgba(255,255,255,0.15)", | |
| linecolor="rgba(255,255,255,0.15)", | |
| ticks="outside" | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| with st.expander("Dynamic Interpretation", expanded=False): | |
| if idx.empty: | |
| st.write("No observations available for interpretation.") | |
| else: | |
| ts = idx.copy().reset_index(drop=True) | |
| ts['Trade Date'] = pd.to_datetime(ts['Trade Date']) | |
| ts = ts.sort_values('Trade Date') | |
| def pct_rank(series, value): | |
| s = pd.to_numeric(series, errors="coerce").dropna() | |
| if s.empty or not np.isfinite(value): | |
| return np.nan | |
| return float((s < value).mean() * 100.0) | |
| def streak_updown(x): | |
| s = np.sign(x.fillna(0).to_numpy()) | |
| streak = 0 | |
| for v in s[::-1]: | |
| if v > 0: streak = streak + 1 if streak >= 0 else 1 | |
| elif v < 0: streak = streak - 1 if streak <= 0 else -1 | |
| else: break | |
| return streak | |
| for w in (5, 20, 60, 120): | |
| mp = min(3, w) | |
| ts[f'Ret_MA_{w}'] = ts['Return'].rolling(w, min_periods=mp).mean() | |
| ts[f'Ret_STD_{w}'] = ts['Return'].rolling(w, min_periods=mp).std(ddof=0) | |
| ts['Vol20'] = ts['Ret_STD_20'] * np.sqrt(252) | |
| ts['Vol60'] = ts['Ret_STD_60'] * np.sqrt(252) | |
| ts['Vol120'] = ts['Ret_STD_120'] * np.sqrt(252) | |
| for w in (20, 60, 120, 252): | |
| mp = min(5, w) | |
| ts[f'Idx_MA_{w}'] = ts['Index'].rolling(w, min_periods=mp).mean() | |
| for w in (60, 120, 252): | |
| mu = ts['Blend'].rolling(w, min_periods=min(20, w)).mean() | |
| sd = ts['Blend'].rolling(w, min_periods=min(20, w)).std(ddof=0) | |
| ts[f'Blend_Z_{w}'] = np.where(sd > 0, (ts['Blend'] - mu) / sd, np.nan) | |
| cummax = ts['Index'].cummax() | |
| ts['Drawdown'] = ts['Index'] / cummax - 1.0 | |
| max_dd = float(ts['Drawdown'].min()) if len(ts) else np.nan | |
| dd_now = float(ts['Drawdown'].iloc[-1]) | |
| peak_date = ts.loc[ts['Index'].idxmax(), 'Trade Date'].date() | |
| r = ts['Return'].dropna() | |
| r_mu = r.mean() | |
| r_sd = r.std(ddof=0) | |
| if np.isfinite(r_sd) and r_sd > 0: | |
| tail_2s = float((np.abs(r - r_mu) >= 2*r_sd).mean() * 100.0) | |
| last_tail = not r.empty and (abs(r.iloc[-1] - r_mu) >= 2*r_sd) | |
| else: | |
| tail_2s, last_tail = np.nan, False | |
| last = ts.iloc[-1] | |
| end_date = last['Trade Date'].date() | |
| def window_ret(days): | |
| if len(ts) < days+1: return np.nan | |
| a = ts['Index'].iloc[-1] / ts['Index'].iloc[-(days+1)] - 1.0 | |
| return float(a) | |
| ret_5d = window_ret(5) | |
| ret_20d = window_ret(20) | |
| ret_60d = window_ret(60) | |
| updown_streak = streak_updown(ts['Return']) | |
| idx_pct = pct_rank(ts['Index'], last['Index']) | |
| blend_pct = pct_rank(ts['Blend'], last['Blend']) | |
| def pos(val, ref): | |
| if pd.isna(ref): return "n/a" | |
| return "above" if val > ref else ("below" if val < ref else "at") | |
| st20 = pos(last['Index'], last.get('Idx_MA_20')) | |
| st60 = pos(last['Index'], last.get('Idx_MA_60')) | |
| st120 = pos(last['Index'], last.get('Idx_MA_120')) | |
| ma20 = ts['Idx_MA_20'] | |
| ma20_slope = np.nan | |
| if ma20.notna().sum() >= 5: | |
| y = ma20.dropna().tail(20).to_numpy() | |
| x = np.arange(len(y), dtype=float) | |
| if len(y) >= 5: | |
| b1 = np.polyfit(x, y, 1)[0] | |
| ma20_slope = float(b1) | |
| ts['Month'] = ts['Trade Date'].dt.to_period('M') | |
| cur_month = ts['Month'].iloc[-1] | |
| mtd = ts[ts['Month'] == cur_month] | |
| if len(mtd) >= 2: | |
| mtd_ret = float(mtd['Index'].iloc[-1] / mtd['Index'].iloc[0] - 1.0) | |
| else: | |
| mtd_ret = np.nan | |
| by_month = ( | |
| ts.assign(mret=lambda x: x.groupby('Month')['Index'].transform(lambda z: z.iloc[-1]/z.iloc[0]-1.0)) | |
| .drop_duplicates('Month')[['Month','mret']] | |
| .dropna() | |
| ) | |
| med_m = float(by_month['mret'].median()) if not by_month.empty else np.nan | |
| st.write("\n— 30d Constant-Maturity VIX Futures Index: interpretation —") | |
| st.write(f"Date: {end_date}") | |
| if pd.notna(idx_pct): | |
| if idx_pct >= 90: | |
| st.write("The index level sits in the top decile of its history. Vol risk is priced high.") | |
| elif idx_pct <= 10: | |
| st.write("The index level sits in the bottom decile. Vol risk is priced low.") | |
| else: | |
| zone = "upper" if idx_pct >= 60 else ("lower" if idx_pct <= 40 else "middle") | |
| st.write(f"The index level is in the {zone} part of its historical range.") | |
| st.write(f"Trend check: index is {st20} the 20d average, {st60} the 60d, {st120} the 120d.") | |
| if np.isfinite(ma20_slope): | |
| if ma20_slope > 0: | |
| st.write("Short-term trend is rising. The 20d average is pointing up.") | |
| elif ma20_slope < 0: | |
| st.write("Short-term trend is falling. The 20d average is pointing down.") | |
| else: | |
| st.write("Short-term trend is flat.") | |
| def fmt_pct(x): | |
| return "n/a" if pd.isna(x) else f"{x*100:.1f}%" | |
| st.write(f"Recent performance: 1w {fmt_pct(ret_5d)}, 1m {fmt_pct(ret_20d)}, 3m {fmt_pct(ret_60d)}.") | |
| if pd.notna(max_dd): | |
| if dd_now < -0.05: | |
| st.write(f"Current drawdown: {dd_now*100:.1f}%. The index is below its peak from {peak_date}.") | |
| elif dd_now > -0.01: | |
| st.write("No material drawdown vs the most recent peak.") | |
| else: | |
| st.write(f"Modest drawdown: {dd_now*100:.1f}% vs peak on {peak_date}.") | |
| st.write(f"Worst drawdown in sample: {max_dd*100:.1f}%.") | |
| v20, v60, v120 = last.get('Vol20'), last.get('Vol60'), last.get('Vol120') | |
| if pd.notna(v20): | |
| st.write(f"Annualized return volatility: 20d {v20*100:.1f}%, 60d {v60*100:.1f}%, 120d {v120*100:.1f}%.") | |
| if pd.notna(tail_2s): | |
| st.write(f"Tail frequency: {tail_2s:.1f}% of days move more than 2σ from the mean.") | |
| if last_tail: | |
| st.write("Today’s move was a tail event relative to recent history.") | |
| b_pct = blend_pct | |
| b_z120 = last.get('Blend_Z_120') | |
| b_z252 = last.get('Blend_Z_252') if 'Blend_Z_252' in ts.columns else np.nan | |
| if pd.notna(b_pct): | |
| if b_pct >= 90: | |
| st.write("The 30d blend price is in its top decile vs history.") | |
| elif b_pct <= 10: | |
| st.write("The 30d blend price is in its bottom decile vs history.") | |
| else: | |
| st.write("The 30d blend price is within its typical range.") | |
| if pd.notna(b_z120): | |
| if b_z120 >= 2: | |
| st.write(f"Relative to the last ~6 months, the 30d blend price is unusually high (z={b_z120:.2f}).") | |
| elif b_z120 <= -2: | |
| st.write(f"Relative to the last ~6 months, the 30d blend price is unusually low (z={b_z120:.2f}).") | |
| else: | |
| st.write(f"Relative to the last ~6 months, the 30d blend price is normal (z={b_z120:.2f}).") | |
| if updown_streak > 0: | |
| st.write(f"Up streak: {updown_streak} days of gains.") | |
| elif updown_streak < 0: | |
| st.write(f"Down streak: {abs(updown_streak)} days of losses.") | |
| else: | |
| st.write("No up/down streak today.") | |
| if pd.notna(mtd_ret): | |
| st.write(f"Month-to-date return: {mtd_ret*100:.1f}%.") | |
| if pd.notna(med_m): | |
| if mtd_ret > med_m: | |
| st.write("This is above the median month in the sample.") | |
| elif mtd_ret < med_m: | |
| st.write("This is below the median month in the sample.") | |
| else: | |
| st.write("This is in line with a typical month.") | |
| notes = [] | |
| if pd.notna(v20) and v20 > v60: | |
| notes.append("short-term volatility is elevated vs medium term") | |
| if pd.notna(ret_5d) and pd.notna(ret_20d) and (np.sign(ret_5d) != np.sign(ret_20d)): | |
| notes.append("1w vs 1m momentum conflict") | |
| if pd.notna(dd_now) and dd_now < -0.10: | |
| notes.append("index is in a deep drawdown") | |
| if pd.notna(b_z120) and abs(b_z120) >= 2: | |
| notes.append("30d blend price is an outlier vs 6m history") | |
| if notes: | |
| st.write("Risk notes: " + "; ".join(notes) + ".") | |
| st.write("\n— Recap —") | |
| if pd.notna(idx_pct): | |
| loc = "high" if idx_pct >= 60 else ("low" if idx_pct <= 40 else "mid") | |
| st.write(f"Index level location: {loc} range vs history.") | |
| st.write(f"Trend stance: {st20} 20d MA; {st60} 60d MA; {st120} 120d MA.") | |
| if pd.notna(dd_now): | |
| st.write(f"Drawdown now: {dd_now*100:.1f}%; max drawdown: {max_dd*100:.1f}%.") | |
| if pd.notna(v20): | |
| st.write(f"Return vol (20d): {v20*100:.1f}%.") | |
| st.write("Use this as context, not a forecast.") | |
| except Exception as e: | |
| st.error("An error occurred during analysis. Please check your inputs and try again.") | |
| st.write(traceback.format_exc()) | |
| # ---------- Hide default Streamlit style ---------- | |
| st.markdown( | |
| """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |