File size: 10,165 Bytes
e24ac26 0aa887c 7d472e6 9fbb6e8 e24ac26 5d22f47 880a4aa d8fd664 880a4aa d8fd664 880a4aa d8fd664 5d22f47 880a4aa a4e3034 e24ac26 92ffe23 e24ac26 01a170b e24ac26 01a170b e24ac26 047a662 e24ac26 047a662 5d22f47 f3fc0da e24ac26 47e249a 5d22f47 e24ac26 5d22f47 e24ac26 d8fd664 e24ac26 d8fd664 5d22f47 d8fd664 e24ac26 d8fd664 e24ac26 d8fd664 e24ac26 75d077a 5d22f47 e24ac26 4e07151 e24ac26 d8fd664 a3eeeeb 5d22f47 e24ac26 5d22f47 e24ac26 333a2a9 c339158 cdd758f f05d235 cdd758f c339158 cdd758f 3c599cf c339158 cdd758f 3c599cf cdd758f c339158 cdd758f 333a2a9 6462d42 333a2a9 d47afb2 333a2a9 b6bde2c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 | import pickle
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import SimpleImputer
from scipy import stats
import yfinance as yf
import gradio as gr
import plotly.graph_objects as go
# Load pre-trained models
with open('model_n1d_cat.pkl','rb') as f:
model_n1d_cat = pickle.load(f)
with open('model_n4h_cat.pkl','rb') as f:
model_n4h_cat = pickle.load(f)
def fetch_yfinance_data(pair: str, period: str, interval: str) -> pd.DataFrame:
"""
pair: e.g. "BCH/USDT"
period: e.g. "100d"
interval: e.g. "1d", "1h", "4h"
"""
ticker = pair.replace("/USDT", "-USD")
df = yf.download(ticker, period=period, interval=interval)
df.columns = df.columns.get_level_values(0)
# bring the DateTimeIndex into a column, whatever its name was
df = df.reset_index()
df.rename(columns={df.columns[0]: 'timestamp'}, inplace=True)
# standardize OHLCV
df.rename(columns={
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume'
}, inplace=True)
return df
# Rolling Window Normalizer
class RollingWindowNormalizer:
def __init__(self, window=24):
self.window = window
self.stats_ = {}
def fit(self, X, columns):
for column in columns:
rolling_mean = X[column].rolling(window=self.window).mean()
rolling_std = X[column].rolling(window=self.window).std()
self.stats_[column] = {'rolling_mean': rolling_mean, 'rolling_std': rolling_std}
return self
def transform(self, X, columns):
for column in columns:
rolling_mean = self.stats_[column]['rolling_mean']
rolling_std = self.stats_[column]['rolling_std']
X[column] = (X[column] - rolling_mean) / rolling_std
X.dropna(inplace=True)
return X
def fit_transform(self, X, columns):
return self.fit(X, columns).transform(X, columns)
def normalize(X, columns=['open','high','low','close']):
X_copy = X.copy()
Rm = RollingWindowNormalizer()
Rm.fit(X_copy, columns)
Y = Rm.transform(X_copy, columns)
return Y, Rm
def remove_outliers(df, epsilon):
# compute z-scores as a NumPy array…
z = stats.zscore(df['low'])
# …then build a boolean mask via np.abs
mask = np.abs(z) < epsilon
# apply it back against the original DataFrame’s index
return df.loc[mask].reset_index(drop=True)
# Advanced features
def calculate_rsi(series, window=14):
# 1) compute deltas
delta = series.diff()
# 2) separate gains and losses as Series
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
# 3) rolling averages
avg_gain = gain.rolling(window=window).mean()
avg_loss = loss.rolling(window=window).mean()
# 4) compute RS and RSI
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def generate_advanced_features(d, other_data=None):
d = d.copy()
d['ma_7'] = d['close'].rolling(window=7).mean()
d['ma_21'] = d['close'].rolling(window=21).mean()
d['rsi'] = calculate_rsi(d['close'])
d['ma_ratio'] = d['ma_7'] / d['ma_21']
for k in ['close','high']:
for i in range(1,5):
d[f'lag_{k}{i}'] = d[k].shift(i)
d['std_last_10'] = d['close'].rolling(window=10).std()
if other_data is not None:
other_data = other_data.loc[:, ~other_data.columns.duplicated()]
d['relative_strength'] = d['close'] / other_data['close']
d['relative_strength_1'] = d['close'].shift(2) / other_data['close'].shift(2)
return d.iloc[:,1:].values
def create_features_and_labels_with_advanced_features(btc, eth):
btc_copy = btc.copy()
eth_copy = eth.copy()
btc_features = generate_advanced_features(btc_copy, eth_copy)
eth_features = generate_advanced_features(eth_copy, btc_copy)
# df = btc.copy()
# df['future'] = df['close'].rolling(window=5).mean().shift(-1)
# df['trend'] = (df['future'] > df['close']).astype(int)
# labels = df['trend'].dropna().values
label = btc_copy[['timestamp','close']].shift(-1)
features = np.vstack((btc_features, eth_features))
return features, label
def get_data_predict(
btc_ori: pd.DataFrame,
bch_ori: pd.DataFrame,
symbol: str = 'BCH/USDT',
timeframe: str = '4h',
epsilon: float = 2,
normalized: bool = False,
limit: int = 50
):
period = f'{limit}d' # last N days
btc_data_ = fetch_yfinance_data('BTC/USDT', period, timeframe)
bch_data_ = fetch_yfinance_data(symbol, period, timeframe)
btc_data_ = remove_outliers(btc_data_, epsilon)
bch_data_ = remove_outliers(bch_data_, epsilon)
if normalized:
# merge with ori if you still want to include historical yf data
btc_all = pd.concat([btc_ori, btc_data_]).drop_duplicates('timestamp').reset_index(drop=True)
bch_all = pd.concat([bch_ori, bch_data_]).drop_duplicates('timestamp').reset_index(drop=True)
btc_data_, _ = normalize(btc_all)
bch_data_, _ = normalize(bch_all)
label = btc_data_.copy()[['timestamp','close']].shift(-1)
return btc_data_, bch_data_, label
return btc_data_, bch_data_, None
def predictions(model, X1, X2, name, n_steps):
features_, labels_ = create_features_and_labels_with_advanced_features(X1, X2)
imputer = SimpleImputer(strategy='mean')
features_imputed = imputer.fit_transform(features_)
y = model.predict_proba(features_imputed)[:,1]
if len(y) != len(labels_):
y = y[:len(labels_)]
return y, labels_
def plot(y, label, timeframe='1h', ma=5, n_steps=None):
if n_steps is None:
n_steps = len(y)
plt.figure(figsize=(12,6))
if ma:
df_plot = pd.DataFrame({'date': label['timestamp'].values[-n_steps:], 'prediction':5*(y[-n_steps:]-0.5), 'real': label['close'].values[-n_steps:]})
plt.plot(df_plot['date'], df_plot['prediction'].rolling(window=ma).mean(), marker='o', label='updown')
plt.plot(df_plot['date'], df_plot['real'].rolling(window=ma).mean(), marker='o', label='real')
plt.plot(df_plot['date'], (df_plot['real']-df_plot['prediction']).rolling(window=ma).mean(), marker='o', label='difference')
else:
plt.plot(label['timestamp'].values[-n_steps:], 5*(y[-n_steps:]-0.5), label='updown')
plt.plot(label['timestamp'].values[-n_steps:], label['close'].values[-n_steps:], label='real')
plt.axhline(0, linestyle='--')
plt.title(f"BTC timeframe {timeframe}")
plt.xlabel('Timestamp')
plt.ylabel('Values')
plt.legend()
return plt.gcf()
def predict_and_plot(timeframe, limit, epsilon, n_steps, ma):
period = f'{limit}d'
# original “ori” series now also from yfinance
btc_data = fetch_yfinance_data('BTC/USDT', period, timeframe)
bch_data = fetch_yfinance_data('BCH/USDT', period, timeframe)
btc_data, _ = normalize(btc_data)
bch_data, _ = normalize(bch_data)
model = model_n1d_cat if timeframe=='1d' else model_n4h_cat
preds, label = predictions(model, btc_data, bch_data, name=timeframe, n_steps=n_steps)
fig = plot(preds, label = label, timeframe=timeframe, ma=ma, n_steps=n_steps)
return fig
def make_interactive_fig(y, label, timeframe='1h', ma=5):
line_width=2
n_steps = len(label)
dates = label['timestamp'].iloc[-n_steps:]
real = label['close'].iloc[-n_steps:]
preds = 5 * (y[-n_steps:] - 0.5)
# print(ma, n_steps, real, preds)
# rolling means
real_ma = pd.Series(real.values).rolling(window=ma).mean()
pred_ma = pd.Series(preds).rolling(window=ma).mean()
diff_ma = (real_ma - pred_ma)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates, y=pred_ma,
mode='lines', name='Predicted Δ',
line=dict(width=line_width)
))
fig.add_trace(go.Scatter(
x=dates, y=real_ma,
mode='lines', name='Real Close',
line=dict(width=line_width)
))
fig.add_trace(go.Scatter(
x=dates, y=diff_ma,
mode='lines', name='Difference',
line=dict(width=line_width)
))
# horizontal zero line
fig.add_shape(
type='line', x0=dates.min(), x1=dates.max(),
y0=0, y1=0, line=dict(dash='dash', width=line_width)
)
fig.update_layout(
title=f"BTC {timeframe} Forecast vs. Real",
xaxis_title='Timestamp',
yaxis_title='Value',
hovermode='x unified'
)
return fig
def predict_both_plots(limit, epsilon, ma):
period = f'{limit}d'
period_4h = f'{limit//5}d'
n_steps = limit
# fetch & normalize both timeframes
btc_1d = fetch_yfinance_data('BTC/USDT', period, '1d')
bch_1d = fetch_yfinance_data('BCH/USDT', period, '1d')
btc_1d, _ = normalize(btc_1d)
bch_1d, _ = normalize(bch_1d)
btc_4h = fetch_yfinance_data('BTC/USDT', period_4h, '4h')
bch_4h = fetch_yfinance_data('BCH/USDT', period_4h, '4h')
btc_4h, _ = normalize(btc_4h)
bch_4h, _ = normalize(bch_4h)
# generate predictions
y1, lbl1 = predictions(model_n1d_cat, btc_1d, bch_1d, '1d', n_steps)
y2, lbl2 = predictions(model_n4h_cat, btc_4h, bch_4h, '4h', n_steps)
# build interactive figures
fig1 = make_interactive_fig(y1, lbl1, timeframe='1d', ma=ma)
fig2 = make_interactive_fig(y2, lbl2, timeframe='4h', ma=ma)
return fig1, fig2
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=1):
limit = gr.Slider(50,500,step=50,value=100,label='Number of points')
epsilon = gr.Slider(0.1,5.0,step=0.1,value=2.0, label='Epsilon')
ma = gr.Slider(1,20,step=1,value=5, label='MA Window')
run_btn = gr.Button("Run Prediction")
# RIGHT column: scale=3 (wider)
with gr.Column(scale=3):
plot1 = gr.Plot(label="1-Day Timeframe")
plot2 = gr.Plot(label="4-Hour Timeframe")
# wire it up
run_btn.click(
fn=predict_both_plots,
inputs=[limit, epsilon, ma],
outputs=[plot1, plot2]
)
demo.launch() |