"""SQLite persistence layer for the matcher service.""" import sqlite3 import threading import time import os DB_PATH = os.getenv("DB_PATH", "/app/data/matcher.db") _local = threading.local() def get_connection(): """Get a thread-local database connection.""" if not hasattr(_local, "conn") or _local.conn is None: os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) _local.conn = sqlite3.connect(DB_PATH, check_same_thread=False) _local.conn.row_factory = sqlite3.Row return _local.conn def init_db(): """Initialize database schema.""" conn = get_connection() cursor = conn.cursor() cursor.executescript(""" CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, price REAL NOT NULL, quantity INTEGER NOT NULL, buy_order_id TEXT, sell_order_id TEXT, timestamp REAL NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS order_book ( id INTEGER PRIMARY KEY AUTOINCREMENT, cl_ord_id TEXT UNIQUE, symbol TEXT NOT NULL, side TEXT NOT NULL, price REAL, quantity INTEGER NOT NULL, remaining_qty INTEGER NOT NULL, status TEXT DEFAULT 'OPEN', timestamp REAL NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_trades_symbol ON trades(symbol); CREATE INDEX IF NOT EXISTS idx_trades_timestamp ON trades(timestamp); CREATE INDEX IF NOT EXISTS idx_orderbook_symbol_side ON order_book(symbol, side); CREATE INDEX IF NOT EXISTS idx_orderbook_status ON order_book(status); CREATE INDEX IF NOT EXISTS idx_orderbook_cl_ord_id ON order_book(cl_ord_id); """) conn.commit() print(f"Database initialized at {DB_PATH}") def save_trade(trade: dict): """Persist a trade to the database.""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT INTO trades (symbol, price, quantity, buy_order_id, sell_order_id, timestamp) VALUES (?, ?, ?, ?, ?, ?) """, ( trade.get("symbol"), trade.get("price"), trade.get("quantity"), trade.get("buy_id"), trade.get("sell_id"), trade.get("timestamp", time.time()) )) conn.commit() return cursor.lastrowid def get_trades(symbol: str = None, limit: int = 200, offset: int = 0): """Retrieve trades from database.""" conn = get_connection() cursor = conn.cursor() if symbol: cursor.execute(""" SELECT * FROM trades WHERE symbol = ? ORDER BY timestamp DESC LIMIT ? OFFSET ? """, (symbol, limit, offset)) else: cursor.execute(""" SELECT * FROM trades ORDER BY timestamp DESC LIMIT ? OFFSET ? """, (limit, offset)) rows = cursor.fetchall() return [dict(row) for row in rows] def get_trade_count(symbol: str = None): """Get total trade count.""" conn = get_connection() cursor = conn.cursor() if symbol: cursor.execute("SELECT COUNT(*) FROM trades WHERE symbol = ?", (symbol,)) else: cursor.execute("SELECT COUNT(*) FROM trades") return cursor.fetchone()[0] def save_order(order: dict): """Save or update an order in the order book.""" conn = get_connection() cursor = conn.cursor() cl_ord_id = order.get("cl_ord_id") if not cl_ord_id: cl_ord_id = f"gen-{time.time_ns()}" cursor.execute(""" INSERT INTO order_book (cl_ord_id, symbol, side, price, quantity, remaining_qty, status, timestamp) VALUES (?, ?, ?, ?, ?, ?, 'OPEN', ?) ON CONFLICT(cl_ord_id) DO UPDATE SET remaining_qty = excluded.remaining_qty, status = CASE WHEN excluded.remaining_qty = 0 THEN 'FILLED' ELSE status END """, ( cl_ord_id, order.get("symbol"), order.get("side"), order.get("price"), order.get("quantity"), order.get("quantity"), # remaining_qty starts as full quantity order.get("timestamp", time.time()) )) conn.commit() return cl_ord_id def update_order_quantity(cl_ord_id: str, remaining_qty: int): """Update remaining quantity for an order.""" conn = get_connection() cursor = conn.cursor() status = "FILLED" if remaining_qty == 0 else "OPEN" cursor.execute(""" UPDATE order_book SET remaining_qty = ?, status = ? WHERE cl_ord_id = ? """, (remaining_qty, status, cl_ord_id)) conn.commit() def cancel_order(cl_ord_id: str): """Mark an order as cancelled.""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" UPDATE order_book SET status = 'CANCELLED' WHERE cl_ord_id = ? """, (cl_ord_id,)) conn.commit() return cursor.rowcount > 0 def get_open_orders(symbol: str = None, side: str = None): """Get all open orders, optionally filtered by symbol and side.""" conn = get_connection() cursor = conn.cursor() query = "SELECT * FROM order_book WHERE status = 'OPEN'" params = [] if symbol: query += " AND symbol = ?" params.append(symbol) if side: query += " AND side = ?" params.append(side) query += " ORDER BY timestamp ASC" cursor.execute(query, params) rows = cursor.fetchall() return [dict(row) for row in rows] def load_order_books(): """Load all open orders grouped by symbol for matcher initialization.""" conn = get_connection() cursor = conn.cursor() cursor.execute(""" SELECT * FROM order_book WHERE status = 'OPEN' ORDER BY symbol, side, timestamp """) books = {} for row in cursor.fetchall(): order = dict(row) symbol = order["symbol"] side = order["side"] if symbol not in books: books[symbol] = {"bids": [], "asks": []} # Convert DB format to matcher format matcher_order = { "cl_ord_id": order["cl_ord_id"], "symbol": symbol, "side": side, "price": order["price"], "quantity": order["remaining_qty"], "timestamp": order["timestamp"] } if side == "BUY": books[symbol]["bids"].append(matcher_order) else: books[symbol]["asks"].append(matcher_order) return books def delete_filled_orders(older_than_days: int = 7): """Clean up old filled orders.""" conn = get_connection() cursor = conn.cursor() cutoff = time.time() - (older_than_days * 24 * 60 * 60) cursor.execute(""" DELETE FROM order_book WHERE status IN ('FILLED', 'CANCELLED') AND timestamp < ? """, (cutoff,)) conn.commit() return cursor.rowcount