test / app.py
Alvin3y1's picture
Update app.py
a5396a6 verified
raw
history blame
9.4 kB
import asyncio
import json
import logging
import time
from aiohttp import web
import websockets
# --- Configuration ---
SYMBOL_DISPLAY = "BTC-USD"
SYMBOL_KRAKEN = "BTC/USD"
PORT = 7860
# --- Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
# --- In-Memory State ---
# We store the orderbook in dictionaries: price -> quantity
# This allows O(1) updates when Kraken sends changes.
market_state = {
"bids": {},
"asks": {},
"price": 0.0,
"ready": False
}
# --- HTML Frontend (Plotly.js) ---
HTML_PAGE = f"""
<!DOCTYPE html>
<html>
<head>
<title>BTC-USD Depth Chart</title>
<script src="https://cdn.plot.ly/plotly-2.24.1.min.js"></script>
<style>
body {{ margin: 0; padding: 0; background-color: #0e0e0e; color: #ccc; font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif; }}
#chart {{ width: 100vw; height: 100vh; }}
#status {{ position: absolute; top: 10px; left: 60px; z-index: 10; font-size: 14px; background: rgba(0,0,0,0.5); padding: 5px; border-radius: 4px; pointer-events: none;}}
.green {{ color: #00e676; }}
.red {{ color: #ff1744; }}
</style>
</head>
<body>
<div id="status">Connecting...</div>
<div id="chart"></div>
<script>
const chartDiv = document.getElementById('chart');
const statusDiv = document.getElementById('status');
let initialized = false;
async function updateChart() {{
try {{
const res = await fetch('/data');
const data = await res.json();
if (data.error) {{
statusDiv.innerHTML = "Waiting for data...";
return;
}}
statusDiv.innerHTML = `Price: <span class="${{data.price >= data.prev_price ? 'green' : 'red'}}">$${{data.price.toLocaleString()}}</span> | Bids: ${{data.bids_count}} | Asks: ${{data.asks_count}}`;
const traceBids = {{
x: data.bids.x,
y: data.bids.y,
fill: 'tozeroy',
type: 'scatter',
mode: 'lines',
name: 'Bids',
line: {{color: '#00e676', width: 2}}
}};
const traceAsks = {{
x: data.asks.x,
y: data.asks.y,
fill: 'tozeroy',
type: 'scatter',
mode: 'lines',
name: 'Asks',
line: {{color: '#ff1744', width: 2}}
}};
const layout = {{
title: '<b>BTC/USD Depth</b>',
paper_bgcolor: '#0e0e0e',
plot_bgcolor: '#0e0e0e',
font: {{ color: '#aaa' }},
showlegend: false,
xaxis: {{
title: 'Price (USD)',
gridcolor: '#333',
tickformat: '.0f'
}},
yaxis: {{
title: 'Volume (BTC)',
gridcolor: '#333'
}},
margin: {{ t: 40, b: 40, l: 50, r: 20 }},
hovermode: 'x unified'
}};
const config = {{ responsive: true, displayModeBar: false }};
if (!initialized) {{
Plotly.newPlot(chartDiv, [traceBids, traceAsks], layout, config);
initialized = true;
}} else {{
Plotly.react(chartDiv, [traceBids, traceAsks], layout, config);
}}
}} catch (e) {{
console.error("Fetch error:", e);
statusDiv.innerText = "Connection lost. Retrying...";
}}
}}
// Poll every 500ms
setInterval(updateChart, 500);
updateChart();
</script>
</body>
</html>
"""
async def kraken_worker():
"""Connects to Kraken WS and maintains the orderbook in memory."""
global market_state
while True:
try:
async with websockets.connect("wss://ws.kraken.com/v2") as ws:
logging.info(f"πŸ”Œ Connected to Kraken ({SYMBOL_KRAKEN})")
# Subscribe
msg = {
"method": "subscribe",
"params": {
"channel": "book",
"symbol": [SYMBOL_KRAKEN],
"depth": 500
}
}
await ws.send(json.dumps(msg))
# Also subscribe to ticker for the last traded price
msg_ticker = {
"method": "subscribe",
"params": {
"channel": "ticker",
"symbol": [SYMBOL_KRAKEN]
}
}
await ws.send(json.dumps(msg_ticker))
async for message in ws:
payload = json.loads(message)
channel = payload.get("channel")
data_entries = payload.get("data", [])
if payload.get("type") == "error":
logging.error(f"Kraken Error: {payload}")
continue
if channel == "ticker":
# Update Last Price
for item in data_entries:
if 'last' in item:
market_state['price'] = float(item['last'])
elif channel == "book":
# Update Orderbook
for item in data_entries:
# Update Bids
for bid in item.get('bids', []):
price = float(bid['price'])
qty = float(bid['qty'])
if qty == 0:
market_state['bids'].pop(price, None)
else:
market_state['bids'][price] = qty
# Update Asks
for ask in item.get('asks', []):
price = float(ask['price'])
qty = float(ask['qty'])
if qty == 0:
market_state['asks'].pop(price, None)
else:
market_state['asks'][price] = qty
market_state['ready'] = True
except Exception as e:
logging.warning(f"⚠️ Kraken Connection lost: {e}. Reconnecting in 3s...")
await asyncio.sleep(3)
async def handle_index(request):
"""Serve the HTML page."""
return web.Response(text=HTML_PAGE, content_type='text/html')
async def handle_data(request):
"""Return the calculated cumulative volume arrays for Plotly."""
if not market_state['ready']:
return web.json_response({"error": "Initializing..."})
# Prepare data snapshots (Thread-safe enough for this use case due to GIL/asyncio)
bids = market_state['bids'].copy()
asks = market_state['asks'].copy()
current_price = market_state['price']
# Sort Bids: High -> Low
sorted_bids = sorted(bids.items(), key=lambda x: -x[0])
# Sort Asks: Low -> High
sorted_asks = sorted(asks.items(), key=lambda x: x[0])
# Slice to keep chart performant (e.g., closest 300 orders)
DEPTH = 400
sorted_bids = sorted_bids[:DEPTH]
sorted_asks = sorted_asks[:DEPTH]
# Calculate Cumulative Volume for Bids
b_x, b_y = [], []
cum = 0
for p, q in sorted_bids:
cum += q
b_x.append(p)
b_y.append(cum)
# Reverse bids for the chart line to draw outwards from center
b_x.reverse()
b_y.reverse()
# Calculate Cumulative Volume for Asks
a_x, a_y = [], []
cum = 0
for p, q in sorted_asks:
cum += q
a_x.append(p)
a_y.append(cum)
return web.json_response({
"price": current_price,
"bids": {"x": b_x, "y": b_y},
"asks": {"x": a_x, "y": a_y},
"bids_count": len(bids),
"asks_count": len(asks)
})
async def start_background_tasks(app):
app['kraken_task'] = asyncio.create_task(kraken_worker())
async def cleanup_background_tasks(app):
app['kraken_task'].cancel()
await app['kraken_task']
async def main():
app = web.Application()
app.router.add_get('/', handle_index)
app.router.add_get('/data', handle_data)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', PORT)
await site.start()
print("="*50)
print(f"πŸš€ BTC-USD Orderbook Chart Running")
print(f"πŸ‘‰ Open: http://localhost:{PORT}")
print("="*50)
# Keep alive
await asyncio.Event().wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nStopping server...")