test / app.py
Alvin3y1's picture
Update app.py
00ec666 verified
raw
history blame
32 kB
import asyncio
import json
import logging
import time
import math
import aiohttp
import pandas as pd
import numpy as np
import tensorflow as tf
from aiohttp import web
from tensorflow.keras import layers, models, callbacks
from sklearn.preprocessing import StandardScaler
from concurrent.futures import ThreadPoolExecutor
SYMBOL_KRAKEN = "BTC/USD"
PORT = 7860
BROADCAST_RATE = 1.0
PREDICTION_HORIZON = 100
MAX_HISTORY = 5000
TRAIN_INTERVAL = 600
LOOKBACK_WINDOW = 60
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
market_state = {
"ohlc_history": [],
"ready": False,
"model": None,
"scaler": None,
"model_residuals": 0.0,
"last_training_time": 0,
"last_price": 0,
"price_change": 0
}
connected_clients = set()
executor = ThreadPoolExecutor(max_workers=1)
def calculate_indicators(candles):
if len(candles) < LOOKBACK_WINDOW + PREDICTION_HORIZON:
return None
df = pd.DataFrame(candles)
cols = ['open', 'high', 'low', 'close', 'volume']
for c in cols:
df[c] = df[c].astype(float)
df['ema20'] = df['close'].ewm(span=20, adjust=False).mean()
df['ema50'] = df['close'].ewm(span=50, adjust=False).mean()
df['std'] = df['close'].rolling(window=20).std()
df['bb_upper'] = df['ema20'] + (df['std'] * 2)
df['bb_lower'] = df['ema20'] - (df['std'] * 2)
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
k = df['close'].ewm(span=12, adjust=False).mean()
d = df['close'].ewm(span=26, adjust=False).mean()
df['macd'] = k - d
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
df['macd_hist'] = df['macd'] - df['macd_signal']
df['tr0'] = abs(df['high'] - df['low'])
df['tr1'] = abs(df['high'] - df['close'].shift())
df['tr2'] = abs(df['low'] - df['close'].shift())
df['tr'] = df[['tr0', 'tr1', 'tr2']].max(axis=1)
df['atr'] = df['tr'].rolling(window=14).mean()
df['dist_ema20'] = (df['close'] - df['ema20']) / df['ema20']
df['dist_ema50'] = (df['close'] - df['ema50']) / df['ema50']
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['ema20']
df['bb_pos'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
df['vol_change'] = df['volume'].pct_change()
df['log_ret'] = np.log(df['close'] / df['close'].shift(1))
df['datetime'] = pd.to_datetime(df['time'], unit='s')
df['hour_sin'] = np.sin(2 * np.pi * df['datetime'].dt.hour / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['datetime'].dt.hour / 24)
return df.dropna()
def create_sequences(data, target_data, window_size, horizon):
X, y = [], []
for i in range(len(data) - window_size - horizon + 1):
X.append(data[i:(i + window_size)])
y.append(target_data[i + window_size : i + window_size + horizon])
return np.array(X), np.array(y)
def train_model(df):
logging.info(f"Training CNN Model on {len(df)} candles...")
feature_cols = [
'rsi', 'macd_hist', 'atr',
'dist_ema20', 'dist_ema50',
'bb_width', 'bb_pos',
'vol_change', 'log_ret',
'hour_sin', 'hour_cos'
]
data_features = df[feature_cols].values
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data_features)
close_prices = df['close'].values
returns_future = []
for i in range(len(close_prices) - PREDICTION_HORIZON):
current_price = close_prices[i]
future_prices = close_prices[i+1 : i+1+PREDICTION_HORIZON]
pct_change = (future_prices - current_price) / current_price
returns_future.append(pct_change)
returns_future = np.array(returns_future)
X = []
y = []
valid_length = len(returns_future) - LOOKBACK_WINDOW
if valid_length <= 0:
return None, None, None
for i in range(valid_length):
X.append(data_scaled[i : i + LOOKBACK_WINDOW])
y.append(returns_future[i + LOOKBACK_WINDOW - 1])
X = np.array(X)
y = np.array(y)
if len(X) < 100:
return None, None, None
model = models.Sequential([
layers.Input(shape=(LOOKBACK_WINDOW, len(feature_cols))),
layers.Conv1D(filters=64, kernel_size=3, activation='relu', padding='same'),
layers.MaxPooling1D(pool_size=2),
layers.Dropout(0.2),
layers.Conv1D(filters=32, kernel_size=3, activation='relu', padding='same'),
layers.GlobalAveragePooling1D(),
layers.Dense(64, activation='relu'),
layers.Dropout(0.1),
layers.Dense(PREDICTION_HORIZON)
])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
early_stop = callbacks.EarlyStopping(monitor='loss', patience=5, restore_best_weights=True)
model.fit(X, y, epochs=20, batch_size=32, verbose=0, callbacks=[early_stop])
predictions = model.predict(X, verbose=0)
residuals = y - predictions
residual_std = np.std(residuals)
return model, scaler, residual_std
def get_prediction(df, model, scaler, residual_std):
if model is None or scaler is None:
return []
feature_cols = [
'rsi', 'macd_hist', 'atr',
'dist_ema20', 'dist_ema50',
'bb_width', 'bb_pos',
'vol_change', 'log_ret',
'hour_sin', 'hour_cos'
]
last_window = df.iloc[-LOOKBACK_WINDOW:][feature_cols].values
if len(last_window) < LOOKBACK_WINDOW:
return []
last_window_scaled = scaler.transform(last_window)
last_window_reshaped = last_window_scaled.reshape(1, LOOKBACK_WINDOW, len(feature_cols))
predicted_returns = model.predict(last_window_reshaped, verbose=0)[0]
current_price = df.iloc[-1]['close']
current_time = int(df.iloc[-1]['time'])
pred_data = []
confidence_multiplier = 1.96
time_step = 0
accumulated_variance = 0.0
for i, pct_change in enumerate(predicted_returns):
future_price = current_price * (1 + pct_change)
accumulated_variance += (residual_std ** 2)
current_std = np.sqrt(accumulated_variance) / np.sqrt(i + 1) * (i + 1) * 0.5
upper_bound = future_price * (1 + (residual_std * confidence_multiplier))
lower_bound = future_price * (1 - (residual_std * confidence_multiplier))
pred_data.append({
"time": current_time + ((i + 1) * 60),
"value": float(future_price),
"upper": float(upper_bound),
"lower": float(lower_bound)
})
return pred_data
async def process_market_data():
if not market_state['ready'] or not market_state['ohlc_history']:
return {"error": "Initializing..."}
df = calculate_indicators(market_state['ohlc_history'])
if df is None or len(df) < LOOKBACK_WINDOW + 50:
return {"error": "Not enough data"}
if market_state['model'] is None or (time.time() - market_state['last_training_time'] > TRAIN_INTERVAL):
try:
loop = asyncio.get_running_loop()
model, scaler, res_std = await loop.run_in_executor(executor, train_model, df)
if model is not None:
market_state['model'] = model
market_state['scaler'] = scaler
market_state['model_residuals'] = float(res_std)
market_state['last_training_time'] = time.time()
logging.info(f"Model retrained. Residual Std: {market_state['model_residuals']:.5f}")
except Exception as e:
logging.error(f"Training failed: {e}")
predictions = []
try:
predictions = get_prediction(df, market_state['model'], market_state['scaler'], market_state['model_residuals'])
except Exception as e:
logging.error(f"Prediction failed: {e}")
df_clean = df.replace([np.inf, -np.inf], np.nan)
cols_to_keep = ['time', 'open', 'high', 'low', 'close', 'volume', 'ema20', 'bb_upper', 'bb_lower', 'rsi', 'macd_hist']
df_clean = df_clean[cols_to_keep].where(pd.notnull(df_clean), None)
last_close = float(df['close'].iloc[-1]) if len(df) > 0 else 0
first_close = float(df['close'].iloc[0]) if len(df) > 0 else 0
price_change = ((last_close - first_close) / first_close * 100) if first_close > 0 else 0
market_state['last_price'] = last_close
market_state['price_change'] = price_change
display_data = df_clean.tail(500).to_dict('records')
last_row = df.iloc[-1] if len(df) > 0 else {}
return {
"data": display_data,
"prediction": predictions,
"stats": {
"price": last_close,
"change": round(price_change, 2),
"rsi": round(float(last_row.get('rsi', 0)), 1) if pd.notna(last_row.get('rsi')) else 0,
"macd": round(float(last_row.get('macd_hist', 0)), 2) if pd.notna(last_row.get('macd_hist')) else 0,
"atr": round(float(last_row.get('atr', 0)), 2) if pd.notna(last_row.get('atr')) else 0,
"volume": round(float(last_row.get('volume', 0)), 2) if pd.notna(last_row.get('volume')) else 0
}
}
HTML_PAGE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>BTC/USD AI Predictor (CNN)</title>
<script src="https://unpkg.com/lightweight-charts@4.1.1/dist/lightweight-charts.standalone.production.js"></script>
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap" rel="stylesheet">
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: 'Inter', -apple-system, BlinkMacSystemFont, sans-serif;
background: linear-gradient(135deg, #0a0a0f 0%, #1a1a2e 100%);
color: #ffffff;
height: 100vh;
display: flex;
flex-direction: column;
overflow: hidden;
}
.header {
background: rgba(15, 15, 25, 0.95);
backdrop-filter: blur(20px);
border-bottom: 1px solid rgba(255, 255, 255, 0.05);
padding: 12px 24px;
display: flex;
align-items: center;
justify-content: space-between;
z-index: 100;
}
.logo-section { display: flex; align-items: center; gap: 16px; }
.logo {
font-size: 24px;
font-weight: 700;
background: linear-gradient(135deg, #00ff88 0%, #00d4ff 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
}
.symbol-badge {
background: rgba(0, 255, 136, 0.1);
border: 1px solid rgba(0, 255, 136, 0.3);
padding: 6px 14px;
border-radius: 20px;
font-size: 13px;
font-weight: 600;
color: #00ff88;
}
.stats-row { display: flex; gap: 24px; align-items: center; }
.stat-item { display: flex; flex-direction: column; align-items: flex-end; }
.stat-label { font-size: 10px; color: #666; text-transform: uppercase; }
.stat-value { font-size: 15px; font-weight: 600; font-variant-numeric: tabular-nums; }
.stat-value.positive { color: #00ff88; }
.stat-value.negative { color: #ff4757; }
.stat-value.neutral { color: #ffd700; }
.status-indicator { display: flex; align-items: center; gap: 8px; font-size: 12px; color: #888; }
.status-dot { width: 8px; height: 8px; border-radius: 50%; background: #00ff88; animation: pulse 2s infinite; }
.status-dot.disconnected { background: #ff4757; animation: none; }
@keyframes pulse {
0%, 100% { opacity: 1; box-shadow: 0 0 0 0 rgba(0, 255, 136, 0.4); }
50% { opacity: 0.8; box-shadow: 0 0 0 8px rgba(0, 255, 136, 0); }
}
.indicator-panel {
background: rgba(15, 15, 25, 0.8);
border-bottom: 1px solid rgba(255, 255, 255, 0.05);
padding: 10px 24px;
display: flex;
gap: 32px;
overflow-x: auto;
}
.indicator-group { display: flex; align-items: center; gap: 12px; }
.indicator-label { font-size: 11px; color: #666; text-transform: uppercase; }
.indicator-value { font-size: 13px; font-weight: 500; font-variant-numeric: tabular-nums; }
.charts-container {
flex: 1;
display: flex;
flex-direction: column;
position: relative;
}
.chart-wrapper { position: relative; border-bottom: 1px solid rgba(255, 255, 255, 0.05); }
#main-chart { flex: 5; }
#volume-chart { flex: 1; min-height: 60px; }
#osc-chart { flex: 1.5; min-height: 80px; }
.chart-label {
position: absolute; top: 12px; left: 16px; z-index: 10;
display: flex; gap: 16px; font-size: 11px; pointer-events: none;
}
.chart-label span { display: flex; align-items: center; gap: 6px; }
.chart-label .dot { width: 8px; height: 8px; border-radius: 50%; }
.loading-overlay {
position: absolute; top: 0; left: 0; right: 0; bottom: 0;
background: rgba(10, 10, 15, 0.95);
display: flex; flex-direction: column; align-items: center; justify-content: center;
z-index: 1000; transition: opacity 0.5s ease;
}
.loading-overlay.hidden { opacity: 0; pointer-events: none; }
.loader {
width: 50px; height: 50px; border: 3px solid rgba(0, 255, 136, 0.1);
border-top-color: #00ff88; border-radius: 50%; animation: spin 1s linear infinite;
}
@keyframes spin { to { transform: rotate(360deg); } }
.loading-text { margin-top: 20px; font-size: 14px; color: #666; }
.prediction-badge {
position: absolute; top: 12px; right: 16px;
background: rgba(191, 90, 242, 0.15); border: 1px solid rgba(191, 90, 242, 0.3);
padding: 4px 10px; border-radius: 12px; font-size: 10px; color: #bf5af2; z-index: 10;
}
</style>
</head>
<body>
<div class="header">
<div class="logo-section">
<div class="logo">QuantAI</div>
<div class="symbol-badge">BTC/USD</div>
</div>
<div class="stats-row">
<div class="stat-item">
<span class="stat-label">Price</span>
<span id="price" class="stat-value">$--</span>
</div>
<div class="stat-item">
<span class="stat-label">Change</span>
<span id="change" class="stat-value neutral">--%</span>
</div>
<div class="stat-item">
<span class="stat-label">RSI</span>
<span id="rsi" class="stat-value">--</span>
</div>
<div class="stat-item">
<span class="stat-label">ATR</span>
<span id="atr" class="stat-value">--</span>
</div>
</div>
<div class="status-indicator">
<div id="status-dot" class="status-dot"></div>
<span id="status-text">Connecting...</span>
</div>
</div>
<div class="indicator-panel">
<div class="indicator-group"><span class="indicator-label">EMA 20</span><span id="ema-val" class="indicator-value" style="color: #2962FF">--</span></div>
<div class="indicator-group"><span class="indicator-label">BB Upper</span><span id="bb-upper" class="indicator-value" style="color: #26a69a">--</span></div>
<div class="indicator-group"><span class="indicator-label">BB Lower</span><span id="bb-lower" class="indicator-value" style="color: #ef5350">--</span></div>
<div class="indicator-group"><span class="indicator-label">MACD</span><span id="macd-val" class="indicator-value">--</span></div>
<div class="indicator-group"><span class="indicator-label">Volume</span><span id="vol-val" class="indicator-value" style="color: #888">--</span></div>
</div>
<div class="charts-container">
<div class="loading-overlay" id="loading">
<div class="loader"></div>
<div class="loading-text">Loading market data...</div>
</div>
<div id="main-chart" class="chart-wrapper">
<div class="chart-label">
<span><div class="dot" style="background: #00ff88"></div>Price</span>
<span><div class="dot" style="background: #2962FF"></div>EMA 20</span>
<span><div class="dot" style="background: #26a69a; opacity: 0.5"></div>Bollinger</span>
<span><div class="dot" style="background: #bf5af2"></div>CNN Forecast</span>
</div>
<div class="prediction-badge">AI Forecast: 100 candles</div>
</div>
<div id="volume-chart" class="chart-wrapper">
<div class="chart-label"><span><div class="dot" style="background: #5c6bc0"></div>Volume</span></div>
</div>
<div id="osc-chart" class="chart-wrapper">
<div class="chart-label">
<span><div class="dot" style="background: #9C27B0"></div>RSI</span>
<span><div class="dot" style="background: #26a69a"></div>MACD Hist</span>
</div>
</div>
</div>
<script>
document.addEventListener('DOMContentLoaded', () => {
const mainEl = document.getElementById('main-chart');
const volEl = document.getElementById('volume-chart');
const oscEl = document.getElementById('osc-chart');
const loading = document.getElementById('loading');
const chartOptions = {
layout: { background: { type: 'solid', color: 'transparent' }, textColor: '#666' },
grid: { vertLines: { color: 'rgba(255,255,255,0.03)' }, horzLines: { color: 'rgba(255,255,255,0.03)' } },
timeScale: { timeVisible: true, secondsVisible: false, borderColor: 'rgba(255,255,255,0.1)' },
rightPriceScale: { borderColor: 'rgba(255,255,255,0.1)' },
crosshair: {
mode: LightweightCharts.CrosshairMode.Normal,
vertLine: { color: 'rgba(255,255,255,0.2)', labelBackgroundColor: '#1a1a2e' },
horzLine: { color: 'rgba(255,255,255,0.2)', labelBackgroundColor: '#1a1a2e' }
}
};
const mainChart = LightweightCharts.createChart(mainEl, chartOptions);
const volChart = LightweightCharts.createChart(volEl, chartOptions);
const oscChart = LightweightCharts.createChart(oscEl, chartOptions);
const candles = mainChart.addCandlestickSeries({
upColor: '#00ff88', downColor: '#ff4757',
borderUpColor: '#00ff88', borderDownColor: '#ff4757',
wickUpColor: '#00ff88', wickDownColor: '#ff4757'
});
const ema = mainChart.addLineSeries({ color: '#2962FF', lineWidth: 2, crosshairMarkerVisible: false });
const bbUpper = mainChart.addLineSeries({ color: 'rgba(38, 166, 154, 0.4)', lineWidth: 1, crosshairMarkerVisible: false });
const bbLower = mainChart.addLineSeries({ color: 'rgba(239, 83, 80, 0.4)', lineWidth: 1, crosshairMarkerVisible: false });
const predLine = mainChart.addLineSeries({
color: '#bf5af2', lineWidth: 2, lineStyle: LightweightCharts.LineStyle.Dashed,
crosshairMarkerVisible: false, title: 'Forecast'
});
const predUpper = mainChart.addLineSeries({
color: 'rgba(191, 90, 242, 0.3)', lineWidth: 1, lineStyle: LightweightCharts.LineStyle.Dotted,
crosshairMarkerVisible: false
});
const predLower = mainChart.addLineSeries({
color: 'rgba(191, 90, 242, 0.3)', lineWidth: 1, lineStyle: LightweightCharts.LineStyle.Dotted,
crosshairMarkerVisible: false
});
const volumeSeries = volChart.addHistogramSeries({ priceFormat: { type: 'volume' }, priceScaleId: '' });
volChart.priceScale('').applyOptions({ scaleMargins: { top: 0.1, bottom: 0 } });
const rsi = oscChart.addLineSeries({ color: '#9C27B0', lineWidth: 2, priceScaleId: 'rsi' });
oscChart.priceScale('rsi').applyOptions({ scaleMargins: { top: 0.1, bottom: 0.1 } });
const macdHist = oscChart.addHistogramSeries({ priceScaleId: 'macd' });
oscChart.priceScale('macd').applyOptions({ scaleMargins: { top: 0.6, bottom: 0 } });
function resizeCharts() {
mainChart.applyOptions({ width: mainEl.clientWidth, height: mainEl.clientHeight });
volChart.applyOptions({ width: mainEl.clientWidth, height: volEl.clientHeight });
oscChart.applyOptions({ width: mainEl.clientWidth, height: oscEl.clientHeight });
}
new ResizeObserver(resizeCharts).observe(document.body);
setTimeout(resizeCharts, 100);
function syncTimeScales(charts) {
charts.forEach((chart, i) => {
chart.timeScale().subscribeVisibleLogicalRangeChange(range => {
if (range) charts.forEach((c, j) => { if (i !== j) c.timeScale().setVisibleLogicalRange(range); });
});
});
}
syncTimeScales([mainChart, volChart, oscChart]);
function updateStats(stats, lastData) {
if (stats) {
document.getElementById('price').textContent = '$' + stats.price.toLocaleString('en-US', {minimumFractionDigits: 2});
const changeEl = document.getElementById('change');
changeEl.textContent = (stats.change >= 0 ? '+' : '') + stats.change + '%';
changeEl.className = 'stat-value ' + (stats.change > 0 ? 'positive' : stats.change < 0 ? 'negative' : 'neutral');
const rsiVal = stats.rsi;
const rsiEl = document.getElementById('rsi');
rsiEl.textContent = rsiVal;
rsiEl.className = 'stat-value ' + (rsiVal > 70 ? 'negative' : rsiVal < 30 ? 'positive' : 'neutral');
document.getElementById('atr').textContent = stats.atr;
}
if (lastData) {
document.getElementById('ema-val').textContent = lastData.ema20 ? lastData.ema20.toFixed(2) : '--';
document.getElementById('bb-upper').textContent = lastData.bb_upper ? lastData.bb_upper.toFixed(2) : '--';
document.getElementById('bb-lower').textContent = lastData.bb_lower ? lastData.bb_lower.toFixed(2) : '--';
const macdVal = lastData.macd_hist;
const macdEl = document.getElementById('macd-val');
if (macdVal !== null && macdVal !== undefined) {
macdEl.textContent = macdVal.toFixed(2);
macdEl.style.color = macdVal >= 0 ? '#26a69a' : '#ef5350';
}
document.getElementById('vol-val').textContent = lastData.volume ? lastData.volume.toFixed(2) : '--';
}
}
function setStatus(connected) {
const dot = document.getElementById('status-dot');
const text = document.getElementById('status-text');
if (connected) { dot.className = 'status-dot'; text.textContent = 'Live'; }
else { dot.className = 'status-dot disconnected'; text.textContent = 'Reconnecting...'; }
}
let hasData = false;
function connect() {
const protocol = location.protocol === 'https:' ? 'wss' : 'ws';
const ws = new WebSocket(protocol + '://' + location.host + '/ws');
ws.onopen = () => setStatus(true);
ws.onmessage = (e) => {
try {
const payload = JSON.parse(e.data);
if (!payload.data || payload.data.length === 0) return;
const d = payload.data;
const safeMap = (arr, key) => arr.filter(x => x && x.time && x[key] !== null).map(x => ({ time: x.time, value: x[key] }));
const candleData = d.filter(x => x && x.time && x.open).map(x => ({
time: x.time, open: x.open, high: x.high, low: x.low, close: x.close
}));
if (candleData.length > 0) {
candles.setData(candleData);
ema.setData(safeMap(d, 'ema20'));
bbUpper.setData(safeMap(d, 'bb_upper'));
bbLower.setData(safeMap(d, 'bb_lower'));
volumeSeries.setData(d.filter(x => x && x.time).map(x => ({
time: x.time, value: x.volume, color: x.close >= x.open ? 'rgba(0, 255, 136, 0.5)' : 'rgba(255, 71, 87, 0.5)'
})));
rsi.setData(safeMap(d, 'rsi'));
macdHist.setData(d.filter(x => x && x.time).map(x => ({
time: x.time, value: x.macd_hist, color: x.macd_hist >= 0 ? '#26a69a' : '#ef5350'
})));
if (payload.prediction && payload.prediction.length > 0) {
const lastCandle = candleData[candleData.length - 1];
const predData = [{ time: lastCandle.time, value: lastCandle.close }, ...payload.prediction.map(p => ({ time: p.time, value: p.value }))];
const upperData = [{ time: lastCandle.time, value: lastCandle.close }, ...payload.prediction.map(p => ({ time: p.time, value: p.upper }))];
const lowerData = [{ time: lastCandle.time, value: lastCandle.close }, ...payload.prediction.map(p => ({ time: p.time, value: p.lower }))];
predLine.setData(predData);
predUpper.setData(upperData);
predLower.setData(lowerData);
}
updateStats(payload.stats, d[d.length - 1]);
if (!hasData) {
hasData = true;
loading.classList.add('hidden');
mainChart.timeScale().fitContent();
}
}
} catch (err) { console.error("Chart error:", err); }
};
ws.onclose = () => { setStatus(false); setTimeout(connect, 2000); };
ws.onerror = () => ws.close();
}
connect();
});
</script>
</body>
</html>
"""
async def fetch_initial_data():
try:
async with aiohttp.ClientSession() as session:
url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
data = await response.json()
if 'result' in data:
for key in data['result']:
if key != 'last':
raw = data['result'][key]
market_state['ohlc_history'] = [
{
'time': int(c[0]),
'open': float(c[1]),
'high': float(c[2]),
'low': float(c[3]),
'close': float(c[4]),
'volume': float(c[6])
}
for c in raw
]
market_state['ready'] = True
logging.info(f"Loaded {len(market_state['ohlc_history'])} initial candles")
return True
except Exception as e:
logging.error(f"Initial data fetch error: {e}")
return False
async def kraken_rest_worker():
await fetch_initial_data()
while True:
try:
async with aiohttp.ClientSession() as session:
url = "https://api.kraken.com/0/public/OHLC?pair=XBTUSD&interval=1"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status == 200:
data = await response.json()
if 'result' in data:
for key in data['result']:
if key != 'last':
raw = data['result'][key]
new_candles = [
{
'time': int(c[0]),
'open': float(c[1]),
'high': float(c[2]),
'low': float(c[3]),
'close': float(c[4]),
'volume': float(c[6])
}
for c in raw[-10:]
]
if market_state['ohlc_history']:
existing_times = {c['time'] for c in market_state['ohlc_history']}
for nc in new_candles:
if nc['time'] in existing_times:
for i, ec in enumerate(market_state['ohlc_history']):
if ec['time'] == nc['time']:
market_state['ohlc_history'][i] = nc
break
else:
market_state['ohlc_history'].append(nc)
market_state['ohlc_history'].sort(key=lambda x: x['time'])
if len(market_state['ohlc_history']) > MAX_HISTORY:
market_state['ohlc_history'] = market_state['ohlc_history'][-MAX_HISTORY:]
market_state['ready'] = True
break
except Exception as e:
logging.warning(f"REST update error: {e}")
await asyncio.sleep(5)
async def broadcast_worker():
while True:
if connected_clients and market_state['ready']:
payload = await process_market_data()
if payload and "data" in payload:
msg = json.dumps(payload)
current_clients = connected_clients.copy()
disconnected = set()
for ws in current_clients:
try:
await ws.send_str(msg)
except Exception:
disconnected.add(ws)
if disconnected:
connected_clients.difference_update(disconnected)
await asyncio.sleep(BROADCAST_RATE)
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
connected_clients.add(ws)
try:
async for msg in ws:
pass
finally:
connected_clients.discard(ws)
return ws
async def handle_index(request):
return web.Response(text=HTML_PAGE, content_type='text/html')
async def main():
app = web.Application()
app.router.add_get('/', handle_index)
app.router.add_get('/ws', websocket_handler)
asyncio.create_task(kraken_rest_worker())
asyncio.create_task(broadcast_worker())
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', PORT)
await site.start()
logging.info(f"Server running at http://localhost:{PORT}")
await asyncio.Event().wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass