| | |
| | import fs from 'fs/promises'; |
| | import path from 'path'; |
| |
|
| | import { preview } from './util.mjs'; |
| | import crypto from 'crypto'; |
| | import { |
| | DEFAULT_SEEDS_PATH, |
| | DEFAULT_OUT_PATH, |
| | loadSeedQuestions, |
| | seedToQuestion, |
| | } from './seeds.mjs'; |
| | import { runPipelineStep } from './step.mjs'; |
| | import { loadProviderFor } from '../providers/provider.mjs'; |
| | import { runQuestionGenerator } from '../question/question_core.mjs'; |
| | import { fetchChunksFromIndex } from '../retrieval/retrieval.mjs'; |
| | import { loadRagChunks } from '../retrieval/jsonl_chunks.mjs'; |
| | import { |
| | chunkIdFromContent, |
| | questionId, |
| | } from './cache.mjs'; |
| | import { |
| | getCachedQuestions, |
| | saveQuestions, |
| | getCachedGeneration, |
| | saveGeneration, |
| | getCachedReward, |
| | saveReward, |
| | saveVerification, |
| | } from './cache.mjs'; |
| |
|
| | |
| | |
| | |
| | export async function appendGoldRecord(outPath, record) { |
| | const line = JSON.stringify(record) + '\n'; |
| | await fs.mkdir(path.dirname(outPath), { recursive: true }); |
| | await fs.appendFile(outPath, line, 'utf8'); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | export async function runPipelineBatch({ |
| | seedsPath = DEFAULT_SEEDS_PATH, |
| | outPath = DEFAULT_OUT_PATH, |
| | limit, |
| | chunkLimit, |
| | verbose = false, |
| | logger = console, |
| | seedMode = process.env.PIPELINE_SEED_MODE || 'question-first', |
| | onProgress = () => {}, |
| | } = {}) { |
| | const log = logger?.log?.bind(logger) || console.log; |
| | const errLog = logger?.error?.bind(logger) || console.error; |
| |
|
| | if (verbose) { |
| | log('[pipeline] mode:', seedMode); |
| | log('[pipeline] outPath:', outPath); |
| | } |
| |
|
| | let processed = 0; |
| | let accepted = 0; |
| | const statusCounts = {}; |
| |
|
| | |
| | |
| | |
| | if (seedMode === 'static') { |
| | const seeds = await loadSeedQuestions(seedsPath); |
| | const maxSeeds = typeof limit === 'number' ? limit : seeds.length; |
| |
|
| | if (verbose) { |
| | log('[pipeline] seedsPath (static questions):', seedsPath); |
| | } |
| |
|
| | for (let idx = 0; idx < maxSeeds; idx++) { |
| | const seed = seeds[idx]; |
| | const question = seedToQuestion(seed); |
| | const label = `[${idx + 1}/${maxSeeds}]`; |
| |
|
| | log(`→ ${label} Running pipeline for: "${question}"`); |
| |
|
| | const tStart = Date.now(); |
| | try { |
| | const result = await runPipelineStep({ |
| | question, |
| | verbose, |
| | logger, |
| | }); |
| |
|
| | processed += 1; |
| | statusCounts[result.status] = |
| | (statusCounts[result.status] || 0) + 1; |
| |
|
| | onProgress({ |
| | processed, |
| | accepted, |
| | status: result.status, |
| | elapsedMs: Date.now() - tStart, |
| | question, |
| | mode: seedMode, |
| | }); |
| |
|
| | if (verbose) { |
| | log(` ↳ status: ${result.status}`); |
| | } |
| |
|
| | if (result.status === 'accepted') { |
| | const record = { |
| | question, |
| | context: result.context, |
| | sample: { |
| | answer: result.gen?.answer, |
| | thought: result.gen?.thought, |
| | raw: result.gen?.raw, |
| | confidence: result.gen?.confidence, |
| | evidence: result.gen?.evidence, |
| | limitations: result.gen?.limitations, |
| | thinking: result.gen?.thinking, |
| | }, |
| | verifier: result.ver, |
| | reward: result.rew, |
| | }; |
| |
|
| | await appendGoldRecord(outPath, record); |
| | accepted += 1; |
| | } |
| | } catch (e) { |
| | const msg = e?.message || String(e); |
| | processed += 1; |
| | statusCounts.pipeline_error = |
| | (statusCounts.pipeline_error || 0) + 1; |
| | onProgress({ |
| | processed, |
| | accepted, |
| | status: 'pipeline_error', |
| | elapsedMs: Date.now() - tStart, |
| | question, |
| | mode: seedMode, |
| | }); |
| | errLog(' [pipeline] ERROR:', msg); |
| | } |
| | } |
| |
|
| | return { |
| | mode: seedMode, |
| | total: seeds.length, |
| | processed, |
| | accepted, |
| | outPath, |
| | statusCounts, |
| | }; |
| | } |
| |
|
| | |
| | |
| | |
| | if (seedMode === 'question-first') { |
| | const questionProvider = loadProviderFor('question'); |
| | const generatorProviderName = |
| | process.env.GENERATOR_PROVIDER || 'ollama'; |
| | const verifierProviderName = |
| | process.env.VERIFIER_PROVIDER || generatorProviderName; |
| | const rewardProviderName = |
| | process.env.REWARD_PROVIDER || generatorProviderName; |
| |
|
| | const maxQuestionsPerChunk = Number( |
| | process.env.QUESTION_MAX_PER_CHUNK || |
| | process.env.QUESTION_MAX || |
| | '5', |
| | ); |
| |
|
| | const questionCap = |
| | typeof limit === 'number' ? limit : Number.POSITIVE_INFINITY; |
| |
|
| | const effectiveChunkLimit = (() => { |
| | if (typeof chunkLimit === 'number') return chunkLimit; |
| | const envLimit = |
| | process.env.PIPELINE_CHUNK_LIMIT || |
| | process.env.PIPELINE_CHUNKS || |
| | process.env.PIPELINE_CHUNK_SAMPLE; |
| | const parsed = envLimit != null ? Number(envLimit) : NaN; |
| | return Number.isFinite(parsed) ? parsed : undefined; |
| | })(); |
| |
|
| | |
| | |
| | const chunkSource = |
| | process.env.PIPELINE_CHUNK_SOURCE || 'jsonl'; |
| |
|
| | if (verbose) { |
| | if (chunkSource === 'jsonl') { |
| | const p = |
| | process.env.RAG_CHUNKS_PATH || |
| | 'data/rag_chunks.jsonl'; |
| | log( |
| | `[pipeline] loading chunks from JSONL (${p}), limit=${effectiveChunkLimit ?? 'all'}`, |
| | ); |
| | } else { |
| | log( |
| | `[pipeline] fetching chunks from ES (limit=${effectiveChunkLimit ?? 'all'})`, |
| | ); |
| | } |
| | } |
| |
|
| | let chunks = []; |
| | try { |
| | if (chunkSource === 'jsonl') { |
| | chunks = await loadRagChunks(effectiveChunkLimit); |
| | } else { |
| | chunks = await fetchChunksFromIndex(effectiveChunkLimit); |
| | } |
| | } catch (e) { |
| | const msg = e?.message || String(e); |
| | errLog('[pipeline] ERROR loading chunks:', msg); |
| | return { |
| | mode: seedMode, |
| | total: 0, |
| | processed: 0, |
| | accepted: 0, |
| | outPath, |
| | statusCounts: { chunk_load_error: 1 }, |
| | }; |
| | } |
| |
|
| | const totalChunks = chunks.length; |
| | let processedSeeds = 0; |
| |
|
| | |
| | const randomWalk = (() => { |
| | const v = |
| | process.env.PIPELINE_RANDOM_WALK || |
| | process.env.PIPELINE_CHUNK_ORDER; |
| | if (!v) return false; |
| | const s = String(v).toLowerCase(); |
| | return s === '1' || s === 'true' || s === 'yes' || s === 'random'; |
| | })(); |
| |
|
| | if (randomWalk && chunks.length > 1) { |
| | for (let i = chunks.length - 1; i > 0; i--) { |
| | const j = crypto.randomInt(i + 1); |
| | [chunks[i], chunks[j]] = [chunks[j], chunks[i]]; |
| | } |
| | } |
| |
|
| | for (let idx = 0; idx < chunks.length; idx++) { |
| | if (processed >= questionCap) break; |
| |
|
| | const chunk = chunks[idx]; |
| | const label = `[chunk ${idx + 1}/${chunks.length}]`; |
| | const contextText = chunk.content; |
| |
|
| | const stableChunkId = |
| | chunk.id || |
| | chunkIdFromContent( |
| | contextText, |
| | chunk.sourceId || chunk.source?.id, |
| | ); |
| | chunk.id = stableChunkId; |
| |
|
| | if (!contextText || !contextText.trim()) { |
| | if (verbose) { |
| | log(`${label} chunk content empty, skipping`); |
| | } |
| | continue; |
| | } |
| |
|
| | processedSeeds += 1; |
| |
|
| | if (verbose) { |
| | log(`\n🧩 ${label} generating questions from chunk…`); |
| | log(` [question] chunk id: ${chunk.id}`); |
| | log( |
| | ' [question] chunk preview:\n ' + |
| | preview(contextText, 300).replace(/\n/g, '\n '), |
| | ); |
| | log( |
| | ` [question] using provider="question" maxQuestions=${maxQuestionsPerChunk}`, |
| | ); |
| | } |
| |
|
| | |
| | let qResult; |
| | let questions = []; |
| | try { |
| | const cachedQRecords = await getCachedQuestions(stableChunkId); |
| | if (cachedQRecords.length > 0) { |
| | questions = cachedQRecords[0].questions.slice( |
| | 0, |
| | maxQuestionsPerChunk, |
| | ); |
| | if (verbose) |
| | log( |
| | ` [question] loaded ${questions.length} cached question(s)`, |
| | ); |
| | } else { |
| | qResult = await runQuestionGenerator( |
| | contextText, |
| | questionProvider, |
| | { maxQuestions: maxQuestionsPerChunk }, |
| | ); |
| | questions = qResult?.questions || []; |
| | if (questions.length > 0) { |
| | await saveQuestions(stableChunkId, questions, { |
| | provider: process.env.QUESTION_PROVIDER, |
| | model: process.env.QUESTION_MODEL, |
| | }); |
| | } |
| | } |
| | } catch (e) { |
| | const msg = e?.message || String(e); |
| | statusCounts.question_error = |
| | (statusCounts.question_error || 0) + 1; |
| | if (verbose) { |
| | errLog(' [question] ERROR:', msg); |
| | } |
| | continue; |
| | } |
| |
|
| | if (verbose) { |
| | log( |
| | ` [question] generated ${questions.length} question(s) from this chunk`, |
| | ); |
| | if (questions.length > 0) { |
| | log( |
| | ' [question] first question: "' + |
| | preview(questions[0], 200) + |
| | '"', |
| | ); |
| | } |
| | } |
| |
|
| | |
| | for (const q of questions) { |
| | if (processed >= questionCap) break; |
| | if (!q || !q.trim()) continue; |
| |
|
| | const qId = questionId(stableChunkId, q); |
| | const cachedReward = await getCachedReward(stableChunkId, qId); |
| | if (cachedReward?.ok) { |
| | processed += 1; |
| | accepted += 1; |
| | statusCounts.cached_reward = |
| | (statusCounts.cached_reward || 0) + 1; |
| | onProgress({ |
| | processed, |
| | accepted, |
| | status: 'cached_reward', |
| | elapsedMs: 0, |
| | question: q, |
| | mode: seedMode, |
| | }); |
| | if (verbose) |
| | log( |
| | ` → [q ${processed}] using cached reward, skipping stages`, |
| | ); |
| | continue; |
| | } |
| |
|
| | const cachedGen = await getCachedGeneration(stableChunkId, qId); |
| |
|
| | const qLabel = `[q ${processed + 1}]`; |
| | log( |
| | ` → ${qLabel} Running pipeline for generated question: "${q}"`, |
| | ); |
| |
|
| | const tStart = Date.now(); |
| | try { |
| | const result = await runPipelineStep({ |
| | question: q, |
| | initialContext: [chunk], |
| | cachedGen, |
| | verbose, |
| | logger, |
| | }); |
| |
|
| | processed += 1; |
| | statusCounts[result.status] = |
| | (statusCounts[result.status] || 0) + 1; |
| |
|
| | onProgress({ |
| | processed, |
| | accepted, |
| | status: result.status, |
| | elapsedMs: Date.now() - tStart, |
| | question: q, |
| | mode: seedMode, |
| | }); |
| |
|
| | if (verbose) { |
| | log(` ↳ status: ${result.status}`); |
| | } |
| |
|
| | let savedGenRecord = null; |
| | if (result.gen) { |
| | savedGenRecord = await saveGeneration(stableChunkId, qId, result.gen, { |
| | provider: generatorProviderName, |
| | model: process.env.GENERATOR_MODEL, |
| | }); |
| | |
| | if (savedGenRecord?.gen_id) { |
| | result.gen.gen_id = savedGenRecord.gen_id; |
| | } |
| | } |
| | const genIdForFollowups = |
| | savedGenRecord?.gen_id || cachedGen?.gen_id; |
| | if (result.ver) { |
| | await saveVerification( |
| | stableChunkId, |
| | qId, |
| | genIdForFollowups, |
| | result.ver, |
| | { |
| | provider: verifierProviderName, |
| | model: process.env.VERIFIER_MODEL, |
| | }, |
| | ); |
| | } |
| | if (result.rew) { |
| | await saveReward( |
| | stableChunkId, |
| | qId, |
| | genIdForFollowups, |
| | result.rew, |
| | { provider: rewardProviderName, model: process.env.REWARD_MODEL }, |
| | ); |
| | } |
| |
|
| | if (result.status === 'accepted') { |
| | const record = { |
| | question: q, |
| | sourceChunkId: chunk.id, |
| | sourceChunk: contextText, |
| | sourceDoc: chunk.source, |
| | context: result.context, |
| | sample: { |
| | answer: result.gen?.answer, |
| | thought: result.gen?.thought, |
| | raw: result.gen?.raw, |
| | }, |
| | verifier: result.ver, |
| | reward: result.rew, |
| | }; |
| |
|
| | await appendGoldRecord(outPath, record); |
| | accepted += 1; |
| |
|
| | if (verbose) { |
| | log(' ✓ accepted and written to gold JSONL'); |
| | } |
| | } |
| | } catch (e) { |
| | const msg = e?.message || String(e); |
| | processed += 1; |
| | statusCounts.pipeline_error = |
| | (statusCounts.pipeline_error || 0) + 1; |
| | onProgress({ |
| | processed, |
| | accepted, |
| | status: 'pipeline_error', |
| | elapsedMs: Date.now() - tStart, |
| | question: q, |
| | mode: seedMode, |
| | }); |
| | errLog(' [pipeline] ERROR:', msg); |
| | } |
| | } |
| | } |
| |
|
| | return { |
| | mode: seedMode, |
| | total: totalChunks, |
| | processed, |
| | processedSeeds, |
| | processedQuestions: processed, |
| | accepted, |
| | outPath, |
| | statusCounts, |
| | }; |
| | } |
| |
|
| | throw new Error(`Unknown PIPELINE_SEED_MODE: ${seedMode}`); |
| | } |
| |
|