Spaces:
Running
Running
File size: 2,947 Bytes
fb4d8fe | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | import type { OpenClawConfig } from "../config/config.js";
import type { InboundDebounceByProvider } from "../config/types.messages.js";
const resolveMs = (value: unknown): number | undefined => {
if (typeof value !== "number" || !Number.isFinite(value)) {
return undefined;
}
return Math.max(0, Math.trunc(value));
};
const resolveChannelOverride = (params: {
byChannel?: InboundDebounceByProvider;
channel: string;
}): number | undefined => {
if (!params.byChannel) {
return undefined;
}
return resolveMs(params.byChannel[params.channel]);
};
export function resolveInboundDebounceMs(params: {
cfg: OpenClawConfig;
channel: string;
overrideMs?: number;
}): number {
const inbound = params.cfg.messages?.inbound;
const override = resolveMs(params.overrideMs);
const byChannel = resolveChannelOverride({
byChannel: inbound?.byChannel,
channel: params.channel,
});
const base = resolveMs(inbound?.debounceMs);
return override ?? byChannel ?? base ?? 0;
}
type DebounceBuffer<T> = {
items: T[];
timeout: ReturnType<typeof setTimeout> | null;
};
export function createInboundDebouncer<T>(params: {
debounceMs: number;
buildKey: (item: T) => string | null | undefined;
shouldDebounce?: (item: T) => boolean;
onFlush: (items: T[]) => Promise<void>;
onError?: (err: unknown, items: T[]) => void;
}) {
const buffers = new Map<string, DebounceBuffer<T>>();
const debounceMs = Math.max(0, Math.trunc(params.debounceMs));
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
buffers.delete(key);
if (buffer.timeout) {
clearTimeout(buffer.timeout);
buffer.timeout = null;
}
if (buffer.items.length === 0) {
return;
}
try {
await params.onFlush(buffer.items);
} catch (err) {
params.onError?.(err, buffer.items);
}
};
const flushKey = async (key: string) => {
const buffer = buffers.get(key);
if (!buffer) {
return;
}
await flushBuffer(key, buffer);
};
const scheduleFlush = (key: string, buffer: DebounceBuffer<T>) => {
if (buffer.timeout) {
clearTimeout(buffer.timeout);
}
buffer.timeout = setTimeout(() => {
void flushBuffer(key, buffer);
}, debounceMs);
buffer.timeout.unref?.();
};
const enqueue = async (item: T) => {
const key = params.buildKey(item);
const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true);
if (!canDebounce || !key) {
if (key && buffers.has(key)) {
await flushKey(key);
}
await params.onFlush([item]);
return;
}
const existing = buffers.get(key);
if (existing) {
existing.items.push(item);
scheduleFlush(key, existing);
return;
}
const buffer: DebounceBuffer<T> = { items: [item], timeout: null };
buffers.set(key, buffer);
scheduleFlush(key, buffer);
};
return { enqueue, flushKey };
}
|