import asyncio import json import logging import time import math import aiohttp import pandas as pd import numpy as np from aiohttp import web from sklearn.ensemble import RandomForestRegressor from concurrent.futures import ThreadPoolExecutor # --- CONFIGURATION --- SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 BROADCAST_RATE = 1.0 PREDICTION_HORIZON = 100 MAX_HISTORY = 5000 TRAIN_INTERVAL = 300 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') market_state = { "ohlc_history": [], "ready": False, "model": None, "model_residuals": None, "last_training_time": 0, "last_price": 0, "price_change": 0 } connected_clients = set() executor = ThreadPoolExecutor(max_workers=1) def calculate_indicators(candles): if len(candles) < 100: return None df = pd.DataFrame(candles) cols = ['open', 'high', 'low', 'close', 'volume'] for c in cols: df[c] = df[c].astype(float) # Moving Averages df['ema20'] = df['close'].ewm(span=20, adjust=False).mean() df['ema50'] = df['close'].ewm(span=50, adjust=False).mean() # Bollinger Bands df['std'] = df['close'].rolling(window=20).std() df['bb_upper'] = df['ema20'] + (df['std'] * 2) df['bb_lower'] = df['ema20'] - (df['std'] * 2) # RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['rsi'] = 100 - (100 / (1 + rs)) # MACD k = df['close'].ewm(span=12, adjust=False).mean() d = df['close'].ewm(span=26, adjust=False).mean() df['macd'] = k - d df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() df['macd_hist'] = df['macd'] - df['macd_signal'] # ATR df['tr0'] = abs(df['high'] - df['low']) df['tr1'] = abs(df['high'] - df['close'].shift()) df['tr2'] = abs(df['low'] - df['close'].shift()) df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1) df['atr'] = df['tr'].rolling(window=14).mean() # Features df['dist_ema20'] = (df['close'] - df['ema20']) / df['ema20'] df['dist_ema50'] = (df['close'] - df['ema50']) / df['ema50'] df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['ema20'] df['bb_pos'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) df['vol_change'] = df['volume'].pct_change() df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) # Time encoding df['datetime'] = pd.to_datetime(df['time'], unit='s') df['hour_sin'] = np.sin(2 * np.pi * df['datetime'].dt.hour / 24) df['hour_cos'] = np.cos(2 * np.pi * df['datetime'].dt.hour / 24) # Lag Features for lag in [1, 2, 3, 5, 8]: df[f'rsi_lag{lag}'] = df['rsi'].shift(lag) df[f'macd_hist_lag{lag}'] = df['macd_hist'].shift(lag) df[f'log_ret_lag{lag}'] = df['log_ret'].shift(lag) df[f'vol_change_lag{lag}'] = df['vol_change'].shift(lag) return df def train_model(df): logging.info(f"Training ML Model on {len(df)} candles...") feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] for lag in [1, 2, 3, 5, 8]: feature_cols.extend([ f'rsi_lag{lag}', f'macd_hist_lag{lag}', f'log_ret_lag{lag}', f'vol_change_lag{lag}' ]) data = df.dropna().copy() # Prepare targets (Future Returns relative to Current Price) target_cols_dict = {} target_names = [] for i in range(1, PREDICTION_HORIZON + 1): col_name = f'target_return_{i}' # Return at step i relative to step 0 target_cols_dict[col_name] = (data['close'].shift(-i) - data['close']) / data['close'] target_names.append(col_name) targets_df = pd.DataFrame(target_cols_dict, index=data.index) data = pd.concat([data, targets_df], axis=1).dropna() if len(data) < 200: return None, None X = data[feature_cols].values y = data[target_names].values model = RandomForestRegressor( n_estimators=200, max_depth=20, min_samples_split=5, min_samples_leaf=2, max_features='sqrt', n_jobs=-1, random_state=42 ) model.fit(X, y) # Calculate Residuals for Confidence Estimation # (Using OOB or training residuals as a proxy for uncertainty) predictions = model.predict(X) residuals = y - predictions residual_std = np.std(residuals, axis=0) return model, residual_std def get_prediction(df, model, residual_std): if model is None or residual_std is None: return [], [] feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] for lag in [1, 2, 3, 5, 8]: feature_cols.extend([ f'rsi_lag{lag}', f'macd_hist_lag{lag}', f'log_ret_lag{lag}', f'vol_change_lag{lag}' ]) last_row = df.iloc[[-1]][feature_cols] if last_row.isnull().values.any(): return [], [] predicted_returns = model.predict(last_row.values)[0] current_price = df.iloc[-1]['close'] current_time = int(df.iloc[-1]['time']) current_atr = df.iloc[-1]['atr'] pred_candles = [] confidence_data = [] prev_close = current_price for i, pct_change in enumerate(predicted_returns): future_time = current_time + ((i + 1) * 60) # Calculate predicted Close future_close = current_price * (1 + pct_change) # Construct Candle # Open is previous candle's close open_price = prev_close close_price = future_close # Heuristic for High/Low to make it look like a candle # Use ATR and some noise or just fixed ratio to visualize structure # Here we use a fixed structure based on ATR to keep it clean but candle-like half_range = current_atr * 0.4 high_price = max(open_price, close_price) + half_range low_price = min(open_price, close_price) - half_range pred_candles.append({ "time": future_time, "open": float(open_price), "high": float(high_price), "low": float(low_price), "close": float(close_price) }) # Confidence Metric (Standard Deviation of Residuals at this step) # We plot the error margin width relative to price sigma = residual_std[i] error_margin = future_close * sigma * 1.96 # 95% CI width approx confidence_data.append({ "time": future_time, "value": float(error_margin) }) prev_close = future_close return pred_candles, confidence_data async def process_market_data(): if not market_state['ready'] or not market_state['ohlc_history']: return {"error": "Initializing..."} df = calculate_indicators(market_state['ohlc_history']) if df is None or len(df) < 100: return {"error": "Not enough data"} # Retrain periodically if market_state['model'] is None or (time.time() - market_state['last_training_time'] > TRAIN_INTERVAL): try: loop = asyncio.get_running_loop() model, res_std = await loop.run_in_executor(executor, train_model, df) if model is not None: market_state['model'] = model market_state['model_residuals'] = res_std market_state['last_training_time'] = time.time() except Exception as e: logging.error(f"Training failed: {e}") predictions = [] confidence = [] try: predictions, confidence = get_prediction(df, market_state['model'], market_state['model_residuals']) except Exception as e: logging.error(f"Prediction failed: {e}") # Prepare display data df_clean = df.replace([np.inf, -np.inf], np.nan) cols_to_keep = ['time', 'open', 'high', 'low', 'close', 'volume', 'ema20', 'bb_upper', 'bb_lower', 'rsi', 'macd_hist'] df_clean = df_clean[cols_to_keep].where(pd.notnull(df_clean), None) last_close = float(df['close'].iloc[-1]) if len(df) > 0 else 0 first_close = float(df['close'].iloc[0]) if len(df) > 0 else 0 price_change = ((last_close - first_close) / first_close * 100) if first_close > 0 else 0 market_state['last_price'] = last_close market_state['price_change'] = price_change display_data = df_clean.tail(500).to_dict('records') last_row = df.iloc[-1] if len(df) > 0 else {} return { "data": display_data, "prediction": predictions, "confidence": confidence, "stats": { "price": last_close, "change": round(price_change, 2), "rsi": round(float(last_row.get('rsi', 0)), 1) if pd.notna(last_row.get('rsi')) else 0, "macd": round(float(last_row.get('macd', 0)), 2) if pd.notna(last_row.get('macd')) else 0, "atr": round(float(last_row.get('atr', 0)), 2) if pd.notna(last_row.get('atr')) else 0, "volume": round(float(last_row.get('volume', 0)), 2) if pd.notna(last_row.get('volume')) else 0 } } # --- HTML/JS --- HTML_PAGE = """ BTC/USD AI Predictor
BTC/USD
Price $--
Change --%
RSI --
ATR --
Connecting...
EMA 20--
BB Upper--
BB Lower--
MACD--
Price
AI Prediction
Forecast: 100 Candles
Volume
AI Uncertainty (±$)
RSI
MACD
""" async def fetch_initial_data(): try: async with aiohttp.ClientSession() as session: url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1" async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status == 200: data = await response.json() if 'result' in data: for key in data['result']: if key != 'last': raw = data['result'][key] market_state['ohlc_history'] = [ { 'time': int(c[0]), 'open': float(c[1]), 'high': float(c[2]), 'low': float(c[3]), 'close': float(c[4]), 'volume': float(c[6]) } for c in raw ] market_state['ready'] = True logging.info(f"Loaded {len(market_state['ohlc_history'])} initial candles") return True except Exception as e: logging.error(f"Initial data fetch error: {e}") return False async def kraken_rest_worker(): await fetch_initial_data() while True: try: async with aiohttp.ClientSession() as session: url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1" async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: if response.status == 200: data = await response.json() if 'result' in data: for key in data['result']: if key != 'last': raw = data['result'][key] new_candles = [ { 'time': int(c[0]), 'open': float(c[1]), 'high': float(c[2]), 'low': float(c[3]), 'close': float(c[4]), 'volume': float(c[6]) } for c in raw[-10:] ] if market_state['ohlc_history']: existing_times = {c['time'] for c in market_state['ohlc_history']} for nc in new_candles: if nc['time'] in existing_times: for i, ec in enumerate(market_state['ohlc_history']): if ec['time'] == nc['time']: market_state['ohlc_history'][i] = nc break else: market_state['ohlc_history'].append(nc) market_state['ohlc_history'].sort(key=lambda x: x['time']) if len(market_state['ohlc_history']) > MAX_HISTORY: market_state['ohlc_history'] = market_state['ohlc_history'][-MAX_HISTORY:] market_state['ready'] = True break except Exception as e: logging.warning(f"REST update error: {e}") await asyncio.sleep(5) async def broadcast_worker(): while True: if connected_clients and market_state['ready']: payload = await process_market_data() if payload and "data" in payload: msg = json.dumps(payload) # Iterate over a copy to avoid RuntimeError if set size changes current_clients = connected_clients.copy() disconnected = set() for ws in current_clients: try: await ws.send_str(msg) except Exception: disconnected.add(ws) if disconnected: connected_clients.difference_update(disconnected) await asyncio.sleep(BROADCAST_RATE) async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) connected_clients.add(ws) try: async for msg in ws: pass finally: connected_clients.discard(ws) return ws async def handle_index(request): return web.Response(text=HTML_PAGE, content_type='text/html') async def main(): app = web.Application() app.router.add_get('/', handle_index) app.router.add_get('/ws', websocket_handler) asyncio.create_task(kraken_rest_worker()) asyncio.create_task(broadcast_worker()) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() logging.info(f"Server running at http://localhost:{PORT}") await asyncio.Event().wait() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass