EuNEx / dashboard /database.py
RayMelius's picture
v0.7.0: Daily close persistence, ticker tape, OHLCV candlestick chart
9781afc
"""
EuNEx Dashboard Database — SQLite persistence for orders, trades, and OHLCV history.
"""
import sqlite3
import threading
import time
import os
import json
_local = threading.local()
def _get_conn(db_path):
if not hasattr(_local, "connections"):
_local.connections = {}
if db_path not in _local.connections:
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
_local.connections[db_path] = conn
return _local.connections[db_path]
def init_db(db_path):
conn = _get_conn(db_path)
conn.executescript("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
order_id INTEGER UNIQUE,
cl_ord_id TEXT,
symbol_idx INTEGER,
symbol TEXT,
side TEXT,
order_type TEXT,
price REAL,
quantity INTEGER,
remaining_qty INTEGER,
status TEXT,
tif TEXT DEFAULT 'Day',
source TEXT DEFAULT 'dashboard',
timestamp REAL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_orders_symbol ON orders(symbol);
CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status);
CREATE INDEX IF NOT EXISTS idx_orders_order_id ON orders(order_id);
CREATE TABLE IF NOT EXISTS trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
trade_id INTEGER,
symbol_idx INTEGER,
symbol TEXT,
price REAL,
quantity INTEGER,
buy_order_id INTEGER,
sell_order_id INTEGER,
timestamp REAL,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_trades_symbol ON trades(symbol);
CREATE INDEX IF NOT EXISTS idx_trades_timestamp ON trades(timestamp);
CREATE TABLE IF NOT EXISTS ohlcv (
symbol TEXT,
bucket INTEGER,
open REAL,
high REAL,
low REAL,
close REAL,
volume INTEGER,
PRIMARY KEY (symbol, bucket)
);
CREATE TABLE IF NOT EXISTS daily_close (
symbol TEXT,
trade_date TEXT,
close_price REAL,
bid REAL,
ask REAL,
volume INTEGER,
trade_count INTEGER,
PRIMARY KEY (symbol, trade_date)
);
""")
conn.commit()
def save_order(db_path, order):
conn = _get_conn(db_path)
conn.execute("""
INSERT INTO orders (order_id, cl_ord_id, symbol_idx, symbol, side, order_type,
price, quantity, remaining_qty, status, tif, source, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(order_id) DO UPDATE SET
remaining_qty=excluded.remaining_qty,
status=excluded.status
""", (
order.get("orderId"), order.get("clOrdId", ""),
order.get("symbolIdx"), order.get("symbol"),
order.get("side"), order.get("orderType", "Limit"),
order.get("price", 0), order.get("quantity"),
order.get("remainingQty", order.get("quantity")),
order.get("status", "New"), order.get("tif", "Day"),
order.get("source", "dashboard"), order.get("timestamp", time.time()),
))
conn.commit()
def save_trade(db_path, trade):
conn = _get_conn(db_path)
conn.execute("""
INSERT OR IGNORE INTO trades (trade_id, symbol_idx, symbol, price, quantity,
buy_order_id, sell_order_id, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
trade.get("tradeId"), trade.get("symbolIdx"),
trade.get("symbol"), trade.get("price"),
trade.get("quantity"), trade.get("buyOrderId"),
trade.get("sellOrderId"), trade.get("timestamp", time.time()),
))
conn.commit()
def record_ohlcv(db_path, symbol, price, quantity, bucket_size=60):
bucket = int(time.time()) // bucket_size * bucket_size
conn = _get_conn(db_path)
row = conn.execute(
"SELECT open, high, low, close, volume FROM ohlcv WHERE symbol=? AND bucket=?",
(symbol, bucket)
).fetchone()
if row:
conn.execute("""
UPDATE ohlcv SET high=MAX(high, ?), low=MIN(low, ?), close=?, volume=volume+?
WHERE symbol=? AND bucket=?
""", (price, price, price, quantity, symbol, bucket))
else:
conn.execute("""
INSERT INTO ohlcv (symbol, bucket, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (symbol, bucket, price, price, price, price, quantity))
conn.commit()
def get_ohlcv(db_path, symbol, period_seconds=3600):
conn = _get_conn(db_path)
cutoff = int(time.time()) - period_seconds
rows = conn.execute(
"SELECT bucket, open, high, low, close, volume FROM ohlcv "
"WHERE symbol=? AND bucket>=? ORDER BY bucket",
(symbol, cutoff)
).fetchall()
return [dict(r) for r in rows]
def get_recent_orders(db_path, limit=50):
conn = _get_conn(db_path)
rows = conn.execute(
"SELECT * FROM orders ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def get_recent_trades(db_path, limit=100):
conn = _get_conn(db_path)
rows = conn.execute(
"SELECT * FROM trades ORDER BY id DESC LIMIT ?", (limit,)
).fetchall()
return [dict(r) for r in rows]
def get_trade_stats(db_path, symbol=None):
conn = _get_conn(db_path)
if symbol:
row = conn.execute(
"SELECT COUNT(*) as count, SUM(quantity) as volume, "
"AVG(price) as avg_price, MAX(price) as high, MIN(price) as low "
"FROM trades WHERE symbol=?", (symbol,)
).fetchone()
else:
row = conn.execute(
"SELECT COUNT(*) as count, SUM(quantity) as volume "
"FROM trades"
).fetchone()
return dict(row) if row else {}
def get_active_orders(db_path, symbol=None):
conn = _get_conn(db_path)
if symbol:
rows = conn.execute(
"SELECT * FROM orders WHERE status IN ('New','PartiallyFilled') AND symbol=? "
"ORDER BY timestamp", (symbol,)
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM orders WHERE status IN ('New','PartiallyFilled') ORDER BY timestamp"
).fetchall()
return [dict(r) for r in rows]
def save_daily_close(db_path, symbol, trade_date, close_price, bid, ask, volume, trade_count):
conn = _get_conn(db_path)
conn.execute("""
INSERT INTO daily_close (symbol, trade_date, close_price, bid, ask, volume, trade_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, trade_date) DO UPDATE SET
close_price=excluded.close_price, bid=excluded.bid, ask=excluded.ask,
volume=excluded.volume, trade_count=excluded.trade_count
""", (symbol, trade_date, close_price, bid, ask, volume, trade_count))
conn.commit()
def get_last_closing_prices(db_path):
conn = _get_conn(db_path)
rows = conn.execute("""
SELECT d.symbol, d.close_price, d.bid, d.ask, d.volume, d.trade_count, d.trade_date
FROM daily_close d
INNER JOIN (
SELECT symbol, MAX(trade_date) as max_date FROM daily_close GROUP BY symbol
) latest ON d.symbol = latest.symbol AND d.trade_date = latest.max_date
""").fetchall()
return {r["symbol"]: dict(r) for r in rows}
def get_daily_closes(db_path, symbol, limit=30):
conn = _get_conn(db_path)
rows = conn.execute(
"SELECT trade_date, close_price, bid, ask, volume, trade_count "
"FROM daily_close WHERE symbol=? ORDER BY trade_date DESC LIMIT ?",
(symbol, limit)
).fetchall()
return [dict(r) for r in reversed(rows)]