|
|
import asyncio |
|
|
import json |
|
|
from aiohttp import web |
|
|
import websockets |
|
|
import logging |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
|
|
|
symbol_data = {} |
|
|
|
|
|
|
|
|
HTML_PAGE = """<!DOCTYPE html><html><body>hello world</body></html>""" |
|
|
|
|
|
def extract_extrema(path): |
|
|
"""Extract only local minima and maxima from a path.""" |
|
|
if len(path) < 3: |
|
|
return path |
|
|
|
|
|
extrema = [path[0]] |
|
|
|
|
|
for i in range(1, len(path) - 1): |
|
|
prev_val = path[i - 1] |
|
|
curr_val = path[i] |
|
|
next_val = path[i + 1] |
|
|
|
|
|
|
|
|
if curr_val > prev_val and curr_val > next_val: |
|
|
extrema.append(curr_val) |
|
|
|
|
|
elif curr_val < prev_val and curr_val < next_val: |
|
|
extrema.append(curr_val) |
|
|
|
|
|
extrema.append(path[-1]) |
|
|
|
|
|
return extrema |
|
|
|
|
|
async def index_handler(request): |
|
|
"""Serve the main HTML page""" |
|
|
return web.Response(text=HTML_PAGE, content_type='text/html') |
|
|
|
|
|
async def websocket_handler(request): |
|
|
"""Handle WebSocket connections for a specific symbol.""" |
|
|
symbol_url = request.match_info['symbol'].lower() |
|
|
|
|
|
if '-' not in symbol_url: |
|
|
logging.warning(f"Invalid symbol format request received: {symbol_url}") |
|
|
return web.HTTPBadRequest(text="Invalid symbol format. Use 'base-quote', e.g., btc-usd") |
|
|
|
|
|
symbol_parts = symbol_url.split('-') |
|
|
symbol_kraken = f"{symbol_parts[0].upper()}/{symbol_parts[1].upper()}" |
|
|
|
|
|
ws = web.WebSocketResponse() |
|
|
await ws.prepare(request) |
|
|
|
|
|
if symbol_kraken not in symbol_data: |
|
|
symbol_data[symbol_kraken] = { |
|
|
"clients": set(), |
|
|
"B": {}, |
|
|
"A": {}, |
|
|
"task": None, |
|
|
"lp": 0.0 |
|
|
} |
|
|
symbol_data[symbol_kraken]["task"] = asyncio.create_task(kraken_client(symbol_kraken)) |
|
|
|
|
|
symbol_data[symbol_kraken]["clients"].add(ws) |
|
|
logging.info(f"Client connected for {symbol_kraken}. Total clients for this symbol: {len(symbol_data[symbol_kraken]['clients'])}") |
|
|
|
|
|
try: |
|
|
async for msg in ws: |
|
|
pass |
|
|
finally: |
|
|
symbol_data[symbol_kraken]["clients"].discard(ws) |
|
|
logging.info(f"Client disconnected from {symbol_kraken}. Total clients for this symbol: {len(symbol_data[symbol_kraken]['clients'])}") |
|
|
if not symbol_data[symbol_kraken]["clients"]: |
|
|
logging.info(f"No more clients for {symbol_kraken}. Stopping Kraken task.") |
|
|
symbol_data[symbol_kraken]["task"].cancel() |
|
|
del symbol_data[symbol_kraken] |
|
|
|
|
|
return ws |
|
|
|
|
|
async def broadcast(symbol, data): |
|
|
"""Broadcast data to all connected WebSocket clients for a specific symbol.""" |
|
|
if symbol in symbol_data and symbol_data[symbol]["clients"]: |
|
|
message = json.dumps(data) |
|
|
disconnected_clients = set() |
|
|
for client in symbol_data[symbol]["clients"]: |
|
|
try: |
|
|
if not client.closed: |
|
|
await client.send_str(message) |
|
|
except Exception: |
|
|
disconnected_clients.add(client) |
|
|
symbol_data[symbol]["clients"].difference_update(disconnected_clients) |
|
|
|
|
|
async def kraken_client(symbol): |
|
|
"""Connect to Kraken and process order book data for a given symbol.""" |
|
|
data = symbol_data[symbol] |
|
|
B, A = data["B"], data["A"] |
|
|
|
|
|
while True: |
|
|
try: |
|
|
logging.info(f"Connecting to Kraken WebSocket for {symbol}...") |
|
|
async with websockets.connect("wss://ws.kraken.com/v2") as ws: |
|
|
logging.info(f"Connected to Kraken for {symbol}!") |
|
|
|
|
|
for channel in ["book", "ticker"]: |
|
|
params = {"channel": channel, "symbol": [symbol]} |
|
|
if channel == "book": |
|
|
params["depth"] = 1000 |
|
|
await ws.send(json.dumps({"method": "subscribe", "params": params})) |
|
|
|
|
|
async for msg in ws: |
|
|
d = json.loads(msg) |
|
|
ch = d.get("channel") |
|
|
|
|
|
if d.get('event') == 'heartbeat': |
|
|
continue |
|
|
|
|
|
if ch == "ticker": |
|
|
dat = d.get("data", [{}])[0] |
|
|
if 'last' in dat: |
|
|
data["lp"] = float(dat['last']) |
|
|
continue |
|
|
|
|
|
if ch == "book": |
|
|
dat = d.get("data", []) |
|
|
for x in dat: |
|
|
for k, s in [('bids', B), ('asks', A)]: |
|
|
for i in x.get(k, []): |
|
|
p, q = float(i['price']), float(i['qty']) |
|
|
if q == 0: |
|
|
s.pop(p, None) |
|
|
else: |
|
|
s[p] = q |
|
|
|
|
|
sb = sorted(B.items(), key=lambda x: -x[0]) |
|
|
sa = sorted(A.items()) |
|
|
if not (sb and sa): |
|
|
continue |
|
|
|
|
|
mid = (sb[0][0] + sa[0][0]) / 2 |
|
|
|
|
|
|
|
|
bi, ai = 0, 0 |
|
|
cur_b_qty = sb[0][1] |
|
|
cur_a_qty = sa[0][1] |
|
|
h = [mid] |
|
|
|
|
|
|
|
|
for _ in range(60): |
|
|
if bi >= len(sb) - 1 or ai >= len(sa) - 1: |
|
|
break |
|
|
if cur_b_qty > cur_a_qty: |
|
|
h.append(sa[ai][0]) |
|
|
cur_b_qty -= cur_a_qty |
|
|
ai += 1 |
|
|
cur_a_qty = sa[ai][1] |
|
|
else: |
|
|
h.append(sb[bi][0]) |
|
|
cur_a_qty -= cur_b_qty |
|
|
bi += 1 |
|
|
cur_b_qty = sb[bi][1] |
|
|
|
|
|
|
|
|
extrema = extract_extrema(h) |
|
|
|
|
|
|
|
|
await broadcast(symbol, { |
|
|
'symbol': symbol, |
|
|
'live': data["lp"], |
|
|
'mid': mid, |
|
|
'path': extrema |
|
|
}) |
|
|
|
|
|
print(f"\r[{symbol}] Live: ${data['lp']:.2f} | Mid: ${mid:.2f} | Extrema: {len(extrema)} points (from {len(h)}) ", end='', flush=True) |
|
|
|
|
|
except websockets.exceptions.ConnectionClosed as e: |
|
|
logging.warning(f"\nKraken connection for {symbol} closed: {e}. Reconnecting in 5s...") |
|
|
except asyncio.CancelledError: |
|
|
logging.info(f"\nKraken client for {symbol} cancelled.") |
|
|
break |
|
|
except Exception as e: |
|
|
logging.error(f"\nKraken error for {symbol}: {e}. Reconnecting in 5s...") |
|
|
|
|
|
B.clear() |
|
|
A.clear() |
|
|
await asyncio.sleep(5) |
|
|
|
|
|
async def main(): |
|
|
app = web.Application() |
|
|
app.router.add_get('/', index_handler) |
|
|
app.router.add_get('/{symbol}', websocket_handler) |
|
|
|
|
|
runner = web.AppRunner(app) |
|
|
await runner.setup() |
|
|
site = web.TCPSite(runner, '0.0.0.0', 7860) |
|
|
await site.start() |
|
|
|
|
|
print("=" * 50) |
|
|
print("🌐 HTTP Server: http://localhost:7860") |
|
|
print("🔌 WebSocket Server: ws://localhost:7860/<symbol>") |
|
|
print(" (e.g., ws://localhost:7860/sol-usd)") |
|
|
print("=" * 50) |
|
|
|
|
|
await asyncio.Event().wait() |
|
|
|
|
|
if __name__ == '__main__': |
|
|
try: |
|
|
asyncio.run(main()) |
|
|
except KeyboardInterrupt: |
|
|
print("\n\nServer stopped.") |