test / app.py
Alvin3y1's picture
Update app.py
e1711fa verified
import asyncio
import json
import logging
import time
import math
import aiohttp
import pandas as pd
import numpy as np
from aiohttp import web
from sklearn.ensemble import RandomForestRegressor
from concurrent.futures import ThreadPoolExecutor
# --- CONFIGURATION ---
SYMBOL_KRAKEN = "BTC/USD"
PORT = 7860
BROADCAST_RATE = 1.0
PREDICTION_HORIZON = 100
MAX_HISTORY = 5000
TRAIN_INTERVAL = 300
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
market_state = {
"ohlc_history": [],
"ready": False,
"model": None,
"model_residuals": None,
"last_training_time": 0,
"last_price": 0,
"price_change": 0
}
connected_clients = set()
executor = ThreadPoolExecutor(max_workers=1)
def calculate_indicators(candles):
if len(candles) < 100:
return None
df = pd.DataFrame(candles)
cols = ['open', 'high', 'low', 'close', 'volume']
for c in cols:
df[c] = df[c].astype(float)
# Moving Averages
df['ema20'] = df['close'].ewm(span=20, adjust=False).mean()
df['ema50'] = df['close'].ewm(span=50, adjust=False).mean()
# Bollinger Bands
df['std'] = df['close'].rolling(window=20).std()
df['bb_upper'] = df['ema20'] + (df['std'] * 2)
df['bb_lower'] = df['ema20'] - (df['std'] * 2)
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# MACD
k = df['close'].ewm(span=12, adjust=False).mean()
d = df['close'].ewm(span=26, adjust=False).mean()
df['macd'] = k - d
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
df['macd_hist'] = df['macd'] - df['macd_signal']
# ATR
df['tr0'] = abs(df['high'] - df['low'])
df['tr1'] = abs(df['high'] - df['close'].shift())
df['tr2'] = abs(df['low'] - df['close'].shift())
df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1)
df['atr'] = df['tr'].rolling(window=14).mean()
# Features
df['dist_ema20'] = (df['close'] - df['ema20']) / df['ema20']
df['dist_ema50'] = (df['close'] - df['ema50']) / df['ema50']
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['ema20']
df['bb_pos'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
df['vol_change'] = df['volume'].pct_change()
df['log_ret'] = np.log(df['close'] / df['close'].shift(1))
# Time encoding
df['datetime'] = pd.to_datetime(df['time'], unit='s')
df['hour_sin'] = np.sin(2 * np.pi * df['datetime'].dt.hour / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['datetime'].dt.hour / 24)
# Lag Features
for lag in [1, 2, 3, 5, 8]:
df[f'rsi_lag{lag}'] = df['rsi'].shift(lag)
df[f'macd_hist_lag{lag}'] = df['macd_hist'].shift(lag)
df[f'log_ret_lag{lag}'] = df['log_ret'].shift(lag)
df[f'vol_change_lag{lag}'] = df['vol_change'].shift(lag)
return df
def train_model(df):
logging.info(f"Training ML Model on {len(df)} candles...")
feature_cols = [
'rsi', 'macd_hist', 'atr',
'dist_ema20', 'dist_ema50',
'bb_width', 'bb_pos',
'vol_change', 'log_ret',
'hour_sin', 'hour_cos'
]
for lag in [1, 2, 3, 5, 8]:
feature_cols.extend([
f'rsi_lag{lag}', f'macd_hist_lag{lag}',
f'log_ret_lag{lag}', f'vol_change_lag{lag}'
])
data = df.dropna().copy()
# Prepare targets (Future Returns relative to Current Price)
target_cols_dict = {}
target_names = []
for i in range(1, PREDICTION_HORIZON + 1):
col_name = f'target_return_{i}'
# Return at step i relative to step 0
target_cols_dict[col_name] = (data['close'].shift(-i) - data['close']) / data['close']
target_names.append(col_name)
targets_df = pd.DataFrame(target_cols_dict, index=data.index)
data = pd.concat([data, targets_df], axis=1).dropna()
if len(data) < 200:
return None, None
X = data[feature_cols].values
y = data[target_names].values
model = RandomForestRegressor(
n_estimators=200,
max_depth=20,
min_samples_split=5,
min_samples_leaf=2,
max_features='sqrt',
n_jobs=-1,
random_state=42
)
model.fit(X, y)
# Calculate Residuals for Confidence Estimation
# (Using OOB or training residuals as a proxy for uncertainty)
predictions = model.predict(X)
residuals = y - predictions
residual_std = np.std(residuals, axis=0)
return model, residual_std
def get_prediction(df, model, residual_std):
if model is None or residual_std is None:
return [], []
feature_cols = [
'rsi', 'macd_hist', 'atr',
'dist_ema20', 'dist_ema50',
'bb_width', 'bb_pos',
'vol_change', 'log_ret',
'hour_sin', 'hour_cos'
]
for lag in [1, 2, 3, 5, 8]:
feature_cols.extend([
f'rsi_lag{lag}', f'macd_hist_lag{lag}',
f'log_ret_lag{lag}', f'vol_change_lag{lag}'
])
last_row = df.iloc[[-1]][feature_cols]
if last_row.isnull().values.any():
return [], []
predicted_returns = model.predict(last_row.values)[0]
current_price = df.iloc[-1]['close']
current_time = int(df.iloc[-1]['time'])
current_atr = df.iloc[-1]['atr']
pred_candles = []
confidence_data = []
prev_close = current_price
for i, pct_change in enumerate(predicted_returns):
future_time = current_time + ((i + 1) * 60)
# Calculate predicted Close
future_close = current_price * (1 + pct_change)
# Construct Candle
# Open is previous candle's close
open_price = prev_close
close_price = future_close
# Heuristic for High/Low to make it look like a candle
# Use ATR and some noise or just fixed ratio to visualize structure
# Here we use a fixed structure based on ATR to keep it clean but candle-like
half_range = current_atr * 0.4
high_price = max(open_price, close_price) + half_range
low_price = min(open_price, close_price) - half_range
pred_candles.append({
"time": future_time,
"open": float(open_price),
"high": float(high_price),
"low": float(low_price),
"close": float(close_price)
})
# Confidence Metric (Standard Deviation of Residuals at this step)
# We plot the error margin width relative to price
sigma = residual_std[i]
error_margin = future_close * sigma * 1.96 # 95% CI width approx
confidence_data.append({
"time": future_time,
"value": float(error_margin)
})
prev_close = future_close
return pred_candles, confidence_data
async def process_market_data():
if not market_state['ready'] or not market_state['ohlc_history']:
return {"error": "Initializing..."}
df = calculate_indicators(market_state['ohlc_history'])
if df is None or len(df) < 100:
return {"error": "Not enough data"}
# Retrain periodically
if market_state['model'] is None or (time.time() - market_state['last_training_time'] > TRAIN_INTERVAL):
try:
loop = asyncio.get_running_loop()
model, res_std = await loop.run_in_executor(executor, train_model, df)
if model is not None:
market_state['model'] = model
market_state['model_residuals'] = res_std
market_state['last_training_time'] = time.time()
except Exception as e:
logging.error(f"Training failed: {e}")
predictions = []
confidence = []
try:
predictions, confidence = get_prediction(df, market_state['model'], market_state['model_residuals'])
except Exception as e:
logging.error(f"Prediction failed: {e}")
# Prepare display data
df_clean = df.replace([np.inf, -np.inf], np.nan)
cols_to_keep = ['time', 'open', 'high', 'low', 'close', 'volume', 'ema20', 'bb_upper', 'bb_lower', 'rsi', 'macd_hist']
df_clean = df_clean[cols_to_keep].where(pd.notnull(df_clean), None)
last_close = float(df['close'].iloc[-1]) if len(df) > 0 else 0
first_close = float(df['close'].iloc[0]) if len(df) > 0 else 0
price_change = ((last_close - first_close) / first_close * 100) if first_close > 0 else 0
market_state['last_price'] = last_close
market_state['price_change'] = price_change
display_data = df_clean.tail(500).to_dict('records')
last_row = df.iloc[-1] if len(df) > 0 else {}
return {
"data": display_data,
"prediction": predictions,
"confidence": confidence,
"stats": {
"price": last_close,
"change": round(price_change, 2),
"rsi": round(float(last_row.get('rsi', 0)), 1) if pd.notna(last_row.get('rsi')) else 0,
"macd": round(float(last_row.get('macd', 0)), 2) if pd.notna(last_row.get('macd')) else 0,
"atr": round(float(last_row.get('atr', 0)), 2) if pd.notna(last_row.get('atr')) else 0,
"volume": round(float(last_row.get('volume', 0)), 2) if pd.notna(last_row.get('volume')) else 0
}
}
# --- HTML/JS ---
HTML_PAGE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>BTC/USD AI Predictor</title>
<script src="https://unpkg.com/lightweight-charts@4.1.1/dist/lightweight-charts.standalone.production.js"></script>
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap" rel="stylesheet">
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
background: linear-gradient(135deg, #0a0a0f 0%, #1a1a2e 100%);
color: #ffffff;
height: 100vh;
display: flex;
flex-direction: column;
overflow: hidden;
}
.header {
background: rgba(15, 15, 25, 0.95);
backdrop-filter: blur(20px);
border-bottom: 1px solid rgba(255, 255, 255, 0.05);
padding: 12px 24px;
display: flex;
align-items: center;
justify-content: space-between;
z-index: 100;
}
.logo-section { display: flex; align-items: center; gap: 16px; }
.logo {
font-size: 24px;
font-weight: 700;
background: linear-gradient(135deg, #00ff88 0%, #00d4ff 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
}
.symbol-badge {
background: rgba(0, 255, 136, 0.1);
border: 1px solid rgba(0, 255, 136, 0.3);
padding: 6px 14px;
border-radius: 20px;
font-size: 13px;
font-weight: 600;
color: #00ff88;
}
.stats-row { display: flex; gap: 24px; align-items: center; }
.stat-item { display: flex; flex-direction: column; align-items: flex-end; }
.stat-label { font-size: 10px; color: #666; text-transform: uppercase; }
.stat-value { font-size: 15px; font-weight: 600; font-variant-numeric: tabular-nums; }
.stat-value.positive { color: #00ff88; }
.stat-value.negative { color: #ff4757; }
.stat-value.neutral { color: #ffd700; }
.status-indicator { display: flex; align-items: center; gap: 8px; font-size: 12px; color: #888; }
.status-dot { width: 8px; height: 8px; border-radius: 50%; background: #00ff88; animation: pulse 2s infinite; }
.status-dot.disconnected { background: #ff4757; animation: none; }
@keyframes pulse {
0%, 100% { opacity: 1; box-shadow: 0 0 0 0 rgba(0, 255, 136, 0.4); }
50% { opacity: 0.8; box-shadow: 0 0 0 8px rgba(0, 255, 136, 0); }
}
.indicator-panel {
background: rgba(15, 15, 25, 0.8);
border-bottom: 1px solid rgba(255, 255, 255, 0.05);
padding: 10px 24px;
display: flex;
gap: 32px;
overflow-x: auto;
}
.indicator-group { display: flex; align-items: center; gap: 12px; }
.indicator-label { font-size: 11px; color: #666; text-transform: uppercase; }
.indicator-value { font-size: 13px; font-weight: 500; font-variant-numeric: tabular-nums; }
.charts-container {
flex: 1;
display: flex;
flex-direction: column;
position: relative;
}
.chart-wrapper { position: relative; border-bottom: 1px solid rgba(255, 255, 255, 0.05); }
#main-chart { flex: 5; }
#volume-chart { flex: 1.5; min-height: 80px; }
#osc-chart { flex: 1.5; min-height: 80px; }
.chart-label {
position: absolute; top: 12px; left: 16px; z-index: 10;
display: flex; gap: 16px; font-size: 11px; pointer-events: none;
}
.chart-label span { display: flex; align-items: center; gap: 6px; }
.chart-label .dot { width: 8px; height: 8px; border-radius: 50%; }
.loading-overlay {
position: absolute; top: 0; left: 0; right: 0; bottom: 0;
background: rgba(10, 10, 15, 0.95);
display: flex; flex-direction: column; align-items: center; justify-content: center;
z-index: 1000; transition: opacity 0.5s ease;
}
.loading-overlay.hidden { opacity: 0; pointer-events: none; }
.loader {
width: 50px; height: 50px; border: 3px solid rgba(0, 255, 136, 0.1);
border-top-color: #00ff88; border-radius: 50%; animation: spin 1s linear infinite;
}
@keyframes spin { to { transform: rotate(360deg); } }
.prediction-badge {
position: absolute; top: 12px; right: 16px;
background: rgba(191, 90, 242, 0.15); border: 1px solid rgba(191, 90, 242, 0.3);
padding: 4px 10px; border-radius: 12px; font-size: 10px; color: #bf5af2; z-index: 10;
}
</style>
</head>
<body>
<div class="header">
<div class="logo-section">
<div class="logo">QuantAI</div>
<div class="symbol-badge">BTC/USD</div>
</div>
<div class="stats-row">
<div class="stat-item">
<span class="stat-label">Price</span>
<span id="price" class="stat-value">$--</span>
</div>
<div class="stat-item">
<span class="stat-label">Change</span>
<span id="change" class="stat-value neutral">--%</span>
</div>
<div class="stat-item">
<span class="stat-label">RSI</span>
<span id="rsi" class="stat-value">--</span>
</div>
<div class="stat-item">
<span class="stat-label">ATR</span>
<span id="atr" class="stat-value">--</span>
</div>
</div>
<div class="status-indicator">
<div id="status-dot" class="status-dot"></div>
<span id="status-text">Connecting...</span>
</div>
</div>
<div class="indicator-panel">
<div class="indicator-group"><span class="indicator-label">EMA 20</span><span id="ema-val" class="indicator-value" style="color: #2962FF">--</span></div>
<div class="indicator-group"><span class="indicator-label">BB Upper</span><span id="bb-upper" class="indicator-value" style="color: #26a69a">--</span></div>
<div class="indicator-group"><span class="indicator-label">BB Lower</span><span id="bb-lower" class="indicator-value" style="color: #ef5350">--</span></div>
<div class="indicator-group"><span class="indicator-label">MACD</span><span id="macd-val" class="indicator-value">--</span></div>
</div>
<div class="charts-container">
<div class="loading-overlay" id="loading">
<div class="loader"></div>
</div>
<div id="main-chart" class="chart-wrapper">
<div class="chart-label">
<span><div class="dot" style="background: #00ff88"></div>Price</span>
<span><div class="dot" style="background: #bf5af2"></div>AI Prediction</span>
</div>
<div class="prediction-badge">Forecast: 100 Candles</div>
</div>
<div id="volume-chart" class="chart-wrapper">
<div class="chart-label">
<span><div class="dot" style="background: #5c6bc0"></div>Volume</span>
<span><div class="dot" style="background: #ff9f43"></div>AI Uncertainty (±$)</span>
</div>
</div>
<div id="osc-chart" class="chart-wrapper">
<div class="chart-label">
<span><div class="dot" style="background: #9C27B0"></div>RSI</span>
<span><div class="dot" style="background: #26a69a"></div>MACD</span>
</div>
</div>
</div>
<script>
document.addEventListener('DOMContentLoaded', () => {
const mainEl = document.getElementById('main-chart');
const volEl = document.getElementById('volume-chart');
const oscEl = document.getElementById('osc-chart');
const loading = document.getElementById('loading');
const chartOptions = {
layout: { background: { type: 'solid', color: 'transparent' }, textColor: '#666' },
grid: { vertLines: { color: 'rgba(255,255,255,0.03)' }, horzLines: { color: 'rgba(255,255,255,0.03)' } },
timeScale: { timeVisible: true, secondsVisible: false, borderColor: 'rgba(255,255,255,0.1)' },
rightPriceScale: { borderColor: 'rgba(255,255,255,0.1)' },
crosshair: {
mode: LightweightCharts.CrosshairMode.Normal,
vertLine: { color: 'rgba(255,255,255,0.2)', labelBackgroundColor: '#1a1a2e' },
horzLine: { color: 'rgba(255,255,255,0.2)', labelBackgroundColor: '#1a1a2e' }
}
};
const mainChart = LightweightCharts.createChart(mainEl, chartOptions);
const volChart = LightweightCharts.createChart(volEl, chartOptions);
const oscChart = LightweightCharts.createChart(oscEl, chartOptions);
// Main Chart Series
const candles = mainChart.addCandlestickSeries({
upColor: '#00ff88', downColor: '#ff4757',
borderUpColor: '#00ff88', borderDownColor: '#ff4757',
wickUpColor: '#00ff88', wickDownColor: '#ff4757'
});
const ema = mainChart.addLineSeries({ color: '#2962FF', lineWidth: 2, crosshairMarkerVisible: false });
const bbUpper = mainChart.addLineSeries({ color: 'rgba(38, 166, 154, 0.4)', lineWidth: 1, crosshairMarkerVisible: false });
const bbLower = mainChart.addLineSeries({ color: 'rgba(239, 83, 80, 0.4)', lineWidth: 1, crosshairMarkerVisible: false });
// AI Prediction Series (Candles)
const predCandles = mainChart.addCandlestickSeries({
upColor: 'rgba(191, 90, 242, 0.8)', downColor: 'rgba(191, 90, 242, 0.8)',
borderUpColor: '#bf5af2', borderDownColor: '#bf5af2',
wickUpColor: '#bf5af2', wickDownColor: '#bf5af2'
});
// Volume Chart Series
const volumeSeries = volChart.addHistogramSeries({ priceFormat: { type: 'volume' }, priceScaleId: '' });
volChart.priceScale('').applyOptions({ scaleMargins: { top: 0.1, bottom: 0 } });
// Confidence/Uncertainty Series (Near Volume)
const confidenceSeries = volChart.addLineSeries({
color: '#ff9f43',
lineWidth: 2,
priceScaleId: 'confidence',
lineStyle: LightweightCharts.LineStyle.Solid
});
// Position the confidence scale to not overlap heavily with volume (overlay mode)
volChart.priceScale('confidence').applyOptions({
scaleMargins: { top: 0.1, bottom: 0.7 }, // Keep it at top of volume pane
visible: true
});
// Oscillator Chart Series
const rsi = oscChart.addLineSeries({ color: '#9C27B0', lineWidth: 2, priceScaleId: 'rsi' });
oscChart.priceScale('rsi').applyOptions({ scaleMargins: { top: 0.1, bottom: 0.1 } });
const macdHist = oscChart.addHistogramSeries({ priceScaleId: 'macd' });
oscChart.priceScale('macd').applyOptions({ scaleMargins: { top: 0.6, bottom: 0 } });
function resizeCharts() {
mainChart.applyOptions({ width: mainEl.clientWidth, height: mainEl.clientHeight });
volChart.applyOptions({ width: mainEl.clientWidth, height: volEl.clientHeight });
oscChart.applyOptions({ width: mainEl.clientWidth, height: oscEl.clientHeight });
}
new ResizeObserver(resizeCharts).observe(document.body);
setTimeout(resizeCharts, 100);
function syncTimeScales(charts) {
charts.forEach((chart, i) => {
chart.timeScale().subscribeVisibleLogicalRangeChange(range => {
if (range) charts.forEach((c, j) => { if (i !== j) c.timeScale().setVisibleLogicalRange(range); });
});
});
}
syncTimeScales([mainChart, volChart, oscChart]);
function updateStats(stats, lastData) {
if (stats) {
document.getElementById('price').textContent = '$' + stats.price.toLocaleString('en-US', {minimumFractionDigits: 2});
const changeEl = document.getElementById('change');
changeEl.textContent = (stats.change >= 0 ? '+' : '') + stats.change + '%';
changeEl.className = 'stat-value ' + (stats.change > 0 ? 'positive' : stats.change < 0 ? 'negative' : 'neutral');
const rsiVal = stats.rsi;
const rsiEl = document.getElementById('rsi');
rsiEl.textContent = rsiVal;
rsiEl.className = 'stat-value ' + (rsiVal > 70 ? 'negative' : rsiVal < 30 ? 'positive' : 'neutral');
document.getElementById('atr').textContent = stats.atr;
}
if (lastData) {
document.getElementById('ema-val').textContent = lastData.ema20 ? lastData.ema20.toFixed(2) : '--';
document.getElementById('bb-upper').textContent = lastData.bb_upper ? lastData.bb_upper.toFixed(2) : '--';
document.getElementById('bb-lower').textContent = lastData.bb_lower ? lastData.bb_lower.toFixed(2) : '--';
const macdVal = lastData.macd_hist;
const macdEl = document.getElementById('macd-val');
if (macdVal !== null && macdVal !== undefined) {
macdEl.textContent = macdVal.toFixed(2);
macdEl.style.color = macdVal >= 0 ? '#26a69a' : '#ef5350';
}
}
}
function setStatus(connected) {
const dot = document.getElementById('status-dot');
const text = document.getElementById('status-text');
if (connected) { dot.className = 'status-dot'; text.textContent = 'Live'; }
else { dot.className = 'status-dot disconnected'; text.textContent = 'Reconnecting...'; }
}
let hasData = false;
function connect() {
const protocol = location.protocol === 'https:' ? 'wss' : 'ws';
const ws = new WebSocket(protocol + '://' + location.host + '/ws');
ws.onopen = () => setStatus(true);
ws.onmessage = (e) => {
try {
const payload = JSON.parse(e.data);
if (!payload.data || payload.data.length === 0) return;
const d = payload.data;
const safeMap = (arr, key) => arr.filter(x => x && x.time && x[key] !== null).map(x => ({ time: x.time, value: x[key] }));
const candleData = d.filter(x => x && x.time && x.open).map(x => ({
time: x.time, open: x.open, high: x.high, low: x.low, close: x.close
}));
if (candleData.length > 0) {
candles.setData(candleData);
ema.setData(safeMap(d, 'ema20'));
bbUpper.setData(safeMap(d, 'bb_upper'));
bbLower.setData(safeMap(d, 'bb_lower'));
volumeSeries.setData(d.filter(x => x && x.time).map(x => ({
time: x.time, value: x.volume, color: x.close >= x.open ? 'rgba(0, 255, 136, 0.5)' : 'rgba(255, 71, 87, 0.5)'
})));
rsi.setData(safeMap(d, 'rsi'));
macdHist.setData(d.filter(x => x && x.time).map(x => ({
time: x.time, value: x.macd_hist, color: x.macd_hist >= 0 ? '#26a69a' : '#ef5350'
})));
// Update Prediction Candles
if (payload.prediction && payload.prediction.length > 0) {
// Prediction data is already in OHLC format
predCandles.setData(payload.prediction);
}
// Update Confidence Metric (Volume Pane)
if (payload.confidence && payload.confidence.length > 0) {
confidenceSeries.setData(payload.confidence);
}
updateStats(payload.stats, d[d.length - 1]);
if (!hasData) {
hasData = true;
loading.classList.add('hidden');
mainChart.timeScale().fitContent();
}
}
} catch (err) { console.error("Chart error:", err); }
};
ws.onclose = () => { setStatus(false); setTimeout(connect, 2000); };
ws.onerror = () => ws.close();
}
connect();
});
</script>
</body>
</html>
"""
async def fetch_initial_data():
try:
async with aiohttp.ClientSession() as session:
url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
data = await response.json()
if 'result' in data:
for key in data['result']:
if key != 'last':
raw = data['result'][key]
market_state['ohlc_history'] = [
{
'time': int(c[0]),
'open': float(c[1]),
'high': float(c[2]),
'low': float(c[3]),
'close': float(c[4]),
'volume': float(c[6])
}
for c in raw
]
market_state['ready'] = True
logging.info(f"Loaded {len(market_state['ohlc_history'])} initial candles")
return True
except Exception as e:
logging.error(f"Initial data fetch error: {e}")
return False
async def kraken_rest_worker():
await fetch_initial_data()
while True:
try:
async with aiohttp.ClientSession() as session:
url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
data = await response.json()
if 'result' in data:
for key in data['result']:
if key != 'last':
raw = data['result'][key]
new_candles = [
{
'time': int(c[0]),
'open': float(c[1]),
'high': float(c[2]),
'low': float(c[3]),
'close': float(c[4]),
'volume': float(c[6])
}
for c in raw[-10:]
]
if market_state['ohlc_history']:
existing_times = {c['time'] for c in market_state['ohlc_history']}
for nc in new_candles:
if nc['time'] in existing_times:
for i, ec in enumerate(market_state['ohlc_history']):
if ec['time'] == nc['time']:
market_state['ohlc_history'][i] = nc
break
else:
market_state['ohlc_history'].append(nc)
market_state['ohlc_history'].sort(key=lambda x: x['time'])
if len(market_state['ohlc_history']) > MAX_HISTORY:
market_state['ohlc_history'] = market_state['ohlc_history'][-MAX_HISTORY:]
market_state['ready'] = True
break
except Exception as e:
logging.warning(f"REST update error: {e}")
await asyncio.sleep(5)
async def broadcast_worker():
while True:
if connected_clients and market_state['ready']:
payload = await process_market_data()
if payload and "data" in payload:
msg = json.dumps(payload)
# Iterate over a copy to avoid RuntimeError if set size changes
current_clients = connected_clients.copy()
disconnected = set()
for ws in current_clients:
try:
await ws.send_str(msg)
except Exception:
disconnected.add(ws)
if disconnected:
connected_clients.difference_update(disconnected)
await asyncio.sleep(BROADCAST_RATE)
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
connected_clients.add(ws)
try:
async for msg in ws:
pass
finally:
connected_clients.discard(ws)
return ws
async def handle_index(request):
return web.Response(text=HTML_PAGE, content_type='text/html')
async def main():
app = web.Application()
app.router.add_get('/', handle_index)
app.router.add_get('/ws', websocket_handler)
asyncio.create_task(kraken_rest_worker())
asyncio.create_task(broadcast_worker())
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', PORT)
await site.start()
logging.info(f"Server running at http://localhost:{PORT}")
await asyncio.Event().wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass