// src/pipeline/batch.mjs import fs from 'fs/promises'; import path from 'path'; import { preview } from './util.mjs'; import crypto from 'crypto'; import { DEFAULT_SEEDS_PATH, DEFAULT_OUT_PATH, loadSeedQuestions, seedToQuestion, } from './seeds.mjs'; import { runPipelineStep } from './step.mjs'; import { loadProviderFor } from '../providers/provider.mjs'; import { runQuestionGenerator } from '../question/question_core.mjs'; import { fetchChunksFromIndex } from '../retrieval/retrieval.mjs'; import { loadRagChunks } from '../retrieval/jsonl_chunks.mjs'; import { chunkIdFromContent, questionId, } from './cache.mjs'; import { getCachedQuestions, saveQuestions, getCachedGeneration, saveGeneration, getCachedReward, saveReward, saveVerification, } from './cache.mjs'; /** * Append a single accepted record to a JSONL file. */ export async function appendGoldRecord(outPath, record) { const line = JSON.stringify(record) + '\n'; await fs.mkdir(path.dirname(outPath), { recursive: true }); await fs.appendFile(outPath, line, 'utf8'); } /** * Run the pipeline over a batch of seeds and write accepted * samples to a JSONL file. * * Modes: * - question-first (default): seeds are CHUNKS; we generate questions from each chunk * - static: seeds are questions (legacy / low-priority mode) * * Options: * - seedsPath: JSONL of seeds (defaults to test_samples/seed_questions.jsonl) * - outPath: output JSONL (defaults to gold/pipeline_gold.jsonl) * - limit: max number of seeds to process * - verbose: extra per-stage logging * - logger: optional logger (defaults to console) * - seedMode: 'question-first' | 'static' (overrides env + default) * * Returns: * { * mode, * total, * processed, // number of questions run through pipeline * accepted, * outPath, * statusCounts, * processedSeeds?, // (question-first) how many chunks actually used * processedQuestions? // (question-first) alias for processed * } */ export async function runPipelineBatch({ seedsPath = DEFAULT_SEEDS_PATH, outPath = DEFAULT_OUT_PATH, limit, chunkLimit, verbose = false, logger = console, seedMode = process.env.PIPELINE_SEED_MODE || 'question-first', onProgress = () => {}, } = {}) { const log = logger?.log?.bind(logger) || console.log; const errLog = logger?.error?.bind(logger) || console.error; if (verbose) { log('[pipeline] mode:', seedMode); log('[pipeline] outPath:', outPath); } let processed = 0; // number of questions sent through runPipelineStep let accepted = 0; const statusCounts = {}; // ---------------------------------------- // MODE 1: static questions from JSONL (legacy / tests) // ---------------------------------------- if (seedMode === 'static') { const seeds = await loadSeedQuestions(seedsPath); const maxSeeds = typeof limit === 'number' ? limit : seeds.length; if (verbose) { log('[pipeline] seedsPath (static questions):', seedsPath); } for (let idx = 0; idx < maxSeeds; idx++) { const seed = seeds[idx]; const question = seedToQuestion(seed); const label = `[${idx + 1}/${maxSeeds}]`; log(`→ ${label} Running pipeline for: "${question}"`); const tStart = Date.now(); try { const result = await runPipelineStep({ question, verbose, logger, }); processed += 1; statusCounts[result.status] = (statusCounts[result.status] || 0) + 1; onProgress({ processed, accepted, status: result.status, elapsedMs: Date.now() - tStart, question, mode: seedMode, }); if (verbose) { log(` ↳ status: ${result.status}`); } if (result.status === 'accepted') { const record = { question, context: result.context, sample: { answer: result.gen?.answer, thought: result.gen?.thought, raw: result.gen?.raw, confidence: result.gen?.confidence, evidence: result.gen?.evidence, limitations: result.gen?.limitations, thinking: result.gen?.thinking, }, verifier: result.ver, reward: result.rew, }; await appendGoldRecord(outPath, record); accepted += 1; } } catch (e) { const msg = e?.message || String(e); processed += 1; statusCounts.pipeline_error = (statusCounts.pipeline_error || 0) + 1; onProgress({ processed, accepted, status: 'pipeline_error', elapsedMs: Date.now() - tStart, question, mode: seedMode, }); errLog(' [pipeline] ERROR:', msg); } } return { mode: seedMode, total: seeds.length, processed, accepted, outPath, statusCounts, }; } // ---------------------------------------- // MODE 2: question-first (JSONL or ES chunks → QG → pipeline) // ---------------------------------------- if (seedMode === 'question-first') { const questionProvider = loadProviderFor('question'); const generatorProviderName = process.env.GENERATOR_PROVIDER || 'ollama'; const verifierProviderName = process.env.VERIFIER_PROVIDER || generatorProviderName; const rewardProviderName = process.env.REWARD_PROVIDER || generatorProviderName; const maxQuestionsPerChunk = Number( process.env.QUESTION_MAX_PER_CHUNK || process.env.QUESTION_MAX || '5', ); const questionCap = typeof limit === 'number' ? limit : Number.POSITIVE_INFINITY; const effectiveChunkLimit = (() => { if (typeof chunkLimit === 'number') return chunkLimit; const envLimit = process.env.PIPELINE_CHUNK_LIMIT || process.env.PIPELINE_CHUNKS || process.env.PIPELINE_CHUNK_SAMPLE; const parsed = envLimit != null ? Number(envLimit) : NaN; return Number.isFinite(parsed) ? parsed : undefined; })(); // Decide where chunks come from: // PIPELINE_CHUNK_SOURCE = 'jsonl' | 'es' const chunkSource = process.env.PIPELINE_CHUNK_SOURCE || 'jsonl'; if (verbose) { if (chunkSource === 'jsonl') { const p = process.env.RAG_CHUNKS_PATH || 'data/rag_chunks.jsonl'; log( `[pipeline] loading chunks from JSONL (${p}), limit=${effectiveChunkLimit ?? 'all'}`, ); } else { log( `[pipeline] fetching chunks from ES (limit=${effectiveChunkLimit ?? 'all'})`, ); } } let chunks = []; try { if (chunkSource === 'jsonl') { chunks = await loadRagChunks(effectiveChunkLimit); } else { chunks = await fetchChunksFromIndex(effectiveChunkLimit); } } catch (e) { const msg = e?.message || String(e); errLog('[pipeline] ERROR loading chunks:', msg); return { mode: seedMode, total: 0, processed: 0, accepted: 0, outPath, statusCounts: { chunk_load_error: 1 }, }; } const totalChunks = chunks.length; let processedSeeds = 0; // Optional random walk over chunks const randomWalk = (() => { const v = process.env.PIPELINE_RANDOM_WALK || process.env.PIPELINE_CHUNK_ORDER; if (!v) return false; const s = String(v).toLowerCase(); return s === '1' || s === 'true' || s === 'yes' || s === 'random'; })(); if (randomWalk && chunks.length > 1) { for (let i = chunks.length - 1; i > 0; i--) { const j = crypto.randomInt(i + 1); [chunks[i], chunks[j]] = [chunks[j], chunks[i]]; } } for (let idx = 0; idx < chunks.length; idx++) { if (processed >= questionCap) break; const chunk = chunks[idx]; const label = `[chunk ${idx + 1}/${chunks.length}]`; const contextText = chunk.content; const stableChunkId = chunk.id || chunkIdFromContent( contextText, chunk.sourceId || chunk.source?.id, ); chunk.id = stableChunkId; if (!contextText || !contextText.trim()) { if (verbose) { log(`${label} chunk content empty, skipping`); } continue; } processedSeeds += 1; if (verbose) { log(`\n🧩 ${label} generating questions from chunk…`); log(` [question] chunk id: ${chunk.id}`); log( ' [question] chunk preview:\n ' + preview(contextText, 300).replace(/\n/g, '\n '), ); log( ` [question] using provider="question" maxQuestions=${maxQuestionsPerChunk}`, ); } // 1) generate questions from this chunk (or load cache) let qResult; let questions = []; try { const cachedQRecords = await getCachedQuestions(stableChunkId); if (cachedQRecords.length > 0) { questions = cachedQRecords[0].questions.slice( 0, maxQuestionsPerChunk, ); if (verbose) log( ` [question] loaded ${questions.length} cached question(s)`, ); } else { qResult = await runQuestionGenerator( contextText, questionProvider, { maxQuestions: maxQuestionsPerChunk }, ); questions = qResult?.questions || []; if (questions.length > 0) { await saveQuestions(stableChunkId, questions, { provider: process.env.QUESTION_PROVIDER, model: process.env.QUESTION_MODEL, }); } } } catch (e) { const msg = e?.message || String(e); statusCounts.question_error = (statusCounts.question_error || 0) + 1; if (verbose) { errLog(' [question] ERROR:', msg); } continue; } if (verbose) { log( ` [question] generated ${questions.length} question(s) from this chunk`, ); if (questions.length > 0) { log( ' [question] first question: "' + preview(questions[0], 200) + '"', ); } } // 2) run full pipeline for each generated question for (const q of questions) { if (processed >= questionCap) break; if (!q || !q.trim()) continue; const qId = questionId(stableChunkId, q); const cachedReward = await getCachedReward(stableChunkId, qId); if (cachedReward?.ok) { processed += 1; accepted += 1; statusCounts.cached_reward = (statusCounts.cached_reward || 0) + 1; onProgress({ processed, accepted, status: 'cached_reward', elapsedMs: 0, question: q, mode: seedMode, }); if (verbose) log( ` → [q ${processed}] using cached reward, skipping stages`, ); continue; } const cachedGen = await getCachedGeneration(stableChunkId, qId); const qLabel = `[q ${processed + 1}]`; log( ` → ${qLabel} Running pipeline for generated question: "${q}"`, ); const tStart = Date.now(); try { const result = await runPipelineStep({ question: q, initialContext: [chunk], // IMPORTANT: reuse SAME chunk, no second retrieval cachedGen, verbose, logger, }); processed += 1; statusCounts[result.status] = (statusCounts[result.status] || 0) + 1; onProgress({ processed, accepted, status: result.status, elapsedMs: Date.now() - tStart, question: q, mode: seedMode, }); if (verbose) { log(` ↳ status: ${result.status}`); } let savedGenRecord = null; if (result.gen) { savedGenRecord = await saveGeneration(stableChunkId, qId, result.gen, { provider: generatorProviderName, model: process.env.GENERATOR_MODEL, }); // keep gen_id on the in-memory object for downstream linking if (savedGenRecord?.gen_id) { result.gen.gen_id = savedGenRecord.gen_id; } } const genIdForFollowups = savedGenRecord?.gen_id || cachedGen?.gen_id; if (result.ver) { await saveVerification( stableChunkId, qId, genIdForFollowups, result.ver, { provider: verifierProviderName, model: process.env.VERIFIER_MODEL, }, ); } if (result.rew) { await saveReward( stableChunkId, qId, genIdForFollowups, result.rew, { provider: rewardProviderName, model: process.env.REWARD_MODEL }, ); } if (result.status === 'accepted') { const record = { question: q, sourceChunkId: chunk.id, sourceChunk: contextText, sourceDoc: chunk.source, context: result.context, sample: { answer: result.gen?.answer, thought: result.gen?.thought, raw: result.gen?.raw, }, verifier: result.ver, reward: result.rew, }; await appendGoldRecord(outPath, record); accepted += 1; if (verbose) { log(' ✓ accepted and written to gold JSONL'); } } } catch (e) { const msg = e?.message || String(e); processed += 1; statusCounts.pipeline_error = (statusCounts.pipeline_error || 0) + 1; onProgress({ processed, accepted, status: 'pipeline_error', elapsedMs: Date.now() - tStart, question: q, mode: seedMode, }); errLog(' [pipeline] ERROR:', msg); } } } return { mode: seedMode, total: totalChunks, processed, // number of questions processed processedSeeds, processedQuestions: processed, accepted, outPath, statusCounts, }; } throw new Error(`Unknown PIPELINE_SEED_MODE: ${seedMode}`); }