File size: 12,357 Bytes
8f253b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
04e24fd
8f253b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57e97ca
 
 
 
 
 
 
 
 
 
8f253b3
 
 
 
57e97ca
 
8f253b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57e97ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f253b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
"""SQLite persistence for the Clearing House simulation.

DB path: /app/data/clearing_house.db (shared volume with dashboard).
All public functions are thread-safe via thread-local connections.
"""

import sqlite3
import threading
import time
import datetime
import os

CH_DB_PATH = os.getenv("CH_DB_PATH", "/app/data/clearing_house.db")
CH_MEMBERS = [f"USR{i:02d}" for i in range(1, 11)]
CH_STARTING_CAPITAL = 100_000.0
CH_DAILY_OBLIGATION = 20  # minimum securities (qty sum) per trading day

_local = threading.local()

SCHEMA = """
CREATE TABLE IF NOT EXISTS ch_members (
    member_id   TEXT PRIMARY KEY,
    capital     REAL NOT NULL DEFAULT 100000.0,
    created_at  REAL NOT NULL
);

CREATE TABLE IF NOT EXISTS ch_holdings (
    member_id   TEXT NOT NULL,
    symbol      TEXT NOT NULL,
    quantity    INTEGER NOT NULL DEFAULT 0,
    avg_cost    REAL NOT NULL DEFAULT 0.0,
    PRIMARY KEY (member_id, symbol)
);

CREATE TABLE IF NOT EXISTS ch_daily_trades (
    member_id        TEXT NOT NULL,
    trading_date     TEXT NOT NULL,
    buy_count        INTEGER NOT NULL DEFAULT 0,
    sell_count       INTEGER NOT NULL DEFAULT 0,
    total_securities INTEGER NOT NULL DEFAULT 0,
    PRIMARY KEY (member_id, trading_date)
);

CREATE TABLE IF NOT EXISTS ch_trade_log (
    id           INTEGER PRIMARY KEY AUTOINCREMENT,
    member_id    TEXT NOT NULL,
    symbol       TEXT NOT NULL,
    side         TEXT NOT NULL,
    quantity     INTEGER NOT NULL,
    price        REAL NOT NULL,
    cl_ord_id    TEXT NOT NULL,
    trading_date TEXT NOT NULL,
    timestamp    REAL NOT NULL
);

CREATE TABLE IF NOT EXISTS ch_settlements (
    id              INTEGER PRIMARY KEY AUTOINCREMENT,
    member_id       TEXT NOT NULL,
    trading_date    TEXT NOT NULL,
    opening_capital REAL NOT NULL,
    closing_capital REAL NOT NULL,
    realized_pnl    REAL NOT NULL DEFAULT 0.0,
    unrealized_pnl  REAL NOT NULL DEFAULT 0.0,
    obligation_met  INTEGER NOT NULL DEFAULT 0,
    settled_at      REAL NOT NULL
);

CREATE TABLE IF NOT EXISTS ch_ai_decisions (
    id           INTEGER PRIMARY KEY AUTOINCREMENT,
    member_id    TEXT NOT NULL,
    llm_response TEXT NOT NULL,
    parsed_order TEXT,
    source       TEXT NOT NULL DEFAULT 'llm',
    trading_date TEXT NOT NULL,
    timestamp    REAL NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_ch_trade_log_member
    ON ch_trade_log(member_id, trading_date);
CREATE INDEX IF NOT EXISTS idx_ch_settlements_member
    ON ch_settlements(member_id, trading_date);
CREATE INDEX IF NOT EXISTS idx_ch_ai_decisions_member
    ON ch_ai_decisions(member_id, trading_date);
"""


def _conn() -> sqlite3.Connection:
    """Thread-local SQLite connection."""
    if not hasattr(_local, "conn") or _local.conn is None:
        _local.conn = sqlite3.connect(CH_DB_PATH, check_same_thread=False)
        _local.conn.row_factory = sqlite3.Row
    return _local.conn


def init_db() -> None:
    """Create schema and seed the 10 members if missing."""
    conn = _conn()
    conn.executescript(SCHEMA)
    conn.commit()
    now = time.time()
    for mid in CH_MEMBERS:
        conn.execute(
            "INSERT OR IGNORE INTO ch_members (member_id, capital, created_at) VALUES (?,?,?)",
            (mid, CH_STARTING_CAPITAL, now),
        )
    conn.commit()
    print(f"[CH-DB] Initialized at {CH_DB_PATH}")


# ── Members ────────────────────────────────────────────────────────────────────

def get_member(member_id: str) -> dict | None:
    row = _conn().execute(
        "SELECT member_id, capital FROM ch_members WHERE member_id=?", (member_id,)
    ).fetchone()
    return dict(row) if row else None


def get_all_members() -> list[dict]:
    rows = _conn().execute(
        "SELECT member_id, capital FROM ch_members ORDER BY member_id"
    ).fetchall()
    return [dict(r) for r in rows]


# ── Holdings ───────────────────────────────────────────────────────────────────

def get_holdings(member_id: str) -> list[dict]:
    rows = _conn().execute(
        "SELECT symbol, quantity, avg_cost FROM ch_holdings WHERE member_id=? AND quantity>0",
        (member_id,),
    ).fetchall()
    return [dict(r) for r in rows]


def get_holding(member_id: str, symbol: str) -> dict:
    row = _conn().execute(
        "SELECT quantity, avg_cost FROM ch_holdings WHERE member_id=? AND symbol=?",
        (member_id, symbol),
    ).fetchone()
    return dict(row) if row else {"quantity": 0, "avg_cost": 0.0}


# ── Trade recording (atomic) ───────────────────────────────────────────────────

def today_str() -> str:
    return datetime.date.today().isoformat()


def record_trade(
    member_id: str,
    symbol: str,
    side: str,
    quantity: int,
    price: float,
    cl_ord_id: str,
) -> None:
    """Atomically update holdings, capital, daily counter, and trade log."""
    conn = _conn()
    date = today_str()
    value = quantity * price

    with conn:  # auto-commit / rollback
        # 1. Update holdings
        holding = get_holding(member_id, symbol)
        old_qty = holding["quantity"]
        old_avg = holding["avg_cost"]

        if side == "BUY":
            new_qty = old_qty + quantity
            new_avg = (old_qty * old_avg + quantity * price) / new_qty if new_qty else price
            conn.execute(
                """INSERT INTO ch_holdings (member_id, symbol, quantity, avg_cost)
                   VALUES (?,?,?,?)
                   ON CONFLICT(member_id, symbol) DO UPDATE
                   SET quantity=excluded.quantity, avg_cost=excluded.avg_cost""",
                (member_id, symbol, new_qty, round(new_avg, 4)),
            )
            # 2. Deduct capital
            conn.execute(
                "UPDATE ch_members SET capital = capital - ? WHERE member_id=?",
                (value, member_id),
            )
        else:  # SELL
            new_qty = max(0, old_qty - quantity)
            if new_qty == 0:
                conn.execute(
                    "DELETE FROM ch_holdings WHERE member_id=? AND symbol=?",
                    (member_id, symbol),
                )
            else:
                conn.execute(
                    "UPDATE ch_holdings SET quantity=? WHERE member_id=? AND symbol=?",
                    (new_qty, member_id, symbol),
                )
            # 2. Add capital
            conn.execute(
                "UPDATE ch_members SET capital = capital + ? WHERE member_id=?",
                (value, member_id),
            )

        # 3. Update daily trade counter
        buy_inc = 1 if side == "BUY" else 0
        sell_inc = 1 if side == "SELL" else 0
        conn.execute(
            """INSERT INTO ch_daily_trades (member_id, trading_date, buy_count, sell_count, total_securities)
               VALUES (?,?,?,?,?)
               ON CONFLICT(member_id, trading_date) DO UPDATE
               SET buy_count   = buy_count + excluded.buy_count,
                   sell_count  = sell_count + excluded.sell_count,
                   total_securities = total_securities + excluded.total_securities""",
            (member_id, date, buy_inc, sell_inc, quantity),
        )

        # 4. Log the trade
        conn.execute(
            """INSERT INTO ch_trade_log
               (member_id, symbol, side, quantity, price, cl_ord_id, trading_date, timestamp)
               VALUES (?,?,?,?,?,?,?,?)""",
            (member_id, symbol, side, quantity, price, cl_ord_id, date, time.time()),
        )


def get_daily_trades(member_id: str, date: str | None = None) -> dict:
    date = date or today_str()
    row = _conn().execute(
        "SELECT buy_count, sell_count, total_securities FROM ch_daily_trades WHERE member_id=? AND trading_date=?",
        (member_id, date),
    ).fetchone()
    return dict(row) if row else {"buy_count": 0, "sell_count": 0, "total_securities": 0}


def get_trade_log(member_id: str, date: str | None = None, limit: int = 50) -> list[dict]:
    date = date or today_str()
    rows = _conn().execute(
        """SELECT symbol, side, quantity, price, cl_ord_id, timestamp
           FROM ch_trade_log WHERE member_id=? AND trading_date=?
           ORDER BY timestamp DESC LIMIT ?""",
        (member_id, date, limit),
    ).fetchall()
    return [dict(r) for r in rows]


# ── EOD Settlement ─────────────────────────────────────────────────────────────

def record_settlement(
    member_id: str,
    trading_date: str,
    opening_capital: float,
    closing_capital: float,
    realized_pnl: float,
    unrealized_pnl: float,
    obligation_met: bool,
) -> None:
    _conn().execute(
        """INSERT INTO ch_settlements
           (member_id, trading_date, opening_capital, closing_capital,
            realized_pnl, unrealized_pnl, obligation_met, settled_at)
           VALUES (?,?,?,?,?,?,?,?)""",
        (
            member_id, trading_date, opening_capital, closing_capital,
            realized_pnl, unrealized_pnl, int(obligation_met), time.time(),
        ),
    )
    _conn().commit()


def get_settlements(member_id: str, limit: int = 30) -> list[dict]:
    rows = _conn().execute(
        """SELECT trading_date, opening_capital, closing_capital,
                  realized_pnl, unrealized_pnl, obligation_met, settled_at
           FROM ch_settlements WHERE member_id=?
           ORDER BY settled_at DESC LIMIT ?""",
        (member_id, limit),
    ).fetchall()
    return [dict(r) for r in rows]


# ── AI Decisions ──────────────────────────────────────────────────────────────

def record_ai_decision(
    member_id: str,
    llm_response: str,
    parsed_order: dict | None,
    source: str = "llm",
) -> None:
    import json as _json
    _conn().execute(
        """INSERT INTO ch_ai_decisions
           (member_id, llm_response, parsed_order, source, trading_date, timestamp)
           VALUES (?,?,?,?,?,?)""",
        (
            member_id,
            llm_response,
            _json.dumps(parsed_order) if parsed_order else None,
            source,
            today_str(),
            time.time(),
        ),
    )
    _conn().commit()


def get_ai_decisions(member_id: str, limit: int = 10) -> list[dict]:
    rows = _conn().execute(
        """SELECT llm_response, parsed_order, source, timestamp
           FROM ch_ai_decisions WHERE member_id=?
           ORDER BY timestamp DESC LIMIT ?""",
        (member_id, limit),
    ).fetchall()
    return [dict(r) for r in rows]


# ── Leaderboard ────────────────────────────────────────────────────────────────

def get_leaderboard(date: str | None = None) -> list[dict]:
    """Returns member stats. Caller adds holdings_value using live prices."""
    date = date or today_str()
    members = get_all_members()
    result = []
    for m in members:
        mid = m["member_id"]
        dt = get_daily_trades(mid, date)
        holdings = get_holdings(mid)
        result.append({
            "member_id": mid,
            "capital": round(m["capital"], 2),
            "holdings": holdings,
            "buy_count": dt["buy_count"],
            "sell_count": dt["sell_count"],
            "total_securities": dt["total_securities"],
            "obligation_met": dt["total_securities"] >= CH_DAILY_OBLIGATION,
        })
    return result


# ── Auth ───────────────────────────────────────────────────────────────────────

def verify_password(member_id: str, password: str) -> bool:
    """Password equals the member ID (e.g. USR01 / USR01)."""
    return member_id in CH_MEMBERS and password == member_id