import asyncio import json import logging import time import bisect import math from aiohttp import web import websockets # --- Configuration --- SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 HISTORY_LENGTH = 300 # --- Logging --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') # --- In-Memory State --- market_state = { "bids": {}, "asks": {}, "history": [], "current_mid": 0.0, "ready": False } # --- HTML Frontend (Plotly.js) --- HTML_PAGE = f""" BTC-USD Mean Pressure
Connecting...
""" async def kraken_worker(): global market_state while True: try: async with websockets.connect("wss://ws.kraken.com/v2") as ws: logging.info(f"🔌 Connected to Kraken ({SYMBOL_KRAKEN})") await ws.send(json.dumps({ "method": "subscribe", "params": {"channel": "book", "symbol": [SYMBOL_KRAKEN], "depth": 500} })) async for message in ws: payload = json.loads(message) channel = payload.get("channel") data_entries = payload.get("data", []) if channel == "book": for item in data_entries: for bid in item.get('bids', []): q, p = float(bid['qty']), float(bid['price']) if q == 0: market_state['bids'].pop(p, None) else: market_state['bids'][p] = q for ask in item.get('asks', []): q, p = float(ask['qty']), float(ask['price']) if q == 0: market_state['asks'].pop(p, None) else: market_state['asks'][p] = q if market_state['bids'] and market_state['asks']: best_bid = max(market_state['bids'].keys()) best_ask = min(market_state['asks'].keys()) mid = (best_bid + best_ask) / 2 market_state['current_mid'] = mid market_state['ready'] = True now = time.time() if not market_state['history'] or (now - market_state['history'][-1]['t'] > 0.5): market_state['history'].append({'t': now, 'p': mid}) if len(market_state['history']) > HISTORY_LENGTH: market_state['history'].pop(0) except Exception as e: logging.warning(f"⚠️ Reconnecting: {e}") await asyncio.sleep(3) async def handle_index(request): return web.Response(text=HTML_PAGE, content_type='text/html') async def handle_data(request): if not market_state['ready']: return web.json_response({"error": "Initializing..."}) mid = market_state['current_mid'] # --- Raw Data --- raw_bids = sorted(market_state['bids'].items(), key=lambda x: -x[0])[:300] raw_asks = sorted(market_state['asks'].items(), key=lambda x: x[0])[:300] # --- Distance Data --- dist_bids = [(mid - p, q) for p, q in raw_bids if mid - p >= 0] d_b_x, d_b_y = [], [] cum_d = 0 for d, q in dist_bids: cum_d += q d_b_x.append(d); d_b_y.append(cum_d) dist_asks = [(p - mid, q) for p, q in raw_asks if p - mid >= 0] d_a_x, d_a_y = [], [] cum_d = 0 for d, q in dist_asks: cum_d += q d_a_x.append(d); d_a_y.append(cum_d) # --- Calculations --- imb_x, imb_y = [], [] mean_x, mean_y = [], [] running_integral = 0 if d_b_x and d_a_x: max_dist = min(d_b_x[-1], d_a_x[-1]) step_size = max_dist / 100 steps = [i * step_size for i in range(1, 101)] for i, s in enumerate(steps): # 1. Get Volumes idx_b = bisect.bisect_right(d_b_x, s) vol_b = d_b_y[idx_b-1] if idx_b > 0 else 0 idx_a = bisect.bisect_right(d_a_x, s) vol_a = d_a_y[idx_a-1] if idx_a > 0 else 0 # 2. Imbalance % (-100 to 100) total_vol = vol_b + vol_a pct = ((vol_b - vol_a) / total_vol) * 100 if total_vol > 0 else 0 imb_x.append(s) imb_y.append(pct) # 3. Mean Pressure # Integral / Number of Steps running_integral += pct step_count = i + 1 mean_pressure = running_integral / step_count # Normalize to -1..1 (divide by 100%) norm_mean = mean_pressure / 100.0 mean_x.append(s) mean_y.append(norm_mean) return web.json_response({ "mid": mid, "prev_mid": market_state['history'][-2]['p'] if len(market_state['history']) > 1 else mid, "imbalance_vol": { "dist_bids": d_b_x, "vol_bids": d_b_y, "dist_asks": d_a_x, "vol_asks": d_a_y }, "imbalance_pct": { "x": imb_x, "y": imb_y }, "mean": { "x": mean_x, "y": mean_y, "last_val": mean_y[-1] if mean_y else 0 }, "history": market_state['history'] }) async def start_background(app): app['kraken_task'] = asyncio.create_task(kraken_worker()) async def cleanup_background(app): app['kraken_task'].cancel() try: await app['kraken_task'] except asyncio.CancelledError: pass async def main(): app = web.Application() app.router.add_get('/', handle_index) app.router.add_get('/data', handle_data) app.on_startup.append(start_background) app.on_cleanup.append(cleanup_background) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() print(f"🚀 BTC-USD Mean Pressure Dashboard: http://localhost:{PORT}") await asyncio.Event().wait() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass