btc / app.py
farquasar's picture
Update app.py
3c599cf verified
import pickle
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import SimpleImputer
from scipy import stats
import yfinance as yf
import gradio as gr
import plotly.graph_objects as go
# Load pre-trained models
with open('model_n1d_cat.pkl','rb') as f:
model_n1d_cat = pickle.load(f)
with open('model_n4h_cat.pkl','rb') as f:
model_n4h_cat = pickle.load(f)
def fetch_yfinance_data(pair: str, period: str, interval: str) -> pd.DataFrame:
"""
pair: e.g. "BCH/USDT"
period: e.g. "100d"
interval: e.g. "1d", "1h", "4h"
"""
ticker = pair.replace("/USDT", "-USD")
df = yf.download(ticker, period=period, interval=interval)
df.columns = df.columns.get_level_values(0)
# bring the DateTimeIndex into a column, whatever its name was
df = df.reset_index()
df.rename(columns={df.columns[0]: 'timestamp'}, inplace=True)
# standardize OHLCV
df.rename(columns={
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume'
}, inplace=True)
return df
# Rolling Window Normalizer
class RollingWindowNormalizer:
def __init__(self, window=24):
self.window = window
self.stats_ = {}
def fit(self, X, columns):
for column in columns:
rolling_mean = X[column].rolling(window=self.window).mean()
rolling_std = X[column].rolling(window=self.window).std()
self.stats_[column] = {'rolling_mean': rolling_mean, 'rolling_std': rolling_std}
return self
def transform(self, X, columns):
for column in columns:
rolling_mean = self.stats_[column]['rolling_mean']
rolling_std = self.stats_[column]['rolling_std']
X[column] = (X[column] - rolling_mean) / rolling_std
X.dropna(inplace=True)
return X
def fit_transform(self, X, columns):
return self.fit(X, columns).transform(X, columns)
def normalize(X, columns=['open','high','low','close']):
X_copy = X.copy()
Rm = RollingWindowNormalizer()
Rm.fit(X_copy, columns)
Y = Rm.transform(X_copy, columns)
return Y, Rm
def remove_outliers(df, epsilon):
# compute z-scores as a NumPy array…
z = stats.zscore(df['low'])
# …then build a boolean mask via np.abs
mask = np.abs(z) < epsilon
# apply it back against the original DataFrame’s index
return df.loc[mask].reset_index(drop=True)
# Advanced features
def calculate_rsi(series, window=14):
# 1) compute deltas
delta = series.diff()
# 2) separate gains and losses as Series
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
# 3) rolling averages
avg_gain = gain.rolling(window=window).mean()
avg_loss = loss.rolling(window=window).mean()
# 4) compute RS and RSI
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def generate_advanced_features(d, other_data=None):
d = d.copy()
d['ma_7'] = d['close'].rolling(window=7).mean()
d['ma_21'] = d['close'].rolling(window=21).mean()
d['rsi'] = calculate_rsi(d['close'])
d['ma_ratio'] = d['ma_7'] / d['ma_21']
for k in ['close','high']:
for i in range(1,5):
d[f'lag_{k}{i}'] = d[k].shift(i)
d['std_last_10'] = d['close'].rolling(window=10).std()
if other_data is not None:
other_data = other_data.loc[:, ~other_data.columns.duplicated()]
d['relative_strength'] = d['close'] / other_data['close']
d['relative_strength_1'] = d['close'].shift(2) / other_data['close'].shift(2)
return d.iloc[:,1:].values
def create_features_and_labels_with_advanced_features(btc, eth):
btc_copy = btc.copy()
eth_copy = eth.copy()
btc_features = generate_advanced_features(btc_copy, eth_copy)
eth_features = generate_advanced_features(eth_copy, btc_copy)
# df = btc.copy()
# df['future'] = df['close'].rolling(window=5).mean().shift(-1)
# df['trend'] = (df['future'] > df['close']).astype(int)
# labels = df['trend'].dropna().values
label = btc_copy[['timestamp','close']].shift(-1)
features = np.vstack((btc_features, eth_features))
return features, label
def get_data_predict(
btc_ori: pd.DataFrame,
bch_ori: pd.DataFrame,
symbol: str = 'BCH/USDT',
timeframe: str = '4h',
epsilon: float = 2,
normalized: bool = False,
limit: int = 50
):
period = f'{limit}d' # last N days
btc_data_ = fetch_yfinance_data('BTC/USDT', period, timeframe)
bch_data_ = fetch_yfinance_data(symbol, period, timeframe)
btc_data_ = remove_outliers(btc_data_, epsilon)
bch_data_ = remove_outliers(bch_data_, epsilon)
if normalized:
# merge with ori if you still want to include historical yf data
btc_all = pd.concat([btc_ori, btc_data_]).drop_duplicates('timestamp').reset_index(drop=True)
bch_all = pd.concat([bch_ori, bch_data_]).drop_duplicates('timestamp').reset_index(drop=True)
btc_data_, _ = normalize(btc_all)
bch_data_, _ = normalize(bch_all)
label = btc_data_.copy()[['timestamp','close']].shift(-1)
return btc_data_, bch_data_, label
return btc_data_, bch_data_, None
def predictions(model, X1, X2, name, n_steps):
features_, labels_ = create_features_and_labels_with_advanced_features(X1, X2)
imputer = SimpleImputer(strategy='mean')
features_imputed = imputer.fit_transform(features_)
y = model.predict_proba(features_imputed)[:,1]
if len(y) != len(labels_):
y = y[:len(labels_)]
return y, labels_
def plot(y, label, timeframe='1h', ma=5, n_steps=None):
if n_steps is None:
n_steps = len(y)
plt.figure(figsize=(12,6))
if ma:
df_plot = pd.DataFrame({'date': label['timestamp'].values[-n_steps:], 'prediction':5*(y[-n_steps:]-0.5), 'real': label['close'].values[-n_steps:]})
plt.plot(df_plot['date'], df_plot['prediction'].rolling(window=ma).mean(), marker='o', label='updown')
plt.plot(df_plot['date'], df_plot['real'].rolling(window=ma).mean(), marker='o', label='real')
plt.plot(df_plot['date'], (df_plot['real']-df_plot['prediction']).rolling(window=ma).mean(), marker='o', label='difference')
else:
plt.plot(label['timestamp'].values[-n_steps:], 5*(y[-n_steps:]-0.5), label='updown')
plt.plot(label['timestamp'].values[-n_steps:], label['close'].values[-n_steps:], label='real')
plt.axhline(0, linestyle='--')
plt.title(f"BTC timeframe {timeframe}")
plt.xlabel('Timestamp')
plt.ylabel('Values')
plt.legend()
return plt.gcf()
def predict_and_plot(timeframe, limit, epsilon, n_steps, ma):
period = f'{limit}d'
# original “ori” series now also from yfinance
btc_data = fetch_yfinance_data('BTC/USDT', period, timeframe)
bch_data = fetch_yfinance_data('BCH/USDT', period, timeframe)
btc_data, _ = normalize(btc_data)
bch_data, _ = normalize(bch_data)
model = model_n1d_cat if timeframe=='1d' else model_n4h_cat
preds, label = predictions(model, btc_data, bch_data, name=timeframe, n_steps=n_steps)
fig = plot(preds, label = label, timeframe=timeframe, ma=ma, n_steps=n_steps)
return fig
def make_interactive_fig(y, label, timeframe='1h', ma=5):
line_width=2
n_steps = len(label)
dates = label['timestamp'].iloc[-n_steps:]
real = label['close'].iloc[-n_steps:]
preds = 5 * (y[-n_steps:] - 0.5)
# print(ma, n_steps, real, preds)
# rolling means
real_ma = pd.Series(real.values).rolling(window=ma).mean()
pred_ma = pd.Series(preds).rolling(window=ma).mean()
diff_ma = (real_ma - pred_ma)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=dates, y=pred_ma,
mode='lines', name='Predicted Δ',
line=dict(width=line_width)
))
fig.add_trace(go.Scatter(
x=dates, y=real_ma,
mode='lines', name='Real Close',
line=dict(width=line_width)
))
fig.add_trace(go.Scatter(
x=dates, y=diff_ma,
mode='lines', name='Difference',
line=dict(width=line_width)
))
# horizontal zero line
fig.add_shape(
type='line', x0=dates.min(), x1=dates.max(),
y0=0, y1=0, line=dict(dash='dash', width=line_width)
)
fig.update_layout(
title=f"BTC {timeframe} Forecast vs. Real",
xaxis_title='Timestamp',
yaxis_title='Value',
hovermode='x unified'
)
return fig
def predict_both_plots(limit, epsilon, ma):
period = f'{limit}d'
period_4h = f'{limit//5}d'
n_steps = limit
# fetch & normalize both timeframes
btc_1d = fetch_yfinance_data('BTC/USDT', period, '1d')
bch_1d = fetch_yfinance_data('BCH/USDT', period, '1d')
btc_1d, _ = normalize(btc_1d)
bch_1d, _ = normalize(bch_1d)
btc_4h = fetch_yfinance_data('BTC/USDT', period_4h, '4h')
bch_4h = fetch_yfinance_data('BCH/USDT', period_4h, '4h')
btc_4h, _ = normalize(btc_4h)
bch_4h, _ = normalize(bch_4h)
# generate predictions
y1, lbl1 = predictions(model_n1d_cat, btc_1d, bch_1d, '1d', n_steps)
y2, lbl2 = predictions(model_n4h_cat, btc_4h, bch_4h, '4h', n_steps)
# build interactive figures
fig1 = make_interactive_fig(y1, lbl1, timeframe='1d', ma=ma)
fig2 = make_interactive_fig(y2, lbl2, timeframe='4h', ma=ma)
return fig1, fig2
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=1):
limit = gr.Slider(50,500,step=50,value=100,label='Number of points')
epsilon = gr.Slider(0.1,5.0,step=0.1,value=2.0, label='Epsilon')
ma = gr.Slider(1,20,step=1,value=5, label='MA Window')
run_btn = gr.Button("Run Prediction")
# RIGHT column: scale=3 (wider)
with gr.Column(scale=3):
plot1 = gr.Plot(label="1-Day Timeframe")
plot2 = gr.Plot(label="4-Hour Timeframe")
# wire it up
run_btn.click(
fn=predict_both_plots,
inputs=[limit, epsilon, ma],
outputs=[plot1, plot2]
)
demo.launch()