import streamlit as st import yfinance as yf import numpy as np import pandas as pd from math import ceil from datetime import datetime, timedelta import plotly.graph_objects as go from plotly.subplots import make_subplots from pandas.tseries.offsets import BDay st.set_page_config(page_title="Autocorrelation Periodogram", layout="wide") @st.cache_data(show_spinner=False) def run_analysis(ticker, start_date, end_date, length, max_lag, lags_per_plot, plot_start_lag, plot_end_lag, data_type): df = yf.download(ticker, start=start_date, end=end_date, interval="1d", auto_adjust=True) if df.empty: return None, "No data available for the given inputs." if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) else: df.columns = [c.split("_")[0] for c in df.columns] def ultimate_smoother(src, period): a1 = np.exp(-1.414 * np.pi / period) c2 = 2.0 * a1 * np.cos(1.414 * np.pi / period) c3 = -a1 * a1 c1 = (1.0 + c2 - c3) / 4.0 n = len(src) out = np.copy(src).astype(float) for i in range(3, n): out[i] = ((1.0 - c1) * src[i] + (2.0 * c1 - c2) * src[i-1] - (c1 + c3) * src[i-2] + c2 * out[i-1] + c3 * out[i-2]) return out if data_type == "prices": raw_series = df["Close"].values data_series = ultimate_smoother(raw_series, length) elif data_type == "returns": prices = df["Close"].values log_prices = np.log(prices) data_series = np.diff(log_prices, prepend=np.nan) data_series[0] = 0.0 elif data_type == "volatility": prices = df["Close"].values log_prices = np.log(prices) returns = np.diff(log_prices, prepend=np.nan) returns[0] = 0.0 vol_series = pd.Series(returns).rolling(window=length).std().to_numpy() vol_series[:length-1] = 0.0 data_series = vol_series else: return None, "Invalid data type." def compute_autocorrelation(series, window_length, max_lag): n = len(series) corrs = np.full((n, max_lag+1), np.nan, dtype=float) for i in range(window_length - 1, n): window = series[i - window_length + 1 : i + 1] sum_x = np.sum(window) sum_xx = np.sum(window * window) for L in range(max_lag + 1): start_lag = i - window_length - L + 1 end_lag = i - L + 1 if start_lag < 0: continue window_lag = series[start_lag : end_lag] if len(window_lag) != window_length: continue sum_y = np.sum(window_lag) sum_yy = np.sum(window_lag * window_lag) sum_xy = np.sum(window * window_lag) denom_x = window_length * sum_xx - sum_x * sum_x denom_y = window_length * sum_yy - sum_y * sum_y if denom_x > 0 and denom_y > 0: numer = window_length * sum_xy - sum_x * sum_y corrs[i, L] = numer / np.sqrt(denom_x * denom_y) return corrs corrs = compute_autocorrelation(data_series, length, max_lag) dates = df.index.to_pydatetime() def slice_corr(corr_matrix, lag_start, lag_end): subset = corr_matrix[:, lag_start : lag_end + 1] return subset.T plot_range = plot_end_lag - plot_start_lag + 1 n_plots = ceil(plot_range / lags_per_plot) bucket_slices = [] for i in range(n_plots): ls = plot_start_lag + i * lags_per_plot le = min(plot_start_lag + (i+1) * lags_per_plot - 1, plot_end_lag) subset = slice_corr(corrs, ls, le) bucket_slices.append((ls, le, subset)) colorscale = [[0.0, 'red'], [0.5, 'yellow'], [1.0, 'green']] total_rows = 1 + len(bucket_slices) subplot_titles = [""] for (ls, le, _) in bucket_slices: subplot_titles.append(f"ACI {ls}–{le}") fig = make_subplots( rows=total_rows, cols=1, shared_xaxes=True, row_heights=[2] + [1]*len(bucket_slices), vertical_spacing=0.03, subplot_titles=subplot_titles ) if data_type == "prices": fig.add_trace( go.Scatter( x=dates, y=df["Close"], mode='lines', line=dict(width=1.2), name="Close Price" ), row=1, col=1 ) fig.add_trace( go.Scatter( x=dates, y=data_series, mode='lines', line=dict(width=1.2), name="Smoothed Price" ), row=1, col=1 ) elif data_type == "returns": fig.add_trace( go.Scatter( x=dates, y=data_series, mode='lines', line=dict(width=1.2), name="Log Returns" ), row=1, col=1 ) elif data_type == "volatility": fig.add_trace( go.Scatter( x=dates, y=data_series, mode='lines', line=dict(width=1.2), name="Rolling Volatility" ), row=1, col=1 ) for idx, (ls, le, subset) in enumerate(bucket_slices): row_index = idx + 2 show_colorbar = (idx == len(bucket_slices) - 1) heatmap = go.Heatmap( x=dates, y=list(range(ls, le + 1)), z=subset, colorscale=colorscale, zmin=-1, zmax=1, showscale=show_colorbar, colorbar=dict(title="Correlation") if show_colorbar else None ) fig.add_trace(heatmap, row=row_index, col=1) latest_date = pd.Timestamp(df.index[-1]) for idx, (ls, le, _) in enumerate(bucket_slices): row_number = idx + 2 tickvals = list(range(ls, le + 1)) ticktext = [f"{lag} ({(latest_date - BDay(lag)).strftime('%Y-%m-%d')})" for lag in tickvals] fig.update_yaxes( tickmode='array', tickvals=tickvals, ticktext=ticktext, row=row_number, tickfont=dict(size=8), #color="white", col=1 ) fig.update_layout( template="plotly_dark", title=dict(text=f"Autocorrelation Indicator - {ticker} - {data_type.capitalize()}"), height=800 + 200 * len(bucket_slices), width=1600, legend=dict( orientation="h", yanchor="bottom", y=1.05, xanchor="center", x=0.5 ) ) fig.update_xaxes( type="date", tickangle=45, tickformat="%Y-%m-%d" ) return {"df": df, "data_series": data_series, "corrs": corrs, "dates": dates, "bucket_slices": bucket_slices, "fig": fig}, None # Initialize session state for results. if "results" not in st.session_state: st.session_state.results = {} # Top radio for page selection. current_page = st.sidebar.radio("Select Page", options=["Prices", "Returns", "Volatility"], help="Choose analysis type.") st.sidebar.header("User Inputs") with st.sidebar.expander("Data Inputs", expanded=True): ticker = st.text_input("Ticker", value="SPY", help="Enter the ticker symbol.") start_date = st.date_input("Start Date", value=datetime(2020, 1, 1), help="Set the start date for daily data.") default_end_date = datetime.today() + timedelta(days=1) end_date = st.date_input("End Date", value=default_end_date, help="Set the end date for daily data.") with st.sidebar.expander("Methodology Parameters", expanded=True): length = st.number_input( "Window Size", value=20, min_value=1, help="Controls how many days are used when comparing current vs past segments. Also used for smoothing (Prices) and rolling window in volatility." ) lags_per_plot = st.number_input( "Lags per Plot", value=32, min_value=1, help="How many lag rows to include in each heatmap panel." ) plot_start_lag = st.number_input( "Plot Start Lag", value=30, min_value=0, help="Lower bound of lag range to visualize. Set this to skip very short lags." ) plot_end_lag = st.number_input( "Plot End Lag", value=120, min_value=0, help="Upper bound of lag range to visualize. The tool will measure similarity with up to this many days in the past." ) max_lag = plot_end_lag # Run Analysis button. if st.sidebar.button("Run Analysis"): st.session_state.ticker = ticker st.session_state.start_date = start_date st.session_state.end_date = end_date st.session_state.length = length st.session_state.max_lag = max_lag st.session_state.lags_per_plot = lags_per_plot st.session_state.plot_start_lag = plot_start_lag st.session_state.plot_end_lag = plot_end_lag st.session_state.page = current_page with st.spinner("Running analysis..."): results, error = run_analysis( ticker, start_date, end_date, length, max_lag, lags_per_plot, plot_start_lag, plot_end_lag, current_page.lower() ) st.session_state.results[current_page] = (results, error) # Always show the main title and description # Always show the main title and intro st.title("Autocorrelation Periodogram") st.markdown( "This tool visualizes how market structure repeats across time by computing rolling autocorrelations over many lags.\n\n" "You can analyze **Prices**, **Returns**, or **Volatility**. The heatmaps show how much today’s behavior resembles the past at different time horizons." ) # Methodology expander with math with st.expander("Methodology", expanded=False): st.markdown(""" **Purpose** Measure how similar the current behavior is to past behavior over multiple lags to detect persistence or reversion in structure. **Autocorrelation formula**: """) st.latex(r""" \rho_{t, L} = \frac{\sum_{i=0}^{N-1}(x_{t-i} - \bar{x})(x_{t-L-i} - \bar{y})} {\sqrt{\sum_{i=0}^{N-1}(x_{t-i} - \bar{x})^2} \cdot \sqrt{\sum_{i=0}^{N-1}(x_{t-L-i} - \bar{y})^2}} """) st.markdown(""" - \( x \): current window - \( y \): lagged window shifted by \( L \) days - \( N \): window size (set via **Window Size**) - \( L \): lag (from 0 to **Max Lag**) **Inputs** (configured in sidebar): - **Window Size**: used for autocorrelation and volatility. Also used for smoothing in *Prices* mode. - **Max Lag**: upper bound on lag values to compute. - **Lags per Plot**: number of lag rows per heatmap. - **Plot Start / End Lag**: limits for lags to visualize. **Output** The app displays: - A top panel with the selected series. - One or more heatmaps below showing autocorrelation across lag ranges. - Color scale: green = positive correlation (momentum), red = negative correlation (mean reversion), yellow = no structure. """) # Show analysis results (if any) if current_page in st.session_state.results: results, error = st.session_state.results[current_page] st.markdown(f"### {current_page} Analysis") if error: st.error(error) else: lag_start = st.session_state.plot_start_lag lag_end = st.session_state.plot_end_lag lags_per_plot = st.session_state.lags_per_plot n_panels = ceil((lag_end - lag_start + 1) / lags_per_plot) if current_page.lower() == "prices": st.markdown(f""" **Input type**: Closing prices (smoothed with Ehlers' filter) **Top panel**: Raw close vs smoothed price **Lower panels**: Autocorrelation of smoothed prices across {n_panels} lag bands **Lag range**: {lag_start} to {lag_end} **Window size**: {st.session_state.length} """) elif current_page.lower() == "returns": st.markdown(f""" **Input type**: Log returns **Top panel**: Daily log returns **Lower panels**: Autocorrelation of returns across {n_panels} lag bands **Lag range**: {lag_start} to {lag_end} **Window size**: {st.session_state.length} """) elif current_page.lower() == "volatility": st.markdown(f""" **Input type**: Rolling standard deviation of log returns **Top panel**: Rolling volatility **Lower panels**: Autocorrelation of volatility across {n_panels} lag bands **Lag range**: {lag_start} to {lag_end} **Window size**: {st.session_state.length} """) st.plotly_chart(results["fig"], use_container_width=True) else: #st.markdown("#### No analysis run yet") st.info("Use the sidebar to set parameters and click **Run Analysis** to display results here.") # Hide default Streamlit style st.markdown( """ """, unsafe_allow_html=True )