Spaces:
Paused
Paused
| /** | |
| * logger-db.ts - SQLite 持久化层 | |
| * | |
| * 仅在 config.logging.db_enabled = true 时使用。 | |
| * 与 JSONL 文件方式完全并存,互不干扰。 | |
| */ | |
| import Database from 'better-sqlite3'; | |
| import { mkdirSync, existsSync } from 'fs'; | |
| import { dirname } from 'path'; | |
| // 使用 inline 类型避免与 logger.ts 的循环依赖 | |
| // DbRequestSummary 和 DbRequestPayload 的最小结构定义(仅 logger-db 内部使用) | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| type DbRequestSummary = { requestId: string; startTime: number } & Record<string, any>; | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| type DbRequestPayload = Record<string, any>; | |
| let db: InstanceType<typeof Database> | null = null; | |
| // ==================== 初始化 ==================== | |
| export function closeDb(): void { | |
| if (db) { | |
| db.close(); | |
| db = null; | |
| } | |
| } | |
| export function initDb(dbPath: string): void { | |
| closeDb(); // 关闭旧连接(幂等,支持热重载重新初始化) | |
| const dir = dirname(dbPath); | |
| if (dir && !existsSync(dir)) { | |
| mkdirSync(dir, { recursive: true }); | |
| } | |
| db = new Database(dbPath); | |
| db.pragma('journal_mode = WAL'); | |
| db.pragma('synchronous = NORMAL'); | |
| db.exec(` | |
| CREATE TABLE IF NOT EXISTS requests ( | |
| request_id TEXT PRIMARY KEY, | |
| timestamp INTEGER NOT NULL, | |
| summary_json TEXT NOT NULL, | |
| payload_json TEXT | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_timestamp ON requests(timestamp); | |
| `); | |
| } | |
| function getDb(): InstanceType<typeof Database> { | |
| if (!db) throw new Error('SQLite not initialized. Call initDb() first.'); | |
| return db; | |
| } | |
| // ==================== 写入 ==================== | |
| export function dbInsertRequest(summary: DbRequestSummary, payload: DbRequestPayload): void { | |
| const stmt = getDb().prepare( | |
| 'INSERT OR REPLACE INTO requests (request_id, timestamp, summary_json, payload_json) VALUES (?, ?, ?, ?)' | |
| ); | |
| stmt.run( | |
| summary.requestId, | |
| summary.startTime, | |
| JSON.stringify(summary), | |
| JSON.stringify(payload) | |
| ); | |
| } | |
| // ==================== 查询 ==================== | |
| /** 按需加载单条 payload(Web UI 点击时调用) */ | |
| export function dbGetPayload(requestId: string): DbRequestPayload | undefined { | |
| const row = getDb() | |
| .prepare('SELECT payload_json FROM requests WHERE request_id = ?') | |
| .get(requestId) as { payload_json: string } | undefined; | |
| if (!row?.payload_json) return undefined; | |
| try { return JSON.parse(row.payload_json) as DbRequestPayload; } catch { return undefined; } | |
| } | |
| export interface DbQueryOpts { | |
| limit: number; | |
| before?: number; // timestamp < before(游标翻页) | |
| since?: number; // timestamp >= since(时间范围) | |
| status?: string; // 精确匹配 summary.status | |
| keyword?: string; // 模糊匹配 title/model/request_id | |
| } | |
| /** 动态构建 WHERE 子句(参数化,防注入) */ | |
| function buildWhere(opts: Omit<DbQueryOpts, 'limit'>): { where: string; params: Record<string, unknown> } { | |
| const conditions: string[] = []; | |
| const params: Record<string, unknown> = {}; | |
| if (opts.before !== undefined) { | |
| conditions.push('timestamp < :before'); | |
| params.before = opts.before; | |
| } | |
| if (opts.since !== undefined) { | |
| conditions.push('timestamp >= :since'); | |
| params.since = opts.since; | |
| } | |
| if (opts.status) { | |
| conditions.push("json_extract(summary_json,'$.status') = :status"); | |
| params.status = opts.status; | |
| } | |
| if (opts.keyword) { | |
| conditions.push("(request_id LIKE :kw OR json_extract(summary_json,'$.title') LIKE :kw OR json_extract(summary_json,'$.model') LIKE :kw)"); | |
| params.kw = `%${opts.keyword}%`; | |
| } | |
| const where = conditions.length > 0 ? 'WHERE ' + conditions.join(' AND ') : ''; | |
| return { where, params }; | |
| } | |
| /** | |
| * 游标分页:返回最新的 limit 条,支持 status/keyword/since 后端过滤。 | |
| * 结果按 timestamp 倒序(最新在前)。 | |
| */ | |
| export function dbGetSummaries(opts: DbQueryOpts): DbRequestSummary[] { | |
| const { limit, ...filterOpts } = opts; | |
| const { where, params } = buildWhere(filterOpts); | |
| const sql = `SELECT summary_json FROM requests ${where} ORDER BY timestamp DESC LIMIT :limit`; | |
| const rows = getDb().prepare(sql).all({ ...params, limit }) as Array<{ summary_json: string }>; | |
| return rows.map(r => { | |
| try { return JSON.parse(r.summary_json) as DbRequestSummary; } catch { return null; } | |
| }).filter((s): s is DbRequestSummary => s !== null); | |
| } | |
| /** 返回符合过滤条件的记录总数 */ | |
| export function dbCountSummaries(opts: Omit<DbQueryOpts, 'limit' | 'before'> = {}): number { | |
| const { where, params } = buildWhere(opts); | |
| const sql = `SELECT COUNT(*) as cnt FROM requests ${where}`; | |
| const row = getDb().prepare(sql).get(params) as { cnt: number }; | |
| return row.cnt; | |
| } | |
| /** | |
| * 返回各状态的计数(不含 status 过滤,仅受 keyword/since 影响)。 | |
| * 用于状态筛选按钮上的计数显示,点击某状态后其他按钮数字不变。 | |
| */ | |
| export function dbGetStatusCounts(opts: { keyword?: string; since?: number } = {}): Record<string, number> { | |
| const { where, params } = buildWhere(opts); // 不传 status,只用 keyword/since | |
| const sql = `SELECT json_extract(summary_json,'$.status') as status, COUNT(*) as cnt FROM requests ${where} GROUP BY status`; | |
| const rows = getDb().prepare(sql).all(params) as Array<{ status: string; cnt: number }>; | |
| const counts: Record<string, number> = { all: 0, success: 0, degraded: 0, error: 0, processing: 0, intercepted: 0 }; | |
| for (const row of rows) { | |
| if (row.status) counts[row.status] = row.cnt; | |
| counts.all += row.cnt; | |
| } | |
| return counts; | |
| } | |
| /** 返回数据库中全部记录总数(无过滤) */ | |
| export function dbGetSummaryCount(): number { | |
| const row = getDb() | |
| .prepare('SELECT COUNT(*) as cnt FROM requests') | |
| .get() as { cnt: number }; | |
| return row.cnt; | |
| } | |
| /** | |
| * 启动时加载:返回 timestamp >= cutoffTimestamp 的所有 summary(不含 payload)。 | |
| * 用于恢复内存中的请求列表。 | |
| */ | |
| export function dbGetSummariesSince(cutoffTimestamp: number): DbRequestSummary[] { | |
| const rows = getDb() | |
| .prepare('SELECT summary_json FROM requests WHERE timestamp >= ? ORDER BY timestamp ASC') | |
| .all(cutoffTimestamp) as Array<{ summary_json: string }>; | |
| return rows.map(r => { | |
| try { return JSON.parse(r.summary_json) as DbRequestSummary; } catch { return null; } | |
| }).filter((s): s is DbRequestSummary => s !== null); | |
| } | |
| /** | |
| * 聚合统计:通过 SQL 一次查询返回全量(或指定时间范围内)的 stats。 | |
| * 仅在 db_enabled 时调用。 | |
| */ | |
| export function dbGetStats(since?: number): { | |
| totalRequests: number; | |
| successCount: number; | |
| degradedCount: number; | |
| errorCount: number; | |
| interceptedCount: number; | |
| processingCount: number; | |
| avgResponseTime: number; | |
| avgTTFT: number; | |
| } { | |
| const where = since !== undefined ? 'WHERE timestamp >= ?' : ''; | |
| const params = since !== undefined ? [since] : []; | |
| const row = getDb().prepare(` | |
| SELECT | |
| COUNT(*) as total, | |
| SUM(CASE WHEN json_extract(summary_json,'$.status')='success' THEN 1 ELSE 0 END) as success, | |
| SUM(CASE WHEN json_extract(summary_json,'$.status')='degraded' THEN 1 ELSE 0 END) as degraded, | |
| SUM(CASE WHEN json_extract(summary_json,'$.status')='error' THEN 1 ELSE 0 END) as error, | |
| SUM(CASE WHEN json_extract(summary_json,'$.status')='intercepted' THEN 1 ELSE 0 END) as intercepted, | |
| SUM(CASE WHEN json_extract(summary_json,'$.status')='processing' THEN 1 ELSE 0 END) as processing, | |
| AVG(CASE WHEN json_extract(summary_json,'$.endTime') IS NOT NULL | |
| THEN json_extract(summary_json,'$.endTime') - timestamp END) as avgTime, | |
| AVG(CASE WHEN json_extract(summary_json,'$.ttft') IS NOT NULL | |
| THEN json_extract(summary_json,'$.ttft') END) as avgTTFT | |
| FROM requests ${where} | |
| `).get(...params) as { | |
| total: number; success: number; degraded: number; error: number; | |
| intercepted: number; processing: number; avgTime: number | null; avgTTFT: number | null; | |
| }; | |
| return { | |
| totalRequests: row.total ?? 0, | |
| successCount: row.success ?? 0, | |
| degradedCount: row.degraded ?? 0, | |
| errorCount: row.error ?? 0, | |
| interceptedCount: row.intercepted ?? 0, | |
| processingCount: row.processing ?? 0, | |
| avgResponseTime: row.avgTime != null ? Math.round(row.avgTime) : 0, | |
| avgTTFT: row.avgTTFT != null ? Math.round(row.avgTTFT) : 0, | |
| }; | |
| } | |
| // ==================== 清空 ==================== | |
| export function dbClear(): void { | |
| getDb().prepare('DELETE FROM requests').run(); | |
| } | |
| // ==================== 状态 ==================== | |
| export function isDbInitialized(): boolean { | |
| return db !== null; | |
| } | |