StockEx / clearing_house /ch_database.py
RayMelius's picture
Fix MDF order book depth and CH AI reference price fallback
04e24fd
"""SQLite persistence for the Clearing House simulation.
DB path: /app/data/clearing_house.db (shared volume with dashboard).
All public functions are thread-safe via thread-local connections.
"""
import sqlite3
import threading
import time
import datetime
import os
CH_DB_PATH = os.getenv("CH_DB_PATH", "/app/data/clearing_house.db")
CH_MEMBERS = [f"USR{i:02d}" for i in range(1, 11)]
CH_STARTING_CAPITAL = 100_000.0
CH_DAILY_OBLIGATION = 20 # minimum securities (qty sum) per trading day
_local = threading.local()
SCHEMA = """
CREATE TABLE IF NOT EXISTS ch_members (
member_id TEXT PRIMARY KEY,
capital REAL NOT NULL DEFAULT 100000.0,
created_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS ch_holdings (
member_id TEXT NOT NULL,
symbol TEXT NOT NULL,
quantity INTEGER NOT NULL DEFAULT 0,
avg_cost REAL NOT NULL DEFAULT 0.0,
PRIMARY KEY (member_id, symbol)
);
CREATE TABLE IF NOT EXISTS ch_daily_trades (
member_id TEXT NOT NULL,
trading_date TEXT NOT NULL,
buy_count INTEGER NOT NULL DEFAULT 0,
sell_count INTEGER NOT NULL DEFAULT 0,
total_securities INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (member_id, trading_date)
);
CREATE TABLE IF NOT EXISTS ch_trade_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
member_id TEXT NOT NULL,
symbol TEXT NOT NULL,
side TEXT NOT NULL,
quantity INTEGER NOT NULL,
price REAL NOT NULL,
cl_ord_id TEXT NOT NULL,
trading_date TEXT NOT NULL,
timestamp REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS ch_settlements (
id INTEGER PRIMARY KEY AUTOINCREMENT,
member_id TEXT NOT NULL,
trading_date TEXT NOT NULL,
opening_capital REAL NOT NULL,
closing_capital REAL NOT NULL,
realized_pnl REAL NOT NULL DEFAULT 0.0,
unrealized_pnl REAL NOT NULL DEFAULT 0.0,
obligation_met INTEGER NOT NULL DEFAULT 0,
settled_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS ch_ai_decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
member_id TEXT NOT NULL,
llm_response TEXT NOT NULL,
parsed_order TEXT,
source TEXT NOT NULL DEFAULT 'llm',
trading_date TEXT NOT NULL,
timestamp REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_ch_trade_log_member
ON ch_trade_log(member_id, trading_date);
CREATE INDEX IF NOT EXISTS idx_ch_settlements_member
ON ch_settlements(member_id, trading_date);
CREATE INDEX IF NOT EXISTS idx_ch_ai_decisions_member
ON ch_ai_decisions(member_id, trading_date);
"""
def _conn() -> sqlite3.Connection:
"""Thread-local SQLite connection."""
if not hasattr(_local, "conn") or _local.conn is None:
_local.conn = sqlite3.connect(CH_DB_PATH, check_same_thread=False)
_local.conn.row_factory = sqlite3.Row
return _local.conn
def init_db() -> None:
"""Create schema and seed the 10 members if missing."""
conn = _conn()
conn.executescript(SCHEMA)
conn.commit()
now = time.time()
for mid in CH_MEMBERS:
conn.execute(
"INSERT OR IGNORE INTO ch_members (member_id, capital, created_at) VALUES (?,?,?)",
(mid, CH_STARTING_CAPITAL, now),
)
conn.commit()
print(f"[CH-DB] Initialized at {CH_DB_PATH}")
# ── Members ────────────────────────────────────────────────────────────────────
def get_member(member_id: str) -> dict | None:
row = _conn().execute(
"SELECT member_id, capital FROM ch_members WHERE member_id=?", (member_id,)
).fetchone()
return dict(row) if row else None
def get_all_members() -> list[dict]:
rows = _conn().execute(
"SELECT member_id, capital FROM ch_members ORDER BY member_id"
).fetchall()
return [dict(r) for r in rows]
# ── Holdings ───────────────────────────────────────────────────────────────────
def get_holdings(member_id: str) -> list[dict]:
rows = _conn().execute(
"SELECT symbol, quantity, avg_cost FROM ch_holdings WHERE member_id=? AND quantity>0",
(member_id,),
).fetchall()
return [dict(r) for r in rows]
def get_holding(member_id: str, symbol: str) -> dict:
row = _conn().execute(
"SELECT quantity, avg_cost FROM ch_holdings WHERE member_id=? AND symbol=?",
(member_id, symbol),
).fetchone()
return dict(row) if row else {"quantity": 0, "avg_cost": 0.0}
# ── Trade recording (atomic) ───────────────────────────────────────────────────
def today_str() -> str:
return datetime.date.today().isoformat()
def record_trade(
member_id: str,
symbol: str,
side: str,
quantity: int,
price: float,
cl_ord_id: str,
) -> None:
"""Atomically update holdings, capital, daily counter, and trade log."""
conn = _conn()
date = today_str()
value = quantity * price
with conn: # auto-commit / rollback
# 1. Update holdings
holding = get_holding(member_id, symbol)
old_qty = holding["quantity"]
old_avg = holding["avg_cost"]
if side == "BUY":
new_qty = old_qty + quantity
new_avg = (old_qty * old_avg + quantity * price) / new_qty if new_qty else price
conn.execute(
"""INSERT INTO ch_holdings (member_id, symbol, quantity, avg_cost)
VALUES (?,?,?,?)
ON CONFLICT(member_id, symbol) DO UPDATE
SET quantity=excluded.quantity, avg_cost=excluded.avg_cost""",
(member_id, symbol, new_qty, round(new_avg, 4)),
)
# 2. Deduct capital
conn.execute(
"UPDATE ch_members SET capital = capital - ? WHERE member_id=?",
(value, member_id),
)
else: # SELL
new_qty = max(0, old_qty - quantity)
if new_qty == 0:
conn.execute(
"DELETE FROM ch_holdings WHERE member_id=? AND symbol=?",
(member_id, symbol),
)
else:
conn.execute(
"UPDATE ch_holdings SET quantity=? WHERE member_id=? AND symbol=?",
(new_qty, member_id, symbol),
)
# 2. Add capital
conn.execute(
"UPDATE ch_members SET capital = capital + ? WHERE member_id=?",
(value, member_id),
)
# 3. Update daily trade counter
buy_inc = 1 if side == "BUY" else 0
sell_inc = 1 if side == "SELL" else 0
conn.execute(
"""INSERT INTO ch_daily_trades (member_id, trading_date, buy_count, sell_count, total_securities)
VALUES (?,?,?,?,?)
ON CONFLICT(member_id, trading_date) DO UPDATE
SET buy_count = buy_count + excluded.buy_count,
sell_count = sell_count + excluded.sell_count,
total_securities = total_securities + excluded.total_securities""",
(member_id, date, buy_inc, sell_inc, quantity),
)
# 4. Log the trade
conn.execute(
"""INSERT INTO ch_trade_log
(member_id, symbol, side, quantity, price, cl_ord_id, trading_date, timestamp)
VALUES (?,?,?,?,?,?,?,?)""",
(member_id, symbol, side, quantity, price, cl_ord_id, date, time.time()),
)
def get_daily_trades(member_id: str, date: str | None = None) -> dict:
date = date or today_str()
row = _conn().execute(
"SELECT buy_count, sell_count, total_securities FROM ch_daily_trades WHERE member_id=? AND trading_date=?",
(member_id, date),
).fetchone()
return dict(row) if row else {"buy_count": 0, "sell_count": 0, "total_securities": 0}
def get_trade_log(member_id: str, date: str | None = None, limit: int = 50) -> list[dict]:
date = date or today_str()
rows = _conn().execute(
"""SELECT symbol, side, quantity, price, cl_ord_id, timestamp
FROM ch_trade_log WHERE member_id=? AND trading_date=?
ORDER BY timestamp DESC LIMIT ?""",
(member_id, date, limit),
).fetchall()
return [dict(r) for r in rows]
# ── EOD Settlement ─────────────────────────────────────────────────────────────
def record_settlement(
member_id: str,
trading_date: str,
opening_capital: float,
closing_capital: float,
realized_pnl: float,
unrealized_pnl: float,
obligation_met: bool,
) -> None:
_conn().execute(
"""INSERT INTO ch_settlements
(member_id, trading_date, opening_capital, closing_capital,
realized_pnl, unrealized_pnl, obligation_met, settled_at)
VALUES (?,?,?,?,?,?,?,?)""",
(
member_id, trading_date, opening_capital, closing_capital,
realized_pnl, unrealized_pnl, int(obligation_met), time.time(),
),
)
_conn().commit()
def get_settlements(member_id: str, limit: int = 30) -> list[dict]:
rows = _conn().execute(
"""SELECT trading_date, opening_capital, closing_capital,
realized_pnl, unrealized_pnl, obligation_met, settled_at
FROM ch_settlements WHERE member_id=?
ORDER BY settled_at DESC LIMIT ?""",
(member_id, limit),
).fetchall()
return [dict(r) for r in rows]
# ── AI Decisions ──────────────────────────────────────────────────────────────
def record_ai_decision(
member_id: str,
llm_response: str,
parsed_order: dict | None,
source: str = "llm",
) -> None:
import json as _json
_conn().execute(
"""INSERT INTO ch_ai_decisions
(member_id, llm_response, parsed_order, source, trading_date, timestamp)
VALUES (?,?,?,?,?,?)""",
(
member_id,
llm_response,
_json.dumps(parsed_order) if parsed_order else None,
source,
today_str(),
time.time(),
),
)
_conn().commit()
def get_ai_decisions(member_id: str, limit: int = 10) -> list[dict]:
rows = _conn().execute(
"""SELECT llm_response, parsed_order, source, timestamp
FROM ch_ai_decisions WHERE member_id=?
ORDER BY timestamp DESC LIMIT ?""",
(member_id, limit),
).fetchall()
return [dict(r) for r in rows]
# ── Leaderboard ────────────────────────────────────────────────────────────────
def get_leaderboard(date: str | None = None) -> list[dict]:
"""Returns member stats. Caller adds holdings_value using live prices."""
date = date or today_str()
members = get_all_members()
result = []
for m in members:
mid = m["member_id"]
dt = get_daily_trades(mid, date)
holdings = get_holdings(mid)
result.append({
"member_id": mid,
"capital": round(m["capital"], 2),
"holdings": holdings,
"buy_count": dt["buy_count"],
"sell_count": dt["sell_count"],
"total_securities": dt["total_securities"],
"obligation_met": dt["total_securities"] >= CH_DAILY_OBLIGATION,
})
return result
# ── Auth ───────────────────────────────────────────────────────────────────────
def verify_password(member_id: str, password: str) -> bool:
"""Password equals the member ID (e.g. USR01 / USR01)."""
return member_id in CH_MEMBERS and password == member_id