import asyncio import json import logging import time import bisect import random from aiohttp import web import websockets # --- Configuration --- SYMBOL_KRAKEN = "BTC/USD" PORT = 7860 HISTORY_LENGTH = 300 # --- Logging --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') # --- In-Memory State --- market_state = { "bids": {}, "asks": {}, "history": [], "current_mid": 0.0, "prev_mid": 0.0, "ready": False, "last_comment_time": 0 } # --- AI Logic Helper --- def generate_ai_commentary(diff_y, mid, prev_mid): if not diff_y: return {"text": "Initializing analysis...", "sentiment": "neutral", "net": 0} # 1. Calculate Aggregates net_total = diff_y[-1] # Total Net Liquidity at max depth # 2. Price Trend price_delta = mid - prev_mid # 3. Logic Engine msg = "" sentiment = "neutral" # -- SCENARIO 1: EXTREME IMBALANCE -- if net_total > 50: sentiment = "bullish" msg = f"🚀 STRONG BUY WALL: Net surplus of {int(net_total)} BTC. Orderbook tilted heavily to Bids." elif net_total < -50: sentiment = "bearish" msg = f"📉 HEAVY SELL WALL: Net deficit of {int(net_total)} BTC. Sellers dominating." # -- SCENARIO 2: ABSORPTION (Price moving vs Liquidity) -- elif price_delta < -5 and net_total > 20: sentiment = "warning" msg = f"🛡️ ABSORPTION: Price dropping, but Buyers are stepping in (+{int(net_total)} BTC surplus)." elif price_delta > 5 and net_total < -20: sentiment = "warning" msg = f"🧱 RESISTANCE: Price rising into Sell liquidity ({int(net_total)} BTC deficit)." # -- SCENARIO 3: CHOP / EQUILIBRIUM -- elif abs(net_total) < 10: sentiment = "neutral" msg = f"⚖️ EQUILIBRIUM: Liquidity is balanced. Waiting for a breakout trigger." # -- SCENARIO 4: MOMENTUM -- else: if net_total > 0: msg = f"📈 Bullish Bias: Moderate buy support (+{int(net_total)} BTC)." sentiment = "bullish" else: msg = f"📉 Bearish Bias: Moderate sell pressure ({int(net_total)} BTC)." sentiment = "bearish" return {"text": msg, "sentiment": sentiment, "net": net_total} # --- HTML Frontend --- HTML_PAGE = f""" BTC-USD AI Hybrid Dashboard
Connecting...
> AI MARKET COMMENTARY
""" async def kraken_worker(): global market_state while True: try: async with websockets.connect("wss://ws.kraken.com/v2") as ws: logging.info(f"🔌 Connected to Kraken ({SYMBOL_KRAKEN})") await ws.send(json.dumps({ "method": "subscribe", "params": {"channel": "book", "symbol": [SYMBOL_KRAKEN], "depth": 500} })) async for message in ws: payload = json.loads(message) channel = payload.get("channel") data_entries = payload.get("data", []) if channel == "book": for item in data_entries: for bid in item.get('bids', []): q, p = float(bid['qty']), float(bid['price']) if q == 0: market_state['bids'].pop(p, None) else: market_state['bids'][p] = q for ask in item.get('asks', []): q, p = float(ask['qty']), float(ask['price']) if q == 0: market_state['asks'].pop(p, None) else: market_state['asks'][p] = q if market_state['bids'] and market_state['asks']: best_bid = max(market_state['bids'].keys()) best_ask = min(market_state['asks'].keys()) market_state['prev_mid'] = market_state['current_mid'] mid = (best_bid + best_ask) / 2 market_state['current_mid'] = mid market_state['ready'] = True now = time.time() if not market_state['history'] or (now - market_state['history'][-1]['t'] > 0.5): market_state['history'].append({'t': now, 'p': mid}) if len(market_state['history']) > HISTORY_LENGTH: market_state['history'].pop(0) except Exception as e: logging.warning(f"⚠️ Reconnecting: {e}") await asyncio.sleep(3) async def handle_index(request): return web.Response(text=HTML_PAGE, content_type='text/html') async def handle_data(request): if not market_state['ready']: return web.json_response({"error": "Initializing..."}) mid = market_state['current_mid'] # --- Prepare Data --- raw_bids = sorted(market_state['bids'].items(), key=lambda x: -x[0])[:300] raw_asks = sorted(market_state['asks'].items(), key=lambda x: x[0])[:300] # Calculate Distances & Cum Volumes d_b_x, d_b_y, cum = [], [], 0 for p, q in raw_bids: d = mid - p if d >= 0: cum += q d_b_x.append(d); d_b_y.append(cum) d_a_x, d_a_y, cum = [], [], 0 for p, q in raw_asks: d = p - mid if d >= 0: cum += q d_a_x.append(d); d_a_y.append(cum) # --- Calculate Net Liquidity Array (Diff) --- diff_x, diff_y = [], [] if d_b_x and d_a_x: max_dist = min(d_b_x[-1], d_a_x[-1]) step_size = max_dist / 100 steps = [i * step_size for i in range(1, 101)] for s in steps: idx_b = bisect.bisect_right(d_b_x, s) vol_b = d_b_y[idx_b-1] if idx_b > 0 else 0 idx_a = bisect.bisect_right(d_a_x, s) vol_a = d_a_y[idx_a-1] if idx_a > 0 else 0 val = vol_b - vol_a diff_x.append(s) diff_y.append(val) # --- Generate AI Commentary --- ai_output = generate_ai_commentary(diff_y, mid, market_state['prev_mid']) return web.json_response({ "mid": mid, "prev_mid": market_state['prev_mid'], "vol": { "dist_bids": d_b_x, "vol_bids": d_b_y, "dist_asks": d_a_x, "vol_asks": d_a_y }, "diff": { "x": diff_x, "y": diff_y }, "comment": ai_output, "history": market_state['history'] }) async def start_background(app): app['kraken_task'] = asyncio.create_task(kraken_worker()) async def cleanup_background(app): app['kraken_task'].cancel() try: await app['kraken_task'] except asyncio.CancelledError: pass async def main(): app = web.Application() app.router.add_get('/', handle_index) app.router.add_get('/data', handle_data) app.on_startup.append(start_background) app.on_cleanup.append(cleanup_background) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', PORT) await site.start() print(f"🚀 BTC-USD AI+Graph Dashboard: http://localhost:{PORT}") await asyncio.Event().wait() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass