import asyncio import json import logging import time import math import aiohttp from collections import deque from aiohttp import web import websockets import statistics SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 HISTORY_LENGTH = 300 BROADCAST_RATE = 0.1 MICROPRICE_DECAY = 0.05 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') class OnlineStats: def __init__(self): self.count = 0 self.mean = 0.0 self.M2 = 0.0 def update(self, value): self.count += 1 delta = value - self.mean self.mean += delta / self.count delta2 = value - self.mean self.M2 += delta * delta2 @property def variance(self): if self.count < 2: return 0.0 return self.M2 / self.count @property def std_dev(self): return math.sqrt(self.variance) class KalmanVelocity: def __init__(self, R=0.001, Q=0.0001): self.z = 0.0 self.v = 0.0 self.P = 1.0 self.R = R self.Q = Q self.last_ts = time.time() def update(self, price): now = time.time() dt = now - self.last_ts self.last_ts = now if dt <= 0: return pred_z = self.z + self.v * dt pred_v = self.v p_cov = self.P + self.Q y = price - pred_z K = p_cov / (p_cov + self.R) self.z = pred_z + K * y self.v = pred_v + (K / dt) * y self.P = (1 - K) * p_cov market_state = { "bids": {}, "asks": {}, "history": [], "trade_history": deque(maxlen=2000), "ohlc_history": [], "current_vol_window": {"buy": 0.0, "sell": 0.0, "start": time.time()}, "current_mid": 0.0, "ready": False, "kalman": KalmanVelocity(), "stats": OnlineStats(), "walls": {"bids": [], "asks": []} } connected_clients = set() def detect_walls(order_book, side_name): if not order_book: return [] sorted_book = sorted(order_book.items(), key=lambda x: x[0], reverse=(side_name == 'bids')) relevant = sorted_book[:50] volumes = [q for p, q in relevant] if not volumes: return [] avg_vol = statistics.mean(volumes) std_vol = statistics.stdev(volumes) if len(volumes) > 1 else 0 walls = [] for p, q in relevant: if std_vol > 0: z = (q - avg_vol) / std_vol if z > 2.5: walls.append({'p': p, 'q': q, 'z': z}) return walls[:3] def calculate_weighted_micro_price(mid_price): bids = sorted(market_state['bids'].items(), reverse=True)[:50] asks = sorted(market_state['asks'].items())[:50] if not bids or not asks: return mid_price sum_wb = 0.0 sum_wa = 0.0 for p, q in bids: distance = abs(mid_price - p) weight = q * math.exp(-MICROPRICE_DECAY * distance) sum_wb += weight for p, q in asks: distance = abs(p - mid_price) weight = q * math.exp(-MICROPRICE_DECAY * distance) sum_wa += weight total_w = sum_wb + sum_wa if total_w == 0: return mid_price imbalance = (sum_wb - sum_wa) / total_w spread = asks[0][0] - bids[0][0] micro_price = mid_price + (imbalance * (spread / 2)) return micro_price def calculate_vwap_1m(): cutoff = time.time() - 60 v_sum = 0.0 pv_sum = 0.0 for trade in reversed(market_state['trade_history']): if trade['t'] < cutoff: break pv_sum += trade['p'] * trade['q'] v_sum += trade['q'] return pv_sum / v_sum if v_sum > 0 else market_state['current_mid'] def calculate_kyle_lambda(volatility, volume_window): if volume_window <= 0: return 0 return (volatility * 1000) / (math.sqrt(volume_window) + 1) def process_market_data(): if not market_state['ready']: return {"error": "Initializing..."} mid = market_state['current_mid'] now = time.time() market_state['stats'].update(mid) volatility = market_state['stats'].std_dev if volatility == 0: volatility = 1.0 market_state['kalman'].update(mid) micro_price = calculate_weighted_micro_price(mid) vwap = calculate_vwap_1m() ofi_buy = 0.0 ofi_sell = 0.0 ofi_window = 10.0 for t in reversed(market_state['trade_history']): if t['t'] < (now - ofi_window): break if t['side'] == 'buy': ofi_buy += t['q'] else: ofi_sell += t['q'] net_ofi = ofi_buy - ofi_sell total_vol_10s = ofi_buy + ofi_sell k_lambda = calculate_kyle_lambda(volatility, total_vol_10s) impact_term = net_ofi * k_lambda mean_reversion_alpha = 0.1 reversion_term = (vwap - mid) * mean_reversion_alpha micro_alpha = (micro_price - mid) * 0.8 trend_term = market_state['kalman'].v * 60.0 predicted_delta = impact_term + micro_alpha + trend_term + reversion_term pred_close = mid + predicted_delta sigma_1m = volatility * math.sqrt(60) pred_candle = { 'time': int(now) + 60, 'open': mid, 'close': pred_close, 'high': max(mid, pred_close) + (2 * sigma_1m), 'low': min(mid, pred_close) - (2 * sigma_1m) } if not market_state['history'] or (now - market_state['history'][-1]['t'] > 0.5): market_state['history'].append({'t': now, 'p': mid}) if len(market_state['history']) > HISTORY_LENGTH: market_state['history'].pop(0) bid_walls = detect_walls(market_state['bids'], 'bids') ask_walls = detect_walls(market_state['asks'], 'asks') analysis = { "projected": pred_close, "rho": (micro_price - mid), "vwap": vwap, "lambda": k_lambda } return { "mid": mid, "history": market_state['history'], "ohlc": market_state['ohlc_history'], "pred_candle": pred_candle, "analysis": analysis, "walls": {"bids": bid_walls, "asks": ask_walls} } HTML_PAGE = f"""