File size: 9,131 Bytes
097fb32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
/**
 * logger-db.ts - SQLite 持久化层
 *
 * 仅在 config.logging.db_enabled = true 时使用。
 * 与 JSONL 文件方式完全并存,互不干扰。
 */

import Database from 'better-sqlite3';
import { mkdirSync, existsSync } from 'fs';
import { dirname } from 'path';
// 使用 inline 类型避免与 logger.ts 的循环依赖
// DbRequestSummary 和 DbRequestPayload 的最小结构定义(仅 logger-db 内部使用)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type DbRequestSummary = { requestId: string; startTime: number } & Record<string, any>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type DbRequestPayload = Record<string, any>;

let db: InstanceType<typeof Database> | null = null;

// ==================== 初始化 ====================

export function closeDb(): void {
    if (db) {
        db.close();
        db = null;
    }
}

export function initDb(dbPath: string): void {
    closeDb(); // 关闭旧连接(幂等,支持热重载重新初始化)
    const dir = dirname(dbPath);
    if (dir && !existsSync(dir)) {
        mkdirSync(dir, { recursive: true });
    }
    db = new Database(dbPath);
    db.pragma('journal_mode = WAL');
    db.pragma('synchronous = NORMAL');
    db.exec(`
        CREATE TABLE IF NOT EXISTS requests (
            request_id   TEXT PRIMARY KEY,
            timestamp    INTEGER NOT NULL,
            summary_json TEXT NOT NULL,
            payload_json TEXT
        );
        CREATE INDEX IF NOT EXISTS idx_timestamp ON requests(timestamp);
    `);
}

function getDb(): InstanceType<typeof Database> {
    if (!db) throw new Error('SQLite not initialized. Call initDb() first.');
    return db;
}

// ==================== 写入 ====================

export function dbInsertRequest(summary: DbRequestSummary, payload: DbRequestPayload): void {
    const stmt = getDb().prepare(
        'INSERT OR REPLACE INTO requests (request_id, timestamp, summary_json, payload_json) VALUES (?, ?, ?, ?)'
    );
    stmt.run(
        summary.requestId,
        summary.startTime,
        JSON.stringify(summary),
        JSON.stringify(payload)
    );
}

// ==================== 查询 ====================

/** 按需加载单条 payload(Web UI 点击时调用) */
export function dbGetPayload(requestId: string): DbRequestPayload | undefined {
    const row = getDb()
        .prepare('SELECT payload_json FROM requests WHERE request_id = ?')
        .get(requestId) as { payload_json: string } | undefined;
    if (!row?.payload_json) return undefined;
    try { return JSON.parse(row.payload_json) as DbRequestPayload; } catch { return undefined; }
}

export interface DbQueryOpts {
    limit: number;
    before?: number;    // timestamp < before(游标翻页)
    since?: number;     // timestamp >= since(时间范围)
    status?: string;    // 精确匹配 summary.status
    keyword?: string;   // 模糊匹配 title/model/request_id
}

/** 动态构建 WHERE 子句(参数化,防注入) */
function buildWhere(opts: Omit<DbQueryOpts, 'limit'>): { where: string; params: Record<string, unknown> } {
    const conditions: string[] = [];
    const params: Record<string, unknown> = {};
    if (opts.before !== undefined) {
        conditions.push('timestamp < :before');
        params.before = opts.before;
    }
    if (opts.since !== undefined) {
        conditions.push('timestamp >= :since');
        params.since = opts.since;
    }
    if (opts.status) {
        conditions.push("json_extract(summary_json,'$.status') = :status");
        params.status = opts.status;
    }
    if (opts.keyword) {
        conditions.push("(request_id LIKE :kw OR json_extract(summary_json,'$.title') LIKE :kw OR json_extract(summary_json,'$.model') LIKE :kw)");
        params.kw = `%${opts.keyword}%`;
    }
    const where = conditions.length > 0 ? 'WHERE ' + conditions.join(' AND ') : '';
    return { where, params };
}

/**
 * 游标分页:返回最新的 limit 条,支持 status/keyword/since 后端过滤。
 * 结果按 timestamp 倒序(最新在前)。
 */
export function dbGetSummaries(opts: DbQueryOpts): DbRequestSummary[] {
    const { limit, ...filterOpts } = opts;
    const { where, params } = buildWhere(filterOpts);
    const sql = `SELECT summary_json FROM requests ${where} ORDER BY timestamp DESC LIMIT :limit`;
    const rows = getDb().prepare(sql).all({ ...params, limit }) as Array<{ summary_json: string }>;
    return rows.map(r => {
        try { return JSON.parse(r.summary_json) as DbRequestSummary; } catch { return null; }
    }).filter((s): s is DbRequestSummary => s !== null);
}

/** 返回符合过滤条件的记录总数 */
export function dbCountSummaries(opts: Omit<DbQueryOpts, 'limit' | 'before'> = {}): number {
    const { where, params } = buildWhere(opts);
    const sql = `SELECT COUNT(*) as cnt FROM requests ${where}`;
    const row = getDb().prepare(sql).get(params) as { cnt: number };
    return row.cnt;
}

/**
 * 返回各状态的计数(不含 status 过滤,仅受 keyword/since 影响)。
 * 用于状态筛选按钮上的计数显示,点击某状态后其他按钮数字不变。
 */
export function dbGetStatusCounts(opts: { keyword?: string; since?: number } = {}): Record<string, number> {
    const { where, params } = buildWhere(opts); // 不传 status,只用 keyword/since
    const sql = `SELECT json_extract(summary_json,'$.status') as status, COUNT(*) as cnt FROM requests ${where} GROUP BY status`;
    const rows = getDb().prepare(sql).all(params) as Array<{ status: string; cnt: number }>;
    const counts: Record<string, number> = { all: 0, success: 0, degraded: 0, error: 0, processing: 0, intercepted: 0 };
    for (const row of rows) {
        if (row.status) counts[row.status] = row.cnt;
        counts.all += row.cnt;
    }
    return counts;
}

/** 返回数据库中全部记录总数(无过滤) */
export function dbGetSummaryCount(): number {
    const row = getDb()
        .prepare('SELECT COUNT(*) as cnt FROM requests')
        .get() as { cnt: number };
    return row.cnt;
}

/**
 * 启动时加载:返回 timestamp >= cutoffTimestamp 的所有 summary(不含 payload)。
 * 用于恢复内存中的请求列表。
 */
export function dbGetSummariesSince(cutoffTimestamp: number): DbRequestSummary[] {
    const rows = getDb()
        .prepare('SELECT summary_json FROM requests WHERE timestamp >= ? ORDER BY timestamp ASC')
        .all(cutoffTimestamp) as Array<{ summary_json: string }>;
    return rows.map(r => {
        try { return JSON.parse(r.summary_json) as DbRequestSummary; } catch { return null; }
    }).filter((s): s is DbRequestSummary => s !== null);
}

/**
 * 聚合统计:通过 SQL 一次查询返回全量(或指定时间范围内)的 stats。
 * 仅在 db_enabled 时调用。
 */
export function dbGetStats(since?: number): {
    totalRequests: number;
    successCount: number;
    degradedCount: number;
    errorCount: number;
    interceptedCount: number;
    processingCount: number;
    avgResponseTime: number;
    avgTTFT: number;
} {
    const where = since !== undefined ? 'WHERE timestamp >= ?' : '';
    const params = since !== undefined ? [since] : [];
    const row = getDb().prepare(`
        SELECT
            COUNT(*) as total,
            SUM(CASE WHEN json_extract(summary_json,'$.status')='success'     THEN 1 ELSE 0 END) as success,
            SUM(CASE WHEN json_extract(summary_json,'$.status')='degraded'    THEN 1 ELSE 0 END) as degraded,
            SUM(CASE WHEN json_extract(summary_json,'$.status')='error'       THEN 1 ELSE 0 END) as error,
            SUM(CASE WHEN json_extract(summary_json,'$.status')='intercepted' THEN 1 ELSE 0 END) as intercepted,
            SUM(CASE WHEN json_extract(summary_json,'$.status')='processing'  THEN 1 ELSE 0 END) as processing,
            AVG(CASE WHEN json_extract(summary_json,'$.endTime') IS NOT NULL
                THEN json_extract(summary_json,'$.endTime') - timestamp END) as avgTime,
            AVG(CASE WHEN json_extract(summary_json,'$.ttft') IS NOT NULL
                THEN json_extract(summary_json,'$.ttft') END) as avgTTFT
        FROM requests ${where}
    `).get(...params) as {
        total: number; success: number; degraded: number; error: number;
        intercepted: number; processing: number; avgTime: number | null; avgTTFT: number | null;
    };
    return {
        totalRequests:    row.total      ?? 0,
        successCount:     row.success    ?? 0,
        degradedCount:    row.degraded   ?? 0,
        errorCount:       row.error      ?? 0,
        interceptedCount: row.intercepted ?? 0,
        processingCount:  row.processing  ?? 0,
        avgResponseTime:  row.avgTime != null ? Math.round(row.avgTime) : 0,
        avgTTFT:          row.avgTTFT != null ? Math.round(row.avgTTFT) : 0,
    };
}

// ==================== 清空 ====================

export function dbClear(): void {
    getDb().prepare('DELETE FROM requests').run();
}

// ==================== 状态 ====================

export function isDbInitialized(): boolean {
    return db !== null;
}