distill-pipeline / scripts /live_bench.mjs
htaf's picture
Add CI, licences, samples, and benchmark scripts
b2f1284
#!/usr/bin/env node
// scripts/live_bench.mjs
// Live HUD for pipeline throughput/latency using readline (no deps).
// Defaults to mock providers for speed; can run real providers by env overrides.
import readline from 'readline';
import { performance } from 'perf_hooks';
import path from 'path';
import os from 'os';
import { runPipelineBatch } from '../src/pipeline/pipeline.mjs';
function parseArgs(argv) {
const args = argv.slice(2);
let limit;
let chunkLimit;
let randomWalk = false;
let mockMode = true;
for (let i = 0; i < args.length; i++) {
const a = args[i];
if (a === '--limit' || a === '-n') {
const v = Number(args[i + 1]);
if (!Number.isNaN(v)) limit = v;
i++;
} else if (a === '--chunk-limit') {
const v = Number(args[i + 1]);
if (!Number.isNaN(v)) chunkLimit = v;
i++;
} else if (a === '--random-walk') {
randomWalk = true;
} else if (a === '--real') {
mockMode = false;
}
}
return { limit, chunkLimit, randomWalk, mockMode };
}
function createHud() {
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });
rl.pause();
function render(lines) {
readline.cursorTo(process.stdout, 0, 0);
readline.clearScreenDown(process.stdout);
process.stdout.write(lines.join('\n') + '\n');
}
return { render };
}
function formatBar(label, fraction, width = 30) {
const clamped = Math.max(0, Math.min(1, fraction));
const filled = Math.round(clamped * width);
const empty = width - filled;
return `${label.padEnd(12)} [${'#'.repeat(filled)}${'.'.repeat(empty)}] ${(clamped * 100).toFixed(1)}%`;
}
function humanMs(ms) {
if (ms < 1000) return `${ms.toFixed(1)} ms`;
return `${(ms / 1000).toFixed(2)} s`;
}
async function main() {
const { limit, chunkLimit, randomWalk, mockMode } = parseArgs(process.argv);
// Defaults: mock providers for speed/determinism
if (mockMode) {
process.env.GENERATOR_PROVIDER = 'mock';
process.env.VERIFIER_PROVIDER = 'mock';
process.env.REWARD_PROVIDER = 'mock';
process.env.QUESTION_PROVIDER = 'mock';
process.env.PROVIDER_TYPE = 'mock';
}
// question-first by default
process.env.PIPELINE_SEED_MODE = process.env.PIPELINE_SEED_MODE || 'question-first';
if (randomWalk) process.env.PIPELINE_RANDOM_WALK = '1';
const cacheDir =
process.env.PIPELINE_CACHE_DIR ||
path.join(os.tmpdir(), `distill-live-bench-cache-${Date.now()}`);
const outPath =
process.env.BENCH_OUT ||
path.join(os.tmpdir(), `pipeline_gold_live_bench_${Date.now()}.jsonl`);
const hud = createHud();
const t0 = performance.now();
const samples = [];
const statusCounts = {};
const stageTimes = { gen: [], ver: [], rew: [], end2end: [] };
function avg(arr) {
if (!arr.length) return 0;
return arr.reduce((a, b) => a + b, 0) / arr.length;
}
function throughput(windowMs = 60000) {
const now = performance.now();
const recent = samples.filter((s) => now - s.ts <= windowMs);
if (recent.length === 0) return 0;
const spanMs = Math.min(windowMs, now - recent[0].ts);
if (spanMs <= 0) return 0;
return recent.length / (spanMs / 1000);
}
function acceptRate() {
const accepted = statusCounts.accepted || 0;
const processed = samples.length;
return processed ? accepted / processed : 0;
}
function summarizeStatus() {
const keys = Object.keys(statusCounts);
return keys.map((k) => `${k}:${statusCounts[k]}`).join(' ');
}
const logger = {
log: () => {},
error: console.error,
};
const hudInterval = setInterval(() => {
const now = performance.now();
const totalMs = now - t0;
const proc = samples.length;
const qps = throughput();
const acc = statusCounts.accepted || 0;
const lines = [
'📊 Live Bench (minimal logging)',
`mode: ${process.env.PIPELINE_SEED_MODE} | mock: ${mockMode ? 'yes' : 'no'} | random walk: ${randomWalk ? 'yes' : 'no'}`,
`processed: ${proc} | accepted: ${acc} | elapsed: ${humanMs(totalMs)}`,
`throughput: ${qps.toFixed(2)} pipeline cycles/s (60s window)`,
`gen avg: ${humanMs(avg(stageTimes.gen))} | ver avg: ${humanMs(avg(stageTimes.ver))} | rew avg: ${humanMs(avg(stageTimes.rew))}`,
`end2end avg: ${humanMs(avg(stageTimes.end2end))}`,
`status: ${summarizeStatus() || 'n/a'}`,
`cache: ${cacheDir}`,
`out: ${outPath}`,
];
hud.render(lines);
}, 750);
function onProgress({ status, elapsedMs }) {
const ts = performance.now();
statusCounts[status] = (statusCounts[status] || 0) + 1;
samples.push({ ts, status, end2endMs: elapsedMs });
if (samples.length > 2000) samples.shift();
if (elapsedMs != null) {
stageTimes.end2end.push(elapsedMs);
if (stageTimes.end2end.length > 500) stageTimes.end2end.shift();
}
}
const result = await runPipelineBatch({
limit,
chunkLimit,
verbose: false,
outPath,
seedMode: process.env.PIPELINE_SEED_MODE,
logger,
onProgress,
});
const totalMs = performance.now() - t0;
clearInterval(hudInterval);
// Final render
const proc = result.processed;
const acc = result.accepted;
const qps = proc > 0 ? (proc / totalMs) * 1000 : 0;
const lines = [
'✅ Bench complete',
`mode: ${result.mode} | mock: ${mockMode ? 'yes' : 'no'} | random walk: ${randomWalk ? 'yes' : 'no'}`,
`processed: ${proc} | accepted: ${acc} | elapsed: ${humanMs(totalMs)}`,
`throughput: ${qps.toFixed(2)} pipeline cycles/s overall`,
`status: ${summarizeStatus() || 'n/a'}`,
`cache: ${cacheDir}`,
`out: ${outPath}`,
];
hud.render(lines);
}
main().catch((err) => {
console.error('Live bench error:', err);
process.exit(1);
});