""" EuNEx Dashboard Database — SQLite persistence for orders, trades, and OHLCV history. """ import sqlite3 import threading import time import os import json _local = threading.local() def _get_conn(db_path): if not hasattr(_local, "connections"): _local.connections = {} if db_path not in _local.connections: os.makedirs(os.path.dirname(db_path), exist_ok=True) conn = sqlite3.connect(db_path, check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") _local.connections[db_path] = conn return _local.connections[db_path] def init_db(db_path): conn = _get_conn(db_path) conn.executescript(""" CREATE TABLE IF NOT EXISTS orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, order_id INTEGER UNIQUE, cl_ord_id TEXT, symbol_idx INTEGER, symbol TEXT, side TEXT, order_type TEXT, price REAL, quantity INTEGER, remaining_qty INTEGER, status TEXT, tif TEXT DEFAULT 'Day', source TEXT DEFAULT 'dashboard', timestamp REAL, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_orders_symbol ON orders(symbol); CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status); CREATE INDEX IF NOT EXISTS idx_orders_order_id ON orders(order_id); CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, trade_id INTEGER, symbol_idx INTEGER, symbol TEXT, price REAL, quantity INTEGER, buy_order_id INTEGER, sell_order_id INTEGER, timestamp REAL, created_at TEXT DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_trades_symbol ON trades(symbol); CREATE INDEX IF NOT EXISTS idx_trades_timestamp ON trades(timestamp); CREATE TABLE IF NOT EXISTS ohlcv ( symbol TEXT, bucket INTEGER, open REAL, high REAL, low REAL, close REAL, volume INTEGER, PRIMARY KEY (symbol, bucket) ); CREATE TABLE IF NOT EXISTS daily_close ( symbol TEXT, trade_date TEXT, close_price REAL, bid REAL, ask REAL, volume INTEGER, trade_count INTEGER, PRIMARY KEY (symbol, trade_date) ); """) conn.commit() def save_order(db_path, order): conn = _get_conn(db_path) conn.execute(""" INSERT INTO orders (order_id, cl_ord_id, symbol_idx, symbol, side, order_type, price, quantity, remaining_qty, status, tif, source, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(order_id) DO UPDATE SET remaining_qty=excluded.remaining_qty, status=excluded.status """, ( order.get("orderId"), order.get("clOrdId", ""), order.get("symbolIdx"), order.get("symbol"), order.get("side"), order.get("orderType", "Limit"), order.get("price", 0), order.get("quantity"), order.get("remainingQty", order.get("quantity")), order.get("status", "New"), order.get("tif", "Day"), order.get("source", "dashboard"), order.get("timestamp", time.time()), )) conn.commit() def save_trade(db_path, trade): conn = _get_conn(db_path) conn.execute(""" INSERT OR IGNORE INTO trades (trade_id, symbol_idx, symbol, price, quantity, buy_order_id, sell_order_id, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( trade.get("tradeId"), trade.get("symbolIdx"), trade.get("symbol"), trade.get("price"), trade.get("quantity"), trade.get("buyOrderId"), trade.get("sellOrderId"), trade.get("timestamp", time.time()), )) conn.commit() def record_ohlcv(db_path, symbol, price, quantity, bucket_size=60): bucket = int(time.time()) // bucket_size * bucket_size conn = _get_conn(db_path) row = conn.execute( "SELECT open, high, low, close, volume FROM ohlcv WHERE symbol=? AND bucket=?", (symbol, bucket) ).fetchone() if row: conn.execute(""" UPDATE ohlcv SET high=MAX(high, ?), low=MIN(low, ?), close=?, volume=volume+? WHERE symbol=? AND bucket=? """, (price, price, price, quantity, symbol, bucket)) else: conn.execute(""" INSERT INTO ohlcv (symbol, bucket, open, high, low, close, volume) VALUES (?, ?, ?, ?, ?, ?, ?) """, (symbol, bucket, price, price, price, price, quantity)) conn.commit() def get_ohlcv(db_path, symbol, period_seconds=3600): conn = _get_conn(db_path) cutoff = int(time.time()) - period_seconds rows = conn.execute( "SELECT bucket, open, high, low, close, volume FROM ohlcv " "WHERE symbol=? AND bucket>=? ORDER BY bucket", (symbol, cutoff) ).fetchall() return [dict(r) for r in rows] def get_recent_orders(db_path, limit=50): conn = _get_conn(db_path) rows = conn.execute( "SELECT * FROM orders ORDER BY id DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] def get_recent_trades(db_path, limit=100): conn = _get_conn(db_path) rows = conn.execute( "SELECT * FROM trades ORDER BY id DESC LIMIT ?", (limit,) ).fetchall() return [dict(r) for r in rows] def get_trade_stats(db_path, symbol=None): conn = _get_conn(db_path) if symbol: row = conn.execute( "SELECT COUNT(*) as count, SUM(quantity) as volume, " "AVG(price) as avg_price, MAX(price) as high, MIN(price) as low " "FROM trades WHERE symbol=?", (symbol,) ).fetchone() else: row = conn.execute( "SELECT COUNT(*) as count, SUM(quantity) as volume " "FROM trades" ).fetchone() return dict(row) if row else {} def get_active_orders(db_path, symbol=None): conn = _get_conn(db_path) if symbol: rows = conn.execute( "SELECT * FROM orders WHERE status IN ('New','PartiallyFilled') AND symbol=? " "ORDER BY timestamp", (symbol,) ).fetchall() else: rows = conn.execute( "SELECT * FROM orders WHERE status IN ('New','PartiallyFilled') ORDER BY timestamp" ).fetchall() return [dict(r) for r in rows] def save_daily_close(db_path, symbol, trade_date, close_price, bid, ask, volume, trade_count): conn = _get_conn(db_path) conn.execute(""" INSERT INTO daily_close (symbol, trade_date, close_price, bid, ask, volume, trade_count) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(symbol, trade_date) DO UPDATE SET close_price=excluded.close_price, bid=excluded.bid, ask=excluded.ask, volume=excluded.volume, trade_count=excluded.trade_count """, (symbol, trade_date, close_price, bid, ask, volume, trade_count)) conn.commit() def get_last_closing_prices(db_path): conn = _get_conn(db_path) rows = conn.execute(""" SELECT d.symbol, d.close_price, d.bid, d.ask, d.volume, d.trade_count, d.trade_date FROM daily_close d INNER JOIN ( SELECT symbol, MAX(trade_date) as max_date FROM daily_close GROUP BY symbol ) latest ON d.symbol = latest.symbol AND d.trade_date = latest.max_date """).fetchall() return {r["symbol"]: dict(r) for r in rows} def get_daily_closes(db_path, symbol, limit=30): conn = _get_conn(db_path) rows = conn.execute( "SELECT trade_date, close_price, bid, ask, volume, trade_count " "FROM daily_close WHERE symbol=? ORDER BY trade_date DESC LIMIT ?", (symbol, limit) ).fetchall() return [dict(r) for r in reversed(rows)]