import asyncio import json import logging import time import bisect import math import statistics from datetime import datetime from aiohttp import web import websockets # --- CONFIGURATION --- SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 HISTORY_LENGTH = 300 BROADCAST_RATE = 0.1 # Mathematical Constants DECAY_LAMBDA = 50.0 IMPACT_SENSITIVITY = 2.0 WALL_DAMPENING = 0.8 Z_SCORE_THRESHOLD = 3.0 WALL_LOOKBACK = 200 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') # --- GLOBAL STATE --- market_state = { "bids": {}, "asks": {}, "history": [], "pred_history": [], "trade_vol_history": [], # New: Store trade volume history "current_vol_window": {"buy": 0.0, "sell": 0.0, "start": time.time()}, "current_mid": 0.0, "ready": False } connected_clients = set() # --- QUANTITATIVE METHODS --- def detect_anomalies(orders, scan_depth): if len(orders) < 10: return [] relevant_orders = orders[:scan_depth] volumes = [q for p, q in relevant_orders] if not volumes: return [] try: avg_vol = statistics.mean(volumes) stdev_vol = statistics.stdev(volumes) except statistics.StatisticsError: return [] if stdev_vol == 0: return [] walls = [] for price, qty in relevant_orders: z_score = (qty - avg_vol) / stdev_vol if z_score > Z_SCORE_THRESHOLD: walls.append({"price": price, "vol": qty, "z_score": z_score}) walls.sort(key=lambda x: x['z_score'], reverse=True) return walls[:3] def calculate_micro_price_structure(diff_x, diff_y_net, current_mid, best_bid, best_ask, walls): if not diff_x or len(diff_x) < 5: return None # Weighted Imbalance Calculation weighted_imbalance = 0.0 total_weight = 0.0 for i in range(len(diff_x)): dist = diff_x[i] net_vol = diff_y_net[i] weight = math.exp(-dist / DECAY_LAMBDA) weighted_imbalance += net_vol * weight total_weight += weight rho = weighted_imbalance / total_weight if total_weight > 0 else 0 # Base Projection spread = best_ask - best_bid theoretical_delta = (spread / 2) * rho * IMPACT_SENSITIVITY projected_price = current_mid + theoretical_delta # Wall Friction final_delta = theoretical_delta if final_delta > 0 and walls['asks']: nearest_wall = walls['asks'][0] if projected_price >= nearest_wall['price']: damp_factor = 1.0 / (1.0 + (nearest_wall['z_score'] * 0.2)) final_delta *= damp_factor elif final_delta < 0 and walls['bids']: nearest_wall = walls['bids'][0] if projected_price <= nearest_wall['price']: damp_factor = 1.0 / (1.0 + (nearest_wall['z_score'] * 0.2)) final_delta *= damp_factor return { "projected": current_mid + final_delta, "rho": rho } def process_market_data(): if not market_state['ready']: return {"error": "Initializing..."} mid = market_state['current_mid'] # Process Trade Volume Window (Reset every 1 second) now = time.time() if now - market_state['current_vol_window']['start'] >= 1.0: market_state['trade_vol_history'].append({ 't': now, 'buy': market_state['current_vol_window']['buy'], 'sell': market_state['current_vol_window']['sell'] }) if len(market_state['trade_vol_history']) > 60: # Keep last 60 seconds market_state['trade_vol_history'].pop(0) # Reset window market_state['current_vol_window'] = {"buy": 0.0, "sell": 0.0, "start": now} # Order Book Processing sorted_bids = sorted(market_state['bids'].items(), key=lambda x: -x[0]) sorted_asks = sorted(market_state['asks'].items(), key=lambda x: x[0]) if not sorted_bids or not sorted_asks: return {"error": "Empty Book"} best_bid = sorted_bids[0][0] best_ask = sorted_asks[0][0] bid_walls = detect_anomalies(sorted_bids, WALL_LOOKBACK) ask_walls = detect_anomalies(sorted_asks, WALL_LOOKBACK) d_b_x, d_b_y, cum = [], [], 0 for p, q in sorted_bids[:300]: d = mid - p if d >= 0: cum += q d_b_x.append(d); d_b_y.append(cum) d_a_x, d_a_y, cum = [], [], 0 for p, q in sorted_asks[:300]: d = p - mid if d >= 0: cum += q d_a_x.append(d); d_a_y.append(cum) diff_x, diff_y_net = [], [] chart_bids, chart_asks = [], [] if d_b_x and d_a_x: max_dist = min(d_b_x[-1], d_a_x[-1]) step_size = max_dist / 100 steps = [i * step_size for i in range(1, 101)] for s in steps: idx_b = bisect.bisect_right(d_b_x, s) vol_b = d_b_y[idx_b-1] if idx_b > 0 else 0 idx_a = bisect.bisect_right(d_a_x, s) vol_a = d_a_y[idx_a-1] if idx_a > 0 else 0 diff_x.append(s) diff_y_net.append(vol_b - vol_a) chart_bids.append(vol_b) chart_asks.append(vol_a) analysis = calculate_micro_price_structure( diff_x, diff_y_net, mid, best_bid, best_ask, {"bids": bid_walls, "asks": ask_walls} ) if analysis: if not market_state['pred_history'] or (now - market_state['pred_history'][-1]['t'] > 0.5): market_state['pred_history'].append({'t': now, 'p': analysis['projected']}) if len(market_state['pred_history']) > HISTORY_LENGTH: market_state['pred_history'].pop(0) return { "mid": mid, "history": market_state['history'], "pred_history": market_state['pred_history'], "trade_history": market_state['trade_vol_history'], "depth_x": diff_x, "depth_net": diff_y_net, "depth_bids": chart_bids, "depth_asks": chart_asks, "analysis": analysis, "walls": {"bids": bid_walls, "asks": ask_walls} } # --- FRONTEND --- HTML_PAGE = f""" {SYMBOL_KRAKEN}
{SYMBOL_KRAKEN} ---
00:00:00 UTC
PRICE ACTION (BLUE) // PREDICTION (YELLOW)
LIQUIDITY DENSITY
ORDER FLOW IMBALANCE
Micro-Price Delta
--% ---
OFI Imbalance Ratio 0.00
Detected Walls (Z > 3.0)
Scanning...
Real-time Volume (Ticks)
""" # --- SERVER --- async def kraken_worker(): global market_state while True: try: async with websockets.connect("wss://ws.kraken.com/v2") as ws: logging.info(f"🔌 Connected to Kraken ({SYMBOL_KRAKEN})") # SUBSCRIBE TO BOOK AND TRADES await ws.send(json.dumps({ "method": "subscribe", "params": {"channel": "book", "symbol": [SYMBOL_KRAKEN], "depth": 500} })) await ws.send(json.dumps({ "method": "subscribe", "params": {"channel": "trade", "symbol": [SYMBOL_KRAKEN]} })) async for message in ws: payload = json.loads(message) channel = payload.get("channel") data = payload.get("data", []) if channel == "book": for item in data: for bid in item.get('bids', []): q, p = float(bid['qty']), float(bid['price']) if q == 0: market_state['bids'].pop(p, None) else: market_state['bids'][p] = q for ask in item.get('asks', []): q, p = float(ask['qty']), float(ask['price']) if q == 0: market_state['asks'].pop(p, None) else: market_state['asks'][p] = q if market_state['bids'] and market_state['asks']: best_bid = max(market_state['bids'].keys()) best_ask = min(market_state['asks'].keys()) mid = (best_bid + best_ask) / 2 market_state['prev_mid'] = market_state['current_mid'] market_state['current_mid'] = mid market_state['ready'] = True now = time.time() if not market_state['history'] or (now - market_state['history'][-1]['t'] > 0.5): market_state['history'].append({'t': now, 'p': mid}) if len(market_state['history']) > HISTORY_LENGTH: market_state['history'].pop(0) elif channel == "trade": # Process trades for volume history for trade in data: # Kraken Trade format: [price, qty, time, side, order_type, misc] # side: 'buy' or 'sell' try: qty = float(trade['qty']) side = trade['side'] # 'buy' or 'sell' if side == 'buy': market_state['current_vol_window']['buy'] += qty else: market_state['current_vol_window']['sell'] += qty except: pass except Exception as e: logging.warning(f"⚠️ Reconnecting: {e}") await asyncio.sleep(3) async def broadcast_worker(): while True: if connected_clients and market_state['ready']: payload = process_market_data() msg = json.dumps(payload) for ws in list(connected_clients): try: await ws.send_str(msg) except: pass await asyncio.sleep(BROADCAST_RATE) async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) connected_clients.add(ws) try: async for msg in ws: pass finally: connected_clients.remove(ws) return ws async def handle_index(request): return web.Response(text=HTML_PAGE, content_type='text/html') async def start_background(app): app['kraken_task'] = asyncio.create_task(kraken_worker()) app['broadcast_task'] = asyncio.create_task(broadcast_worker()) async def cleanup_background(app): app['kraken_task'].cancel() app['broadcast_task'].cancel() try: await app['kraken_task']; await app['broadcast_task'] except: pass async def main(): app = web.Application() app.router.add_get('/', handle_index) app.router.add_get('/ws', websocket_handler) app.on_startup.append(start_background) app.on_cleanup.append(cleanup_background) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() print(f"🚀 Quant Dashboard: http://localhost:{PORT}") await asyncio.Event().wait() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass