import { KnowledgeBudgetExceeded } from "./errors.ts"; import type { KnowledgeKind } from "./types.ts"; /** * 真 sliding window 计数器(timestamps queue)。 * 每次调用 push 当前 timestamp;清掉超过 windowMs 的旧 ts;count = 队列长度。 * * **Key = capabilityId 聚合**(不是 per-(capability, adapter)):一个 * capability 在一分钟里所有 EK 适配器调用总和不能超过 limit。这是为了真正 * 防止"一个 capability 洪水般打外部知识"的失控,而不是"每路各开一个 * 100/min 配额、合起来 700/min"。adapter 参数仍然传入,只用于日志打点。 * * 默认 100 / 60s,可由 env 覆盖。当 count >= warnAt(默认 80% of limit) * 时打 warn 日志(每 capability 同一窗口去重一次,避免每次调用都 spam)。 */ const DEFAULT_LIMIT = 100; const DEFAULT_WINDOW_MS = 60_000; const DEFAULT_WARN_RATIO = 0.8; const queues = new Map(); // (capabilityId) -> windowStartTs at which we last warned. We only warn // once per crossing of the warn threshold inside a window. const warnedAt = new Map(); function effectiveLimit(): number { const raw = process.env["DOATLAS_EK_BUDGET_LIMIT"]; if (!raw) return DEFAULT_LIMIT; const n = Number(raw); return Number.isFinite(n) && n > 0 ? Math.floor(n) : DEFAULT_LIMIT; } function effectiveWindowMs(): number { const raw = process.env["DOATLAS_EK_BUDGET_WINDOW_MS"]; if (!raw) return DEFAULT_WINDOW_MS; const n = Number(raw); return Number.isFinite(n) && n > 0 ? Math.floor(n) : DEFAULT_WINDOW_MS; } function effectiveWarnRatio(): number { const raw = process.env["DOATLAS_EK_BUDGET_WARN_RATIO"]; if (!raw) return DEFAULT_WARN_RATIO; const n = Number(raw); return Number.isFinite(n) && n > 0 && n <= 1 ? n : DEFAULT_WARN_RATIO; } /** * 注册一次调用。返回 budgetExceeded 标记;true 表示 *本次* 调用让窗口 * 总数 > limit。调用方决定 throw 还是 cache-only 模式。 * * 当 count 跨过 warnAt 阈值时打一次 console.warn(同窗口去重)。 */ export function registerCall( capabilityId: string, adapter: KnowledgeKind, nowOverride?: number, ): { exceeded: boolean; count: number; limit: number; windowMs: number; warnedThisCall: boolean } { const limit = effectiveLimit(); const windowMs = effectiveWindowMs(); const warnAt = Math.max(1, Math.floor(limit * effectiveWarnRatio())); const key = capabilityId; // <-- per-capability aggregate, not per adapter const now = nowOverride ?? Date.now(); let q = queues.get(key); if (!q) { q = []; queues.set(key, q); } // 清掉过期 timestamp const cutoff = now - windowMs; while (q.length > 0 && q[0]! <= cutoff) q.shift(); q.push(now); const count = q.length; const exceeded = count > limit; // warn 一次:刚好越过 warnAt 时,且本窗口未 warn 过 let warnedThisCall = false; if (count >= warnAt && !exceeded) { const oldest = q[0] ?? now; const lastWarnedFor = warnedAt.get(key); if (lastWarnedFor === undefined || lastWarnedFor < oldest) { // 新窗口里第一次越过 warnAt warnedAt.set(key, oldest); warnedThisCall = true; // eslint-disable-next-line no-console console.warn( `[ek.budget.warn] capability=${capabilityId} adapter=${adapter} ` + `count=${count}/${limit} window_ms=${windowMs} warn_at=${warnAt} ` + `— approaching per-capability EK budget`, ); } } return { exceeded, count, limit, windowMs, warnedThisCall }; } /** * 强制版:超阈值直接 throw。给"无缓存"的 adapter 用(causal/drug/ * internal-experience)。literature 不调这个,改用 registerCall + 自查。 * * **拒绝路径也登记 telemetry**:throw 前先把一条 error='budget_exceeded' * 的 telemetry 记录写出去(fire-and-forget),保证"每次 query 都有 telemetry" * 的承诺在 budget-reject 路径下也成立。 */ export function enforceBudget(capabilityId: string, adapter: KnowledgeKind): void { const r = registerCall(capabilityId, adapter); if (r.exceeded) { // eslint-disable-next-line no-console console.warn( `[ek.budget.exceeded] capability=${capabilityId} adapter=${adapter} ` + `count=${r.count}/${r.limit} window_ms=${r.windowMs} — throwing KnowledgeBudgetExceeded`, ); // 静态 import 会循环(telemetry → budget-guard 不会反向,但保险用 dynamic // import + 失败静默,不阻塞 throw) void import("./telemetry.ts") .then((mod) => { try { return mod.recordCall({ adapter, capabilityId, latencyMs: 0, hitCount: 0, cacheHit: null, error: "budget_exceeded", paramsFingerprint: "", }); } catch { return undefined; } }) .catch(() => undefined); throw new KnowledgeBudgetExceeded( capabilityId, adapter, Math.round(r.windowMs / 1000), r.limit, ); } } /** 测试用:重置所有窗口。 */ export function _resetBudgetCounters(): void { queues.clear(); warnedAt.clear(); }