import asyncio import json import logging import time import math import aiohttp from collections import deque from aiohttp import web import websockets import statistics SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 HISTORY_LENGTH = 300 BROADCAST_RATE = 0.1 MICROPRICE_DECAY = 0.05 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') class OnlineStats: def __init__(self): self.count = 0 self.mean = 0.0 self.M2 = 0.0 def update(self, value): self.count += 1 delta = value - self.mean self.mean += delta / self.count delta2 = value - self.mean self.M2 += delta * delta2 @property def variance(self): if self.count < 2: return 0.0 return self.M2 / self.count @property def std_dev(self): return math.sqrt(self.variance) class KalmanVelocity: def __init__(self, R=0.001, Q=0.0001): self.z = 0.0 self.v = 0.0 self.P = 1.0 self.R = R self.Q = Q self.last_ts = time.time() def update(self, price): now = time.time() dt = now - self.last_ts self.last_ts = now if dt <= 0: return pred_z = self.z + self.v * dt pred_v = self.v p_cov = self.P + self.Q y = price - pred_z K = p_cov / (p_cov + self.R) self.z = pred_z + K * y self.v = pred_v + (K / dt) * y self.P = (1 - K) * p_cov market_state = { "bids": {}, "asks": {}, "history": [], "trade_history": deque(maxlen=2000), "ohlc_history": [], "current_vol_window": {"buy": 0.0, "sell": 0.0, "start": time.time()}, "current_mid": 0.0, "ready": False, "kalman": KalmanVelocity(), "stats": OnlineStats(), "walls": {"bids": [], "asks": []} } connected_clients = set() def detect_walls(order_book, side_name): if not order_book: return [] sorted_book = sorted(order_book.items(), key=lambda x: x[0], reverse=(side_name == 'bids')) relevant = sorted_book[:50] volumes = [q for p, q in relevant] if not volumes: return [] avg_vol = statistics.mean(volumes) std_vol = statistics.stdev(volumes) if len(volumes) > 1 else 0 walls = [] for p, q in relevant: if std_vol > 0: z = (q - avg_vol) / std_vol if z > 2.5: walls.append({'p': p, 'q': q, 'z': z}) return walls[:3] def calculate_weighted_micro_price(mid_price): bids = sorted(market_state['bids'].items(), reverse=True)[:50] asks = sorted(market_state['asks'].items())[:50] if not bids or not asks: return mid_price sum_wb = 0.0 sum_wa = 0.0 for p, q in bids: distance = abs(mid_price - p) weight = q * math.exp(-MICROPRICE_DECAY * distance) sum_wb += weight for p, q in asks: distance = abs(p - mid_price) weight = q * math.exp(-MICROPRICE_DECAY * distance) sum_wa += weight total_w = sum_wb + sum_wa if total_w == 0: return mid_price imbalance = (sum_wb - sum_wa) / total_w spread = asks[0][0] - bids[0][0] micro_price = mid_price + (imbalance * (spread / 2)) return micro_price def calculate_vwap_1m(): cutoff = time.time() - 60 v_sum = 0.0 pv_sum = 0.0 for trade in reversed(market_state['trade_history']): if trade['t'] < cutoff: break pv_sum += trade['p'] * trade['q'] v_sum += trade['q'] return pv_sum / v_sum if v_sum > 0 else market_state['current_mid'] def calculate_kyle_lambda(volatility, volume_window): if volume_window <= 0: return 0 return (volatility * 1000) / (math.sqrt(volume_window) + 1) def process_market_data(): if not market_state['ready']: return {"error": "Initializing..."} mid = market_state['current_mid'] now = time.time() market_state['stats'].update(mid) volatility = market_state['stats'].std_dev if volatility == 0: volatility = 1.0 market_state['kalman'].update(mid) micro_price = calculate_weighted_micro_price(mid) vwap = calculate_vwap_1m() ofi_buy = 0.0 ofi_sell = 0.0 ofi_window = 10.0 for t in reversed(market_state['trade_history']): if t['t'] < (now - ofi_window): break if t['side'] == 'buy': ofi_buy += t['q'] else: ofi_sell += t['q'] net_ofi = ofi_buy - ofi_sell total_vol_10s = ofi_buy + ofi_sell k_lambda = calculate_kyle_lambda(volatility, total_vol_10s) impact_term = net_ofi * k_lambda mean_reversion_alpha = 0.1 reversion_term = (vwap - mid) * mean_reversion_alpha micro_alpha = (micro_price - mid) * 0.8 trend_term = market_state['kalman'].v * 60.0 predicted_delta = impact_term + micro_alpha + trend_term + reversion_term pred_close = mid + predicted_delta sigma_1m = volatility * math.sqrt(60) pred_candle = { 'time': int(now) + 60, 'open': mid, 'close': pred_close, 'high': max(mid, pred_close) + (2 * sigma_1m), 'low': min(mid, pred_close) - (2 * sigma_1m) } if not market_state['history'] or (now - market_state['history'][-1]['t'] > 0.5): market_state['history'].append({'t': now, 'p': mid}) if len(market_state['history']) > HISTORY_LENGTH: market_state['history'].pop(0) bid_walls = detect_walls(market_state['bids'], 'bids') ask_walls = detect_walls(market_state['asks'], 'asks') analysis = { "projected": pred_close, "rho": (micro_price - mid), "vwap": vwap, "lambda": k_lambda } return { "mid": mid, "history": market_state['history'], "ohlc": market_state['ohlc_history'], "pred_candle": pred_candle, "analysis": analysis, "walls": {"bids": bid_walls, "asks": ask_walls} } HTML_PAGE = f""" {SYMBOL_KRAKEN} Quant
{SYMBOL_KRAKEN} MATH MODEL
---
PRICE (BLUE) vs PREDICTION (YELLOW) vs VWAP (WHITE)
1M KLINE + WALLS + GHOST PREDICTION (PURPLE)
Predicted Close (1m)
--% ---
MicroPrice + OFI Impact
VWAP Divergence 0.00
Kyle's Lambda (Liq. Cost) 0.00 Impact per Volume Unit
""" async def kraken_worker(): global market_state try: async with aiohttp.ClientSession() as session: url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1" async with session.get(url) as response: if response.status == 200: data = await response.json() if 'result' in data: raw = list(data['result'].values())[0] market_state['ohlc_history'] = [ {'time': int(c[0]), 'open': float(c[1]), 'high': float(c[2]), 'low': float(c[3]), 'close': float(c[4])} for c in raw[-120:] ] except Exception as e: logging.error(f"Init Error: {e}") while True: try: async with websockets.connect("wss://ws.kraken.com/v2") as ws: logging.info(f"Connected to Kraken ({SYMBOL_KRAKEN})") await ws.send(json.dumps({"method": "subscribe", "params": {"channel": "book", "symbol": [SYMBOL_KRAKEN], "depth": 100}})) await ws.send(json.dumps({"method": "subscribe", "params": {"channel": "trade", "symbol": [SYMBOL_KRAKEN]}})) await ws.send(json.dumps({"method": "subscribe", "params": {"channel": "ohlc", "symbol": [SYMBOL_KRAKEN], "interval": 1}})) async for message in ws: payload = json.loads(message) channel = payload.get("channel") data = payload.get("data", []) if channel == "book": for item in data: for bid in item.get('bids', []): market_state['bids'][float(bid['price'])] = float(bid['qty']) for ask in item.get('asks', []): market_state['asks'][float(ask['price'])] = float(ask['qty']) market_state['bids'] = {k: v for k, v in market_state['bids'].items() if v > 0} market_state['asks'] = {k: v for k, v in market_state['asks'].items() if v > 0} if market_state['bids'] and market_state['asks']: best_bid = max(market_state['bids'].keys()) best_ask = min(market_state['asks'].keys()) market_state['current_mid'] = (best_bid + best_ask) / 2 market_state['ready'] = True elif channel == "trade": for trade in data: try: t_obj = { 't': time.time(), 'p': float(trade['price']), 'q': float(trade['qty']), 'side': trade['side'] } market_state['trade_history'].append(t_obj) except: pass elif channel == "ohlc": for c in data: c_data = { 'time': int(float(c['endtime'])), 'open': float(c['open']), 'high': float(c['high']), 'low': float(c['low']), 'close': float(c['close']) } if market_state['ohlc_history'] and market_state['ohlc_history'][-1]['time'] == c_data['time']: market_state['ohlc_history'][-1] = c_data else: market_state['ohlc_history'].append(c_data) if len(market_state['ohlc_history']) > 100: market_state['ohlc_history'].pop(0) except Exception as e: logging.warning(f"Reconnecting: {e}") await asyncio.sleep(2) async def broadcast_worker(): while True: if connected_clients and market_state['ready']: payload = process_market_data() if "error" not in payload: msg = json.dumps(payload) for ws in list(connected_clients): try: await ws.send_str(msg) except: pass await asyncio.sleep(BROADCAST_RATE) async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) connected_clients.add(ws) try: async for msg in ws: pass finally: connected_clients.remove(ws) return ws async def handle_index(request): return web.Response(text=HTML_PAGE, content_type='text/html') async def start_background(app): app['kraken_task'] = asyncio.create_task(kraken_worker()) app['broadcast_task'] = asyncio.create_task(broadcast_worker()) async def cleanup_background(app): app['kraken_task'].cancel() app['broadcast_task'].cancel() async def main(): app = web.Application() app.router.add_get('/', handle_index) app.router.add_get('/ws', websocket_handler) app.on_startup.append(start_background) app.on_cleanup.append(cleanup_background) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() print(f"Quant Dashboard: http://localhost:{PORT}") await asyncio.Event().wait() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass