delta / app.py
Alvin3y1's picture
Update app.py
3af981b verified
import asyncio
import json
from aiohttp import web
import websockets
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# In-memory storage for symbol-specific data
symbol_data = {}
# HTML page with WebSocket client
HTML_PAGE = """<!DOCTYPE html><html><body>hello world</body></html>"""
def extract_extrema(path):
"""Extract only local minima and maxima from a path."""
if len(path) < 3:
return path
extrema = [path[0]] # Always include start point
for i in range(1, len(path) - 1):
prev_val = path[i - 1]
curr_val = path[i]
next_val = path[i + 1]
# Local maximum: higher than both neighbors
if curr_val > prev_val and curr_val > next_val:
extrema.append(curr_val)
# Local minimum: lower than both neighbors
elif curr_val < prev_val and curr_val < next_val:
extrema.append(curr_val)
extrema.append(path[-1]) # Always include end point
return extrema
async def index_handler(request):
"""Serve the main HTML page"""
return web.Response(text=HTML_PAGE, content_type='text/html')
async def websocket_handler(request):
"""Handle WebSocket connections for a specific symbol."""
symbol_url = request.match_info['symbol'].lower()
if '-' not in symbol_url:
logging.warning(f"Invalid symbol format request received: {symbol_url}")
return web.HTTPBadRequest(text="Invalid symbol format. Use 'base-quote', e.g., btc-usd")
symbol_parts = symbol_url.split('-')
symbol_kraken = f"{symbol_parts[0].upper()}/{symbol_parts[1].upper()}"
ws = web.WebSocketResponse()
await ws.prepare(request)
if symbol_kraken not in symbol_data:
symbol_data[symbol_kraken] = {
"clients": set(),
"B": {},
"A": {},
"task": None,
"lp": 0.0
}
symbol_data[symbol_kraken]["task"] = asyncio.create_task(kraken_client(symbol_kraken))
symbol_data[symbol_kraken]["clients"].add(ws)
logging.info(f"Client connected for {symbol_kraken}. Total clients for this symbol: {len(symbol_data[symbol_kraken]['clients'])}")
try:
async for msg in ws:
pass
finally:
symbol_data[symbol_kraken]["clients"].discard(ws)
logging.info(f"Client disconnected from {symbol_kraken}. Total clients for this symbol: {len(symbol_data[symbol_kraken]['clients'])}")
if not symbol_data[symbol_kraken]["clients"]:
logging.info(f"No more clients for {symbol_kraken}. Stopping Kraken task.")
symbol_data[symbol_kraken]["task"].cancel()
del symbol_data[symbol_kraken]
return ws
async def broadcast(symbol, data):
"""Broadcast data to all connected WebSocket clients for a specific symbol."""
if symbol in symbol_data and symbol_data[symbol]["clients"]:
message = json.dumps(data)
disconnected_clients = set()
for client in symbol_data[symbol]["clients"]:
try:
if not client.closed:
await client.send_str(message)
except Exception:
disconnected_clients.add(client)
symbol_data[symbol]["clients"].difference_update(disconnected_clients)
async def kraken_client(symbol):
"""Connect to Kraken and process order book data for a given symbol."""
data = symbol_data[symbol]
B, A = data["B"], data["A"]
while True:
try:
logging.info(f"Connecting to Kraken WebSocket for {symbol}...")
async with websockets.connect("wss://ws.kraken.com/v2") as ws:
logging.info(f"Connected to Kraken for {symbol}!")
for channel in ["book", "ticker"]:
params = {"channel": channel, "symbol": [symbol]}
if channel == "book":
params["depth"] = 1000
await ws.send(json.dumps({"method": "subscribe", "params": params}))
async for msg in ws:
d = json.loads(msg)
ch = d.get("channel")
if d.get('event') == 'heartbeat':
continue
if ch == "ticker":
dat = d.get("data", [{}])[0]
if 'last' in dat:
data["lp"] = float(dat['last'])
continue
if ch == "book":
dat = d.get("data", [])
for x in dat:
for k, s in [('bids', B), ('asks', A)]:
for i in x.get(k, []):
p, q = float(i['price']), float(i['qty'])
if q == 0:
s.pop(p, None)
else:
s[p] = q
sb = sorted(B.items(), key=lambda x: -x[0])
sa = sorted(A.items())
if not (sb and sa):
continue
mid = (sb[0][0] + sa[0][0]) / 2
# --- Path of Least Resistance Calculation ---
bi, ai = 0, 0
cur_b_qty = sb[0][1]
cur_a_qty = sa[0][1]
h = [mid]
# Simulate 60 steps of volume absorption
for _ in range(60):
if bi >= len(sb) - 1 or ai >= len(sa) - 1:
break
if cur_b_qty > cur_a_qty:
h.append(sa[ai][0])
cur_b_qty -= cur_a_qty
ai += 1
cur_a_qty = sa[ai][1]
else:
h.append(sb[bi][0])
cur_a_qty -= cur_b_qty
bi += 1
cur_b_qty = sb[bi][1]
# Extract only local minima and maxima
extrema = extract_extrema(h)
# Broadcast live data + extrema points only
await broadcast(symbol, {
'symbol': symbol,
'live': data["lp"],
'mid': mid,
'path': extrema
})
print(f"\r[{symbol}] Live: ${data['lp']:.2f} | Mid: ${mid:.2f} | Extrema: {len(extrema)} points (from {len(h)}) ", end='', flush=True)
except websockets.exceptions.ConnectionClosed as e:
logging.warning(f"\nKraken connection for {symbol} closed: {e}. Reconnecting in 5s...")
except asyncio.CancelledError:
logging.info(f"\nKraken client for {symbol} cancelled.")
break
except Exception as e:
logging.error(f"\nKraken error for {symbol}: {e}. Reconnecting in 5s...")
B.clear()
A.clear()
await asyncio.sleep(5)
async def main():
app = web.Application()
app.router.add_get('/', index_handler)
app.router.add_get('/{symbol}', websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', 7860)
await site.start()
print("=" * 50)
print("🌐 HTTP Server: http://localhost:7860")
print("🔌 WebSocket Server: ws://localhost:7860/<symbol>")
print(" (e.g., ws://localhost:7860/sol-usd)")
print("=" * 50)
await asyncio.Event().wait()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\nServer stopped.")