File size: 8,098 Bytes
d8389a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9781afc
 
 
 
 
 
 
 
 
 
 
d8389a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9781afc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""
EuNEx Dashboard Database — SQLite persistence for orders, trades, and OHLCV history.
"""

import sqlite3
import threading
import time
import os
import json

_local = threading.local()


def _get_conn(db_path):
    if not hasattr(_local, "connections"):
        _local.connections = {}
    if db_path not in _local.connections:
        os.makedirs(os.path.dirname(db_path), exist_ok=True)
        conn = sqlite3.connect(db_path, check_same_thread=False)
        conn.row_factory = sqlite3.Row
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("PRAGMA synchronous=NORMAL")
        _local.connections[db_path] = conn
    return _local.connections[db_path]


def init_db(db_path):
    conn = _get_conn(db_path)
    conn.executescript("""
        CREATE TABLE IF NOT EXISTS orders (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            order_id INTEGER UNIQUE,
            cl_ord_id TEXT,
            symbol_idx INTEGER,
            symbol TEXT,
            side TEXT,
            order_type TEXT,
            price REAL,
            quantity INTEGER,
            remaining_qty INTEGER,
            status TEXT,
            tif TEXT DEFAULT 'Day',
            source TEXT DEFAULT 'dashboard',
            timestamp REAL,
            created_at TEXT DEFAULT (datetime('now'))
        );

        CREATE INDEX IF NOT EXISTS idx_orders_symbol ON orders(symbol);
        CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(status);
        CREATE INDEX IF NOT EXISTS idx_orders_order_id ON orders(order_id);

        CREATE TABLE IF NOT EXISTS trades (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            trade_id INTEGER,
            symbol_idx INTEGER,
            symbol TEXT,
            price REAL,
            quantity INTEGER,
            buy_order_id INTEGER,
            sell_order_id INTEGER,
            timestamp REAL,
            created_at TEXT DEFAULT (datetime('now'))
        );

        CREATE INDEX IF NOT EXISTS idx_trades_symbol ON trades(symbol);
        CREATE INDEX IF NOT EXISTS idx_trades_timestamp ON trades(timestamp);

        CREATE TABLE IF NOT EXISTS ohlcv (
            symbol TEXT,
            bucket INTEGER,
            open REAL,
            high REAL,
            low REAL,
            close REAL,
            volume INTEGER,
            PRIMARY KEY (symbol, bucket)
        );

        CREATE TABLE IF NOT EXISTS daily_close (
            symbol TEXT,
            trade_date TEXT,
            close_price REAL,
            bid REAL,
            ask REAL,
            volume INTEGER,
            trade_count INTEGER,
            PRIMARY KEY (symbol, trade_date)
        );
    """)
    conn.commit()


def save_order(db_path, order):
    conn = _get_conn(db_path)
    conn.execute("""
        INSERT INTO orders (order_id, cl_ord_id, symbol_idx, symbol, side, order_type,
                            price, quantity, remaining_qty, status, tif, source, timestamp)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(order_id) DO UPDATE SET
            remaining_qty=excluded.remaining_qty,
            status=excluded.status
    """, (
        order.get("orderId"), order.get("clOrdId", ""),
        order.get("symbolIdx"), order.get("symbol"),
        order.get("side"), order.get("orderType", "Limit"),
        order.get("price", 0), order.get("quantity"),
        order.get("remainingQty", order.get("quantity")),
        order.get("status", "New"), order.get("tif", "Day"),
        order.get("source", "dashboard"), order.get("timestamp", time.time()),
    ))
    conn.commit()


def save_trade(db_path, trade):
    conn = _get_conn(db_path)
    conn.execute("""
        INSERT OR IGNORE INTO trades (trade_id, symbol_idx, symbol, price, quantity,
                                       buy_order_id, sell_order_id, timestamp)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        trade.get("tradeId"), trade.get("symbolIdx"),
        trade.get("symbol"), trade.get("price"),
        trade.get("quantity"), trade.get("buyOrderId"),
        trade.get("sellOrderId"), trade.get("timestamp", time.time()),
    ))
    conn.commit()


def record_ohlcv(db_path, symbol, price, quantity, bucket_size=60):
    bucket = int(time.time()) // bucket_size * bucket_size
    conn = _get_conn(db_path)
    row = conn.execute(
        "SELECT open, high, low, close, volume FROM ohlcv WHERE symbol=? AND bucket=?",
        (symbol, bucket)
    ).fetchone()

    if row:
        conn.execute("""
            UPDATE ohlcv SET high=MAX(high, ?), low=MIN(low, ?), close=?, volume=volume+?
            WHERE symbol=? AND bucket=?
        """, (price, price, price, quantity, symbol, bucket))
    else:
        conn.execute("""
            INSERT INTO ohlcv (symbol, bucket, open, high, low, close, volume)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (symbol, bucket, price, price, price, price, quantity))
    conn.commit()


def get_ohlcv(db_path, symbol, period_seconds=3600):
    conn = _get_conn(db_path)
    cutoff = int(time.time()) - period_seconds
    rows = conn.execute(
        "SELECT bucket, open, high, low, close, volume FROM ohlcv "
        "WHERE symbol=? AND bucket>=? ORDER BY bucket",
        (symbol, cutoff)
    ).fetchall()
    return [dict(r) for r in rows]


def get_recent_orders(db_path, limit=50):
    conn = _get_conn(db_path)
    rows = conn.execute(
        "SELECT * FROM orders ORDER BY id DESC LIMIT ?", (limit,)
    ).fetchall()
    return [dict(r) for r in rows]


def get_recent_trades(db_path, limit=100):
    conn = _get_conn(db_path)
    rows = conn.execute(
        "SELECT * FROM trades ORDER BY id DESC LIMIT ?", (limit,)
    ).fetchall()
    return [dict(r) for r in rows]


def get_trade_stats(db_path, symbol=None):
    conn = _get_conn(db_path)
    if symbol:
        row = conn.execute(
            "SELECT COUNT(*) as count, SUM(quantity) as volume, "
            "AVG(price) as avg_price, MAX(price) as high, MIN(price) as low "
            "FROM trades WHERE symbol=?", (symbol,)
        ).fetchone()
    else:
        row = conn.execute(
            "SELECT COUNT(*) as count, SUM(quantity) as volume "
            "FROM trades"
        ).fetchone()
    return dict(row) if row else {}


def get_active_orders(db_path, symbol=None):
    conn = _get_conn(db_path)
    if symbol:
        rows = conn.execute(
            "SELECT * FROM orders WHERE status IN ('New','PartiallyFilled') AND symbol=? "
            "ORDER BY timestamp", (symbol,)
        ).fetchall()
    else:
        rows = conn.execute(
            "SELECT * FROM orders WHERE status IN ('New','PartiallyFilled') ORDER BY timestamp"
        ).fetchall()
    return [dict(r) for r in rows]


def save_daily_close(db_path, symbol, trade_date, close_price, bid, ask, volume, trade_count):
    conn = _get_conn(db_path)
    conn.execute("""
        INSERT INTO daily_close (symbol, trade_date, close_price, bid, ask, volume, trade_count)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        ON CONFLICT(symbol, trade_date) DO UPDATE SET
            close_price=excluded.close_price, bid=excluded.bid, ask=excluded.ask,
            volume=excluded.volume, trade_count=excluded.trade_count
    """, (symbol, trade_date, close_price, bid, ask, volume, trade_count))
    conn.commit()


def get_last_closing_prices(db_path):
    conn = _get_conn(db_path)
    rows = conn.execute("""
        SELECT d.symbol, d.close_price, d.bid, d.ask, d.volume, d.trade_count, d.trade_date
        FROM daily_close d
        INNER JOIN (
            SELECT symbol, MAX(trade_date) as max_date FROM daily_close GROUP BY symbol
        ) latest ON d.symbol = latest.symbol AND d.trade_date = latest.max_date
    """).fetchall()
    return {r["symbol"]: dict(r) for r in rows}


def get_daily_closes(db_path, symbol, limit=30):
    conn = _get_conn(db_path)
    rows = conn.execute(
        "SELECT trade_date, close_price, bid, ask, volume, trade_count "
        "FROM daily_close WHERE symbol=? ORDER BY trade_date DESC LIMIT ?",
        (symbol, limit)
    ).fetchall()
    return [dict(r) for r in reversed(rows)]