Spaces:
Sleeping
Sleeping
| import warnings | |
| warnings.filterwarnings('ignore') # Ignore warnings for cleaner output | |
| import yfinance as yf | |
| import pandas as pd | |
| import numpy as np | |
| import streamlit as st | |
| import plotly.graph_objects as go | |
| import seaborn as sns | |
| from itertools import combinations | |
| from statsmodels.tsa.api import VECM | |
| from statsmodels.tsa.stattools import adfuller | |
| from statsmodels.tsa.vector_ar.vecm import coint_johansen | |
| from statsmodels.tsa.stattools import grangercausalitytests, coint | |
| from statsmodels.regression.linear_model import OLS | |
| import plotly.express as px | |
| # Parameters | |
| START_DATE = '2021-01-01' | |
| END_DATE = pd.to_datetime('today') + pd.Timedelta(days=1) | |
| P_VALUE_THRESHOLD = 0.05 | |
| ROLLING_WINDOW_SIZE = 252 | |
| CONSISTENT_COINTEGRATION_THRESHOLD = 0.8 | |
| # List of tickers for cryptocurrencies, bank stocks, and global indexes | |
| default_tickers = ['BTC-USD', 'ETH-USD', 'BNB-USD', 'JPM', 'BAC', 'WFC', 'C'] | |
| # Function to load adjusted close price data for a given ticker | |
| def load_ticker_ts_df(ticker, start, end): | |
| data = yf.download(ticker, start=start, end=end, auto_adjust=False) # Unadjusted prices | |
| if isinstance(data.columns, pd.MultiIndex): # Flatten multi-index | |
| data.columns = data.columns.get_level_values(0) | |
| if data.empty: | |
| raise ValueError(f"No data found for {ticker}") | |
| return data['Adj Close'] | |
| # Function to calculate cross-correlation at different lags | |
| def cross_correlation(series1, series2, lag): | |
| if lag > 0: | |
| return np.corrcoef(series1[:-lag], series2[lag:])[0, 1] | |
| elif lag < 0: | |
| return np.corrcoef(series1[-lag:], series2[:lag])[0, 1] | |
| else: | |
| return np.corrcoef(series1, series2)[0, 1] | |
| # Function to perform Granger causality test with shifted time series | |
| def granger_test_with_shift(data, target, predictor, shift): | |
| shifted_data = data.copy() | |
| shifted_data[predictor] = data[predictor].shift(shift) | |
| shifted_data.dropna(inplace=True) | |
| granger_test_result = grangercausalitytests(shifted_data[[target, predictor]], maxlag=1, verbose=False) | |
| p_value = granger_test_result[1][0]['ssr_ftest'][1] | |
| return p_value | |
| # Function to calculate cumulative profit | |
| def calculate_cumulative_profit(aligned_data, z_scores, buy_threshold, sell_threshold): | |
| positions = [] | |
| profit = 0 | |
| cumulative_profits = [] | |
| position_open = False | |
| for i in range(len(z_scores)): | |
| date = z_scores.index[i] | |
| z = z_scores.iloc[i] | |
| if z > buy_threshold and not position_open: | |
| entry_date = date | |
| entry_price1 = aligned_data.loc[date, ticker1] | |
| entry_price2 = aligned_data.loc[date, ticker2] | |
| position_open = True | |
| positions.append((entry_date, 'sell', ticker2, entry_price2, 'buy', ticker1, entry_price1)) | |
| elif z < sell_threshold and not position_open: | |
| entry_date = date | |
| entry_price1 = aligned_data.loc[date, ticker1] | |
| entry_price2 = aligned_data.loc[date, ticker2] | |
| position_open = True | |
| positions.append((entry_date, 'buy', ticker2, entry_price2, 'sell', ticker1, entry_price1)) | |
| elif position_open and abs(z) < 0.5: | |
| exit_date = date | |
| exit_price1 = aligned_data.loc[date, ticker1] | |
| exit_price2 = aligned_data.loc[date, ticker2] | |
| position_open = False | |
| entry = positions.pop() | |
| entry_date, action1, tickerA, entry_priceA, action2, tickerB, entry_priceB = entry | |
| if action1 == 'sell': | |
| profit += (entry_priceA - exit_price2) + (exit_price1 - entry_priceB) | |
| else: | |
| profit += (exit_price2 - entry_priceA) + (entry_priceB - exit_price1) | |
| cumulative_profits.append(profit) | |
| return cumulative_profits, positions | |
| # Function to sanitize the data | |
| def sanitize_data(data_map): | |
| TS_DAYS_LENGTH = (pd.to_datetime(END_DATE) - pd.to_datetime(START_DATE)).days | |
| data_sanitized = {} | |
| date_range = pd.date_range(start=START_DATE, end=END_DATE, freq='D') | |
| for ticker, data in data_map.items(): | |
| if data is None or len(data) < (TS_DAYS_LENGTH / 2): | |
| continue | |
| if len(data) > TS_DAYS_LENGTH: | |
| data = data[-TS_DAYS_LENGTH:] | |
| data = data.reindex(date_range) | |
| data.replace([np.inf, -np.inf], np.nan, inplace=True) | |
| data.interpolate(method='linear', inplace=True) | |
| data.fillna(method='pad', inplace=True) | |
| data.fillna(method='bfill', inplace=True) | |
| assert not np.any(np.isnan(data)) and not np.any(np.isinf(data)) | |
| data_sanitized[ticker] = data | |
| return data_sanitized | |
| # Function to find cointegrated pairs using Engle-Granger method | |
| def find_cointegrated_pairs(tickers_ts_map, p_value_threshold): | |
| tickers = list(tickers_ts_map.keys()) | |
| n = len(tickers) | |
| adj_close_data = np.column_stack([tickers_ts_map[ticker].values for ticker in tickers]) | |
| pvalue_matrix = np.ones((n, n)) | |
| for i, j in combinations(range(n), 2): | |
| result = coint(adj_close_data[:, i], adj_close_data[:, j]) | |
| pvalue_matrix[i, j] = result[1] | |
| pvalue_matrix[j, i] = result[1] | |
| np.fill_diagonal(pvalue_matrix, 0) | |
| pairs = [(tickers[i], tickers[j], pvalue_matrix[i, j]) for i in range(n) for j in range(i+1, n) if pvalue_matrix[i, j] < p_value_threshold] | |
| return pvalue_matrix, pairs | |
| # Function to perform Johansen test | |
| def johansen_test(data, det_order=0, k_ar_diff=1): | |
| result = coint_johansen(data, det_order, k_ar_diff) | |
| return result.lr1, result.cvt | |
| # Find cointegrated pairs using Johansen method with rolling windows | |
| def find_cointegrated_pairs_rolling(tickers_ts_map, p_value_threshold, window_size, consistency_threshold): | |
| tickers = list(tickers_ts_map.keys()) | |
| n = len(tickers) | |
| pvalue_matrix = np.ones((n, n)) | |
| consistent_pairs = [] | |
| for i, j in combinations(range(n), 2): | |
| pvalues = [] | |
| for start in range(len(tickers_ts_map[tickers[i]]) - window_size + 1): | |
| end = start + window_size | |
| window_data_i = tickers_ts_map[tickers[i]].iloc[start:end] | |
| window_data_j = tickers_ts_map[tickers[j]].iloc[start:end] | |
| window_data = pd.concat([window_data_i, window_data_j], axis=1).dropna() | |
| if window_data.shape[0] < window_size: | |
| continue | |
| test_stat, crit_values = johansen_test(window_data) | |
| if test_stat[0] > crit_values[1, 1]: # Using 95% critical value | |
| pvalues.append(0.01) | |
| else: | |
| pvalues.append(1) | |
| pvalues = np.array(pvalues) | |
| consistent_cointegration = np.mean(pvalues < p_value_threshold) | |
| if consistent_cointegration >= consistency_threshold: | |
| consistent_pairs.append((tickers[i], tickers[j], np.mean(pvalues))) | |
| pvalue_matrix[i, j] = np.mean(pvalues) | |
| pvalue_matrix[j, i] = np.mean(pvalues) | |
| else: | |
| pvalue_matrix[i, j] = 1 | |
| pvalue_matrix[j, i] = 1 | |
| return pvalue_matrix, consistent_pairs | |
| # Streamlit app | |
| st.set_page_config(layout="wide") | |
| st.title('Pairs Cointegration and Trading Analysis') | |
| st.sidebar.header('Select Page') | |
| page = st.sidebar.radio('Page:', ['Pairs Trading Analysis', 'Pair Cointegration Identification']) | |
| if page == 'Pairs Trading Analysis': | |
| st.subheader("Pairs Trading Analysis") | |
| st.write(""" | |
| ### Description | |
| This method analyzes stock and cryptocurrency prices, normalizes them, calculates rolling volatilities, tests for cointegration, and visualizes buy/sell signals based on z-scores. | |
| """) | |
| with st.sidebar.expander("How to use:", expanded=False): | |
| st.markdown("""**How to use:** | |
| 1. Enter the stock tickers, start date, and end date. | |
| 2. Set the number of days for the volatility window, and the buy/sell thresholds for the z-scores. | |
| 3. Click 'Run Analysis' to start the analysis. | |
| """) | |
| with st.sidebar.expander("Stock/Crypto Ticker and Date Selection", expanded=True): | |
| ticker1 = st.text_input('Enter First Stock/Crypto Ticker', 'ASML.AS', help="Enter the ticker symbol for the first stock or cryptocurrency.") | |
| ticker2 = st.text_input('Enter Second Stock/Crypto Ticker', 'ASML', help="Enter the ticker symbol for the second stock or cryptocurrency.") | |
| start_date = st.date_input('Start Date', pd.to_datetime('2022-01-01'), help="Select the start date for the data range.") | |
| end_date = st.date_input('End Date', pd.to_datetime(END_DATE), help="Select the end date for the data range.") | |
| with st.sidebar.expander("Method Parameters", expanded=True): | |
| volatility_window = st.number_input('Volatility Window (days)', min_value=1, max_value=365, value=30, help="Set the number of days for the rolling volatility window.") | |
| buy_threshold = st.number_input('Buy Z-Score Threshold', value=2.0, help="Set the z-score threshold to generate buy signals.") | |
| sell_threshold = st.number_input('Sell Z-Score Threshold', value=-2.0, help="Set the z-score threshold to generate sell signals.") | |
| if st.sidebar.button('Run Analysis'): | |
| try: | |
| # Data collection | |
| data1 = yf.download(ticker1, start=start_date, end=end_date, auto_adjust=False) | |
| if isinstance(data1.columns, pd.MultiIndex): | |
| data1.columns = data1.columns.get_level_values(0) | |
| data2 = yf.download(ticker2, start=start_date, end=end_date, auto_adjust=False) | |
| if isinstance(data2.columns, pd.MultiIndex): | |
| data2.columns = data2.columns.get_level_values(0) | |
| if data1.empty or data2.empty: | |
| raise ValueError(f"No data found for {ticker1} or {ticker2}") | |
| aligned_data = pd.concat([data1['Close'], data2['Close']], axis=1, join='inner') | |
| aligned_data.columns = [ticker1, ticker2] | |
| # Normalize the price series | |
| normalized_data = (aligned_data - aligned_data.mean()) / aligned_data.std() | |
| # Plot normalized data | |
| fig1 = go.Figure() | |
| fig1.add_trace(go.Scatter(x=normalized_data.index, y=normalized_data[ticker1], mode='lines', name=f'Normalized {ticker1}')) | |
| fig1.add_trace(go.Scatter(x=normalized_data.index, y=normalized_data[ticker2], mode='lines', name=f'Normalized {ticker2}')) | |
| fig1.update_layout(title=f'Normalized Price Series for {ticker1} and {ticker2}', xaxis_title='Date', yaxis_title='Normalized Price') | |
| st.plotly_chart(fig1) | |
| # Calculate daily returns | |
| returns = aligned_data.pct_change().dropna() | |
| # Calculate rolling volatilities (annualized) | |
| volatility1 = returns[ticker1].rolling(volatility_window).std() * np.sqrt(252) | |
| volatility2 = returns[ticker2].rolling(volatility_window).std() * np.sqrt(252) | |
| # Plot rolling volatilities | |
| fig2 = go.Figure() | |
| fig2.add_trace(go.Scatter(x=volatility1.index, y=volatility1, mode='lines', name=f"{ticker1} Volatility")) | |
| fig2.add_trace(go.Scatter(x=volatility2.index, y=volatility2, mode='lines', name=f"{ticker2} Volatility")) | |
| fig2.update_layout(title=f"{volatility_window}-Day Rolling Historical Volatility for {ticker1} and {ticker2}", xaxis_title='Date', yaxis_title='Volatility') | |
| st.plotly_chart(fig2) | |
| # Check for stationarity using ADF test | |
| adf_result1 = adfuller(aligned_data[ticker1]) | |
| adf_result2 = adfuller(aligned_data[ticker2]) | |
| # Perform Johansen cointegration test | |
| coint_test_stat, coint_critical_values = johansen_test(aligned_data) | |
| # If cointegration exists, proceed with VECM | |
| vecm = VECM(aligned_data, k_ar_diff=1, coint_rank=1) | |
| vecm_fit = vecm.fit() | |
| # Analyzing the residuals for stationarity | |
| residuals = vecm_fit.resid | |
| residuals_df = pd.DataFrame(residuals, index=aligned_data.index[-len(residuals):], columns=[f'Residual_{ticker1}', f'Residual_{ticker2}']) | |
| adf_residuals_1 = adfuller(residuals[:, 0]) | |
| adf_residuals_2 = adfuller(residuals[:, 1]) | |
| # Plot residuals from VECM | |
| fig3 = go.Figure() | |
| fig3.add_trace(go.Scatter(x=residuals_df.index, y=residuals_df[f'Residual_{ticker1}'], mode='lines', name=f'Residual {ticker1}')) | |
| fig3.add_trace(go.Scatter(x=residuals_df.index, y=residuals_df[f'Residual_{ticker2}'], mode='lines', name=f'Residual {ticker2}')) | |
| fig3.add_hline(y=0, line=dict(color='red', dash='dash'), name='Zero Line') | |
| fig3.update_layout(title='Residuals from VECM', xaxis_title='Date', yaxis_title='Residuals') | |
| st.plotly_chart(fig3) | |
| # Display ADF test results for the tickers | |
| st.write(f"ADF Statistic for {ticker1}: {adf_result1[0]}, p-value: {adf_result1[1]}") | |
| st.write(f"ADF Statistic for {ticker2}: {adf_result2[0]}, p-value: {adf_result2[1]}") | |
| with st.expander("How it Works", expanded=False): | |
| st.markdown(""" | |
| **ADF Test:** | |
| - The Augmented Dickey-Fuller (ADF) test checks whether a time series has a unit root, i.e., whether it is non-stationary. | |
| - If the p-value is less than 0.05, we reject the null hypothesis that the series has a unit root, indicating that the series is stationary. | |
| **Johansen Cointegration Test:** | |
| - The Johansen test is used to determine the number of cointegrating relationships among multiple time series. | |
| - If the test statistic is greater than the critical value, we reject the null hypothesis that there is no cointegration. | |
| **VECM (Vector Error Correction Model):** | |
| - A VECM is a special form of a VAR (Vector Autoregression) model used for cointegrated series. It corrects for disequilibrium in the short run while keeping the long-term relationship intact. | |
| **Z-Score Trading Strategy:** | |
| - Z-scores measure how many standard deviations an element is from the mean. In pairs trading, z-scores are used to identify overbought or oversold conditions, triggering buy or sell signals. | |
| """) | |
| st.markdown("#### Interpretation of ADF Results") | |
| st.latex(r''' | |
| H_0: \text{The series has a unit root (non-stationary)} \\ | |
| H_1: \text{The series does not have a unit root (stationary)} | |
| ''') | |
| st.write(""" | |
| - The Augmented Dickey-Fuller (ADF) test checks the null hypothesis that a unit root is present in a time series sample. | |
| """) | |
| if adf_result1[1] < 0.05: | |
| st.write(f"{ticker1} is stationary, indicating the series does not have a unit root.") | |
| else: | |
| st.write(f"{ticker1} is not stationary, indicating the series has a unit root.") | |
| if adf_result2[1] < 0.05: | |
| st.write(f"{ticker2} is stationary, indicating the series does not have a unit root.") | |
| else: | |
| st.write(f"{ticker2} is not stationary, indicating the series has a unit root.") | |
| # Display cointegration test results | |
| st.write("Johansen Cointegration Test Results:") | |
| johansen_results = pd.DataFrame({ | |
| 'Test Statistic': coint_test_stat, | |
| '90% Critical Value': coint_critical_values[:, 0], | |
| '95% Critical Value': coint_critical_values[:, 1], | |
| '99% Critical Value': coint_critical_values[:, 2] | |
| }, index=[f'Cointegration Test {i+1}' for i in range(len(coint_test_stat))]) | |
| st.write(johansen_results) | |
| st.markdown("#### Interpretation of Johansen Cointegration Test Results") | |
| st.latex(r''' | |
| H_0: \text{No cointegration relationship exists} \\ | |
| H_1: \text{Cointegration relationship exists} | |
| ''') | |
| st.write(""" | |
| - The Johansen cointegration test is used to determine the cointegration rank between multiple time series. | |
| """) | |
| if coint_test_stat[0] > coint_critical_values[0, 1]: | |
| st.write(f"The two assets {ticker1} and {ticker2} are cointegrated at the 95% confidence level.") | |
| else: | |
| st.write(f"The two assets {ticker1} and {ticker2} are not cointegrated at the 95% confidence level.") | |
| st.markdown("#### Interpretation of VECM Residuals") | |
| st.write(f"ADF Statistic for VECM residuals of {ticker1}: {adf_residuals_1[0]}, p-value: {adf_residuals_1[1]}") | |
| st.write(f"ADF Statistic for VECM residuals of {ticker2}: {adf_residuals_2[0]}, p-value: {adf_residuals_2[1]}") | |
| st.write(""" | |
| - The residuals from the Vector Error Correction Model (VECM) should be stationary to confirm cointegration. | |
| """) | |
| if adf_residuals_1[1] < 0.1: | |
| st.write(f"The residuals of the VECM model for {ticker1} are stationary, confirming cointegration.") | |
| else: | |
| st.write(f"The residuals of the VECM model for {ticker1} are not stationary, suggesting no cointegration.") | |
| if adf_residuals_2[1] < 0.1: | |
| st.write(f"The residuals of the VECM model for {ticker2} are stationary, confirming cointegration.") | |
| else: | |
| st.write(f"The residuals of the VECM model for {ticker2} are not stationary, suggesting no cointegration.") | |
| # Calculate cross-correlation for a range of lags | |
| lag_range = range(-30, 31) | |
| cross_correlations = [cross_correlation(returns[ticker1], returns[ticker2], lag) for lag in lag_range] | |
| # Plot cross-correlation for different lags | |
| fig4 = go.Figure() | |
| fig4.add_trace(go.Scatter(x=list(lag_range), y=cross_correlations, mode='lines+markers')) | |
| fig4.add_hline(y=0, line=dict(color='gray', dash='dash')) | |
| fig4.add_vline(x=0, line=dict(color='red', dash='dash')) | |
| fig4.update_layout(title=f"Cross-Correlation between {ticker1} and {ticker2}", xaxis_title='Lag (days)', yaxis_title='Cross-Correlation') | |
| st.plotly_chart(fig4) | |
| st.markdown("#### Interpretation of Cross-Correlation Results") | |
| max_corr = max(cross_correlations) | |
| max_lag = lag_range[cross_correlations.index(max_corr)] | |
| second_max_corr = max(corr for i, corr in enumerate(cross_correlations) if corr != max_corr) | |
| second_max_lag = lag_range[cross_correlations.index(second_max_corr)] | |
| st.write(f"Highest correlation: {max_corr:.2f} at lag {max_lag}") | |
| st.write(f"Second highest correlation: {second_max_corr:.2f} at lag {second_max_lag}") | |
| interpretation = f"Highest correlation at lag {max_lag}: The high correlation at lag {max_lag} indicates that {ticker1} and {ticker2} move together without any significant lead or lag. In other words, any movements in {ticker1} are almost instantaneously reflected in {ticker2} and vice versa. This is typical for cross-listed assets, where information and price changes are quickly reflected in both markets.\n" | |
| if second_max_lag < 0: | |
| leading_ticker = ticker2 | |
| lagging_ticker = ticker1 | |
| lead_days = abs(second_max_lag) | |
| direction = f"Second highest correlation at lag {second_max_lag}: {leading_ticker} leads {lagging_ticker} by {lead_days} days. This means that movements in {leading_ticker} tend to precede similar movements in {lagging_ticker} by {lead_days} day(s)." | |
| elif second_max_lag > 0: | |
| leading_ticker = ticker1 | |
| lagging_ticker = ticker2 | |
| lead_days = second_max_lag | |
| direction = f"Second highest correlation at lag {second_max_lag}: {leading_ticker} leads {lagging_ticker} by {lead_days} days. This means that movements in {leading_ticker} tend to precede similar movements in {lagging_ticker} by {lead_days} day(s)." | |
| else: | |
| direction = "No significant lead/lag relationship; they move simultaneously." | |
| interpretation += direction | |
| st.write(interpretation) | |
| # Granger causality test with shifts | |
| shift_range = range(-5, 6) | |
| granger_p_values_shift_1_to_2 = {shift: granger_test_with_shift(aligned_data, ticker1, ticker2, shift) for shift in shift_range} | |
| granger_p_values_shift_2_to_1 = {shift: granger_test_with_shift(aligned_data, ticker2, ticker1, shift) for shift in shift_range} | |
| # Create DataFrames for plotting Granger causality test results | |
| granger_p_values_df_shift_1_to_2 = pd.DataFrame(granger_p_values_shift_1_to_2, index=[f"{ticker1} causes {ticker2}"]).T | |
| granger_p_values_df_shift_2_to_1 = pd.DataFrame(granger_p_values_shift_2_to_1, index=[f"{ticker2} causes {ticker1}"]).T | |
| # Plot Granger causality test p-values with shifts | |
| fig5 = go.Figure() | |
| fig5.add_trace(go.Scatter(x=granger_p_values_df_shift_1_to_2.index, y=granger_p_values_df_shift_1_to_2[f"{ticker1} causes {ticker2}"], mode='lines+markers', name=f"{ticker1} causes {ticker2}")) | |
| fig5.add_trace(go.Scatter(x=granger_p_values_df_shift_2_to_1.index, y=granger_p_values_df_shift_2_to_1[f"{ticker2} causes {ticker1}"], mode='lines+markers', name=f"{ticker2} causes {ticker1}")) | |
| fig5.add_hline(y=0.05, line=dict(color='gray', dash='dash')) | |
| fig5.add_vline(x=0, line=dict(color='red', dash='dash')) | |
| fig5.update_layout(title=f"Granger Causality Test p-values with Shifts between {ticker1} and {ticker2}", xaxis_title='Shift (days)', yaxis_title='p-value') | |
| st.plotly_chart(fig5) | |
| st.markdown("#### Interpretation of Granger Causality Test Results") | |
| best_lag_1_to_2 = min(granger_p_values_shift_1_to_2, key=granger_p_values_shift_1_to_2.get) | |
| best_lag_2_to_1 = min(granger_p_values_shift_2_to_1, key=granger_p_values_shift_2_to_1.get) | |
| interpretation = "" | |
| if granger_p_values_shift_1_to_2[best_lag_1_to_2] < 0.05 and granger_p_values_shift_1_to_2[best_lag_1_to_2] < granger_p_values_shift_2_to_1[best_lag_2_to_1]: | |
| causality_direction = f"{ticker1} causes {ticker2}" | |
| best_lag = best_lag_1_to_2 | |
| interpretation += f"Granger causality test with shifts suggests that {ticker1} causes {ticker2} with a lag of {abs(best_lag)} days.\n" | |
| interpretation += f"This means that movements in {ticker1} tend to lead movements in {ticker2} by {abs(best_lag)} days. In practical terms, if {ticker1} experiences a price change, we can expect a similar change in {ticker2} approximately {abs(best_lag)} days later." | |
| else: | |
| causality_direction = f"{ticker2} causes {ticker1}" | |
| best_lag = best_lag_2_to_1 | |
| interpretation += f"Granger causality test with shifts suggests that {ticker2} causes {ticker1} with a lag of {abs(best_lag)} days.\n" | |
| interpretation += f"This means that movements in {ticker2} tend to lead movements in {ticker1} by {abs(best_lag)} days. In practical terms, if {ticker2} experiences a price change, we can expect a similar change in {ticker1} approximately {abs(best_lag)} days later." | |
| st.write(interpretation) | |
| # Adjust data based on the identified best lag | |
| adjusted_data = aligned_data.copy() | |
| adjusted_data[ticker1] = adjusted_data[ticker1].shift(best_lag).dropna() | |
| adjusted_data = adjusted_data.dropna() | |
| # Calculate the residuals | |
| model = OLS(adjusted_data[ticker2], adjusted_data[ticker1]) | |
| results = model.fit() | |
| residuals = adjusted_data[ticker2] - results.params[ticker1] * adjusted_data[ticker1] | |
| # Calculate Z-Scores | |
| residuals_mean = residuals.mean() | |
| residuals_std = residuals.std() | |
| z_scores = (residuals - residuals_mean) / residuals_std | |
| # Generate buy and sell signals | |
| buy_signals = z_scores[z_scores > buy_threshold] | |
| sell_signals = z_scores[z_scores < sell_threshold] | |
| # Plot the residuals with buy and sell signals | |
| fig6 = go.Figure() | |
| fig6.add_trace(go.Scatter(x=z_scores.index, y=z_scores, mode='lines', name='Z-Score of Residuals')) | |
| fig6.add_trace(go.Scatter(x=buy_signals.index, y=buy_signals, mode='markers', marker=dict(color='green', symbol='triangle-up', size=10), name=f'Buy {ticker1}, Sell {ticker2} Signal')) | |
| fig6.add_trace(go.Scatter(x=sell_signals.index, y=sell_signals, mode='markers', marker=dict(color='red', symbol='triangle-down', size=10), name=f'Sell {ticker1}, Buy {ticker2} Signal')) | |
| fig6.add_hline(y=buy_threshold, line=dict(color='gray', dash='dash')) | |
| fig6.add_hline(y=sell_threshold, line=dict(color='gray', dash='dash')) | |
| fig6.update_layout(title=f"Residuals (Adjusted for Lag) with Buy and Sell Signals based on Z-Scores", xaxis_title='Date', yaxis_title='Z-Score') | |
| st.plotly_chart(fig6) | |
| # Calculate cumulative profits and positions | |
| cumulative_profits, positions = calculate_cumulative_profit(aligned_data, z_scores, buy_threshold, sell_threshold) | |
| # Plot the cumulative profit | |
| fig7 = go.Figure() | |
| fig7.add_trace(go.Scatter(x=aligned_data.index[:len(cumulative_profits)], y=cumulative_profits, mode='lines', name='Cumulative Profit')) | |
| fig7.update_layout(title=f"Cumulative Profit from Z-Score Trading Strategy", xaxis_title='Date', yaxis_title='Cumulative Profit') | |
| st.plotly_chart(fig7) | |
| st.markdown("#### Interpretation of Trading Signals and Cumulative Profit") | |
| st.write(f"Cumulative Profit: {cumulative_profits[-1]:.2f}") | |
| st.write(""" | |
| - The trading strategy uses z-scores to generate buy and sell signals. | |
| - The cumulative profit shows the total profit from the trading strategy over the analyzed period. | |
| """) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}. Check ticker symbols or date range.") | |
| elif page == 'Pair Cointegration Identification': | |
| st.subheader("Cointegration Identification") | |
| st.write(""" | |
| ### Description | |
| This method identifies cointegrated pairs using two methods: Engle-Granger and Johansen Cointegration tests. | |
| It works for both stocks and cryptocurrency pairs. | |
| """) | |
| method = st.sidebar.selectbox('Select Cointegration Method', ['Engle-Granger', 'Johansen Cointegration']) | |
| with st.sidebar.expander("Stock/Crypto Ticker and Date Selection", expanded=True): | |
| tickers_input = st.text_input('Enter Stock or Crypto Tickers (comma-separated)', ', '.join(default_tickers), help="Enter the ticker symbols for stocks or cryptocurrencies you want to analyze.") | |
| start_date = st.date_input('Start Date', pd.to_datetime(START_DATE), help="Select the start date for the data range.") | |
| end_date = st.date_input('End Date', pd.to_datetime(END_DATE), help="Select the end date for the data range.") | |
| if st.sidebar.button('Run Cointegration Analysis'): | |
| try: | |
| tickers = [ticker.strip() for ticker in tickers_input.split(',')] | |
| universe_tickers_ts_map = {ticker: load_ticker_ts_df(ticker, start_date, end_date) for ticker in tickers} | |
| uts_sanitized = sanitize_data(universe_tickers_ts_map) | |
| if not uts_sanitized: | |
| raise ValueError("No valid data after sanitization. Check tickers or date range.") | |
| if method == 'Engle-Granger': | |
| pvalues, pairs = find_cointegrated_pairs(uts_sanitized, P_VALUE_THRESHOLD) | |
| masked_pvalues = np.where(pvalues > P_VALUE_THRESHOLD, np.nan, pvalues) | |
| tickers_list = list(uts_sanitized.keys()) | |
| fig_heatmap = px.imshow(masked_pvalues, x=tickers_list, y=tickers_list, | |
| color_continuous_scale='RdYlGn_r', title='Cointegration Heatmap (Engle-Granger)', | |
| labels=dict(x='Tickers', y='Tickers', color='P-value'), | |
| zmin=0, zmax=P_VALUE_THRESHOLD) | |
| else: | |
| pvalues, pairs = find_cointegrated_pairs_rolling(uts_sanitized, P_VALUE_THRESHOLD, ROLLING_WINDOW_SIZE, CONSISTENT_COINTEGRATION_THRESHOLD) | |
| masked_pvalues = np.where(pvalues > P_VALUE_THRESHOLD, np.nan, pvalues) | |
| tickers_list = list(uts_sanitized.keys()) | |
| fig_heatmap = px.imshow(masked_pvalues, x=tickers_list, y=tickers_list, | |
| color_continuous_scale='RdYlGn_r', title='Cointegration Heatmap (Johansen)', | |
| labels=dict(x='Tickers', y='Tickers', color='P-value'), | |
| zmin=0, zmax=P_VALUE_THRESHOLD) | |
| st.plotly_chart(fig_heatmap) | |
| top_10_pairs = sorted(pairs, key=lambda x: x[2])[:10] | |
| pair_labels = [f"{pair[0]} & {pair[1]}" for pair in top_10_pairs] | |
| pair_values = [pair[2] for pair in top_10_pairs] | |
| fig_bar = go.Figure([go.Bar(x=pair_values, y=pair_labels, orientation='h')]) | |
| fig_bar.update_layout(title='Top 10 Most Cointegrated Pairs', | |
| xaxis_title='P-value', | |
| yaxis_title='Asset Pairs', | |
| yaxis=dict(autorange='reversed')) | |
| st.plotly_chart(fig_bar) | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}. Check ticker symbols or date range.") | |
| with st.expander("How it Works", expanded=False): | |
| st.markdown(""" | |
| **Cointegration Overview:** | |
| - Cointegration is a statistical property of a collection of time series variables. Two or more series are cointegrated if they share a common stochastic drift. | |
| **Engle-Granger Method:** | |
| - This method involves estimating a long-term equilibrium relationship between two non-stationary series and testing whether the residuals from this relationship are stationary. | |
| **Johansen Method:** | |
| - The Johansen test is a more general procedure that allows for more than two series and can identify multiple cointegrating relationships. | |
| """) | |
| # Hide the default Streamlit menu and footer | |
| hide_streamlit_style = """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """ | |
| st.markdown(hide_streamlit_style, unsafe_allow_html=True) |