import asyncio import json import logging import time import math import aiohttp import pandas as pd import numpy as np from aiohttp import web from sklearn.ensemble import RandomForestRegressor from concurrent.futures import ThreadPoolExecutor # --- CONFIGURATION --- SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 BROADCAST_RATE = 1.0 PREDICTION_HORIZON = 100 MAX_HISTORY = 5000 TRAIN_INTERVAL = 300 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') market_state = { "ohlc_history": [], "ready": False, "model": None, "model_residuals": None, "last_training_time": 0, "last_price": 0, "price_change": 0 } connected_clients = set() executor = ThreadPoolExecutor(max_workers=1) def calculate_indicators(candles): if len(candles) < 100: return None df = pd.DataFrame(candles) cols = ['open', 'high', 'low', 'close', 'volume'] for c in cols: df[c] = df[c].astype(float) # Moving Averages df['ema20'] = df['close'].ewm(span=20, adjust=False).mean() df['ema50'] = df['close'].ewm(span=50, adjust=False).mean() # Bollinger Bands df['std'] = df['close'].rolling(window=20).std() df['bb_upper'] = df['ema20'] + (df['std'] * 2) df['bb_lower'] = df['ema20'] - (df['std'] * 2) # RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['rsi'] = 100 - (100 / (1 + rs)) # MACD k = df['close'].ewm(span=12, adjust=False).mean() d = df['close'].ewm(span=26, adjust=False).mean() df['macd'] = k - d df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() df['macd_hist'] = df['macd'] - df['macd_signal'] # ATR df['tr0'] = abs(df['high'] - df['low']) df['tr1'] = abs(df['high'] - df['close'].shift()) df['tr2'] = abs(df['low'] - df['close'].shift()) df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1) df['atr'] = df['tr'].rolling(window=14).mean() # Features df['dist_ema20'] = (df['close'] - df['ema20']) / df['ema20'] df['dist_ema50'] = (df['close'] - df['ema50']) / df['ema50'] df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['ema20'] df['bb_pos'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) df['vol_change'] = df['volume'].pct_change() df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) # Time encoding df['datetime'] = pd.to_datetime(df['time'], unit='s') df['hour_sin'] = np.sin(2 * np.pi * df['datetime'].dt.hour / 24) df['hour_cos'] = np.cos(2 * np.pi * df['datetime'].dt.hour / 24) # Lag Features for lag in [1, 2, 3, 5, 8]: df[f'rsi_lag{lag}'] = df['rsi'].shift(lag) df[f'macd_hist_lag{lag}'] = df['macd_hist'].shift(lag) df[f'log_ret_lag{lag}'] = df['log_ret'].shift(lag) df[f'vol_change_lag{lag}'] = df['vol_change'].shift(lag) return df def train_model(df): logging.info(f"Training ML Model on {len(df)} candles...") feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] for lag in [1, 2, 3, 5, 8]: feature_cols.extend([ f'rsi_lag{lag}', f'macd_hist_lag{lag}', f'log_ret_lag{lag}', f'vol_change_lag{lag}' ]) data = df.dropna().copy() # Prepare targets (Future Returns relative to Current Price) target_cols_dict = {} target_names = [] for i in range(1, PREDICTION_HORIZON + 1): col_name = f'target_return_{i}' # Return at step i relative to step 0 target_cols_dict[col_name] = (data['close'].shift(-i) - data['close']) / data['close'] target_names.append(col_name) targets_df = pd.DataFrame(target_cols_dict, index=data.index) data = pd.concat([data, targets_df], axis=1).dropna() if len(data) < 200: return None, None X = data[feature_cols].values y = data[target_names].values model = RandomForestRegressor( n_estimators=200, max_depth=20, min_samples_split=5, min_samples_leaf=2, max_features='sqrt', n_jobs=-1, random_state=42 ) model.fit(X, y) # Calculate Residuals for Confidence Estimation # (Using OOB or training residuals as a proxy for uncertainty) predictions = model.predict(X) residuals = y - predictions residual_std = np.std(residuals, axis=0) return model, residual_std def get_prediction(df, model, residual_std): if model is None or residual_std is None: return [], [] feature_cols = [ 'rsi', 'macd_hist', 'atr', 'dist_ema20', 'dist_ema50', 'bb_width', 'bb_pos', 'vol_change', 'log_ret', 'hour_sin', 'hour_cos' ] for lag in [1, 2, 3, 5, 8]: feature_cols.extend([ f'rsi_lag{lag}', f'macd_hist_lag{lag}', f'log_ret_lag{lag}', f'vol_change_lag{lag}' ]) last_row = df.iloc[[-1]][feature_cols] if last_row.isnull().values.any(): return [], [] predicted_returns = model.predict(last_row.values)[0] current_price = df.iloc[-1]['close'] current_time = int(df.iloc[-1]['time']) current_atr = df.iloc[-1]['atr'] pred_candles = [] confidence_data = [] prev_close = current_price for i, pct_change in enumerate(predicted_returns): future_time = current_time + ((i + 1) * 60) # Calculate predicted Close future_close = current_price * (1 + pct_change) # Construct Candle # Open is previous candle's close open_price = prev_close close_price = future_close # Heuristic for High/Low to make it look like a candle # Use ATR and some noise or just fixed ratio to visualize structure # Here we use a fixed structure based on ATR to keep it clean but candle-like half_range = current_atr * 0.4 high_price = max(open_price, close_price) + half_range low_price = min(open_price, close_price) - half_range pred_candles.append({ "time": future_time, "open": float(open_price), "high": float(high_price), "low": float(low_price), "close": float(close_price) }) # Confidence Metric (Standard Deviation of Residuals at this step) # We plot the error margin width relative to price sigma = residual_std[i] error_margin = future_close * sigma * 1.96 # 95% CI width approx confidence_data.append({ "time": future_time, "value": float(error_margin) }) prev_close = future_close return pred_candles, confidence_data async def process_market_data(): if not market_state['ready'] or not market_state['ohlc_history']: return {"error": "Initializing..."} df = calculate_indicators(market_state['ohlc_history']) if df is None or len(df) < 100: return {"error": "Not enough data"} # Retrain periodically if market_state['model'] is None or (time.time() - market_state['last_training_time'] > TRAIN_INTERVAL): try: loop = asyncio.get_running_loop() model, res_std = await loop.run_in_executor(executor, train_model, df) if model is not None: market_state['model'] = model market_state['model_residuals'] = res_std market_state['last_training_time'] = time.time() except Exception as e: logging.error(f"Training failed: {e}") predictions = [] confidence = [] try: predictions, confidence = get_prediction(df, market_state['model'], market_state['model_residuals']) except Exception as e: logging.error(f"Prediction failed: {e}") # Prepare display data df_clean = df.replace([np.inf, -np.inf], np.nan) cols_to_keep = ['time', 'open', 'high', 'low', 'close', 'volume', 'ema20', 'bb_upper', 'bb_lower', 'rsi', 'macd_hist'] df_clean = df_clean[cols_to_keep].where(pd.notnull(df_clean), None) last_close = float(df['close'].iloc[-1]) if len(df) > 0 else 0 first_close = float(df['close'].iloc[0]) if len(df) > 0 else 0 price_change = ((last_close - first_close) / first_close * 100) if first_close > 0 else 0 market_state['last_price'] = last_close market_state['price_change'] = price_change display_data = df_clean.tail(500).to_dict('records') last_row = df.iloc[-1] if len(df) > 0 else {} return { "data": display_data, "prediction": predictions, "confidence": confidence, "stats": { "price": last_close, "change": round(price_change, 2), "rsi": round(float(last_row.get('rsi', 0)), 1) if pd.notna(last_row.get('rsi')) else 0, "macd": round(float(last_row.get('macd', 0)), 2) if pd.notna(last_row.get('macd')) else 0, "atr": round(float(last_row.get('atr', 0)), 2) if pd.notna(last_row.get('atr')) else 0, "volume": round(float(last_row.get('volume', 0)), 2) if pd.notna(last_row.get('volume')) else 0 } } # --- HTML/JS --- HTML_PAGE = """