import asyncio
import json
import logging
import time
import bisect
from aiohttp import web
import websockets
# --- Configuration ---
SYMBOL_KRAKEN = "BTC/USD"
PORT = 7860
HISTORY_LENGTH = 1000 # Increased history for better charts
# --- Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
# --- In-Memory State ---
market_state = {
"bids": {},
"asks": {},
"history": [], # Stores { time: int, value: float }
"current_mid": 0.0,
"prev_mid": 0.0,
"ready": False
}
# --- AI Logic Helper ---
def analyze_structure(diff_x, diff_y, current_mid):
"""
Analyzes the Net Liquidity Curve to find Support, Resistance, and Projected Trend.
"""
if not diff_y or len(diff_y) < 5:
return None
# 1. Momentum Projection
net_total = diff_y[-1]
# Damping factor
momentum_shift = net_total * 0.2
projected_price = current_mid + momentum_shift
# 2. Find Structural Reversals
support_level = None
resistance_level = None
scan_limit = len(diff_y) // 2
for i in range(1, scan_limit):
prev_val = diff_y[i-1]
curr_val = diff_y[i]
dist = diff_x[i]
# Resistance: Buyers (Pos) -> Sellers (Neg)
if prev_val > 0 and curr_val < 0 and resistance_level is None:
resistance_level = current_mid + dist
# Support: Sellers (Neg) -> Buyers (Pos)
if prev_val < 0 and curr_val > 0 and support_level is None:
support_level = current_mid - dist
return {
"projected": projected_price,
"support": support_level,
"resistance": resistance_level,
"net_score": net_total
}
# --- Improved HTML Frontend ---
HTML_PAGE = f"""
AI Liquidity Dashboard | {SYMBOL_KRAKEN}
INITIALIZING AI MODELS...
"""
async def kraken_worker():
global market_state
while True:
try:
async with websockets.connect("wss://ws.kraken.com/v2") as ws:
logging.info(f"🔌 Connected to Kraken ({SYMBOL_KRAKEN})")
await ws.send(json.dumps({
"method": "subscribe",
"params": {"channel": "book", "symbol": [SYMBOL_KRAKEN], "depth": 500}
}))
async for message in ws:
payload = json.loads(message)
channel = payload.get("channel")
data_entries = payload.get("data", [])
if channel == "book":
for item in data_entries:
# Process Bids
for bid in item.get('bids', []):
q, p = float(bid['qty']), float(bid['price'])
if q == 0: market_state['bids'].pop(p, None)
else: market_state['bids'][p] = q
# Process Asks
for ask in item.get('asks', []):
q, p = float(ask['qty']), float(ask['price'])
if q == 0: market_state['asks'].pop(p, None)
else: market_state['asks'][p] = q
if market_state['bids'] and market_state['asks']:
best_bid = max(market_state['bids'].keys())
best_ask = min(market_state['asks'].keys())
mid = (best_bid + best_ask) / 2
market_state['prev_mid'] = market_state['current_mid']
market_state['current_mid'] = mid
market_state['ready'] = True
now = time.time()
# Throttle history updates slightly to prevent spamming duplicates
if not market_state['history'] or (now - market_state['history'][-1]['t'] > 0.5):
market_state['history'].append({'t': now, 'p': mid})
if len(market_state['history']) > HISTORY_LENGTH:
market_state['history'].pop(0)
except Exception as e:
logging.warning(f"⚠️ Reconnecting: {e}")
await asyncio.sleep(3)
async def handle_index(request):
return web.Response(text=HTML_PAGE, content_type='text/html')
async def handle_data(request):
if not market_state['ready']:
return web.json_response({"error": "Initializing..."})
mid = market_state['current_mid']
# Snapshot Bids/Asks
raw_bids = sorted(market_state['bids'].items(), key=lambda x: -x[0])[:300]
raw_asks = sorted(market_state['asks'].items(), key=lambda x: x[0])[:300]
# Calculate Volume at Distance (Cumulative)
d_b_x, d_b_y, cum = [], [], 0
for p, q in raw_bids:
d = mid - p
if d >= 0:
cum += q
d_b_x.append(d); d_b_y.append(cum)
d_a_x, d_a_y, cum = [], [], 0
for p, q in raw_asks:
d = p - mid
if d >= 0:
cum += q
d_a_x.append(d); d_a_y.append(cum)
# Calculate Net Liquidity Curve (The "Diff" Chart)
diff_x, diff_y = [], []
if d_b_x and d_a_x:
max_dist = min(d_b_x[-1], d_a_x[-1])
# Create interpolated steps
step_size = max_dist / 100
steps = [i * step_size for i in range(1, 101)]
for s in steps:
# Find volume at distance 's' for bid and ask
idx_b = bisect.bisect_right(d_b_x, s)
vol_b = d_b_y[idx_b-1] if idx_b > 0 else 0
idx_a = bisect.bisect_right(d_a_x, s)
vol_a = d_a_y[idx_a-1] if idx_a > 0 else 0
diff_x.append(s)
diff_y.append(vol_b - vol_a) # Positive = Bullish Wall, Negative = Bearish Wall
# Run Analysis
analysis = analyze_structure(diff_x, diff_y, mid)
return web.json_response({
"mid": mid,
"history": market_state['history'], # List of {t, p}
"diff": { "x": diff_x, "y": diff_y },
"analysis": analysis
})
async def start_background(app):
app['kraken_task'] = asyncio.create_task(kraken_worker())
async def cleanup_background(app):
app['kraken_task'].cancel()
try: await app['kraken_task']
except asyncio.CancelledError: pass
async def main():
app = web.Application()
app.router.add_get('/', handle_index)
app.router.add_get('/data', handle_data)
app.on_startup.append(start_background)
app.on_cleanup.append(cleanup_background)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', PORT)
await site.start()
print(f"🚀 AI Dashboard: http://localhost:{PORT}")
await asyncio.Event().wait()
if __name__ == "__main__":
try: asyncio.run(main())
except KeyboardInterrupt: pass