import warnings warnings.filterwarnings('ignore') # Ignore warnings for cleaner output import yfinance as yf import pandas as pd import numpy as np import streamlit as st import plotly.graph_objects as go import seaborn as sns from itertools import combinations from statsmodels.tsa.api import VECM from statsmodels.tsa.stattools import adfuller from statsmodels.tsa.vector_ar.vecm import coint_johansen from statsmodels.tsa.stattools import grangercausalitytests, coint from statsmodels.regression.linear_model import OLS import plotly.express as px # Parameters START_DATE = '2021-01-01' END_DATE = pd.to_datetime('today') + pd.Timedelta(days=1) P_VALUE_THRESHOLD = 0.05 ROLLING_WINDOW_SIZE = 252 CONSISTENT_COINTEGRATION_THRESHOLD = 0.8 # List of tickers for cryptocurrencies, bank stocks, and global indexes default_tickers = ['BTC-USD', 'ETH-USD', 'BNB-USD', 'JPM', 'BAC', 'WFC', 'C'] # Function to load adjusted close price data for a given ticker def load_ticker_ts_df(ticker, start, end): data = yf.download(ticker, start=start, end=end, auto_adjust=False) # Unadjusted prices if isinstance(data.columns, pd.MultiIndex): # Flatten multi-index data.columns = data.columns.get_level_values(0) if data.empty: raise ValueError(f"No data found for {ticker}") return data['Adj Close'] # Function to calculate cross-correlation at different lags def cross_correlation(series1, series2, lag): if lag > 0: return np.corrcoef(series1[:-lag], series2[lag:])[0, 1] elif lag < 0: return np.corrcoef(series1[-lag:], series2[:lag])[0, 1] else: return np.corrcoef(series1, series2)[0, 1] # Function to perform Granger causality test with shifted time series def granger_test_with_shift(data, target, predictor, shift): shifted_data = data.copy() shifted_data[predictor] = data[predictor].shift(shift) shifted_data.dropna(inplace=True) granger_test_result = grangercausalitytests(shifted_data[[target, predictor]], maxlag=1, verbose=False) p_value = granger_test_result[1][0]['ssr_ftest'][1] return p_value # Function to calculate cumulative profit def calculate_cumulative_profit(aligned_data, z_scores, buy_threshold, sell_threshold): positions = [] profit = 0 cumulative_profits = [] position_open = False for i in range(len(z_scores)): date = z_scores.index[i] z = z_scores.iloc[i] if z > buy_threshold and not position_open: entry_date = date entry_price1 = aligned_data.loc[date, ticker1] entry_price2 = aligned_data.loc[date, ticker2] position_open = True positions.append((entry_date, 'sell', ticker2, entry_price2, 'buy', ticker1, entry_price1)) elif z < sell_threshold and not position_open: entry_date = date entry_price1 = aligned_data.loc[date, ticker1] entry_price2 = aligned_data.loc[date, ticker2] position_open = True positions.append((entry_date, 'buy', ticker2, entry_price2, 'sell', ticker1, entry_price1)) elif position_open and abs(z) < 0.5: exit_date = date exit_price1 = aligned_data.loc[date, ticker1] exit_price2 = aligned_data.loc[date, ticker2] position_open = False entry = positions.pop() entry_date, action1, tickerA, entry_priceA, action2, tickerB, entry_priceB = entry if action1 == 'sell': profit += (entry_priceA - exit_price2) + (exit_price1 - entry_priceB) else: profit += (exit_price2 - entry_priceA) + (entry_priceB - exit_price1) cumulative_profits.append(profit) return cumulative_profits, positions # Function to sanitize the data def sanitize_data(data_map): TS_DAYS_LENGTH = (pd.to_datetime(END_DATE) - pd.to_datetime(START_DATE)).days data_sanitized = {} date_range = pd.date_range(start=START_DATE, end=END_DATE, freq='D') for ticker, data in data_map.items(): if data is None or len(data) < (TS_DAYS_LENGTH / 2): continue if len(data) > TS_DAYS_LENGTH: data = data[-TS_DAYS_LENGTH:] data = data.reindex(date_range) data.replace([np.inf, -np.inf], np.nan, inplace=True) data.interpolate(method='linear', inplace=True) data.fillna(method='pad', inplace=True) data.fillna(method='bfill', inplace=True) assert not np.any(np.isnan(data)) and not np.any(np.isinf(data)) data_sanitized[ticker] = data return data_sanitized # Function to find cointegrated pairs using Engle-Granger method def find_cointegrated_pairs(tickers_ts_map, p_value_threshold): tickers = list(tickers_ts_map.keys()) n = len(tickers) adj_close_data = np.column_stack([tickers_ts_map[ticker].values for ticker in tickers]) pvalue_matrix = np.ones((n, n)) for i, j in combinations(range(n), 2): result = coint(adj_close_data[:, i], adj_close_data[:, j]) pvalue_matrix[i, j] = result[1] pvalue_matrix[j, i] = result[1] np.fill_diagonal(pvalue_matrix, 0) pairs = [(tickers[i], tickers[j], pvalue_matrix[i, j]) for i in range(n) for j in range(i+1, n) if pvalue_matrix[i, j] < p_value_threshold] return pvalue_matrix, pairs # Function to perform Johansen test def johansen_test(data, det_order=0, k_ar_diff=1): result = coint_johansen(data, det_order, k_ar_diff) return result.lr1, result.cvt # Find cointegrated pairs using Johansen method with rolling windows def find_cointegrated_pairs_rolling(tickers_ts_map, p_value_threshold, window_size, consistency_threshold): tickers = list(tickers_ts_map.keys()) n = len(tickers) pvalue_matrix = np.ones((n, n)) consistent_pairs = [] for i, j in combinations(range(n), 2): pvalues = [] for start in range(len(tickers_ts_map[tickers[i]]) - window_size + 1): end = start + window_size window_data_i = tickers_ts_map[tickers[i]].iloc[start:end] window_data_j = tickers_ts_map[tickers[j]].iloc[start:end] window_data = pd.concat([window_data_i, window_data_j], axis=1).dropna() if window_data.shape[0] < window_size: continue test_stat, crit_values = johansen_test(window_data) if test_stat[0] > crit_values[1, 1]: # Using 95% critical value pvalues.append(0.01) else: pvalues.append(1) pvalues = np.array(pvalues) consistent_cointegration = np.mean(pvalues < p_value_threshold) if consistent_cointegration >= consistency_threshold: consistent_pairs.append((tickers[i], tickers[j], np.mean(pvalues))) pvalue_matrix[i, j] = np.mean(pvalues) pvalue_matrix[j, i] = np.mean(pvalues) else: pvalue_matrix[i, j] = 1 pvalue_matrix[j, i] = 1 return pvalue_matrix, consistent_pairs # Streamlit app st.set_page_config(layout="wide") st.title('Pairs Cointegration and Trading Analysis') st.sidebar.header('Select Page') page = st.sidebar.radio('Page:', ['Pairs Trading Analysis', 'Pair Cointegration Identification']) if page == 'Pairs Trading Analysis': st.subheader("Pairs Trading Analysis") st.write(""" ### Description This method analyzes stock and cryptocurrency prices, normalizes them, calculates rolling volatilities, tests for cointegration, and visualizes buy/sell signals based on z-scores. """) with st.sidebar.expander("How to use:", expanded=False): st.markdown("""**How to use:** 1. Enter the stock tickers, start date, and end date. 2. Set the number of days for the volatility window, and the buy/sell thresholds for the z-scores. 3. Click 'Run Analysis' to start the analysis. """) with st.sidebar.expander("Stock/Crypto Ticker and Date Selection", expanded=True): ticker1 = st.text_input('Enter First Stock/Crypto Ticker', 'ASML.AS', help="Enter the ticker symbol for the first stock or cryptocurrency.") ticker2 = st.text_input('Enter Second Stock/Crypto Ticker', 'ASML', help="Enter the ticker symbol for the second stock or cryptocurrency.") start_date = st.date_input('Start Date', pd.to_datetime('2022-01-01'), help="Select the start date for the data range.") end_date = st.date_input('End Date', pd.to_datetime(END_DATE), help="Select the end date for the data range.") with st.sidebar.expander("Method Parameters", expanded=True): volatility_window = st.number_input('Volatility Window (days)', min_value=1, max_value=365, value=30, help="Set the number of days for the rolling volatility window.") buy_threshold = st.number_input('Buy Z-Score Threshold', value=2.0, help="Set the z-score threshold to generate buy signals.") sell_threshold = st.number_input('Sell Z-Score Threshold', value=-2.0, help="Set the z-score threshold to generate sell signals.") if st.sidebar.button('Run Analysis'): try: # Data collection data1 = yf.download(ticker1, start=start_date, end=end_date, auto_adjust=False) if isinstance(data1.columns, pd.MultiIndex): data1.columns = data1.columns.get_level_values(0) data2 = yf.download(ticker2, start=start_date, end=end_date, auto_adjust=False) if isinstance(data2.columns, pd.MultiIndex): data2.columns = data2.columns.get_level_values(0) if data1.empty or data2.empty: raise ValueError(f"No data found for {ticker1} or {ticker2}") aligned_data = pd.concat([data1['Close'], data2['Close']], axis=1, join='inner') aligned_data.columns = [ticker1, ticker2] # Normalize the price series normalized_data = (aligned_data - aligned_data.mean()) / aligned_data.std() # Plot normalized data fig1 = go.Figure() fig1.add_trace(go.Scatter(x=normalized_data.index, y=normalized_data[ticker1], mode='lines', name=f'Normalized {ticker1}')) fig1.add_trace(go.Scatter(x=normalized_data.index, y=normalized_data[ticker2], mode='lines', name=f'Normalized {ticker2}')) fig1.update_layout(title=f'Normalized Price Series for {ticker1} and {ticker2}', xaxis_title='Date', yaxis_title='Normalized Price') st.plotly_chart(fig1) # Calculate daily returns returns = aligned_data.pct_change().dropna() # Calculate rolling volatilities (annualized) volatility1 = returns[ticker1].rolling(volatility_window).std() * np.sqrt(252) volatility2 = returns[ticker2].rolling(volatility_window).std() * np.sqrt(252) # Plot rolling volatilities fig2 = go.Figure() fig2.add_trace(go.Scatter(x=volatility1.index, y=volatility1, mode='lines', name=f"{ticker1} Volatility")) fig2.add_trace(go.Scatter(x=volatility2.index, y=volatility2, mode='lines', name=f"{ticker2} Volatility")) fig2.update_layout(title=f"{volatility_window}-Day Rolling Historical Volatility for {ticker1} and {ticker2}", xaxis_title='Date', yaxis_title='Volatility') st.plotly_chart(fig2) # Check for stationarity using ADF test adf_result1 = adfuller(aligned_data[ticker1]) adf_result2 = adfuller(aligned_data[ticker2]) # Perform Johansen cointegration test coint_test_stat, coint_critical_values = johansen_test(aligned_data) # If cointegration exists, proceed with VECM vecm = VECM(aligned_data, k_ar_diff=1, coint_rank=1) vecm_fit = vecm.fit() # Analyzing the residuals for stationarity residuals = vecm_fit.resid residuals_df = pd.DataFrame(residuals, index=aligned_data.index[-len(residuals):], columns=[f'Residual_{ticker1}', f'Residual_{ticker2}']) adf_residuals_1 = adfuller(residuals[:, 0]) adf_residuals_2 = adfuller(residuals[:, 1]) # Plot residuals from VECM fig3 = go.Figure() fig3.add_trace(go.Scatter(x=residuals_df.index, y=residuals_df[f'Residual_{ticker1}'], mode='lines', name=f'Residual {ticker1}')) fig3.add_trace(go.Scatter(x=residuals_df.index, y=residuals_df[f'Residual_{ticker2}'], mode='lines', name=f'Residual {ticker2}')) fig3.add_hline(y=0, line=dict(color='red', dash='dash'), name='Zero Line') fig3.update_layout(title='Residuals from VECM', xaxis_title='Date', yaxis_title='Residuals') st.plotly_chart(fig3) # Display ADF test results for the tickers st.write(f"ADF Statistic for {ticker1}: {adf_result1[0]}, p-value: {adf_result1[1]}") st.write(f"ADF Statistic for {ticker2}: {adf_result2[0]}, p-value: {adf_result2[1]}") with st.expander("How it Works", expanded=False): st.markdown(""" **ADF Test:** - The Augmented Dickey-Fuller (ADF) test checks whether a time series has a unit root, i.e., whether it is non-stationary. - If the p-value is less than 0.05, we reject the null hypothesis that the series has a unit root, indicating that the series is stationary. **Johansen Cointegration Test:** - The Johansen test is used to determine the number of cointegrating relationships among multiple time series. - If the test statistic is greater than the critical value, we reject the null hypothesis that there is no cointegration. **VECM (Vector Error Correction Model):** - A VECM is a special form of a VAR (Vector Autoregression) model used for cointegrated series. It corrects for disequilibrium in the short run while keeping the long-term relationship intact. **Z-Score Trading Strategy:** - Z-scores measure how many standard deviations an element is from the mean. In pairs trading, z-scores are used to identify overbought or oversold conditions, triggering buy or sell signals. """) st.markdown("#### Interpretation of ADF Results") st.latex(r''' H_0: \text{The series has a unit root (non-stationary)} \\ H_1: \text{The series does not have a unit root (stationary)} ''') st.write(""" - The Augmented Dickey-Fuller (ADF) test checks the null hypothesis that a unit root is present in a time series sample. """) if adf_result1[1] < 0.05: st.write(f"{ticker1} is stationary, indicating the series does not have a unit root.") else: st.write(f"{ticker1} is not stationary, indicating the series has a unit root.") if adf_result2[1] < 0.05: st.write(f"{ticker2} is stationary, indicating the series does not have a unit root.") else: st.write(f"{ticker2} is not stationary, indicating the series has a unit root.") # Display cointegration test results st.write("Johansen Cointegration Test Results:") johansen_results = pd.DataFrame({ 'Test Statistic': coint_test_stat, '90% Critical Value': coint_critical_values[:, 0], '95% Critical Value': coint_critical_values[:, 1], '99% Critical Value': coint_critical_values[:, 2] }, index=[f'Cointegration Test {i+1}' for i in range(len(coint_test_stat))]) st.write(johansen_results) st.markdown("#### Interpretation of Johansen Cointegration Test Results") st.latex(r''' H_0: \text{No cointegration relationship exists} \\ H_1: \text{Cointegration relationship exists} ''') st.write(""" - The Johansen cointegration test is used to determine the cointegration rank between multiple time series. """) if coint_test_stat[0] > coint_critical_values[0, 1]: st.write(f"The two assets {ticker1} and {ticker2} are cointegrated at the 95% confidence level.") else: st.write(f"The two assets {ticker1} and {ticker2} are not cointegrated at the 95% confidence level.") st.markdown("#### Interpretation of VECM Residuals") st.write(f"ADF Statistic for VECM residuals of {ticker1}: {adf_residuals_1[0]}, p-value: {adf_residuals_1[1]}") st.write(f"ADF Statistic for VECM residuals of {ticker2}: {adf_residuals_2[0]}, p-value: {adf_residuals_2[1]}") st.write(""" - The residuals from the Vector Error Correction Model (VECM) should be stationary to confirm cointegration. """) if adf_residuals_1[1] < 0.1: st.write(f"The residuals of the VECM model for {ticker1} are stationary, confirming cointegration.") else: st.write(f"The residuals of the VECM model for {ticker1} are not stationary, suggesting no cointegration.") if adf_residuals_2[1] < 0.1: st.write(f"The residuals of the VECM model for {ticker2} are stationary, confirming cointegration.") else: st.write(f"The residuals of the VECM model for {ticker2} are not stationary, suggesting no cointegration.") # Calculate cross-correlation for a range of lags lag_range = range(-30, 31) cross_correlations = [cross_correlation(returns[ticker1], returns[ticker2], lag) for lag in lag_range] # Plot cross-correlation for different lags fig4 = go.Figure() fig4.add_trace(go.Scatter(x=list(lag_range), y=cross_correlations, mode='lines+markers')) fig4.add_hline(y=0, line=dict(color='gray', dash='dash')) fig4.add_vline(x=0, line=dict(color='red', dash='dash')) fig4.update_layout(title=f"Cross-Correlation between {ticker1} and {ticker2}", xaxis_title='Lag (days)', yaxis_title='Cross-Correlation') st.plotly_chart(fig4) st.markdown("#### Interpretation of Cross-Correlation Results") max_corr = max(cross_correlations) max_lag = lag_range[cross_correlations.index(max_corr)] second_max_corr = max(corr for i, corr in enumerate(cross_correlations) if corr != max_corr) second_max_lag = lag_range[cross_correlations.index(second_max_corr)] st.write(f"Highest correlation: {max_corr:.2f} at lag {max_lag}") st.write(f"Second highest correlation: {second_max_corr:.2f} at lag {second_max_lag}") interpretation = f"Highest correlation at lag {max_lag}: The high correlation at lag {max_lag} indicates that {ticker1} and {ticker2} move together without any significant lead or lag. In other words, any movements in {ticker1} are almost instantaneously reflected in {ticker2} and vice versa. This is typical for cross-listed assets, where information and price changes are quickly reflected in both markets.\n" if second_max_lag < 0: leading_ticker = ticker2 lagging_ticker = ticker1 lead_days = abs(second_max_lag) direction = f"Second highest correlation at lag {second_max_lag}: {leading_ticker} leads {lagging_ticker} by {lead_days} days. This means that movements in {leading_ticker} tend to precede similar movements in {lagging_ticker} by {lead_days} day(s)." elif second_max_lag > 0: leading_ticker = ticker1 lagging_ticker = ticker2 lead_days = second_max_lag direction = f"Second highest correlation at lag {second_max_lag}: {leading_ticker} leads {lagging_ticker} by {lead_days} days. This means that movements in {leading_ticker} tend to precede similar movements in {lagging_ticker} by {lead_days} day(s)." else: direction = "No significant lead/lag relationship; they move simultaneously." interpretation += direction st.write(interpretation) # Granger causality test with shifts shift_range = range(-5, 6) granger_p_values_shift_1_to_2 = {shift: granger_test_with_shift(aligned_data, ticker1, ticker2, shift) for shift in shift_range} granger_p_values_shift_2_to_1 = {shift: granger_test_with_shift(aligned_data, ticker2, ticker1, shift) for shift in shift_range} # Create DataFrames for plotting Granger causality test results granger_p_values_df_shift_1_to_2 = pd.DataFrame(granger_p_values_shift_1_to_2, index=[f"{ticker1} causes {ticker2}"]).T granger_p_values_df_shift_2_to_1 = pd.DataFrame(granger_p_values_shift_2_to_1, index=[f"{ticker2} causes {ticker1}"]).T # Plot Granger causality test p-values with shifts fig5 = go.Figure() fig5.add_trace(go.Scatter(x=granger_p_values_df_shift_1_to_2.index, y=granger_p_values_df_shift_1_to_2[f"{ticker1} causes {ticker2}"], mode='lines+markers', name=f"{ticker1} causes {ticker2}")) fig5.add_trace(go.Scatter(x=granger_p_values_df_shift_2_to_1.index, y=granger_p_values_df_shift_2_to_1[f"{ticker2} causes {ticker1}"], mode='lines+markers', name=f"{ticker2} causes {ticker1}")) fig5.add_hline(y=0.05, line=dict(color='gray', dash='dash')) fig5.add_vline(x=0, line=dict(color='red', dash='dash')) fig5.update_layout(title=f"Granger Causality Test p-values with Shifts between {ticker1} and {ticker2}", xaxis_title='Shift (days)', yaxis_title='p-value') st.plotly_chart(fig5) st.markdown("#### Interpretation of Granger Causality Test Results") best_lag_1_to_2 = min(granger_p_values_shift_1_to_2, key=granger_p_values_shift_1_to_2.get) best_lag_2_to_1 = min(granger_p_values_shift_2_to_1, key=granger_p_values_shift_2_to_1.get) interpretation = "" if granger_p_values_shift_1_to_2[best_lag_1_to_2] < 0.05 and granger_p_values_shift_1_to_2[best_lag_1_to_2] < granger_p_values_shift_2_to_1[best_lag_2_to_1]: causality_direction = f"{ticker1} causes {ticker2}" best_lag = best_lag_1_to_2 interpretation += f"Granger causality test with shifts suggests that {ticker1} causes {ticker2} with a lag of {abs(best_lag)} days.\n" interpretation += f"This means that movements in {ticker1} tend to lead movements in {ticker2} by {abs(best_lag)} days. In practical terms, if {ticker1} experiences a price change, we can expect a similar change in {ticker2} approximately {abs(best_lag)} days later." else: causality_direction = f"{ticker2} causes {ticker1}" best_lag = best_lag_2_to_1 interpretation += f"Granger causality test with shifts suggests that {ticker2} causes {ticker1} with a lag of {abs(best_lag)} days.\n" interpretation += f"This means that movements in {ticker2} tend to lead movements in {ticker1} by {abs(best_lag)} days. In practical terms, if {ticker2} experiences a price change, we can expect a similar change in {ticker1} approximately {abs(best_lag)} days later." st.write(interpretation) # Adjust data based on the identified best lag adjusted_data = aligned_data.copy() adjusted_data[ticker1] = adjusted_data[ticker1].shift(best_lag).dropna() adjusted_data = adjusted_data.dropna() # Calculate the residuals model = OLS(adjusted_data[ticker2], adjusted_data[ticker1]) results = model.fit() residuals = adjusted_data[ticker2] - results.params[ticker1] * adjusted_data[ticker1] # Calculate Z-Scores residuals_mean = residuals.mean() residuals_std = residuals.std() z_scores = (residuals - residuals_mean) / residuals_std # Generate buy and sell signals buy_signals = z_scores[z_scores > buy_threshold] sell_signals = z_scores[z_scores < sell_threshold] # Plot the residuals with buy and sell signals fig6 = go.Figure() fig6.add_trace(go.Scatter(x=z_scores.index, y=z_scores, mode='lines', name='Z-Score of Residuals')) fig6.add_trace(go.Scatter(x=buy_signals.index, y=buy_signals, mode='markers', marker=dict(color='green', symbol='triangle-up', size=10), name=f'Buy {ticker1}, Sell {ticker2} Signal')) fig6.add_trace(go.Scatter(x=sell_signals.index, y=sell_signals, mode='markers', marker=dict(color='red', symbol='triangle-down', size=10), name=f'Sell {ticker1}, Buy {ticker2} Signal')) fig6.add_hline(y=buy_threshold, line=dict(color='gray', dash='dash')) fig6.add_hline(y=sell_threshold, line=dict(color='gray', dash='dash')) fig6.update_layout(title=f"Residuals (Adjusted for Lag) with Buy and Sell Signals based on Z-Scores", xaxis_title='Date', yaxis_title='Z-Score') st.plotly_chart(fig6) # Calculate cumulative profits and positions cumulative_profits, positions = calculate_cumulative_profit(aligned_data, z_scores, buy_threshold, sell_threshold) # Plot the cumulative profit fig7 = go.Figure() fig7.add_trace(go.Scatter(x=aligned_data.index[:len(cumulative_profits)], y=cumulative_profits, mode='lines', name='Cumulative Profit')) fig7.update_layout(title=f"Cumulative Profit from Z-Score Trading Strategy", xaxis_title='Date', yaxis_title='Cumulative Profit') st.plotly_chart(fig7) st.markdown("#### Interpretation of Trading Signals and Cumulative Profit") st.write(f"Cumulative Profit: {cumulative_profits[-1]:.2f}") st.write(""" - The trading strategy uses z-scores to generate buy and sell signals. - The cumulative profit shows the total profit from the trading strategy over the analyzed period. """) except Exception as e: st.error(f"Error: {str(e)}. Check ticker symbols or date range.") elif page == 'Pair Cointegration Identification': st.subheader("Cointegration Identification") st.write(""" ### Description This method identifies cointegrated pairs using two methods: Engle-Granger and Johansen Cointegration tests. It works for both stocks and cryptocurrency pairs. """) method = st.sidebar.selectbox('Select Cointegration Method', ['Engle-Granger', 'Johansen Cointegration']) with st.sidebar.expander("Stock/Crypto Ticker and Date Selection", expanded=True): tickers_input = st.text_input('Enter Stock or Crypto Tickers (comma-separated)', ', '.join(default_tickers), help="Enter the ticker symbols for stocks or cryptocurrencies you want to analyze.") start_date = st.date_input('Start Date', pd.to_datetime(START_DATE), help="Select the start date for the data range.") end_date = st.date_input('End Date', pd.to_datetime(END_DATE), help="Select the end date for the data range.") if st.sidebar.button('Run Cointegration Analysis'): try: tickers = [ticker.strip() for ticker in tickers_input.split(',')] universe_tickers_ts_map = {ticker: load_ticker_ts_df(ticker, start_date, end_date) for ticker in tickers} uts_sanitized = sanitize_data(universe_tickers_ts_map) if not uts_sanitized: raise ValueError("No valid data after sanitization. Check tickers or date range.") if method == 'Engle-Granger': pvalues, pairs = find_cointegrated_pairs(uts_sanitized, P_VALUE_THRESHOLD) masked_pvalues = np.where(pvalues > P_VALUE_THRESHOLD, np.nan, pvalues) tickers_list = list(uts_sanitized.keys()) fig_heatmap = px.imshow(masked_pvalues, x=tickers_list, y=tickers_list, color_continuous_scale='RdYlGn_r', title='Cointegration Heatmap (Engle-Granger)', labels=dict(x='Tickers', y='Tickers', color='P-value'), zmin=0, zmax=P_VALUE_THRESHOLD) else: pvalues, pairs = find_cointegrated_pairs_rolling(uts_sanitized, P_VALUE_THRESHOLD, ROLLING_WINDOW_SIZE, CONSISTENT_COINTEGRATION_THRESHOLD) masked_pvalues = np.where(pvalues > P_VALUE_THRESHOLD, np.nan, pvalues) tickers_list = list(uts_sanitized.keys()) fig_heatmap = px.imshow(masked_pvalues, x=tickers_list, y=tickers_list, color_continuous_scale='RdYlGn_r', title='Cointegration Heatmap (Johansen)', labels=dict(x='Tickers', y='Tickers', color='P-value'), zmin=0, zmax=P_VALUE_THRESHOLD) st.plotly_chart(fig_heatmap) top_10_pairs = sorted(pairs, key=lambda x: x[2])[:10] pair_labels = [f"{pair[0]} & {pair[1]}" for pair in top_10_pairs] pair_values = [pair[2] for pair in top_10_pairs] fig_bar = go.Figure([go.Bar(x=pair_values, y=pair_labels, orientation='h')]) fig_bar.update_layout(title='Top 10 Most Cointegrated Pairs', xaxis_title='P-value', yaxis_title='Asset Pairs', yaxis=dict(autorange='reversed')) st.plotly_chart(fig_bar) except Exception as e: st.error(f"Error: {str(e)}. Check ticker symbols or date range.") with st.expander("How it Works", expanded=False): st.markdown(""" **Cointegration Overview:** - Cointegration is a statistical property of a collection of time series variables. Two or more series are cointegrated if they share a common stochastic drift. **Engle-Granger Method:** - This method involves estimating a long-term equilibrium relationship between two non-stationary series and testing whether the residuals from this relationship are stationary. **Johansen Method:** - The Johansen test is a more general procedure that allows for more than two series and can identify multiple cointegrating relationships. """) # Hide the default Streamlit menu and footer hide_streamlit_style = """ """ st.markdown(hide_streamlit_style, unsafe_allow_html=True)