import asyncio import json from aiohttp import web import websockets import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # In-memory storage for symbol-specific data symbol_data = {} # HTML page with WebSocket client HTML_PAGE = """
hello world""" def extract_extrema(path): """Extract only local minima and maxima from a path.""" if len(path) < 3: return path extrema = [path[0]] # Always include start point for i in range(1, len(path) - 1): prev_val = path[i - 1] curr_val = path[i] next_val = path[i + 1] # Local maximum: higher than both neighbors if curr_val > prev_val and curr_val > next_val: extrema.append(curr_val) # Local minimum: lower than both neighbors elif curr_val < prev_val and curr_val < next_val: extrema.append(curr_val) extrema.append(path[-1]) # Always include end point return extrema async def index_handler(request): """Serve the main HTML page""" return web.Response(text=HTML_PAGE, content_type='text/html') async def websocket_handler(request): """Handle WebSocket connections for a specific symbol.""" symbol_url = request.match_info['symbol'].lower() if '-' not in symbol_url: logging.warning(f"Invalid symbol format request received: {symbol_url}") return web.HTTPBadRequest(text="Invalid symbol format. Use 'base-quote', e.g., btc-usd") symbol_parts = symbol_url.split('-') symbol_kraken = f"{symbol_parts[0].upper()}/{symbol_parts[1].upper()}" ws = web.WebSocketResponse() await ws.prepare(request) if symbol_kraken not in symbol_data: symbol_data[symbol_kraken] = { "clients": set(), "B": {}, "A": {}, "task": None, "lp": 0.0 } symbol_data[symbol_kraken]["task"] = asyncio.create_task(kraken_client(symbol_kraken)) symbol_data[symbol_kraken]["clients"].add(ws) logging.info(f"Client connected for {symbol_kraken}. Total clients for this symbol: {len(symbol_data[symbol_kraken]['clients'])}") try: async for msg in ws: pass finally: symbol_data[symbol_kraken]["clients"].discard(ws) logging.info(f"Client disconnected from {symbol_kraken}. Total clients for this symbol: {len(symbol_data[symbol_kraken]['clients'])}") if not symbol_data[symbol_kraken]["clients"]: logging.info(f"No more clients for {symbol_kraken}. Stopping Kraken task.") symbol_data[symbol_kraken]["task"].cancel() del symbol_data[symbol_kraken] return ws async def broadcast(symbol, data): """Broadcast data to all connected WebSocket clients for a specific symbol.""" if symbol in symbol_data and symbol_data[symbol]["clients"]: message = json.dumps(data) disconnected_clients = set() for client in symbol_data[symbol]["clients"]: try: if not client.closed: await client.send_str(message) except Exception: disconnected_clients.add(client) symbol_data[symbol]["clients"].difference_update(disconnected_clients) async def kraken_client(symbol): """Connect to Kraken and process order book data for a given symbol.""" data = symbol_data[symbol] B, A = data["B"], data["A"] while True: try: logging.info(f"Connecting to Kraken WebSocket for {symbol}...") async with websockets.connect("wss://ws.kraken.com/v2") as ws: logging.info(f"Connected to Kraken for {symbol}!") for channel in ["book", "ticker"]: params = {"channel": channel, "symbol": [symbol]} if channel == "book": params["depth"] = 1000 await ws.send(json.dumps({"method": "subscribe", "params": params})) async for msg in ws: d = json.loads(msg) ch = d.get("channel") if d.get('event') == 'heartbeat': continue if ch == "ticker": dat = d.get("data", [{}])[0] if 'last' in dat: data["lp"] = float(dat['last']) continue if ch == "book": dat = d.get("data", []) for x in dat: for k, s in [('bids', B), ('asks', A)]: for i in x.get(k, []): p, q = float(i['price']), float(i['qty']) if q == 0: s.pop(p, None) else: s[p] = q sb = sorted(B.items(), key=lambda x: -x[0]) sa = sorted(A.items()) if not (sb and sa): continue mid = (sb[0][0] + sa[0][0]) / 2 # --- Path of Least Resistance Calculation --- bi, ai = 0, 0 cur_b_qty = sb[0][1] cur_a_qty = sa[0][1] h = [mid] # Simulate 60 steps of volume absorption for _ in range(60): if bi >= len(sb) - 1 or ai >= len(sa) - 1: break if cur_b_qty > cur_a_qty: h.append(sa[ai][0]) cur_b_qty -= cur_a_qty ai += 1 cur_a_qty = sa[ai][1] else: h.append(sb[bi][0]) cur_a_qty -= cur_b_qty bi += 1 cur_b_qty = sb[bi][1] # Extract only local minima and maxima extrema = extract_extrema(h) # Broadcast live data + extrema points only await broadcast(symbol, { 'symbol': symbol, 'live': data["lp"], 'mid': mid, 'path': extrema }) print(f"\r[{symbol}] Live: ${data['lp']:.2f} | Mid: ${mid:.2f} | Extrema: {len(extrema)} points (from {len(h)}) ", end='', flush=True) except websockets.exceptions.ConnectionClosed as e: logging.warning(f"\nKraken connection for {symbol} closed: {e}. Reconnecting in 5s...") except asyncio.CancelledError: logging.info(f"\nKraken client for {symbol} cancelled.") break except Exception as e: logging.error(f"\nKraken error for {symbol}: {e}. Reconnecting in 5s...") B.clear() A.clear() await asyncio.sleep(5) async def main(): app = web.Application() app.router.add_get('/', index_handler) app.router.add_get('/{symbol}', websocket_handler) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', 7860) await site.start() print("=" * 50) print("🌐 HTTP Server: http://localhost:7860") print("🔌 WebSocket Server: ws://localhost:7860/