| """SQLite persistence for the Clearing House simulation. |
| |
| DB path: /app/data/clearing_house.db (shared volume with dashboard). |
| All public functions are thread-safe via thread-local connections. |
| """ |
|
|
| import sqlite3 |
| import threading |
| import time |
| import datetime |
| import os |
|
|
| CH_DB_PATH = os.getenv("CH_DB_PATH", "/app/data/clearing_house.db") |
| CH_MEMBERS = [f"USR{i:02d}" for i in range(1, 11)] |
| CH_STARTING_CAPITAL = 100_000.0 |
| CH_DAILY_OBLIGATION = 20 |
|
|
| _local = threading.local() |
|
|
| SCHEMA = """ |
| CREATE TABLE IF NOT EXISTS ch_members ( |
| member_id TEXT PRIMARY KEY, |
| capital REAL NOT NULL DEFAULT 100000.0, |
| created_at REAL NOT NULL |
| ); |
| |
| CREATE TABLE IF NOT EXISTS ch_holdings ( |
| member_id TEXT NOT NULL, |
| symbol TEXT NOT NULL, |
| quantity INTEGER NOT NULL DEFAULT 0, |
| avg_cost REAL NOT NULL DEFAULT 0.0, |
| PRIMARY KEY (member_id, symbol) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS ch_daily_trades ( |
| member_id TEXT NOT NULL, |
| trading_date TEXT NOT NULL, |
| buy_count INTEGER NOT NULL DEFAULT 0, |
| sell_count INTEGER NOT NULL DEFAULT 0, |
| total_securities INTEGER NOT NULL DEFAULT 0, |
| PRIMARY KEY (member_id, trading_date) |
| ); |
| |
| CREATE TABLE IF NOT EXISTS ch_trade_log ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| member_id TEXT NOT NULL, |
| symbol TEXT NOT NULL, |
| side TEXT NOT NULL, |
| quantity INTEGER NOT NULL, |
| price REAL NOT NULL, |
| cl_ord_id TEXT NOT NULL, |
| trading_date TEXT NOT NULL, |
| timestamp REAL NOT NULL |
| ); |
| |
| CREATE TABLE IF NOT EXISTS ch_settlements ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| member_id TEXT NOT NULL, |
| trading_date TEXT NOT NULL, |
| opening_capital REAL NOT NULL, |
| closing_capital REAL NOT NULL, |
| realized_pnl REAL NOT NULL DEFAULT 0.0, |
| unrealized_pnl REAL NOT NULL DEFAULT 0.0, |
| obligation_met INTEGER NOT NULL DEFAULT 0, |
| settled_at REAL NOT NULL |
| ); |
| |
| CREATE TABLE IF NOT EXISTS ch_ai_decisions ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| member_id TEXT NOT NULL, |
| llm_response TEXT NOT NULL, |
| parsed_order TEXT, |
| source TEXT NOT NULL DEFAULT 'llm', |
| trading_date TEXT NOT NULL, |
| timestamp REAL NOT NULL |
| ); |
| |
| CREATE INDEX IF NOT EXISTS idx_ch_trade_log_member |
| ON ch_trade_log(member_id, trading_date); |
| CREATE INDEX IF NOT EXISTS idx_ch_settlements_member |
| ON ch_settlements(member_id, trading_date); |
| CREATE INDEX IF NOT EXISTS idx_ch_ai_decisions_member |
| ON ch_ai_decisions(member_id, trading_date); |
| """ |
|
|
|
|
| def _conn() -> sqlite3.Connection: |
| """Thread-local SQLite connection.""" |
| if not hasattr(_local, "conn") or _local.conn is None: |
| _local.conn = sqlite3.connect(CH_DB_PATH, check_same_thread=False) |
| _local.conn.row_factory = sqlite3.Row |
| return _local.conn |
|
|
|
|
| def init_db() -> None: |
| """Create schema and seed the 10 members if missing.""" |
| conn = _conn() |
| conn.executescript(SCHEMA) |
| conn.commit() |
| now = time.time() |
| for mid in CH_MEMBERS: |
| conn.execute( |
| "INSERT OR IGNORE INTO ch_members (member_id, capital, created_at) VALUES (?,?,?)", |
| (mid, CH_STARTING_CAPITAL, now), |
| ) |
| conn.commit() |
| print(f"[CH-DB] Initialized at {CH_DB_PATH}") |
|
|
|
|
| |
|
|
| def get_member(member_id: str) -> dict | None: |
| row = _conn().execute( |
| "SELECT member_id, capital FROM ch_members WHERE member_id=?", (member_id,) |
| ).fetchone() |
| return dict(row) if row else None |
|
|
|
|
| def get_all_members() -> list[dict]: |
| rows = _conn().execute( |
| "SELECT member_id, capital FROM ch_members ORDER BY member_id" |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| |
|
|
| def get_holdings(member_id: str) -> list[dict]: |
| rows = _conn().execute( |
| "SELECT symbol, quantity, avg_cost FROM ch_holdings WHERE member_id=? AND quantity>0", |
| (member_id,), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| def get_holding(member_id: str, symbol: str) -> dict: |
| row = _conn().execute( |
| "SELECT quantity, avg_cost FROM ch_holdings WHERE member_id=? AND symbol=?", |
| (member_id, symbol), |
| ).fetchone() |
| return dict(row) if row else {"quantity": 0, "avg_cost": 0.0} |
|
|
|
|
| |
|
|
| def today_str() -> str: |
| return datetime.date.today().isoformat() |
|
|
|
|
| def record_trade( |
| member_id: str, |
| symbol: str, |
| side: str, |
| quantity: int, |
| price: float, |
| cl_ord_id: str, |
| ) -> None: |
| """Atomically update holdings, capital, daily counter, and trade log.""" |
| conn = _conn() |
| date = today_str() |
| value = quantity * price |
|
|
| with conn: |
| |
| holding = get_holding(member_id, symbol) |
| old_qty = holding["quantity"] |
| old_avg = holding["avg_cost"] |
|
|
| if side == "BUY": |
| new_qty = old_qty + quantity |
| new_avg = (old_qty * old_avg + quantity * price) / new_qty if new_qty else price |
| conn.execute( |
| """INSERT INTO ch_holdings (member_id, symbol, quantity, avg_cost) |
| VALUES (?,?,?,?) |
| ON CONFLICT(member_id, symbol) DO UPDATE |
| SET quantity=excluded.quantity, avg_cost=excluded.avg_cost""", |
| (member_id, symbol, new_qty, round(new_avg, 4)), |
| ) |
| |
| conn.execute( |
| "UPDATE ch_members SET capital = capital - ? WHERE member_id=?", |
| (value, member_id), |
| ) |
| else: |
| new_qty = max(0, old_qty - quantity) |
| if new_qty == 0: |
| conn.execute( |
| "DELETE FROM ch_holdings WHERE member_id=? AND symbol=?", |
| (member_id, symbol), |
| ) |
| else: |
| conn.execute( |
| "UPDATE ch_holdings SET quantity=? WHERE member_id=? AND symbol=?", |
| (new_qty, member_id, symbol), |
| ) |
| |
| conn.execute( |
| "UPDATE ch_members SET capital = capital + ? WHERE member_id=?", |
| (value, member_id), |
| ) |
|
|
| |
| buy_inc = 1 if side == "BUY" else 0 |
| sell_inc = 1 if side == "SELL" else 0 |
| conn.execute( |
| """INSERT INTO ch_daily_trades (member_id, trading_date, buy_count, sell_count, total_securities) |
| VALUES (?,?,?,?,?) |
| ON CONFLICT(member_id, trading_date) DO UPDATE |
| SET buy_count = buy_count + excluded.buy_count, |
| sell_count = sell_count + excluded.sell_count, |
| total_securities = total_securities + excluded.total_securities""", |
| (member_id, date, buy_inc, sell_inc, quantity), |
| ) |
|
|
| |
| conn.execute( |
| """INSERT INTO ch_trade_log |
| (member_id, symbol, side, quantity, price, cl_ord_id, trading_date, timestamp) |
| VALUES (?,?,?,?,?,?,?,?)""", |
| (member_id, symbol, side, quantity, price, cl_ord_id, date, time.time()), |
| ) |
|
|
|
|
| def get_daily_trades(member_id: str, date: str | None = None) -> dict: |
| date = date or today_str() |
| row = _conn().execute( |
| "SELECT buy_count, sell_count, total_securities FROM ch_daily_trades WHERE member_id=? AND trading_date=?", |
| (member_id, date), |
| ).fetchone() |
| return dict(row) if row else {"buy_count": 0, "sell_count": 0, "total_securities": 0} |
|
|
|
|
| def get_trade_log(member_id: str, date: str | None = None, limit: int = 50) -> list[dict]: |
| date = date or today_str() |
| rows = _conn().execute( |
| """SELECT symbol, side, quantity, price, cl_ord_id, timestamp |
| FROM ch_trade_log WHERE member_id=? AND trading_date=? |
| ORDER BY timestamp DESC LIMIT ?""", |
| (member_id, date, limit), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| |
|
|
| def record_settlement( |
| member_id: str, |
| trading_date: str, |
| opening_capital: float, |
| closing_capital: float, |
| realized_pnl: float, |
| unrealized_pnl: float, |
| obligation_met: bool, |
| ) -> None: |
| _conn().execute( |
| """INSERT INTO ch_settlements |
| (member_id, trading_date, opening_capital, closing_capital, |
| realized_pnl, unrealized_pnl, obligation_met, settled_at) |
| VALUES (?,?,?,?,?,?,?,?)""", |
| ( |
| member_id, trading_date, opening_capital, closing_capital, |
| realized_pnl, unrealized_pnl, int(obligation_met), time.time(), |
| ), |
| ) |
| _conn().commit() |
|
|
|
|
| def get_settlements(member_id: str, limit: int = 30) -> list[dict]: |
| rows = _conn().execute( |
| """SELECT trading_date, opening_capital, closing_capital, |
| realized_pnl, unrealized_pnl, obligation_met, settled_at |
| FROM ch_settlements WHERE member_id=? |
| ORDER BY settled_at DESC LIMIT ?""", |
| (member_id, limit), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| |
|
|
| def record_ai_decision( |
| member_id: str, |
| llm_response: str, |
| parsed_order: dict | None, |
| source: str = "llm", |
| ) -> None: |
| import json as _json |
| _conn().execute( |
| """INSERT INTO ch_ai_decisions |
| (member_id, llm_response, parsed_order, source, trading_date, timestamp) |
| VALUES (?,?,?,?,?,?)""", |
| ( |
| member_id, |
| llm_response, |
| _json.dumps(parsed_order) if parsed_order else None, |
| source, |
| today_str(), |
| time.time(), |
| ), |
| ) |
| _conn().commit() |
|
|
|
|
| def get_ai_decisions(member_id: str, limit: int = 10) -> list[dict]: |
| rows = _conn().execute( |
| """SELECT llm_response, parsed_order, source, timestamp |
| FROM ch_ai_decisions WHERE member_id=? |
| ORDER BY timestamp DESC LIMIT ?""", |
| (member_id, limit), |
| ).fetchall() |
| return [dict(r) for r in rows] |
|
|
|
|
| |
|
|
| def get_leaderboard(date: str | None = None) -> list[dict]: |
| """Returns member stats. Caller adds holdings_value using live prices.""" |
| date = date or today_str() |
| members = get_all_members() |
| result = [] |
| for m in members: |
| mid = m["member_id"] |
| dt = get_daily_trades(mid, date) |
| holdings = get_holdings(mid) |
| result.append({ |
| "member_id": mid, |
| "capital": round(m["capital"], 2), |
| "holdings": holdings, |
| "buy_count": dt["buy_count"], |
| "sell_count": dt["sell_count"], |
| "total_securities": dt["total_securities"], |
| "obligation_met": dt["total_securities"] >= CH_DAILY_OBLIGATION, |
| }) |
| return result |
|
|
|
|
| |
|
|
| def verify_password(member_id: str, password: str) -> bool: |
| """Password equals the member ID (e.g. USR01 / USR01).""" |
| return member_id in CH_MEMBERS and password == member_id |
|
|