File size: 9,131 Bytes
097fb32 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | /**
* logger-db.ts - SQLite 持久化层
*
* 仅在 config.logging.db_enabled = true 时使用。
* 与 JSONL 文件方式完全并存,互不干扰。
*/
import Database from 'better-sqlite3';
import { mkdirSync, existsSync } from 'fs';
import { dirname } from 'path';
// 使用 inline 类型避免与 logger.ts 的循环依赖
// DbRequestSummary 和 DbRequestPayload 的最小结构定义(仅 logger-db 内部使用)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type DbRequestSummary = { requestId: string; startTime: number } & Record<string, any>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type DbRequestPayload = Record<string, any>;
let db: InstanceType<typeof Database> | null = null;
// ==================== 初始化 ====================
export function closeDb(): void {
if (db) {
db.close();
db = null;
}
}
export function initDb(dbPath: string): void {
closeDb(); // 关闭旧连接(幂等,支持热重载重新初始化)
const dir = dirname(dbPath);
if (dir && !existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
db = new Database(dbPath);
db.pragma('journal_mode = WAL');
db.pragma('synchronous = NORMAL');
db.exec(`
CREATE TABLE IF NOT EXISTS requests (
request_id TEXT PRIMARY KEY,
timestamp INTEGER NOT NULL,
summary_json TEXT NOT NULL,
payload_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_timestamp ON requests(timestamp);
`);
}
function getDb(): InstanceType<typeof Database> {
if (!db) throw new Error('SQLite not initialized. Call initDb() first.');
return db;
}
// ==================== 写入 ====================
export function dbInsertRequest(summary: DbRequestSummary, payload: DbRequestPayload): void {
const stmt = getDb().prepare(
'INSERT OR REPLACE INTO requests (request_id, timestamp, summary_json, payload_json) VALUES (?, ?, ?, ?)'
);
stmt.run(
summary.requestId,
summary.startTime,
JSON.stringify(summary),
JSON.stringify(payload)
);
}
// ==================== 查询 ====================
/** 按需加载单条 payload(Web UI 点击时调用) */
export function dbGetPayload(requestId: string): DbRequestPayload | undefined {
const row = getDb()
.prepare('SELECT payload_json FROM requests WHERE request_id = ?')
.get(requestId) as { payload_json: string } | undefined;
if (!row?.payload_json) return undefined;
try { return JSON.parse(row.payload_json) as DbRequestPayload; } catch { return undefined; }
}
export interface DbQueryOpts {
limit: number;
before?: number; // timestamp < before(游标翻页)
since?: number; // timestamp >= since(时间范围)
status?: string; // 精确匹配 summary.status
keyword?: string; // 模糊匹配 title/model/request_id
}
/** 动态构建 WHERE 子句(参数化,防注入) */
function buildWhere(opts: Omit<DbQueryOpts, 'limit'>): { where: string; params: Record<string, unknown> } {
const conditions: string[] = [];
const params: Record<string, unknown> = {};
if (opts.before !== undefined) {
conditions.push('timestamp < :before');
params.before = opts.before;
}
if (opts.since !== undefined) {
conditions.push('timestamp >= :since');
params.since = opts.since;
}
if (opts.status) {
conditions.push("json_extract(summary_json,'$.status') = :status");
params.status = opts.status;
}
if (opts.keyword) {
conditions.push("(request_id LIKE :kw OR json_extract(summary_json,'$.title') LIKE :kw OR json_extract(summary_json,'$.model') LIKE :kw)");
params.kw = `%${opts.keyword}%`;
}
const where = conditions.length > 0 ? 'WHERE ' + conditions.join(' AND ') : '';
return { where, params };
}
/**
* 游标分页:返回最新的 limit 条,支持 status/keyword/since 后端过滤。
* 结果按 timestamp 倒序(最新在前)。
*/
export function dbGetSummaries(opts: DbQueryOpts): DbRequestSummary[] {
const { limit, ...filterOpts } = opts;
const { where, params } = buildWhere(filterOpts);
const sql = `SELECT summary_json FROM requests ${where} ORDER BY timestamp DESC LIMIT :limit`;
const rows = getDb().prepare(sql).all({ ...params, limit }) as Array<{ summary_json: string }>;
return rows.map(r => {
try { return JSON.parse(r.summary_json) as DbRequestSummary; } catch { return null; }
}).filter((s): s is DbRequestSummary => s !== null);
}
/** 返回符合过滤条件的记录总数 */
export function dbCountSummaries(opts: Omit<DbQueryOpts, 'limit' | 'before'> = {}): number {
const { where, params } = buildWhere(opts);
const sql = `SELECT COUNT(*) as cnt FROM requests ${where}`;
const row = getDb().prepare(sql).get(params) as { cnt: number };
return row.cnt;
}
/**
* 返回各状态的计数(不含 status 过滤,仅受 keyword/since 影响)。
* 用于状态筛选按钮上的计数显示,点击某状态后其他按钮数字不变。
*/
export function dbGetStatusCounts(opts: { keyword?: string; since?: number } = {}): Record<string, number> {
const { where, params } = buildWhere(opts); // 不传 status,只用 keyword/since
const sql = `SELECT json_extract(summary_json,'$.status') as status, COUNT(*) as cnt FROM requests ${where} GROUP BY status`;
const rows = getDb().prepare(sql).all(params) as Array<{ status: string; cnt: number }>;
const counts: Record<string, number> = { all: 0, success: 0, degraded: 0, error: 0, processing: 0, intercepted: 0 };
for (const row of rows) {
if (row.status) counts[row.status] = row.cnt;
counts.all += row.cnt;
}
return counts;
}
/** 返回数据库中全部记录总数(无过滤) */
export function dbGetSummaryCount(): number {
const row = getDb()
.prepare('SELECT COUNT(*) as cnt FROM requests')
.get() as { cnt: number };
return row.cnt;
}
/**
* 启动时加载:返回 timestamp >= cutoffTimestamp 的所有 summary(不含 payload)。
* 用于恢复内存中的请求列表。
*/
export function dbGetSummariesSince(cutoffTimestamp: number): DbRequestSummary[] {
const rows = getDb()
.prepare('SELECT summary_json FROM requests WHERE timestamp >= ? ORDER BY timestamp ASC')
.all(cutoffTimestamp) as Array<{ summary_json: string }>;
return rows.map(r => {
try { return JSON.parse(r.summary_json) as DbRequestSummary; } catch { return null; }
}).filter((s): s is DbRequestSummary => s !== null);
}
/**
* 聚合统计:通过 SQL 一次查询返回全量(或指定时间范围内)的 stats。
* 仅在 db_enabled 时调用。
*/
export function dbGetStats(since?: number): {
totalRequests: number;
successCount: number;
degradedCount: number;
errorCount: number;
interceptedCount: number;
processingCount: number;
avgResponseTime: number;
avgTTFT: number;
} {
const where = since !== undefined ? 'WHERE timestamp >= ?' : '';
const params = since !== undefined ? [since] : [];
const row = getDb().prepare(`
SELECT
COUNT(*) as total,
SUM(CASE WHEN json_extract(summary_json,'$.status')='success' THEN 1 ELSE 0 END) as success,
SUM(CASE WHEN json_extract(summary_json,'$.status')='degraded' THEN 1 ELSE 0 END) as degraded,
SUM(CASE WHEN json_extract(summary_json,'$.status')='error' THEN 1 ELSE 0 END) as error,
SUM(CASE WHEN json_extract(summary_json,'$.status')='intercepted' THEN 1 ELSE 0 END) as intercepted,
SUM(CASE WHEN json_extract(summary_json,'$.status')='processing' THEN 1 ELSE 0 END) as processing,
AVG(CASE WHEN json_extract(summary_json,'$.endTime') IS NOT NULL
THEN json_extract(summary_json,'$.endTime') - timestamp END) as avgTime,
AVG(CASE WHEN json_extract(summary_json,'$.ttft') IS NOT NULL
THEN json_extract(summary_json,'$.ttft') END) as avgTTFT
FROM requests ${where}
`).get(...params) as {
total: number; success: number; degraded: number; error: number;
intercepted: number; processing: number; avgTime: number | null; avgTTFT: number | null;
};
return {
totalRequests: row.total ?? 0,
successCount: row.success ?? 0,
degradedCount: row.degraded ?? 0,
errorCount: row.error ?? 0,
interceptedCount: row.intercepted ?? 0,
processingCount: row.processing ?? 0,
avgResponseTime: row.avgTime != null ? Math.round(row.avgTime) : 0,
avgTTFT: row.avgTTFT != null ? Math.round(row.avgTTFT) : 0,
};
}
// ==================== 清空 ====================
export function dbClear(): void {
getDb().prepare('DELETE FROM requests').run();
}
// ==================== 状态 ====================
export function isDbInitialized(): boolean {
return db !== null;
}
|