import pickle import matplotlib.pyplot as plt import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler from sklearn.impute import SimpleImputer from scipy import stats import yfinance as yf import gradio as gr import plotly.graph_objects as go # Load pre-trained models with open('model_n1d_cat.pkl','rb') as f: model_n1d_cat = pickle.load(f) with open('model_n4h_cat.pkl','rb') as f: model_n4h_cat = pickle.load(f) def fetch_yfinance_data(pair: str, period: str, interval: str) -> pd.DataFrame: """ pair: e.g. "BCH/USDT" period: e.g. "100d" interval: e.g. "1d", "1h", "4h" """ ticker = pair.replace("/USDT", "-USD") df = yf.download(ticker, period=period, interval=interval) df.columns = df.columns.get_level_values(0) # bring the DateTimeIndex into a column, whatever its name was df = df.reset_index() df.rename(columns={df.columns[0]: 'timestamp'}, inplace=True) # standardize OHLCV df.rename(columns={ 'Open': 'open', 'High': 'high', 'Low': 'low', 'Close': 'close', 'Volume': 'volume' }, inplace=True) return df # Rolling Window Normalizer class RollingWindowNormalizer: def __init__(self, window=24): self.window = window self.stats_ = {} def fit(self, X, columns): for column in columns: rolling_mean = X[column].rolling(window=self.window).mean() rolling_std = X[column].rolling(window=self.window).std() self.stats_[column] = {'rolling_mean': rolling_mean, 'rolling_std': rolling_std} return self def transform(self, X, columns): for column in columns: rolling_mean = self.stats_[column]['rolling_mean'] rolling_std = self.stats_[column]['rolling_std'] X[column] = (X[column] - rolling_mean) / rolling_std X.dropna(inplace=True) return X def fit_transform(self, X, columns): return self.fit(X, columns).transform(X, columns) def normalize(X, columns=['open','high','low','close']): X_copy = X.copy() Rm = RollingWindowNormalizer() Rm.fit(X_copy, columns) Y = Rm.transform(X_copy, columns) return Y, Rm def remove_outliers(df, epsilon): # compute z-scores as a NumPy array… z = stats.zscore(df['low']) # …then build a boolean mask via np.abs mask = np.abs(z) < epsilon # apply it back against the original DataFrame’s index return df.loc[mask].reset_index(drop=True) # Advanced features def calculate_rsi(series, window=14): # 1) compute deltas delta = series.diff() # 2) separate gains and losses as Series gain = delta.clip(lower=0) loss = -delta.clip(upper=0) # 3) rolling averages avg_gain = gain.rolling(window=window).mean() avg_loss = loss.rolling(window=window).mean() # 4) compute RS and RSI rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) return rsi def generate_advanced_features(d, other_data=None): d = d.copy() d['ma_7'] = d['close'].rolling(window=7).mean() d['ma_21'] = d['close'].rolling(window=21).mean() d['rsi'] = calculate_rsi(d['close']) d['ma_ratio'] = d['ma_7'] / d['ma_21'] for k in ['close','high']: for i in range(1,5): d[f'lag_{k}{i}'] = d[k].shift(i) d['std_last_10'] = d['close'].rolling(window=10).std() if other_data is not None: other_data = other_data.loc[:, ~other_data.columns.duplicated()] d['relative_strength'] = d['close'] / other_data['close'] d['relative_strength_1'] = d['close'].shift(2) / other_data['close'].shift(2) return d.iloc[:,1:].values def create_features_and_labels_with_advanced_features(btc, eth): btc_copy = btc.copy() eth_copy = eth.copy() btc_features = generate_advanced_features(btc_copy, eth_copy) eth_features = generate_advanced_features(eth_copy, btc_copy) # df = btc.copy() # df['future'] = df['close'].rolling(window=5).mean().shift(-1) # df['trend'] = (df['future'] > df['close']).astype(int) # labels = df['trend'].dropna().values label = btc_copy[['timestamp','close']].shift(-1) features = np.vstack((btc_features, eth_features)) return features, label def get_data_predict( btc_ori: pd.DataFrame, bch_ori: pd.DataFrame, symbol: str = 'BCH/USDT', timeframe: str = '4h', epsilon: float = 2, normalized: bool = False, limit: int = 50 ): period = f'{limit}d' # last N days btc_data_ = fetch_yfinance_data('BTC/USDT', period, timeframe) bch_data_ = fetch_yfinance_data(symbol, period, timeframe) btc_data_ = remove_outliers(btc_data_, epsilon) bch_data_ = remove_outliers(bch_data_, epsilon) if normalized: # merge with ori if you still want to include historical yf data btc_all = pd.concat([btc_ori, btc_data_]).drop_duplicates('timestamp').reset_index(drop=True) bch_all = pd.concat([bch_ori, bch_data_]).drop_duplicates('timestamp').reset_index(drop=True) btc_data_, _ = normalize(btc_all) bch_data_, _ = normalize(bch_all) label = btc_data_.copy()[['timestamp','close']].shift(-1) return btc_data_, bch_data_, label return btc_data_, bch_data_, None def predictions(model, X1, X2, name, n_steps): features_, labels_ = create_features_and_labels_with_advanced_features(X1, X2) imputer = SimpleImputer(strategy='mean') features_imputed = imputer.fit_transform(features_) y = model.predict_proba(features_imputed)[:,1] if len(y) != len(labels_): y = y[:len(labels_)] return y, labels_ def plot(y, label, timeframe='1h', ma=5, n_steps=None): if n_steps is None: n_steps = len(y) plt.figure(figsize=(12,6)) if ma: df_plot = pd.DataFrame({'date': label['timestamp'].values[-n_steps:], 'prediction':5*(y[-n_steps:]-0.5), 'real': label['close'].values[-n_steps:]}) plt.plot(df_plot['date'], df_plot['prediction'].rolling(window=ma).mean(), marker='o', label='updown') plt.plot(df_plot['date'], df_plot['real'].rolling(window=ma).mean(), marker='o', label='real') plt.plot(df_plot['date'], (df_plot['real']-df_plot['prediction']).rolling(window=ma).mean(), marker='o', label='difference') else: plt.plot(label['timestamp'].values[-n_steps:], 5*(y[-n_steps:]-0.5), label='updown') plt.plot(label['timestamp'].values[-n_steps:], label['close'].values[-n_steps:], label='real') plt.axhline(0, linestyle='--') plt.title(f"BTC timeframe {timeframe}") plt.xlabel('Timestamp') plt.ylabel('Values') plt.legend() return plt.gcf() def predict_and_plot(timeframe, limit, epsilon, n_steps, ma): period = f'{limit}d' # original “ori” series now also from yfinance btc_data = fetch_yfinance_data('BTC/USDT', period, timeframe) bch_data = fetch_yfinance_data('BCH/USDT', period, timeframe) btc_data, _ = normalize(btc_data) bch_data, _ = normalize(bch_data) model = model_n1d_cat if timeframe=='1d' else model_n4h_cat preds, label = predictions(model, btc_data, bch_data, name=timeframe, n_steps=n_steps) fig = plot(preds, label = label, timeframe=timeframe, ma=ma, n_steps=n_steps) return fig def make_interactive_fig(y, label, timeframe='1h', ma=5): line_width=2 n_steps = len(label) dates = label['timestamp'].iloc[-n_steps:] real = label['close'].iloc[-n_steps:] preds = 5 * (y[-n_steps:] - 0.5) # print(ma, n_steps, real, preds) # rolling means real_ma = pd.Series(real.values).rolling(window=ma).mean() pred_ma = pd.Series(preds).rolling(window=ma).mean() diff_ma = (real_ma - pred_ma) fig = go.Figure() fig.add_trace(go.Scatter( x=dates, y=pred_ma, mode='lines', name='Predicted Δ', line=dict(width=line_width) )) fig.add_trace(go.Scatter( x=dates, y=real_ma, mode='lines', name='Real Close', line=dict(width=line_width) )) fig.add_trace(go.Scatter( x=dates, y=diff_ma, mode='lines', name='Difference', line=dict(width=line_width) )) # horizontal zero line fig.add_shape( type='line', x0=dates.min(), x1=dates.max(), y0=0, y1=0, line=dict(dash='dash', width=line_width) ) fig.update_layout( title=f"BTC {timeframe} Forecast vs. Real", xaxis_title='Timestamp', yaxis_title='Value', hovermode='x unified' ) return fig def predict_both_plots(limit, epsilon, ma): period = f'{limit}d' period_4h = f'{limit//5}d' n_steps = limit # fetch & normalize both timeframes btc_1d = fetch_yfinance_data('BTC/USDT', period, '1d') bch_1d = fetch_yfinance_data('BCH/USDT', period, '1d') btc_1d, _ = normalize(btc_1d) bch_1d, _ = normalize(bch_1d) btc_4h = fetch_yfinance_data('BTC/USDT', period_4h, '4h') bch_4h = fetch_yfinance_data('BCH/USDT', period_4h, '4h') btc_4h, _ = normalize(btc_4h) bch_4h, _ = normalize(bch_4h) # generate predictions y1, lbl1 = predictions(model_n1d_cat, btc_1d, bch_1d, '1d', n_steps) y2, lbl2 = predictions(model_n4h_cat, btc_4h, bch_4h, '4h', n_steps) # build interactive figures fig1 = make_interactive_fig(y1, lbl1, timeframe='1d', ma=ma) fig2 = make_interactive_fig(y2, lbl2, timeframe='4h', ma=ma) return fig1, fig2 with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=1): limit = gr.Slider(50,500,step=50,value=100,label='Number of points') epsilon = gr.Slider(0.1,5.0,step=0.1,value=2.0, label='Epsilon') ma = gr.Slider(1,20,step=1,value=5, label='MA Window') run_btn = gr.Button("Run Prediction") # RIGHT column: scale=3 (wider) with gr.Column(scale=3): plot1 = gr.Plot(label="1-Day Timeframe") plot2 = gr.Plot(label="4-Hour Timeframe") # wire it up run_btn.click( fn=predict_both_plots, inputs=[limit, epsilon, ma], outputs=[plot1, plot2] ) demo.launch()