htaf's picture
Add CI, licences, samples, and benchmark scripts
b2f1284
// src/pipeline/batch.mjs
import fs from 'fs/promises';
import path from 'path';
import { preview } from './util.mjs';
import crypto from 'crypto';
import {
DEFAULT_SEEDS_PATH,
DEFAULT_OUT_PATH,
loadSeedQuestions,
seedToQuestion,
} from './seeds.mjs';
import { runPipelineStep } from './step.mjs';
import { loadProviderFor } from '../providers/provider.mjs';
import { runQuestionGenerator } from '../question/question_core.mjs';
import { fetchChunksFromIndex } from '../retrieval/retrieval.mjs';
import { loadRagChunks } from '../retrieval/jsonl_chunks.mjs';
import {
chunkIdFromContent,
questionId,
} from './cache.mjs';
import {
getCachedQuestions,
saveQuestions,
getCachedGeneration,
saveGeneration,
getCachedReward,
saveReward,
saveVerification,
} from './cache.mjs';
/**
* Append a single accepted record to a JSONL file.
*/
export async function appendGoldRecord(outPath, record) {
const line = JSON.stringify(record) + '\n';
await fs.mkdir(path.dirname(outPath), { recursive: true });
await fs.appendFile(outPath, line, 'utf8');
}
/**
* Run the pipeline over a batch of seeds and write accepted
* samples to a JSONL file.
*
* Modes:
* - question-first (default): seeds are CHUNKS; we generate questions from each chunk
* - static: seeds are questions (legacy / low-priority mode)
*
* Options:
* - seedsPath: JSONL of seeds (defaults to test_samples/seed_questions.jsonl)
* - outPath: output JSONL (defaults to gold/pipeline_gold.jsonl)
* - limit: max number of seeds to process
* - verbose: extra per-stage logging
* - logger: optional logger (defaults to console)
* - seedMode: 'question-first' | 'static' (overrides env + default)
*
* Returns:
* {
* mode,
* total,
* processed, // number of questions run through pipeline
* accepted,
* outPath,
* statusCounts,
* processedSeeds?, // (question-first) how many chunks actually used
* processedQuestions? // (question-first) alias for processed
* }
*/
export async function runPipelineBatch({
seedsPath = DEFAULT_SEEDS_PATH,
outPath = DEFAULT_OUT_PATH,
limit,
chunkLimit,
verbose = false,
logger = console,
seedMode = process.env.PIPELINE_SEED_MODE || 'question-first',
onProgress = () => {},
} = {}) {
const log = logger?.log?.bind(logger) || console.log;
const errLog = logger?.error?.bind(logger) || console.error;
if (verbose) {
log('[pipeline] mode:', seedMode);
log('[pipeline] outPath:', outPath);
}
let processed = 0; // number of questions sent through runPipelineStep
let accepted = 0;
const statusCounts = {};
// ----------------------------------------
// MODE 1: static questions from JSONL (legacy / tests)
// ----------------------------------------
if (seedMode === 'static') {
const seeds = await loadSeedQuestions(seedsPath);
const maxSeeds = typeof limit === 'number' ? limit : seeds.length;
if (verbose) {
log('[pipeline] seedsPath (static questions):', seedsPath);
}
for (let idx = 0; idx < maxSeeds; idx++) {
const seed = seeds[idx];
const question = seedToQuestion(seed);
const label = `[${idx + 1}/${maxSeeds}]`;
log(`→ ${label} Running pipeline for: "${question}"`);
const tStart = Date.now();
try {
const result = await runPipelineStep({
question,
verbose,
logger,
});
processed += 1;
statusCounts[result.status] =
(statusCounts[result.status] || 0) + 1;
onProgress({
processed,
accepted,
status: result.status,
elapsedMs: Date.now() - tStart,
question,
mode: seedMode,
});
if (verbose) {
log(` ↳ status: ${result.status}`);
}
if (result.status === 'accepted') {
const record = {
question,
context: result.context,
sample: {
answer: result.gen?.answer,
thought: result.gen?.thought,
raw: result.gen?.raw,
confidence: result.gen?.confidence,
evidence: result.gen?.evidence,
limitations: result.gen?.limitations,
thinking: result.gen?.thinking,
},
verifier: result.ver,
reward: result.rew,
};
await appendGoldRecord(outPath, record);
accepted += 1;
}
} catch (e) {
const msg = e?.message || String(e);
processed += 1;
statusCounts.pipeline_error =
(statusCounts.pipeline_error || 0) + 1;
onProgress({
processed,
accepted,
status: 'pipeline_error',
elapsedMs: Date.now() - tStart,
question,
mode: seedMode,
});
errLog(' [pipeline] ERROR:', msg);
}
}
return {
mode: seedMode,
total: seeds.length,
processed,
accepted,
outPath,
statusCounts,
};
}
// ----------------------------------------
// MODE 2: question-first (JSONL or ES chunks → QG → pipeline)
// ----------------------------------------
if (seedMode === 'question-first') {
const questionProvider = loadProviderFor('question');
const generatorProviderName =
process.env.GENERATOR_PROVIDER || 'ollama';
const verifierProviderName =
process.env.VERIFIER_PROVIDER || generatorProviderName;
const rewardProviderName =
process.env.REWARD_PROVIDER || generatorProviderName;
const maxQuestionsPerChunk = Number(
process.env.QUESTION_MAX_PER_CHUNK ||
process.env.QUESTION_MAX ||
'5',
);
const questionCap =
typeof limit === 'number' ? limit : Number.POSITIVE_INFINITY;
const effectiveChunkLimit = (() => {
if (typeof chunkLimit === 'number') return chunkLimit;
const envLimit =
process.env.PIPELINE_CHUNK_LIMIT ||
process.env.PIPELINE_CHUNKS ||
process.env.PIPELINE_CHUNK_SAMPLE;
const parsed = envLimit != null ? Number(envLimit) : NaN;
return Number.isFinite(parsed) ? parsed : undefined;
})();
// Decide where chunks come from:
// PIPELINE_CHUNK_SOURCE = 'jsonl' | 'es'
const chunkSource =
process.env.PIPELINE_CHUNK_SOURCE || 'jsonl';
if (verbose) {
if (chunkSource === 'jsonl') {
const p =
process.env.RAG_CHUNKS_PATH ||
'data/rag_chunks.jsonl';
log(
`[pipeline] loading chunks from JSONL (${p}), limit=${effectiveChunkLimit ?? 'all'}`,
);
} else {
log(
`[pipeline] fetching chunks from ES (limit=${effectiveChunkLimit ?? 'all'})`,
);
}
}
let chunks = [];
try {
if (chunkSource === 'jsonl') {
chunks = await loadRagChunks(effectiveChunkLimit);
} else {
chunks = await fetchChunksFromIndex(effectiveChunkLimit);
}
} catch (e) {
const msg = e?.message || String(e);
errLog('[pipeline] ERROR loading chunks:', msg);
return {
mode: seedMode,
total: 0,
processed: 0,
accepted: 0,
outPath,
statusCounts: { chunk_load_error: 1 },
};
}
const totalChunks = chunks.length;
let processedSeeds = 0;
// Optional random walk over chunks
const randomWalk = (() => {
const v =
process.env.PIPELINE_RANDOM_WALK ||
process.env.PIPELINE_CHUNK_ORDER;
if (!v) return false;
const s = String(v).toLowerCase();
return s === '1' || s === 'true' || s === 'yes' || s === 'random';
})();
if (randomWalk && chunks.length > 1) {
for (let i = chunks.length - 1; i > 0; i--) {
const j = crypto.randomInt(i + 1);
[chunks[i], chunks[j]] = [chunks[j], chunks[i]];
}
}
for (let idx = 0; idx < chunks.length; idx++) {
if (processed >= questionCap) break;
const chunk = chunks[idx];
const label = `[chunk ${idx + 1}/${chunks.length}]`;
const contextText = chunk.content;
const stableChunkId =
chunk.id ||
chunkIdFromContent(
contextText,
chunk.sourceId || chunk.source?.id,
);
chunk.id = stableChunkId;
if (!contextText || !contextText.trim()) {
if (verbose) {
log(`${label} chunk content empty, skipping`);
}
continue;
}
processedSeeds += 1;
if (verbose) {
log(`\n🧩 ${label} generating questions from chunk…`);
log(` [question] chunk id: ${chunk.id}`);
log(
' [question] chunk preview:\n ' +
preview(contextText, 300).replace(/\n/g, '\n '),
);
log(
` [question] using provider="question" maxQuestions=${maxQuestionsPerChunk}`,
);
}
// 1) generate questions from this chunk (or load cache)
let qResult;
let questions = [];
try {
const cachedQRecords = await getCachedQuestions(stableChunkId);
if (cachedQRecords.length > 0) {
questions = cachedQRecords[0].questions.slice(
0,
maxQuestionsPerChunk,
);
if (verbose)
log(
` [question] loaded ${questions.length} cached question(s)`,
);
} else {
qResult = await runQuestionGenerator(
contextText,
questionProvider,
{ maxQuestions: maxQuestionsPerChunk },
);
questions = qResult?.questions || [];
if (questions.length > 0) {
await saveQuestions(stableChunkId, questions, {
provider: process.env.QUESTION_PROVIDER,
model: process.env.QUESTION_MODEL,
});
}
}
} catch (e) {
const msg = e?.message || String(e);
statusCounts.question_error =
(statusCounts.question_error || 0) + 1;
if (verbose) {
errLog(' [question] ERROR:', msg);
}
continue;
}
if (verbose) {
log(
` [question] generated ${questions.length} question(s) from this chunk`,
);
if (questions.length > 0) {
log(
' [question] first question: "' +
preview(questions[0], 200) +
'"',
);
}
}
// 2) run full pipeline for each generated question
for (const q of questions) {
if (processed >= questionCap) break;
if (!q || !q.trim()) continue;
const qId = questionId(stableChunkId, q);
const cachedReward = await getCachedReward(stableChunkId, qId);
if (cachedReward?.ok) {
processed += 1;
accepted += 1;
statusCounts.cached_reward =
(statusCounts.cached_reward || 0) + 1;
onProgress({
processed,
accepted,
status: 'cached_reward',
elapsedMs: 0,
question: q,
mode: seedMode,
});
if (verbose)
log(
` → [q ${processed}] using cached reward, skipping stages`,
);
continue;
}
const cachedGen = await getCachedGeneration(stableChunkId, qId);
const qLabel = `[q ${processed + 1}]`;
log(
` → ${qLabel} Running pipeline for generated question: "${q}"`,
);
const tStart = Date.now();
try {
const result = await runPipelineStep({
question: q,
initialContext: [chunk], // IMPORTANT: reuse SAME chunk, no second retrieval
cachedGen,
verbose,
logger,
});
processed += 1;
statusCounts[result.status] =
(statusCounts[result.status] || 0) + 1;
onProgress({
processed,
accepted,
status: result.status,
elapsedMs: Date.now() - tStart,
question: q,
mode: seedMode,
});
if (verbose) {
log(` ↳ status: ${result.status}`);
}
let savedGenRecord = null;
if (result.gen) {
savedGenRecord = await saveGeneration(stableChunkId, qId, result.gen, {
provider: generatorProviderName,
model: process.env.GENERATOR_MODEL,
});
// keep gen_id on the in-memory object for downstream linking
if (savedGenRecord?.gen_id) {
result.gen.gen_id = savedGenRecord.gen_id;
}
}
const genIdForFollowups =
savedGenRecord?.gen_id || cachedGen?.gen_id;
if (result.ver) {
await saveVerification(
stableChunkId,
qId,
genIdForFollowups,
result.ver,
{
provider: verifierProviderName,
model: process.env.VERIFIER_MODEL,
},
);
}
if (result.rew) {
await saveReward(
stableChunkId,
qId,
genIdForFollowups,
result.rew,
{ provider: rewardProviderName, model: process.env.REWARD_MODEL },
);
}
if (result.status === 'accepted') {
const record = {
question: q,
sourceChunkId: chunk.id,
sourceChunk: contextText,
sourceDoc: chunk.source,
context: result.context,
sample: {
answer: result.gen?.answer,
thought: result.gen?.thought,
raw: result.gen?.raw,
},
verifier: result.ver,
reward: result.rew,
};
await appendGoldRecord(outPath, record);
accepted += 1;
if (verbose) {
log(' ✓ accepted and written to gold JSONL');
}
}
} catch (e) {
const msg = e?.message || String(e);
processed += 1;
statusCounts.pipeline_error =
(statusCounts.pipeline_error || 0) + 1;
onProgress({
processed,
accepted,
status: 'pipeline_error',
elapsedMs: Date.now() - tStart,
question: q,
mode: seedMode,
});
errLog(' [pipeline] ERROR:', msg);
}
}
}
return {
mode: seedMode,
total: totalChunks,
processed, // number of questions processed
processedSeeds,
processedQuestions: processed,
accepted,
outPath,
statusCounts,
};
}
throw new Error(`Unknown PIPELINE_SEED_MODE: ${seedMode}`);
}