|
|
import pickle |
|
|
import matplotlib.pyplot as plt |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.preprocessing import MinMaxScaler |
|
|
from sklearn.impute import SimpleImputer |
|
|
from scipy import stats |
|
|
import yfinance as yf |
|
|
import gradio as gr |
|
|
import plotly.graph_objects as go |
|
|
|
|
|
|
|
|
with open('model_n1d_cat.pkl','rb') as f: |
|
|
model_n1d_cat = pickle.load(f) |
|
|
with open('model_n4h_cat.pkl','rb') as f: |
|
|
model_n4h_cat = pickle.load(f) |
|
|
|
|
|
|
|
|
def fetch_yfinance_data(pair: str, period: str, interval: str) -> pd.DataFrame: |
|
|
""" |
|
|
pair: e.g. "BCH/USDT" |
|
|
period: e.g. "100d" |
|
|
interval: e.g. "1d", "1h", "4h" |
|
|
""" |
|
|
ticker = pair.replace("/USDT", "-USD") |
|
|
df = yf.download(ticker, period=period, interval=interval) |
|
|
df.columns = df.columns.get_level_values(0) |
|
|
|
|
|
df = df.reset_index() |
|
|
df.rename(columns={df.columns[0]: 'timestamp'}, inplace=True) |
|
|
|
|
|
|
|
|
df.rename(columns={ |
|
|
'Open': 'open', |
|
|
'High': 'high', |
|
|
'Low': 'low', |
|
|
'Close': 'close', |
|
|
'Volume': 'volume' |
|
|
}, inplace=True) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
class RollingWindowNormalizer: |
|
|
def __init__(self, window=24): |
|
|
self.window = window |
|
|
self.stats_ = {} |
|
|
def fit(self, X, columns): |
|
|
for column in columns: |
|
|
rolling_mean = X[column].rolling(window=self.window).mean() |
|
|
rolling_std = X[column].rolling(window=self.window).std() |
|
|
self.stats_[column] = {'rolling_mean': rolling_mean, 'rolling_std': rolling_std} |
|
|
return self |
|
|
def transform(self, X, columns): |
|
|
for column in columns: |
|
|
rolling_mean = self.stats_[column]['rolling_mean'] |
|
|
rolling_std = self.stats_[column]['rolling_std'] |
|
|
X[column] = (X[column] - rolling_mean) / rolling_std |
|
|
X.dropna(inplace=True) |
|
|
return X |
|
|
def fit_transform(self, X, columns): |
|
|
return self.fit(X, columns).transform(X, columns) |
|
|
|
|
|
def normalize(X, columns=['open','high','low','close']): |
|
|
X_copy = X.copy() |
|
|
Rm = RollingWindowNormalizer() |
|
|
Rm.fit(X_copy, columns) |
|
|
Y = Rm.transform(X_copy, columns) |
|
|
return Y, Rm |
|
|
|
|
|
def remove_outliers(df, epsilon): |
|
|
|
|
|
z = stats.zscore(df['low']) |
|
|
|
|
|
mask = np.abs(z) < epsilon |
|
|
|
|
|
return df.loc[mask].reset_index(drop=True) |
|
|
|
|
|
|
|
|
def calculate_rsi(series, window=14): |
|
|
|
|
|
delta = series.diff() |
|
|
|
|
|
|
|
|
gain = delta.clip(lower=0) |
|
|
loss = -delta.clip(upper=0) |
|
|
|
|
|
|
|
|
avg_gain = gain.rolling(window=window).mean() |
|
|
avg_loss = loss.rolling(window=window).mean() |
|
|
|
|
|
|
|
|
rs = avg_gain / avg_loss |
|
|
rsi = 100 - (100 / (1 + rs)) |
|
|
|
|
|
return rsi |
|
|
|
|
|
|
|
|
def generate_advanced_features(d, other_data=None): |
|
|
d = d.copy() |
|
|
d['ma_7'] = d['close'].rolling(window=7).mean() |
|
|
d['ma_21'] = d['close'].rolling(window=21).mean() |
|
|
d['rsi'] = calculate_rsi(d['close']) |
|
|
d['ma_ratio'] = d['ma_7'] / d['ma_21'] |
|
|
for k in ['close','high']: |
|
|
for i in range(1,5): |
|
|
d[f'lag_{k}{i}'] = d[k].shift(i) |
|
|
d['std_last_10'] = d['close'].rolling(window=10).std() |
|
|
|
|
|
if other_data is not None: |
|
|
other_data = other_data.loc[:, ~other_data.columns.duplicated()] |
|
|
d['relative_strength'] = d['close'] / other_data['close'] |
|
|
d['relative_strength_1'] = d['close'].shift(2) / other_data['close'].shift(2) |
|
|
return d.iloc[:,1:].values |
|
|
|
|
|
|
|
|
def create_features_and_labels_with_advanced_features(btc, eth): |
|
|
btc_copy = btc.copy() |
|
|
eth_copy = eth.copy() |
|
|
btc_features = generate_advanced_features(btc_copy, eth_copy) |
|
|
eth_features = generate_advanced_features(eth_copy, btc_copy) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
label = btc_copy[['timestamp','close']].shift(-1) |
|
|
features = np.vstack((btc_features, eth_features)) |
|
|
return features, label |
|
|
|
|
|
def get_data_predict( |
|
|
btc_ori: pd.DataFrame, |
|
|
bch_ori: pd.DataFrame, |
|
|
symbol: str = 'BCH/USDT', |
|
|
timeframe: str = '4h', |
|
|
epsilon: float = 2, |
|
|
normalized: bool = False, |
|
|
limit: int = 50 |
|
|
): |
|
|
period = f'{limit}d' |
|
|
btc_data_ = fetch_yfinance_data('BTC/USDT', period, timeframe) |
|
|
bch_data_ = fetch_yfinance_data(symbol, period, timeframe) |
|
|
|
|
|
btc_data_ = remove_outliers(btc_data_, epsilon) |
|
|
bch_data_ = remove_outliers(bch_data_, epsilon) |
|
|
|
|
|
|
|
|
if normalized: |
|
|
|
|
|
btc_all = pd.concat([btc_ori, btc_data_]).drop_duplicates('timestamp').reset_index(drop=True) |
|
|
bch_all = pd.concat([bch_ori, bch_data_]).drop_duplicates('timestamp').reset_index(drop=True) |
|
|
btc_data_, _ = normalize(btc_all) |
|
|
bch_data_, _ = normalize(bch_all) |
|
|
label = btc_data_.copy()[['timestamp','close']].shift(-1) |
|
|
return btc_data_, bch_data_, label |
|
|
|
|
|
return btc_data_, bch_data_, None |
|
|
|
|
|
|
|
|
def predictions(model, X1, X2, name, n_steps): |
|
|
features_, labels_ = create_features_and_labels_with_advanced_features(X1, X2) |
|
|
imputer = SimpleImputer(strategy='mean') |
|
|
features_imputed = imputer.fit_transform(features_) |
|
|
y = model.predict_proba(features_imputed)[:,1] |
|
|
if len(y) != len(labels_): |
|
|
y = y[:len(labels_)] |
|
|
return y, labels_ |
|
|
|
|
|
def plot(y, label, timeframe='1h', ma=5, n_steps=None): |
|
|
if n_steps is None: |
|
|
n_steps = len(y) |
|
|
plt.figure(figsize=(12,6)) |
|
|
if ma: |
|
|
df_plot = pd.DataFrame({'date': label['timestamp'].values[-n_steps:], 'prediction':5*(y[-n_steps:]-0.5), 'real': label['close'].values[-n_steps:]}) |
|
|
plt.plot(df_plot['date'], df_plot['prediction'].rolling(window=ma).mean(), marker='o', label='updown') |
|
|
plt.plot(df_plot['date'], df_plot['real'].rolling(window=ma).mean(), marker='o', label='real') |
|
|
plt.plot(df_plot['date'], (df_plot['real']-df_plot['prediction']).rolling(window=ma).mean(), marker='o', label='difference') |
|
|
else: |
|
|
plt.plot(label['timestamp'].values[-n_steps:], 5*(y[-n_steps:]-0.5), label='updown') |
|
|
plt.plot(label['timestamp'].values[-n_steps:], label['close'].values[-n_steps:], label='real') |
|
|
plt.axhline(0, linestyle='--') |
|
|
plt.title(f"BTC timeframe {timeframe}") |
|
|
plt.xlabel('Timestamp') |
|
|
plt.ylabel('Values') |
|
|
plt.legend() |
|
|
return plt.gcf() |
|
|
|
|
|
|
|
|
def predict_and_plot(timeframe, limit, epsilon, n_steps, ma): |
|
|
period = f'{limit}d' |
|
|
|
|
|
btc_data = fetch_yfinance_data('BTC/USDT', period, timeframe) |
|
|
bch_data = fetch_yfinance_data('BCH/USDT', period, timeframe) |
|
|
btc_data, _ = normalize(btc_data) |
|
|
bch_data, _ = normalize(bch_data) |
|
|
|
|
|
model = model_n1d_cat if timeframe=='1d' else model_n4h_cat |
|
|
preds, label = predictions(model, btc_data, bch_data, name=timeframe, n_steps=n_steps) |
|
|
fig = plot(preds, label = label, timeframe=timeframe, ma=ma, n_steps=n_steps) |
|
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
def make_interactive_fig(y, label, timeframe='1h', ma=5): |
|
|
line_width=2 |
|
|
n_steps = len(label) |
|
|
dates = label['timestamp'].iloc[-n_steps:] |
|
|
real = label['close'].iloc[-n_steps:] |
|
|
preds = 5 * (y[-n_steps:] - 0.5) |
|
|
|
|
|
|
|
|
|
|
|
real_ma = pd.Series(real.values).rolling(window=ma).mean() |
|
|
pred_ma = pd.Series(preds).rolling(window=ma).mean() |
|
|
diff_ma = (real_ma - pred_ma) |
|
|
|
|
|
fig = go.Figure() |
|
|
fig.add_trace(go.Scatter( |
|
|
x=dates, y=pred_ma, |
|
|
mode='lines', name='Predicted Δ', |
|
|
line=dict(width=line_width) |
|
|
)) |
|
|
fig.add_trace(go.Scatter( |
|
|
x=dates, y=real_ma, |
|
|
mode='lines', name='Real Close', |
|
|
line=dict(width=line_width) |
|
|
)) |
|
|
fig.add_trace(go.Scatter( |
|
|
x=dates, y=diff_ma, |
|
|
mode='lines', name='Difference', |
|
|
line=dict(width=line_width) |
|
|
)) |
|
|
|
|
|
fig.add_shape( |
|
|
type='line', x0=dates.min(), x1=dates.max(), |
|
|
y0=0, y1=0, line=dict(dash='dash', width=line_width) |
|
|
) |
|
|
fig.update_layout( |
|
|
title=f"BTC {timeframe} Forecast vs. Real", |
|
|
xaxis_title='Timestamp', |
|
|
yaxis_title='Value', |
|
|
hovermode='x unified' |
|
|
) |
|
|
return fig |
|
|
|
|
|
def predict_both_plots(limit, epsilon, ma): |
|
|
period = f'{limit}d' |
|
|
period_4h = f'{limit//5}d' |
|
|
n_steps = limit |
|
|
|
|
|
btc_1d = fetch_yfinance_data('BTC/USDT', period, '1d') |
|
|
bch_1d = fetch_yfinance_data('BCH/USDT', period, '1d') |
|
|
btc_1d, _ = normalize(btc_1d) |
|
|
bch_1d, _ = normalize(bch_1d) |
|
|
|
|
|
btc_4h = fetch_yfinance_data('BTC/USDT', period_4h, '4h') |
|
|
bch_4h = fetch_yfinance_data('BCH/USDT', period_4h, '4h') |
|
|
btc_4h, _ = normalize(btc_4h) |
|
|
bch_4h, _ = normalize(bch_4h) |
|
|
|
|
|
|
|
|
y1, lbl1 = predictions(model_n1d_cat, btc_1d, bch_1d, '1d', n_steps) |
|
|
y2, lbl2 = predictions(model_n4h_cat, btc_4h, bch_4h, '4h', n_steps) |
|
|
|
|
|
|
|
|
|
|
|
fig1 = make_interactive_fig(y1, lbl1, timeframe='1d', ma=ma) |
|
|
fig2 = make_interactive_fig(y2, lbl2, timeframe='4h', ma=ma) |
|
|
|
|
|
return fig1, fig2 |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
limit = gr.Slider(50,500,step=50,value=100,label='Number of points') |
|
|
epsilon = gr.Slider(0.1,5.0,step=0.1,value=2.0, label='Epsilon') |
|
|
ma = gr.Slider(1,20,step=1,value=5, label='MA Window') |
|
|
run_btn = gr.Button("Run Prediction") |
|
|
|
|
|
|
|
|
with gr.Column(scale=3): |
|
|
plot1 = gr.Plot(label="1-Day Timeframe") |
|
|
plot2 = gr.Plot(label="4-Hour Timeframe") |
|
|
|
|
|
|
|
|
run_btn.click( |
|
|
fn=predict_both_plots, |
|
|
inputs=[limit, epsilon, ma], |
|
|
outputs=[plot1, plot2] |
|
|
) |
|
|
|
|
|
demo.launch() |