Space13 / app.py
QuantumLearner's picture
Update app.py
65e4223 verified
import warnings
warnings.filterwarnings('ignore') # Ignore warnings for cleaner output
import yfinance as yf
import pandas as pd
import numpy as np
import streamlit as st
import plotly.graph_objects as go
import seaborn as sns
from itertools import combinations
from statsmodels.tsa.api import VECM
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.tsa.stattools import grangercausalitytests, coint
from statsmodels.regression.linear_model import OLS
import plotly.express as px
# Parameters
START_DATE = '2021-01-01'
END_DATE = pd.to_datetime('today') + pd.Timedelta(days=1)
P_VALUE_THRESHOLD = 0.05
ROLLING_WINDOW_SIZE = 252
CONSISTENT_COINTEGRATION_THRESHOLD = 0.8
# List of tickers for cryptocurrencies, bank stocks, and global indexes
default_tickers = ['BTC-USD', 'ETH-USD', 'BNB-USD', 'JPM', 'BAC', 'WFC', 'C']
# Function to load adjusted close price data for a given ticker
def load_ticker_ts_df(ticker, start, end):
data = yf.download(ticker, start=start, end=end, auto_adjust=False) # Unadjusted prices
if isinstance(data.columns, pd.MultiIndex): # Flatten multi-index
data.columns = data.columns.get_level_values(0)
if data.empty:
raise ValueError(f"No data found for {ticker}")
return data['Adj Close']
# Function to calculate cross-correlation at different lags
def cross_correlation(series1, series2, lag):
if lag > 0:
return np.corrcoef(series1[:-lag], series2[lag:])[0, 1]
elif lag < 0:
return np.corrcoef(series1[-lag:], series2[:lag])[0, 1]
else:
return np.corrcoef(series1, series2)[0, 1]
# Function to perform Granger causality test with shifted time series
def granger_test_with_shift(data, target, predictor, shift):
shifted_data = data.copy()
shifted_data[predictor] = data[predictor].shift(shift)
shifted_data.dropna(inplace=True)
granger_test_result = grangercausalitytests(shifted_data[[target, predictor]], maxlag=1, verbose=False)
p_value = granger_test_result[1][0]['ssr_ftest'][1]
return p_value
# Function to calculate cumulative profit
def calculate_cumulative_profit(aligned_data, z_scores, buy_threshold, sell_threshold):
positions = []
profit = 0
cumulative_profits = []
position_open = False
for i in range(len(z_scores)):
date = z_scores.index[i]
z = z_scores.iloc[i]
if z > buy_threshold and not position_open:
entry_date = date
entry_price1 = aligned_data.loc[date, ticker1]
entry_price2 = aligned_data.loc[date, ticker2]
position_open = True
positions.append((entry_date, 'sell', ticker2, entry_price2, 'buy', ticker1, entry_price1))
elif z < sell_threshold and not position_open:
entry_date = date
entry_price1 = aligned_data.loc[date, ticker1]
entry_price2 = aligned_data.loc[date, ticker2]
position_open = True
positions.append((entry_date, 'buy', ticker2, entry_price2, 'sell', ticker1, entry_price1))
elif position_open and abs(z) < 0.5:
exit_date = date
exit_price1 = aligned_data.loc[date, ticker1]
exit_price2 = aligned_data.loc[date, ticker2]
position_open = False
entry = positions.pop()
entry_date, action1, tickerA, entry_priceA, action2, tickerB, entry_priceB = entry
if action1 == 'sell':
profit += (entry_priceA - exit_price2) + (exit_price1 - entry_priceB)
else:
profit += (exit_price2 - entry_priceA) + (entry_priceB - exit_price1)
cumulative_profits.append(profit)
return cumulative_profits, positions
# Function to sanitize the data
def sanitize_data(data_map):
TS_DAYS_LENGTH = (pd.to_datetime(END_DATE) - pd.to_datetime(START_DATE)).days
data_sanitized = {}
date_range = pd.date_range(start=START_DATE, end=END_DATE, freq='D')
for ticker, data in data_map.items():
if data is None or len(data) < (TS_DAYS_LENGTH / 2):
continue
if len(data) > TS_DAYS_LENGTH:
data = data[-TS_DAYS_LENGTH:]
data = data.reindex(date_range)
data.replace([np.inf, -np.inf], np.nan, inplace=True)
data.interpolate(method='linear', inplace=True)
data.fillna(method='pad', inplace=True)
data.fillna(method='bfill', inplace=True)
assert not np.any(np.isnan(data)) and not np.any(np.isinf(data))
data_sanitized[ticker] = data
return data_sanitized
# Function to find cointegrated pairs using Engle-Granger method
def find_cointegrated_pairs(tickers_ts_map, p_value_threshold):
tickers = list(tickers_ts_map.keys())
n = len(tickers)
adj_close_data = np.column_stack([tickers_ts_map[ticker].values for ticker in tickers])
pvalue_matrix = np.ones((n, n))
for i, j in combinations(range(n), 2):
result = coint(adj_close_data[:, i], adj_close_data[:, j])
pvalue_matrix[i, j] = result[1]
pvalue_matrix[j, i] = result[1]
np.fill_diagonal(pvalue_matrix, 0)
pairs = [(tickers[i], tickers[j], pvalue_matrix[i, j]) for i in range(n) for j in range(i+1, n) if pvalue_matrix[i, j] < p_value_threshold]
return pvalue_matrix, pairs
# Function to perform Johansen test
def johansen_test(data, det_order=0, k_ar_diff=1):
result = coint_johansen(data, det_order, k_ar_diff)
return result.lr1, result.cvt
# Find cointegrated pairs using Johansen method with rolling windows
def find_cointegrated_pairs_rolling(tickers_ts_map, p_value_threshold, window_size, consistency_threshold):
tickers = list(tickers_ts_map.keys())
n = len(tickers)
pvalue_matrix = np.ones((n, n))
consistent_pairs = []
for i, j in combinations(range(n), 2):
pvalues = []
for start in range(len(tickers_ts_map[tickers[i]]) - window_size + 1):
end = start + window_size
window_data_i = tickers_ts_map[tickers[i]].iloc[start:end]
window_data_j = tickers_ts_map[tickers[j]].iloc[start:end]
window_data = pd.concat([window_data_i, window_data_j], axis=1).dropna()
if window_data.shape[0] < window_size:
continue
test_stat, crit_values = johansen_test(window_data)
if test_stat[0] > crit_values[1, 1]: # Using 95% critical value
pvalues.append(0.01)
else:
pvalues.append(1)
pvalues = np.array(pvalues)
consistent_cointegration = np.mean(pvalues < p_value_threshold)
if consistent_cointegration >= consistency_threshold:
consistent_pairs.append((tickers[i], tickers[j], np.mean(pvalues)))
pvalue_matrix[i, j] = np.mean(pvalues)
pvalue_matrix[j, i] = np.mean(pvalues)
else:
pvalue_matrix[i, j] = 1
pvalue_matrix[j, i] = 1
return pvalue_matrix, consistent_pairs
# Streamlit app
st.set_page_config(layout="wide")
st.title('Pairs Cointegration and Trading Analysis')
st.sidebar.header('Select Page')
page = st.sidebar.radio('Page:', ['Pairs Trading Analysis', 'Pair Cointegration Identification'])
if page == 'Pairs Trading Analysis':
st.subheader("Pairs Trading Analysis")
st.write("""
### Description
This method analyzes stock and cryptocurrency prices, normalizes them, calculates rolling volatilities, tests for cointegration, and visualizes buy/sell signals based on z-scores.
""")
with st.sidebar.expander("How to use:", expanded=False):
st.markdown("""**How to use:**
1. Enter the stock tickers, start date, and end date.
2. Set the number of days for the volatility window, and the buy/sell thresholds for the z-scores.
3. Click 'Run Analysis' to start the analysis.
""")
with st.sidebar.expander("Stock/Crypto Ticker and Date Selection", expanded=True):
ticker1 = st.text_input('Enter First Stock/Crypto Ticker', 'ASML.AS', help="Enter the ticker symbol for the first stock or cryptocurrency.")
ticker2 = st.text_input('Enter Second Stock/Crypto Ticker', 'ASML', help="Enter the ticker symbol for the second stock or cryptocurrency.")
start_date = st.date_input('Start Date', pd.to_datetime('2022-01-01'), help="Select the start date for the data range.")
end_date = st.date_input('End Date', pd.to_datetime(END_DATE), help="Select the end date for the data range.")
with st.sidebar.expander("Method Parameters", expanded=True):
volatility_window = st.number_input('Volatility Window (days)', min_value=1, max_value=365, value=30, help="Set the number of days for the rolling volatility window.")
buy_threshold = st.number_input('Buy Z-Score Threshold', value=2.0, help="Set the z-score threshold to generate buy signals.")
sell_threshold = st.number_input('Sell Z-Score Threshold', value=-2.0, help="Set the z-score threshold to generate sell signals.")
if st.sidebar.button('Run Analysis'):
try:
# Data collection
data1 = yf.download(ticker1, start=start_date, end=end_date, auto_adjust=False)
if isinstance(data1.columns, pd.MultiIndex):
data1.columns = data1.columns.get_level_values(0)
data2 = yf.download(ticker2, start=start_date, end=end_date, auto_adjust=False)
if isinstance(data2.columns, pd.MultiIndex):
data2.columns = data2.columns.get_level_values(0)
if data1.empty or data2.empty:
raise ValueError(f"No data found for {ticker1} or {ticker2}")
aligned_data = pd.concat([data1['Close'], data2['Close']], axis=1, join='inner')
aligned_data.columns = [ticker1, ticker2]
# Normalize the price series
normalized_data = (aligned_data - aligned_data.mean()) / aligned_data.std()
# Plot normalized data
fig1 = go.Figure()
fig1.add_trace(go.Scatter(x=normalized_data.index, y=normalized_data[ticker1], mode='lines', name=f'Normalized {ticker1}'))
fig1.add_trace(go.Scatter(x=normalized_data.index, y=normalized_data[ticker2], mode='lines', name=f'Normalized {ticker2}'))
fig1.update_layout(title=f'Normalized Price Series for {ticker1} and {ticker2}', xaxis_title='Date', yaxis_title='Normalized Price')
st.plotly_chart(fig1)
# Calculate daily returns
returns = aligned_data.pct_change().dropna()
# Calculate rolling volatilities (annualized)
volatility1 = returns[ticker1].rolling(volatility_window).std() * np.sqrt(252)
volatility2 = returns[ticker2].rolling(volatility_window).std() * np.sqrt(252)
# Plot rolling volatilities
fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=volatility1.index, y=volatility1, mode='lines', name=f"{ticker1} Volatility"))
fig2.add_trace(go.Scatter(x=volatility2.index, y=volatility2, mode='lines', name=f"{ticker2} Volatility"))
fig2.update_layout(title=f"{volatility_window}-Day Rolling Historical Volatility for {ticker1} and {ticker2}", xaxis_title='Date', yaxis_title='Volatility')
st.plotly_chart(fig2)
# Check for stationarity using ADF test
adf_result1 = adfuller(aligned_data[ticker1])
adf_result2 = adfuller(aligned_data[ticker2])
# Perform Johansen cointegration test
coint_test_stat, coint_critical_values = johansen_test(aligned_data)
# If cointegration exists, proceed with VECM
vecm = VECM(aligned_data, k_ar_diff=1, coint_rank=1)
vecm_fit = vecm.fit()
# Analyzing the residuals for stationarity
residuals = vecm_fit.resid
residuals_df = pd.DataFrame(residuals, index=aligned_data.index[-len(residuals):], columns=[f'Residual_{ticker1}', f'Residual_{ticker2}'])
adf_residuals_1 = adfuller(residuals[:, 0])
adf_residuals_2 = adfuller(residuals[:, 1])
# Plot residuals from VECM
fig3 = go.Figure()
fig3.add_trace(go.Scatter(x=residuals_df.index, y=residuals_df[f'Residual_{ticker1}'], mode='lines', name=f'Residual {ticker1}'))
fig3.add_trace(go.Scatter(x=residuals_df.index, y=residuals_df[f'Residual_{ticker2}'], mode='lines', name=f'Residual {ticker2}'))
fig3.add_hline(y=0, line=dict(color='red', dash='dash'), name='Zero Line')
fig3.update_layout(title='Residuals from VECM', xaxis_title='Date', yaxis_title='Residuals')
st.plotly_chart(fig3)
# Display ADF test results for the tickers
st.write(f"ADF Statistic for {ticker1}: {adf_result1[0]}, p-value: {adf_result1[1]}")
st.write(f"ADF Statistic for {ticker2}: {adf_result2[0]}, p-value: {adf_result2[1]}")
with st.expander("How it Works", expanded=False):
st.markdown("""
**ADF Test:**
- The Augmented Dickey-Fuller (ADF) test checks whether a time series has a unit root, i.e., whether it is non-stationary.
- If the p-value is less than 0.05, we reject the null hypothesis that the series has a unit root, indicating that the series is stationary.
**Johansen Cointegration Test:**
- The Johansen test is used to determine the number of cointegrating relationships among multiple time series.
- If the test statistic is greater than the critical value, we reject the null hypothesis that there is no cointegration.
**VECM (Vector Error Correction Model):**
- A VECM is a special form of a VAR (Vector Autoregression) model used for cointegrated series. It corrects for disequilibrium in the short run while keeping the long-term relationship intact.
**Z-Score Trading Strategy:**
- Z-scores measure how many standard deviations an element is from the mean. In pairs trading, z-scores are used to identify overbought or oversold conditions, triggering buy or sell signals.
""")
st.markdown("#### Interpretation of ADF Results")
st.latex(r'''
H_0: \text{The series has a unit root (non-stationary)} \\
H_1: \text{The series does not have a unit root (stationary)}
''')
st.write("""
- The Augmented Dickey-Fuller (ADF) test checks the null hypothesis that a unit root is present in a time series sample.
""")
if adf_result1[1] < 0.05:
st.write(f"{ticker1} is stationary, indicating the series does not have a unit root.")
else:
st.write(f"{ticker1} is not stationary, indicating the series has a unit root.")
if adf_result2[1] < 0.05:
st.write(f"{ticker2} is stationary, indicating the series does not have a unit root.")
else:
st.write(f"{ticker2} is not stationary, indicating the series has a unit root.")
# Display cointegration test results
st.write("Johansen Cointegration Test Results:")
johansen_results = pd.DataFrame({
'Test Statistic': coint_test_stat,
'90% Critical Value': coint_critical_values[:, 0],
'95% Critical Value': coint_critical_values[:, 1],
'99% Critical Value': coint_critical_values[:, 2]
}, index=[f'Cointegration Test {i+1}' for i in range(len(coint_test_stat))])
st.write(johansen_results)
st.markdown("#### Interpretation of Johansen Cointegration Test Results")
st.latex(r'''
H_0: \text{No cointegration relationship exists} \\
H_1: \text{Cointegration relationship exists}
''')
st.write("""
- The Johansen cointegration test is used to determine the cointegration rank between multiple time series.
""")
if coint_test_stat[0] > coint_critical_values[0, 1]:
st.write(f"The two assets {ticker1} and {ticker2} are cointegrated at the 95% confidence level.")
else:
st.write(f"The two assets {ticker1} and {ticker2} are not cointegrated at the 95% confidence level.")
st.markdown("#### Interpretation of VECM Residuals")
st.write(f"ADF Statistic for VECM residuals of {ticker1}: {adf_residuals_1[0]}, p-value: {adf_residuals_1[1]}")
st.write(f"ADF Statistic for VECM residuals of {ticker2}: {adf_residuals_2[0]}, p-value: {adf_residuals_2[1]}")
st.write("""
- The residuals from the Vector Error Correction Model (VECM) should be stationary to confirm cointegration.
""")
if adf_residuals_1[1] < 0.1:
st.write(f"The residuals of the VECM model for {ticker1} are stationary, confirming cointegration.")
else:
st.write(f"The residuals of the VECM model for {ticker1} are not stationary, suggesting no cointegration.")
if adf_residuals_2[1] < 0.1:
st.write(f"The residuals of the VECM model for {ticker2} are stationary, confirming cointegration.")
else:
st.write(f"The residuals of the VECM model for {ticker2} are not stationary, suggesting no cointegration.")
# Calculate cross-correlation for a range of lags
lag_range = range(-30, 31)
cross_correlations = [cross_correlation(returns[ticker1], returns[ticker2], lag) for lag in lag_range]
# Plot cross-correlation for different lags
fig4 = go.Figure()
fig4.add_trace(go.Scatter(x=list(lag_range), y=cross_correlations, mode='lines+markers'))
fig4.add_hline(y=0, line=dict(color='gray', dash='dash'))
fig4.add_vline(x=0, line=dict(color='red', dash='dash'))
fig4.update_layout(title=f"Cross-Correlation between {ticker1} and {ticker2}", xaxis_title='Lag (days)', yaxis_title='Cross-Correlation')
st.plotly_chart(fig4)
st.markdown("#### Interpretation of Cross-Correlation Results")
max_corr = max(cross_correlations)
max_lag = lag_range[cross_correlations.index(max_corr)]
second_max_corr = max(corr for i, corr in enumerate(cross_correlations) if corr != max_corr)
second_max_lag = lag_range[cross_correlations.index(second_max_corr)]
st.write(f"Highest correlation: {max_corr:.2f} at lag {max_lag}")
st.write(f"Second highest correlation: {second_max_corr:.2f} at lag {second_max_lag}")
interpretation = f"Highest correlation at lag {max_lag}: The high correlation at lag {max_lag} indicates that {ticker1} and {ticker2} move together without any significant lead or lag. In other words, any movements in {ticker1} are almost instantaneously reflected in {ticker2} and vice versa. This is typical for cross-listed assets, where information and price changes are quickly reflected in both markets.\n"
if second_max_lag < 0:
leading_ticker = ticker2
lagging_ticker = ticker1
lead_days = abs(second_max_lag)
direction = f"Second highest correlation at lag {second_max_lag}: {leading_ticker} leads {lagging_ticker} by {lead_days} days. This means that movements in {leading_ticker} tend to precede similar movements in {lagging_ticker} by {lead_days} day(s)."
elif second_max_lag > 0:
leading_ticker = ticker1
lagging_ticker = ticker2
lead_days = second_max_lag
direction = f"Second highest correlation at lag {second_max_lag}: {leading_ticker} leads {lagging_ticker} by {lead_days} days. This means that movements in {leading_ticker} tend to precede similar movements in {lagging_ticker} by {lead_days} day(s)."
else:
direction = "No significant lead/lag relationship; they move simultaneously."
interpretation += direction
st.write(interpretation)
# Granger causality test with shifts
shift_range = range(-5, 6)
granger_p_values_shift_1_to_2 = {shift: granger_test_with_shift(aligned_data, ticker1, ticker2, shift) for shift in shift_range}
granger_p_values_shift_2_to_1 = {shift: granger_test_with_shift(aligned_data, ticker2, ticker1, shift) for shift in shift_range}
# Create DataFrames for plotting Granger causality test results
granger_p_values_df_shift_1_to_2 = pd.DataFrame(granger_p_values_shift_1_to_2, index=[f"{ticker1} causes {ticker2}"]).T
granger_p_values_df_shift_2_to_1 = pd.DataFrame(granger_p_values_shift_2_to_1, index=[f"{ticker2} causes {ticker1}"]).T
# Plot Granger causality test p-values with shifts
fig5 = go.Figure()
fig5.add_trace(go.Scatter(x=granger_p_values_df_shift_1_to_2.index, y=granger_p_values_df_shift_1_to_2[f"{ticker1} causes {ticker2}"], mode='lines+markers', name=f"{ticker1} causes {ticker2}"))
fig5.add_trace(go.Scatter(x=granger_p_values_df_shift_2_to_1.index, y=granger_p_values_df_shift_2_to_1[f"{ticker2} causes {ticker1}"], mode='lines+markers', name=f"{ticker2} causes {ticker1}"))
fig5.add_hline(y=0.05, line=dict(color='gray', dash='dash'))
fig5.add_vline(x=0, line=dict(color='red', dash='dash'))
fig5.update_layout(title=f"Granger Causality Test p-values with Shifts between {ticker1} and {ticker2}", xaxis_title='Shift (days)', yaxis_title='p-value')
st.plotly_chart(fig5)
st.markdown("#### Interpretation of Granger Causality Test Results")
best_lag_1_to_2 = min(granger_p_values_shift_1_to_2, key=granger_p_values_shift_1_to_2.get)
best_lag_2_to_1 = min(granger_p_values_shift_2_to_1, key=granger_p_values_shift_2_to_1.get)
interpretation = ""
if granger_p_values_shift_1_to_2[best_lag_1_to_2] < 0.05 and granger_p_values_shift_1_to_2[best_lag_1_to_2] < granger_p_values_shift_2_to_1[best_lag_2_to_1]:
causality_direction = f"{ticker1} causes {ticker2}"
best_lag = best_lag_1_to_2
interpretation += f"Granger causality test with shifts suggests that {ticker1} causes {ticker2} with a lag of {abs(best_lag)} days.\n"
interpretation += f"This means that movements in {ticker1} tend to lead movements in {ticker2} by {abs(best_lag)} days. In practical terms, if {ticker1} experiences a price change, we can expect a similar change in {ticker2} approximately {abs(best_lag)} days later."
else:
causality_direction = f"{ticker2} causes {ticker1}"
best_lag = best_lag_2_to_1
interpretation += f"Granger causality test with shifts suggests that {ticker2} causes {ticker1} with a lag of {abs(best_lag)} days.\n"
interpretation += f"This means that movements in {ticker2} tend to lead movements in {ticker1} by {abs(best_lag)} days. In practical terms, if {ticker2} experiences a price change, we can expect a similar change in {ticker1} approximately {abs(best_lag)} days later."
st.write(interpretation)
# Adjust data based on the identified best lag
adjusted_data = aligned_data.copy()
adjusted_data[ticker1] = adjusted_data[ticker1].shift(best_lag).dropna()
adjusted_data = adjusted_data.dropna()
# Calculate the residuals
model = OLS(adjusted_data[ticker2], adjusted_data[ticker1])
results = model.fit()
residuals = adjusted_data[ticker2] - results.params[ticker1] * adjusted_data[ticker1]
# Calculate Z-Scores
residuals_mean = residuals.mean()
residuals_std = residuals.std()
z_scores = (residuals - residuals_mean) / residuals_std
# Generate buy and sell signals
buy_signals = z_scores[z_scores > buy_threshold]
sell_signals = z_scores[z_scores < sell_threshold]
# Plot the residuals with buy and sell signals
fig6 = go.Figure()
fig6.add_trace(go.Scatter(x=z_scores.index, y=z_scores, mode='lines', name='Z-Score of Residuals'))
fig6.add_trace(go.Scatter(x=buy_signals.index, y=buy_signals, mode='markers', marker=dict(color='green', symbol='triangle-up', size=10), name=f'Buy {ticker1}, Sell {ticker2} Signal'))
fig6.add_trace(go.Scatter(x=sell_signals.index, y=sell_signals, mode='markers', marker=dict(color='red', symbol='triangle-down', size=10), name=f'Sell {ticker1}, Buy {ticker2} Signal'))
fig6.add_hline(y=buy_threshold, line=dict(color='gray', dash='dash'))
fig6.add_hline(y=sell_threshold, line=dict(color='gray', dash='dash'))
fig6.update_layout(title=f"Residuals (Adjusted for Lag) with Buy and Sell Signals based on Z-Scores", xaxis_title='Date', yaxis_title='Z-Score')
st.plotly_chart(fig6)
# Calculate cumulative profits and positions
cumulative_profits, positions = calculate_cumulative_profit(aligned_data, z_scores, buy_threshold, sell_threshold)
# Plot the cumulative profit
fig7 = go.Figure()
fig7.add_trace(go.Scatter(x=aligned_data.index[:len(cumulative_profits)], y=cumulative_profits, mode='lines', name='Cumulative Profit'))
fig7.update_layout(title=f"Cumulative Profit from Z-Score Trading Strategy", xaxis_title='Date', yaxis_title='Cumulative Profit')
st.plotly_chart(fig7)
st.markdown("#### Interpretation of Trading Signals and Cumulative Profit")
st.write(f"Cumulative Profit: {cumulative_profits[-1]:.2f}")
st.write("""
- The trading strategy uses z-scores to generate buy and sell signals.
- The cumulative profit shows the total profit from the trading strategy over the analyzed period.
""")
except Exception as e:
st.error(f"Error: {str(e)}. Check ticker symbols or date range.")
elif page == 'Pair Cointegration Identification':
st.subheader("Cointegration Identification")
st.write("""
### Description
This method identifies cointegrated pairs using two methods: Engle-Granger and Johansen Cointegration tests.
It works for both stocks and cryptocurrency pairs.
""")
method = st.sidebar.selectbox('Select Cointegration Method', ['Engle-Granger', 'Johansen Cointegration'])
with st.sidebar.expander("Stock/Crypto Ticker and Date Selection", expanded=True):
tickers_input = st.text_input('Enter Stock or Crypto Tickers (comma-separated)', ', '.join(default_tickers), help="Enter the ticker symbols for stocks or cryptocurrencies you want to analyze.")
start_date = st.date_input('Start Date', pd.to_datetime(START_DATE), help="Select the start date for the data range.")
end_date = st.date_input('End Date', pd.to_datetime(END_DATE), help="Select the end date for the data range.")
if st.sidebar.button('Run Cointegration Analysis'):
try:
tickers = [ticker.strip() for ticker in tickers_input.split(',')]
universe_tickers_ts_map = {ticker: load_ticker_ts_df(ticker, start_date, end_date) for ticker in tickers}
uts_sanitized = sanitize_data(universe_tickers_ts_map)
if not uts_sanitized:
raise ValueError("No valid data after sanitization. Check tickers or date range.")
if method == 'Engle-Granger':
pvalues, pairs = find_cointegrated_pairs(uts_sanitized, P_VALUE_THRESHOLD)
masked_pvalues = np.where(pvalues > P_VALUE_THRESHOLD, np.nan, pvalues)
tickers_list = list(uts_sanitized.keys())
fig_heatmap = px.imshow(masked_pvalues, x=tickers_list, y=tickers_list,
color_continuous_scale='RdYlGn_r', title='Cointegration Heatmap (Engle-Granger)',
labels=dict(x='Tickers', y='Tickers', color='P-value'),
zmin=0, zmax=P_VALUE_THRESHOLD)
else:
pvalues, pairs = find_cointegrated_pairs_rolling(uts_sanitized, P_VALUE_THRESHOLD, ROLLING_WINDOW_SIZE, CONSISTENT_COINTEGRATION_THRESHOLD)
masked_pvalues = np.where(pvalues > P_VALUE_THRESHOLD, np.nan, pvalues)
tickers_list = list(uts_sanitized.keys())
fig_heatmap = px.imshow(masked_pvalues, x=tickers_list, y=tickers_list,
color_continuous_scale='RdYlGn_r', title='Cointegration Heatmap (Johansen)',
labels=dict(x='Tickers', y='Tickers', color='P-value'),
zmin=0, zmax=P_VALUE_THRESHOLD)
st.plotly_chart(fig_heatmap)
top_10_pairs = sorted(pairs, key=lambda x: x[2])[:10]
pair_labels = [f"{pair[0]} & {pair[1]}" for pair in top_10_pairs]
pair_values = [pair[2] for pair in top_10_pairs]
fig_bar = go.Figure([go.Bar(x=pair_values, y=pair_labels, orientation='h')])
fig_bar.update_layout(title='Top 10 Most Cointegrated Pairs',
xaxis_title='P-value',
yaxis_title='Asset Pairs',
yaxis=dict(autorange='reversed'))
st.plotly_chart(fig_bar)
except Exception as e:
st.error(f"Error: {str(e)}. Check ticker symbols or date range.")
with st.expander("How it Works", expanded=False):
st.markdown("""
**Cointegration Overview:**
- Cointegration is a statistical property of a collection of time series variables. Two or more series are cointegrated if they share a common stochastic drift.
**Engle-Granger Method:**
- This method involves estimating a long-term equilibrium relationship between two non-stationary series and testing whether the residuals from this relationship are stationary.
**Johansen Method:**
- The Johansen test is a more general procedure that allows for more than two series and can identify multiple cointegrating relationships.
""")
# Hide the default Streamlit menu and footer
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)