Iostream-Li's picture
Add files using upload-large-folder tool
ff78003 verified
import { KnowledgeBudgetExceeded } from "./errors.ts";
import type { KnowledgeKind } from "./types.ts";
/**
* 真 sliding window 计数器(timestamps queue)。
* 每次调用 push 当前 timestamp;清掉超过 windowMs 的旧 ts;count = 队列长度。
*
* **Key = capabilityId 聚合**(不是 per-(capability, adapter)):一个
* capability 在一分钟里所有 EK 适配器调用总和不能超过 limit。这是为了真正
* 防止"一个 capability 洪水般打外部知识"的失控,而不是"每路各开一个
* 100/min 配额、合起来 700/min"。adapter 参数仍然传入,只用于日志打点。
*
* 默认 100 / 60s,可由 env 覆盖。当 count >= warnAt(默认 80% of limit)
* 时打 warn 日志(每 capability 同一窗口去重一次,避免每次调用都 spam)。
*/
const DEFAULT_LIMIT = 100;
const DEFAULT_WINDOW_MS = 60_000;
const DEFAULT_WARN_RATIO = 0.8;
const queues = new Map<string, number[]>();
// (capabilityId) -> windowStartTs at which we last warned. We only warn
// once per crossing of the warn threshold inside a window.
const warnedAt = new Map<string, number>();
function effectiveLimit(): number {
const raw = process.env["DOATLAS_EK_BUDGET_LIMIT"];
if (!raw) return DEFAULT_LIMIT;
const n = Number(raw);
return Number.isFinite(n) && n > 0 ? Math.floor(n) : DEFAULT_LIMIT;
}
function effectiveWindowMs(): number {
const raw = process.env["DOATLAS_EK_BUDGET_WINDOW_MS"];
if (!raw) return DEFAULT_WINDOW_MS;
const n = Number(raw);
return Number.isFinite(n) && n > 0 ? Math.floor(n) : DEFAULT_WINDOW_MS;
}
function effectiveWarnRatio(): number {
const raw = process.env["DOATLAS_EK_BUDGET_WARN_RATIO"];
if (!raw) return DEFAULT_WARN_RATIO;
const n = Number(raw);
return Number.isFinite(n) && n > 0 && n <= 1 ? n : DEFAULT_WARN_RATIO;
}
/**
* 注册一次调用。返回 budgetExceeded 标记;true 表示 *本次* 调用让窗口
* 总数 > limit。调用方决定 throw 还是 cache-only 模式。
*
* 当 count 跨过 warnAt 阈值时打一次 console.warn(同窗口去重)。
*/
export function registerCall(
capabilityId: string,
adapter: KnowledgeKind,
nowOverride?: number,
): { exceeded: boolean; count: number; limit: number; windowMs: number; warnedThisCall: boolean } {
const limit = effectiveLimit();
const windowMs = effectiveWindowMs();
const warnAt = Math.max(1, Math.floor(limit * effectiveWarnRatio()));
const key = capabilityId; // <-- per-capability aggregate, not per adapter
const now = nowOverride ?? Date.now();
let q = queues.get(key);
if (!q) {
q = [];
queues.set(key, q);
}
// 清掉过期 timestamp
const cutoff = now - windowMs;
while (q.length > 0 && q[0]! <= cutoff) q.shift();
q.push(now);
const count = q.length;
const exceeded = count > limit;
// warn 一次:刚好越过 warnAt 时,且本窗口未 warn 过
let warnedThisCall = false;
if (count >= warnAt && !exceeded) {
const oldest = q[0] ?? now;
const lastWarnedFor = warnedAt.get(key);
if (lastWarnedFor === undefined || lastWarnedFor < oldest) {
// 新窗口里第一次越过 warnAt
warnedAt.set(key, oldest);
warnedThisCall = true;
// eslint-disable-next-line no-console
console.warn(
`[ek.budget.warn] capability=${capabilityId} adapter=${adapter} ` +
`count=${count}/${limit} window_ms=${windowMs} warn_at=${warnAt} ` +
`— approaching per-capability EK budget`,
);
}
}
return { exceeded, count, limit, windowMs, warnedThisCall };
}
/**
* 强制版:超阈值直接 throw。给"无缓存"的 adapter 用(causal/drug/
* internal-experience)。literature 不调这个,改用 registerCall + 自查。
*
* **拒绝路径也登记 telemetry**:throw 前先把一条 error='budget_exceeded'
* 的 telemetry 记录写出去(fire-and-forget),保证"每次 query 都有 telemetry"
* 的承诺在 budget-reject 路径下也成立。
*/
export function enforceBudget(capabilityId: string, adapter: KnowledgeKind): void {
const r = registerCall(capabilityId, adapter);
if (r.exceeded) {
// eslint-disable-next-line no-console
console.warn(
`[ek.budget.exceeded] capability=${capabilityId} adapter=${adapter} ` +
`count=${r.count}/${r.limit} window_ms=${r.windowMs} — throwing KnowledgeBudgetExceeded`,
);
// 静态 import 会循环(telemetry → budget-guard 不会反向,但保险用 dynamic
// import + 失败静默,不阻塞 throw)
void import("./telemetry.ts")
.then((mod) => {
try {
return mod.recordCall({
adapter,
capabilityId,
latencyMs: 0,
hitCount: 0,
cacheHit: null,
error: "budget_exceeded",
paramsFingerprint: "",
});
} catch {
return undefined;
}
})
.catch(() => undefined);
throw new KnowledgeBudgetExceeded(
capabilityId,
adapter,
Math.round(r.windowMs / 1000),
r.limit,
);
}
}
/** 测试用:重置所有窗口。 */
export function _resetBudgetCounters(): void {
queues.clear();
warnedAt.clear();
}