import asyncio import json import logging import time import math import aiohttp import pandas as pd import numpy as np import tensorflow as tf from aiohttp import web from tensorflow.keras import layers, models, callbacks from sklearn.preprocessing import StandardScaler from concurrent.futures import ThreadPoolExecutor SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 BROADCAST_RATE = 1.0 PREDICTION_HORIZON = 100 MAX_HISTORY = 5000 TRAIN_INTERVAL = 600 LOOKBACK_WINDOW = 60 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') market_state = { "ohlc_history": [], "ready": False, "model": None, "scaler": None, "model_residuals": 0.0, "last_training_time": 0, "last_price": 0, "price_change": 0 } connected_clients = set() executor = ThreadPoolExecutor(max_workers=1) def calculate_indicators(candles): if len(candles) < LOOKBACK_WINDOW + PREDICTION_HORIZON: return None df = pd.DataFrame(candles) cols = ['open', 'high', 'low', 'close', 'volume'] for c in cols: df[c] = df[c].astype(float) df['ema20'] = df['close'].ewm(span=20, adjust=False).mean() df['ema50'] = df['close'].ewm(span=50, adjust=False).mean() df['std'] = df['close'].rolling(window=20).std() df['bb_upper'] = df['ema20'] + (df['std'] * 2) df['bb_lower'] = df['ema20'] - (df['std'] * 2) delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['rsi'] = 100 - (100 / (1 + rs)) k = df['close'].ewm(span=12, adjust=False).mean() d = df['close'].ewm(span=26, adjust=False).mean() df['macd'] = k - d df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() df['macd_hist'] = df['macd'] - df['macd_signal'] df['tr0'] = abs(df['high'] - df['low']) df['tr1'] = abs(df['high'] - df['close'].shift()) df['tr2'] = abs(df['low'] - df['close'].shift()) df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1) df['atr'] = df['tr'].rolling(window=14).mean() df['dist_ema20'] = (df['close'] - df['ema20']) / df['ema20'] df['dist_ema50'] = (df['close'] - df['ema50']) / df['ema50'] df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['ema20'] df['bb_pos'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) df['vol_change'] = df['volume'].pct_change() df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) df['datetime'] = pd.to_datetime(df['time'], unit='s') df['hour_sin'] = np.sin(2 * np.pi * df['datetime'].dt.hour / 24) df['hour_cos'] = np.cos(2 * np.pi * df['datetime'].dt.hour / 24) return df.dropna() def create_sequences(data, target_data, window_size, horizon): X, y = [], [] for i in range(len(data) - window_size - horizon + 1): X.append(data[i:(i + window_size)]) y.append(target_data[i + window_size : i + window_size + horizon]) return np.array(X), np.array(y) def train_model(df): logging.info(f"Training CNN Model on {len(df)} candles...") feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] data_features = df[feature_cols].values scaler = StandardScaler() data_scaled = scaler.fit_transform(data_features) close_prices = df['close'].values returns_future = [] for i in range(len(close_prices) - PREDICTION_HORIZON): current_price = close_prices[i] future_prices = close_prices[i+1 : i+1+PREDICTION_HORIZON] pct_change = (future_prices - current_price) / current_price returns_future.append(pct_change) returns_future = np.array(returns_future) X = [] y = [] valid_length = len(returns_future) - LOOKBACK_WINDOW if valid_length <= 0: return None, None, None for i in range(valid_length): X.append(data_scaled[i : i + LOOKBACK_WINDOW]) y.append(returns_future[i + LOOKBACK_WINDOW - 1]) X = np.array(X) y = np.array(y) if len(X) < 100: return None, None, None model = models.Sequential([ layers.Input(shape=(LOOKBACK_WINDOW, len(feature_cols))), layers.Conv1D(filters=64, kernel_size=3, activation='relu', padding='same'), layers.MaxPooling1D(pool_size=2), layers.Dropout(0.2), layers.Conv1D(filters=32, kernel_size=3, activation='relu', padding='same'), layers.GlobalAveragePooling1D(), layers.Dense(64, activation='relu'), layers.Dropout(0.1), layers.Dense(PREDICTION_HORIZON) ]) model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse') early_stop = callbacks.EarlyStopping(monitor='loss', patience=5, restore_best_weights=True) model.fit(X, y, epochs=20, batch_size=32, verbose=0, callbacks=[early_stop]) predictions = model.predict(X, verbose=0) residuals = y - predictions residual_std = np.std(residuals) return model, scaler, residual_std def get_prediction(df, model, scaler, residual_std): if model is None or scaler is None: return [] feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] last_window = df.iloc[-LOOKBACK_WINDOW:][feature_cols].values if len(last_window) < LOOKBACK_WINDOW: return [] last_window_scaled = scaler.transform(last_window) last_window_reshaped = last_window_scaled.reshape(1, LOOKBACK_WINDOW, len(feature_cols)) predicted_returns = model.predict(last_window_reshaped, verbose=0)[0] current_price = df.iloc[-1]['close'] current_time = int(df.iloc[-1]['time']) pred_data = [] confidence_multiplier = 1.96 time_step = 0 accumulated_variance = 0.0 for i, pct_change in enumerate(predicted_returns): future_price = current_price * (1 + pct_change) accumulated_variance += (residual_std ** 2) current_std = np.sqrt(accumulated_variance) / np.sqrt(i + 1) * (i + 1) * 0.5 upper_bound = future_price * (1 + (residual_std * confidence_multiplier)) lower_bound = future_price * (1 - (residual_std * confidence_multiplier)) pred_data.append({ "time": current_time + ((i + 1) * 60), "value": float(future_price), "upper": float(upper_bound), "lower": float(lower_bound) }) return pred_data async def process_market_data(): if not market_state['ready'] or not market_state['ohlc_history']: return {"error": "Initializing..."} df = calculate_indicators(market_state['ohlc_history']) if df is None or len(df) < LOOKBACK_WINDOW + 50: return {"error": "Not enough data"} if market_state['model'] is None or (time.time() - market_state['last_training_time'] > TRAIN_INTERVAL): try: loop = asyncio.get_running_loop() model, scaler, res_std = await loop.run_in_executor(executor, train_model, df) if model is not None: market_state['model'] = model market_state['scaler'] = scaler market_state['model_residuals'] = float(res_std) market_state['last_training_time'] = time.time() logging.info(f"Model retrained. Residual Std: {market_state['model_residuals']:.5f}") except Exception as e: logging.error(f"Training failed: {e}") predictions = [] try: predictions = get_prediction(df, market_state['model'], market_state['scaler'], market_state['model_residuals']) except Exception as e: logging.error(f"Prediction failed: {e}") df_clean = df.replace([np.inf, -np.inf], np.nan) cols_to_keep = ['time', 'open', 'high', 'low', 'close', 'volume', 'ema20', 'bb_upper', 'bb_lower', 'rsi', 'macd_hist'] df_clean = df_clean[cols_to_keep].where(pd.notnull(df_clean), None) last_close = float(df['close'].iloc[-1]) if len(df) > 0 else 0 first_close = float(df['close'].iloc[0]) if len(df) > 0 else 0 price_change = ((last_close - first_close) / first_close * 100) if first_close > 0 else 0 market_state['last_price'] = last_close market_state['price_change'] = price_change display_data = df_clean.tail(500).to_dict('records') last_row = df.iloc[-1] if len(df) > 0 else {} return { "data": display_data, "prediction": predictions, "stats": { "price": last_close, "change": round(price_change, 2), "rsi": round(float(last_row.get('rsi', 0)), 1) if pd.notna(last_row.get('rsi')) else 0, "macd": round(float(last_row.get('macd_hist', 0)), 2) if pd.notna(last_row.get('macd_hist')) else 0, "atr": round(float(last_row.get('atr', 0)), 2) if pd.notna(last_row.get('atr')) else 0, "volume": round(float(last_row.get('volume', 0)), 2) if pd.notna(last_row.get('volume')) else 0 } } HTML_PAGE = """ BTC/USD AI Predictor (CNN)
BTC/USD
Price $--
Change --%
RSI --
ATR --
Connecting...
EMA 20--
BB Upper--
BB Lower--
MACD--
Volume--
Loading market data...
Price
EMA 20
Bollinger
CNN Forecast
AI Forecast: 100 candles
Volume
RSI
MACD Hist
""" async def fetch_initial_data(): try: async with aiohttp.ClientSession() as session: url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1" async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status == 200: data = await response.json() if 'result' in data: for key in data['result']: if key != 'last': raw = data['result'][key] market_state['ohlc_history'] = [ { 'time': int(c[0]), 'open': float(c[1]), 'high': float(c[2]), 'low': float(c[3]), 'close': float(c[4]), 'volume': float(c[6]) } for c in raw ] market_state['ready'] = True logging.info(f"Loaded {len(market_state['ohlc_history'])} initial candles") return True except Exception as e: logging.error(f"Initial data fetch error: {e}") return False async def kraken_rest_worker(): await fetch_initial_data() while True: try: async with aiohttp.ClientSession() as session: url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1" async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status == 200: data = await response.json() if 'result' in data: for key in data['result']: if key != 'last': raw = data['result'][key] new_candles = [ { 'time': int(c[0]), 'open': float(c[1]), 'high': float(c[2]), 'low': float(c[3]), 'close': float(c[4]), 'volume': float(c[6]) } for c in raw[-10:] ] if market_state['ohlc_history']: existing_times = {c['time'] for c in market_state['ohlc_history']} for nc in new_candles: if nc['time'] in existing_times: for i, ec in enumerate(market_state['ohlc_history']): if ec['time'] == nc['time']: market_state['ohlc_history'][i] = nc break else: market_state['ohlc_history'].append(nc) market_state['ohlc_history'].sort(key=lambda x: x['time']) if len(market_state['ohlc_history']) > MAX_HISTORY: market_state['ohlc_history'] = market_state['ohlc_history'][-MAX_HISTORY:] market_state['ready'] = True break except Exception as e: logging.warning(f"REST update error: {e}") await asyncio.sleep(5) async def broadcast_worker(): while True: if connected_clients and market_state['ready']: payload = await process_market_data() if payload and "data" in payload: msg = json.dumps(payload) current_clients = connected_clients.copy() disconnected = set() for ws in current_clients: try: await ws.send_str(msg) except Exception: disconnected.add(ws) if disconnected: connected_clients.difference_update(disconnected) await asyncio.sleep(BROADCAST_RATE) async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) connected_clients.add(ws) try: async for msg in ws: pass finally: connected_clients.discard(ws) return ws async def handle_index(request): return web.Response(text=HTML_PAGE, content_type='text/html') async def main(): app = web.Application() app.router.add_get('/', handle_index) app.router.add_get('/ws', websocket_handler) asyncio.create_task(kraken_rest_worker()) asyncio.create_task(broadcast_worker()) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() logging.info(f"Server running at http://localhost:{PORT}") await asyncio.Event().wait() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass