import asyncio import json import logging import time import math import aiohttp import pandas as pd import numpy as np import tensorflow as tf from aiohttp import web from tensorflow.keras import layers, models, callbacks from sklearn.preprocessing import StandardScaler from concurrent.futures import ThreadPoolExecutor SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 BROADCAST_RATE = 1.0 PREDICTION_HORIZON = 100 MAX_HISTORY = 5000 TRAIN_INTERVAL = 600 LOOKBACK_WINDOW = 60 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') market_state = { "ohlc_history": [], "ready": False, "model": None, "scaler": None, "model_residuals": 0.0, "last_training_time": 0, "last_price": 0, "price_change": 0 } connected_clients = set() executor = ThreadPoolExecutor(max_workers=1) def calculate_indicators(candles): if len(candles) < LOOKBACK_WINDOW + PREDICTION_HORIZON: return None df = pd.DataFrame(candles) cols = ['open', 'high', 'low', 'close', 'volume'] for c in cols: df[c] = df[c].astype(float) df['ema20'] = df['close'].ewm(span=20, adjust=False).mean() df['ema50'] = df['close'].ewm(span=50, adjust=False).mean() df['std'] = df['close'].rolling(window=20).std() df['bb_upper'] = df['ema20'] + (df['std'] * 2) df['bb_lower'] = df['ema20'] - (df['std'] * 2) delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['rsi'] = 100 - (100 / (1 + rs)) k = df['close'].ewm(span=12, adjust=False).mean() d = df['close'].ewm(span=26, adjust=False).mean() df['macd'] = k - d df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() df['macd_hist'] = df['macd'] - df['macd_signal'] df['tr0'] = abs(df['high'] - df['low']) df['tr1'] = abs(df['high'] - df['close'].shift()) df['tr2'] = abs(df['low'] - df['close'].shift()) df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1) df['atr'] = df['tr'].rolling(window=14).mean() df['dist_ema20'] = (df['close'] - df['ema20']) / df['ema20'] df['dist_ema50'] = (df['close'] - df['ema50']) / df['ema50'] df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['ema20'] df['bb_pos'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) df['vol_change'] = df['volume'].pct_change() df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) df['datetime'] = pd.to_datetime(df['time'], unit='s') df['hour_sin'] = np.sin(2 * np.pi * df['datetime'].dt.hour / 24) df['hour_cos'] = np.cos(2 * np.pi * df['datetime'].dt.hour / 24) return df.dropna() def create_sequences(data, target_data, window_size, horizon): X, y = [], [] for i in range(len(data) - window_size - horizon + 1): X.append(data[i:(i + window_size)]) y.append(target_data[i + window_size : i + window_size + horizon]) return np.array(X), np.array(y) def train_model(df): logging.info(f"Training CNN Model on {len(df)} candles...") feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] data_features = df[feature_cols].values scaler = StandardScaler() data_scaled = scaler.fit_transform(data_features) close_prices = df['close'].values returns_future = [] for i in range(len(close_prices) - PREDICTION_HORIZON): current_price = close_prices[i] future_prices = close_prices[i+1 : i+1+PREDICTION_HORIZON] pct_change = (future_prices - current_price) / current_price returns_future.append(pct_change) returns_future = np.array(returns_future) X = [] y = [] valid_length = len(returns_future) - LOOKBACK_WINDOW if valid_length <= 0: return None, None, None for i in range(valid_length): X.append(data_scaled[i : i + LOOKBACK_WINDOW]) y.append(returns_future[i + LOOKBACK_WINDOW - 1]) X = np.array(X) y = np.array(y) if len(X) < 100: return None, None, None model = models.Sequential([ layers.Input(shape=(LOOKBACK_WINDOW, len(feature_cols))), layers.Conv1D(filters=64, kernel_size=3, activation='relu', padding='same'), layers.MaxPooling1D(pool_size=2), layers.Dropout(0.2), layers.Conv1D(filters=32, kernel_size=3, activation='relu', padding='same'), layers.GlobalAveragePooling1D(), layers.Dense(64, activation='relu'), layers.Dropout(0.1), layers.Dense(PREDICTION_HORIZON) ]) model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse') early_stop = callbacks.EarlyStopping(monitor='loss', patience=5, restore_best_weights=True) model.fit(X, y, epochs=20, batch_size=32, verbose=0, callbacks=[early_stop]) predictions = model.predict(X, verbose=0) residuals = y - predictions residual_std = np.std(residuals) return model, scaler, residual_std def get_prediction(df, model, scaler, residual_std): if model is None or scaler is None: return [] feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] last_window = df.iloc[-LOOKBACK_WINDOW:][feature_cols].values if len(last_window) < LOOKBACK_WINDOW: return [] last_window_scaled = scaler.transform(last_window) last_window_reshaped = last_window_scaled.reshape(1, LOOKBACK_WINDOW, len(feature_cols)) predicted_returns = model.predict(last_window_reshaped, verbose=0)[0] current_price = df.iloc[-1]['close'] current_time = int(df.iloc[-1]['time']) pred_data = [] confidence_multiplier = 1.96 time_step = 0 accumulated_variance = 0.0 for i, pct_change in enumerate(predicted_returns): future_price = current_price * (1 + pct_change) accumulated_variance += (residual_std ** 2) current_std = np.sqrt(accumulated_variance) / np.sqrt(i + 1) * (i + 1) * 0.5 upper_bound = future_price * (1 + (residual_std * confidence_multiplier)) lower_bound = future_price * (1 - (residual_std * confidence_multiplier)) pred_data.append({ "time": current_time + ((i + 1) * 60), "value": float(future_price), "upper": float(upper_bound), "lower": float(lower_bound) }) return pred_data async def process_market_data(): if not market_state['ready'] or not market_state['ohlc_history']: return {"error": "Initializing..."} df = calculate_indicators(market_state['ohlc_history']) if df is None or len(df) < LOOKBACK_WINDOW + 50: return {"error": "Not enough data"} if market_state['model'] is None or (time.time() - market_state['last_training_time'] > TRAIN_INTERVAL): try: loop = asyncio.get_running_loop() model, scaler, res_std = await loop.run_in_executor(executor, train_model, df) if model is not None: market_state['model'] = model market_state['scaler'] = scaler market_state['model_residuals'] = float(res_std) market_state['last_training_time'] = time.time() logging.info(f"Model retrained. Residual Std: {market_state['model_residuals']:.5f}") except Exception as e: logging.error(f"Training failed: {e}") predictions = [] try: predictions = get_prediction(df, market_state['model'], market_state['scaler'], market_state['model_residuals']) except Exception as e: logging.error(f"Prediction failed: {e}") df_clean = df.replace([np.inf, -np.inf], np.nan) cols_to_keep = ['time', 'open', 'high', 'low', 'close', 'volume', 'ema20', 'bb_upper', 'bb_lower', 'rsi', 'macd_hist'] df_clean = df_clean[cols_to_keep].where(pd.notnull(df_clean), None) last_close = float(df['close'].iloc[-1]) if len(df) > 0 else 0 first_close = float(df['close'].iloc[0]) if len(df) > 0 else 0 price_change = ((last_close - first_close) / first_close * 100) if first_close > 0 else 0 market_state['last_price'] = last_close market_state['price_change'] = price_change display_data = df_clean.tail(500).to_dict('records') last_row = df.iloc[-1] if len(df) > 0 else {} return { "data": display_data, "prediction": predictions, "stats": { "price": last_close, "change": round(price_change, 2), "rsi": round(float(last_row.get('rsi', 0)), 1) if pd.notna(last_row.get('rsi')) else 0, "macd": round(float(last_row.get('macd_hist', 0)), 2) if pd.notna(last_row.get('macd_hist')) else 0, "atr": round(float(last_row.get('atr', 0)), 2) if pd.notna(last_row.get('atr')) else 0, "volume": round(float(last_row.get('volume', 0)), 2) if pd.notna(last_row.get('volume')) else 0 } } HTML_PAGE = """