htaf commited on
Commit
0918f71
·
1 Parent(s): 936c57a

added intermediate storage of chunks

Browse files
AGENTS.md CHANGED
@@ -5,6 +5,7 @@
5
  - Prompts are in `prompts/`; tweak these before changing stage behaviour.
6
  - Tests sit in `tests/` (Vitest), with sample seeds in `test_samples/`; pipeline outputs write to `gold/`.
7
  - Config baselines (models, limits) are in `configs/pipeline.json`; run scripts live at the repo root (`run.sh`, `try_prompt.sh`).
 
8
 
9
  ## Build, Test, and Development Commands
10
  - `npm install` – install dependencies.
@@ -12,12 +13,14 @@
12
  - `PIPELINE_SEED_MODE=question-first npm run pipeline -- --limit 20 --verbose` – enable question-first seeding.
13
  - `npm test` – run all unit tests (mocked by default).
14
  - `REAL_ES=1 npm test` – exercise retrieval against a live Elasticsearch + embedding endpoint.
 
15
 
16
  ## Coding Style & Naming Conventions
17
  - ECMAScript modules (`type: "module"`); prefer `.mjs` for shared code.
18
  - Two-space indentation, single quotes unless template strings add clarity, and keep functions small and pure where possible (CLI glue stays in `pipeline_cli.js`).
19
  - Use descriptive, lower_snake or camelCase for variables; exported helpers use camelCase.
20
  - Keep prompts and stage logic separate; place reusable utilities in `src/pipeline/util.mjs`.
 
21
 
22
  ## Testing Guidelines
23
  - Vitest is the test runner; add new tests under `tests/` with `.test.mjs` suffix.
 
5
  - Prompts are in `prompts/`; tweak these before changing stage behaviour.
6
  - Tests sit in `tests/` (Vitest), with sample seeds in `test_samples/`; pipeline outputs write to `gold/`.
7
  - Config baselines (models, limits) are in `configs/pipeline.json`; run scripts live at the repo root (`run.sh`, `try_prompt.sh`).
8
+ - Cached intermediates (questions/gens/verifications/rewards) live in `data/cache/*.jsonl`; set `PIPELINE_CACHE_DIR` to redirect.
9
 
10
  ## Build, Test, and Development Commands
11
  - `npm install` – install dependencies.
 
13
  - `PIPELINE_SEED_MODE=question-first npm run pipeline -- --limit 20 --verbose` – enable question-first seeding.
14
  - `npm test` – run all unit tests (mocked by default).
15
  - `REAL_ES=1 npm test` – exercise retrieval against a live Elasticsearch + embedding endpoint.
16
+ - Red/green pathway: use `*_PROVIDER=mock` plus JSONL chunk source to dry-run (green) without models; switch to real providers for red runs and the cache will skip already-completed stages.
17
 
18
  ## Coding Style & Naming Conventions
19
  - ECMAScript modules (`type: "module"`); prefer `.mjs` for shared code.
20
  - Two-space indentation, single quotes unless template strings add clarity, and keep functions small and pure where possible (CLI glue stays in `pipeline_cli.js`).
21
  - Use descriptive, lower_snake or camelCase for variables; exported helpers use camelCase.
22
  - Keep prompts and stage logic separate; place reusable utilities in `src/pipeline/util.mjs`.
23
+ - Deterministic IDs: chunks are hashed from content+source; questions/gens/rewards are keyed in JSONL caches so reruns can skip already-processed work.
24
 
25
  ## Testing Guidelines
26
  - Vitest is the test runner; add new tests under `tests/` with `.test.mjs` suffix.
data/cache/generations.jsonl ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ {"chunk_id":"chunk-1","question_id":"a85338a842d7801079bbbc9d5cab2665c5cf65fda9acdc73b2151994e73555bd","gen_id":"fe108065f6f7a4f099d4687547a35803133e8a72d110560b0d6f6dc3e92f945d","answer":"Here is a grounded answer.","thought":"mock reasoning","raw":"<think>mock reasoning</think>Here is a grounded answer.","provider":"mock","ts":1763950116519}
2
+ {"chunk_id":"chunk-1","question_id":"e8b843bd798e1617421c699c4b9cab3689e52d013c314481a5255917fcd19958","gen_id":"23079eb060c27259b66fec6e0cd66140b8c65c731b6f2724000e49c675e382c2","answer":"Here is a grounded answer.","thought":"mock reasoning","raw":"<think>mock reasoning</think>Here is a grounded answer.","provider":"mock","ts":1763950116521}
3
+ {"chunk_id":"c-0","question_id":"d7bd2353ef09a9061f7d6757ab0d9453e6ff9b735212ea69984cf541a3e506c7","gen_id":"fec93133bc62fb9bd895d88deec5afef39f9d5858997b0ccf1f8826868c758e7","answer":"a","provider":"ollama","ts":1763950116529}
4
+ {"chunk_id":"chunk-2","question_id":"5bb2bda8a2e19f3b77bcdd33f54e068df86b1dce9d82d1046438447d5908cbde","gen_id":"3cfe24cc427220e94ff010c167776baf27a9650bc47294a21c7715e89b01cda8","answer":"Here is a grounded answer.","thought":"mock reasoning","raw":"<think>mock reasoning</think>Here is a grounded answer.","provider":"mock","ts":1763950116529}
5
+ {"chunk_id":"c-0","question_id":"61ae2d8d90d176a1a499aa6b965c3729f6a596a345b0926441f86845c36a56d6","gen_id":"c3f3f96ec39323a48b373ca251675ee6904c41333bc6d4612c8cdd1a4b9163f3","answer":"a","provider":"ollama","ts":1763950116534}
6
+ {"chunk_id":"c1","question_id":"d159888d476c85a6459fe892d359efeebb8821ee5ec32e4cfa0f3ea19c25e7e9","gen_id":"7ba5bdcd55087da7ece639dbf4875a61ef67f8740cd851b071aae05daa3fb871","answer":"a","provider":"ollama","model":"qwen3-vl:8b-thinking","ts":1763950148198}
data/cache/questions.jsonl ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ {"chunk_id":"chunk-1","questions":["What is the main idea?","How does the text justify its claim?"],"question_ids":["a85338a842d7801079bbbc9d5cab2665c5cf65fda9acdc73b2151994e73555bd","e8b843bd798e1617421c699c4b9cab3689e52d013c314481a5255917fcd19958"],"provider":"mock","ts":1763950116516}
2
+ {"chunk_id":"chunk-2","questions":["What is the main idea?","How does the text justify its claim?"],"question_ids":["5bb2bda8a2e19f3b77bcdd33f54e068df86b1dce9d82d1046438447d5908cbde","53f4bb043a43683a68a25a3bb34f3170c362ea5583114ed1222d46011b8d9d5c"],"provider":"mock","ts":1763950116524}
3
+ {"chunk_id":"c-0","questions":["Q1?","Q2?","Q3?"],"question_ids":["d7bd2353ef09a9061f7d6757ab0d9453e6ff9b735212ea69984cf541a3e506c7","61ae2d8d90d176a1a499aa6b965c3729f6a596a345b0926441f86845c36a56d6","fee11e7734240ba47650e2e7b8d71ecb626d2777361138c2fdc033b61fd392fa"],"ts":1763950116525}
4
+ {"chunk_id":"c1","questions":["Q1?","Q2?","Q3?"],"question_ids":["d159888d476c85a6459fe892d359efeebb8821ee5ec32e4cfa0f3ea19c25e7e9","96e999c82e3f02db378e52ca4b50a04a979e9cf303057917dfe21370cc15db01","46a8298c8dc8564e0240a018963d5a3ce2bda1ca75fe367ee45e7c822296f3c9"],"provider":"ollama","model":"qwen3-vl:8b-thinking","ts":1763950148197}
data/cache/rewards.jsonl ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ {"chunk_id":"chunk-1","question_id":"a85338a842d7801079bbbc9d5cab2665c5cf65fda9acdc73b2151994e73555bd","gen_id":"fe108065f6f7a4f099d4687547a35803133e8a72d110560b0d6f6dc3e92f945d","score":0.9,"ok":true,"raw":"0.9 good","provider":"mock","ts":1763950116520}
2
+ {"chunk_id":"chunk-1","question_id":"e8b843bd798e1617421c699c4b9cab3689e52d013c314481a5255917fcd19958","gen_id":"23079eb060c27259b66fec6e0cd66140b8c65c731b6f2724000e49c675e382c2","score":0.9,"ok":true,"raw":"0.9 good","provider":"mock","ts":1763950116521}
3
+ {"chunk_id":"chunk-2","question_id":"5bb2bda8a2e19f3b77bcdd33f54e068df86b1dce9d82d1046438447d5908cbde","gen_id":"3cfe24cc427220e94ff010c167776baf27a9650bc47294a21c7715e89b01cda8","score":0.9,"ok":true,"raw":"0.9 good","provider":"mock","ts":1763950116530}
4
+ {"chunk_id":"c-0","question_id":"d7bd2353ef09a9061f7d6757ab0d9453e6ff9b735212ea69984cf541a3e506c7","gen_id":"fec93133bc62fb9bd895d88deec5afef39f9d5858997b0ccf1f8826868c758e7","ok":true,"provider":"ollama","ts":1763950116530}
5
+ {"chunk_id":"c-0","question_id":"61ae2d8d90d176a1a499aa6b965c3729f6a596a345b0926441f86845c36a56d6","gen_id":"c3f3f96ec39323a48b373ca251675ee6904c41333bc6d4612c8cdd1a4b9163f3","ok":true,"provider":"ollama","ts":1763950116535}
6
+ {"chunk_id":"c1","question_id":"d159888d476c85a6459fe892d359efeebb8821ee5ec32e4cfa0f3ea19c25e7e9","gen_id":"7ba5bdcd55087da7ece639dbf4875a61ef67f8740cd851b071aae05daa3fb871","ok":true,"provider":"ollama","model":"tensortemplar/patronus-lynx:8b-instruct-q4_K_M","ts":1763950148199}
data/cache/verifications.jsonl ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ {"chunk_id":"chunk-1","question_id":"a85338a842d7801079bbbc9d5cab2665c5cf65fda9acdc73b2151994e73555bd","gen_id":"fe108065f6f7a4f099d4687547a35803133e8a72d110560b0d6f6dc3e92f945d","ok":true,"raw":"yes\nmock verifier rationale","provider":"mock","ts":1763950116519}
2
+ {"chunk_id":"chunk-1","question_id":"e8b843bd798e1617421c699c4b9cab3689e52d013c314481a5255917fcd19958","gen_id":"23079eb060c27259b66fec6e0cd66140b8c65c731b6f2724000e49c675e382c2","ok":true,"raw":"yes\nmock verifier rationale","provider":"mock","ts":1763950116521}
3
+ {"chunk_id":"c-0","question_id":"d7bd2353ef09a9061f7d6757ab0d9453e6ff9b735212ea69984cf541a3e506c7","gen_id":"fec93133bc62fb9bd895d88deec5afef39f9d5858997b0ccf1f8826868c758e7","ok":true,"provider":"ollama","ts":1763950116529}
4
+ {"chunk_id":"chunk-2","question_id":"5bb2bda8a2e19f3b77bcdd33f54e068df86b1dce9d82d1046438447d5908cbde","gen_id":"3cfe24cc427220e94ff010c167776baf27a9650bc47294a21c7715e89b01cda8","ok":true,"raw":"yes\nmock verifier rationale","provider":"mock","ts":1763950116530}
5
+ {"chunk_id":"c-0","question_id":"61ae2d8d90d176a1a499aa6b965c3729f6a596a345b0926441f86845c36a56d6","gen_id":"c3f3f96ec39323a48b373ca251675ee6904c41333bc6d4612c8cdd1a4b9163f3","ok":true,"provider":"ollama","ts":1763950116535}
6
+ {"chunk_id":"c1","question_id":"d159888d476c85a6459fe892d359efeebb8821ee5ec32e4cfa0f3ea19c25e7e9","gen_id":"7ba5bdcd55087da7ece639dbf4875a61ef67f8740cd851b071aae05daa3fb871","ok":true,"provider":"ollama","model":"tensortemplar/patronus-lynx:8b-instruct-q4_K_M","ts":1763950148198}
src/pipeline/batch.mjs CHANGED
@@ -14,6 +14,19 @@ import { loadProviderFor } from '../providers/provider.mjs';
14
  import { runQuestionGenerator } from '../question/question_core.mjs';
15
  import { fetchChunksFromIndex } from '../retrieval/retrieval.mjs';
16
  import { loadRagChunks } from '../retrieval/jsonl_chunks.mjs';
 
 
 
 
 
 
 
 
 
 
 
 
 
17
 
18
  /**
19
  * Append a single accepted record to a JSONL file.
@@ -142,6 +155,12 @@ export async function runPipelineBatch({
142
  // ----------------------------------------
143
  if (seedMode === 'question-first') {
144
  const questionProvider = loadProviderFor('question');
 
 
 
 
 
 
145
 
146
  const maxQuestionsPerChunk = Number(
147
  process.env.QUESTION_MAX_PER_CHUNK ||
@@ -212,6 +231,14 @@ export async function runPipelineBatch({
212
  const label = `[chunk ${idx + 1}/${chunks.length}]`;
213
  const contextText = chunk.content;
214
 
 
 
 
 
 
 
 
 
215
  if (!contextText || !contextText.trim()) {
216
  if (verbose) {
217
  log(`${label} chunk content empty, skipping`);
@@ -223,9 +250,7 @@ export async function runPipelineBatch({
223
 
224
  if (verbose) {
225
  log(`\n🧩 ${label} generating questions from chunk…`);
226
- if (chunk.id) {
227
- log(` [question] chunk id: ${chunk.id}`);
228
- }
229
  log(
230
  ' [question] chunk preview:\n ' +
231
  preview(contextText, 300).replace(/\n/g, '\n '),
@@ -235,14 +260,34 @@ export async function runPipelineBatch({
235
  );
236
  }
237
 
238
- // 1) generate questions from this chunk
239
  let qResult;
 
240
  try {
241
- qResult = await runQuestionGenerator(
242
- contextText,
243
- questionProvider,
244
- { maxQuestions: maxQuestionsPerChunk },
245
- );
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
246
  } catch (e) {
247
  const msg = e?.message || String(e);
248
  statusCounts.question_error =
@@ -253,8 +298,6 @@ export async function runPipelineBatch({
253
  continue;
254
  }
255
 
256
- const questions = qResult?.questions || [];
257
-
258
  if (verbose) {
259
  log(
260
  ` [question] generated ${questions.length} question(s) from this chunk`,
@@ -273,6 +316,22 @@ export async function runPipelineBatch({
273
  if (processed >= questionCap) break;
274
  if (!q || !q.trim()) continue;
275
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
276
  const qLabel = `[q ${processed + 1}]`;
277
  log(
278
  ` → ${qLabel} Running pipeline for generated question: "${q}"`,
@@ -282,6 +341,7 @@ export async function runPipelineBatch({
282
  const result = await runPipelineStep({
283
  question: q,
284
  initialContext: [chunk], // IMPORTANT: reuse SAME chunk, no second retrieval
 
285
  verbose,
286
  logger,
287
  });
@@ -294,6 +354,37 @@ export async function runPipelineBatch({
294
  log(` ↳ status: ${result.status}`);
295
  }
296
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
  if (result.status === 'accepted') {
298
  const record = {
299
  question: q,
 
14
  import { runQuestionGenerator } from '../question/question_core.mjs';
15
  import { fetchChunksFromIndex } from '../retrieval/retrieval.mjs';
16
  import { loadRagChunks } from '../retrieval/jsonl_chunks.mjs';
17
+ import {
18
+ chunkIdFromContent,
19
+ questionId,
20
+ } from './cache.mjs';
21
+ import {
22
+ getCachedQuestions,
23
+ saveQuestions,
24
+ getCachedGeneration,
25
+ saveGeneration,
26
+ getCachedReward,
27
+ saveReward,
28
+ saveVerification,
29
+ } from './cache.mjs';
30
 
31
  /**
32
  * Append a single accepted record to a JSONL file.
 
155
  // ----------------------------------------
156
  if (seedMode === 'question-first') {
157
  const questionProvider = loadProviderFor('question');
158
+ const generatorProviderName =
159
+ process.env.GENERATOR_PROVIDER || 'ollama';
160
+ const verifierProviderName =
161
+ process.env.VERIFIER_PROVIDER || generatorProviderName;
162
+ const rewardProviderName =
163
+ process.env.REWARD_PROVIDER || generatorProviderName;
164
 
165
  const maxQuestionsPerChunk = Number(
166
  process.env.QUESTION_MAX_PER_CHUNK ||
 
231
  const label = `[chunk ${idx + 1}/${chunks.length}]`;
232
  const contextText = chunk.content;
233
 
234
+ const stableChunkId =
235
+ chunk.id ||
236
+ chunkIdFromContent(
237
+ contextText,
238
+ chunk.sourceId || chunk.source?.id,
239
+ );
240
+ chunk.id = stableChunkId;
241
+
242
  if (!contextText || !contextText.trim()) {
243
  if (verbose) {
244
  log(`${label} chunk content empty, skipping`);
 
250
 
251
  if (verbose) {
252
  log(`\n🧩 ${label} generating questions from chunk…`);
253
+ log(` [question] chunk id: ${chunk.id}`);
 
 
254
  log(
255
  ' [question] chunk preview:\n ' +
256
  preview(contextText, 300).replace(/\n/g, '\n '),
 
260
  );
261
  }
262
 
263
+ // 1) generate questions from this chunk (or load cache)
264
  let qResult;
265
+ let questions = [];
266
  try {
267
+ const cachedQRecords = await getCachedQuestions(stableChunkId);
268
+ if (cachedQRecords.length > 0) {
269
+ questions = cachedQRecords[0].questions.slice(
270
+ 0,
271
+ maxQuestionsPerChunk,
272
+ );
273
+ if (verbose)
274
+ log(
275
+ ` [question] loaded ${questions.length} cached question(s)`,
276
+ );
277
+ } else {
278
+ qResult = await runQuestionGenerator(
279
+ contextText,
280
+ questionProvider,
281
+ { maxQuestions: maxQuestionsPerChunk },
282
+ );
283
+ questions = qResult?.questions || [];
284
+ if (questions.length > 0) {
285
+ await saveQuestions(stableChunkId, questions, {
286
+ provider: process.env.QUESTION_PROVIDER,
287
+ model: process.env.QUESTION_MODEL,
288
+ });
289
+ }
290
+ }
291
  } catch (e) {
292
  const msg = e?.message || String(e);
293
  statusCounts.question_error =
 
298
  continue;
299
  }
300
 
 
 
301
  if (verbose) {
302
  log(
303
  ` [question] generated ${questions.length} question(s) from this chunk`,
 
316
  if (processed >= questionCap) break;
317
  if (!q || !q.trim()) continue;
318
 
319
+ const qId = questionId(stableChunkId, q);
320
+ const cachedReward = await getCachedReward(stableChunkId, qId);
321
+ if (cachedReward?.ok) {
322
+ processed += 1;
323
+ accepted += 1;
324
+ statusCounts.cached_reward =
325
+ (statusCounts.cached_reward || 0) + 1;
326
+ if (verbose)
327
+ log(
328
+ ` → [q ${processed}] using cached reward, skipping stages`,
329
+ );
330
+ continue;
331
+ }
332
+
333
+ const cachedGen = await getCachedGeneration(stableChunkId, qId);
334
+
335
  const qLabel = `[q ${processed + 1}]`;
336
  log(
337
  ` → ${qLabel} Running pipeline for generated question: "${q}"`,
 
341
  const result = await runPipelineStep({
342
  question: q,
343
  initialContext: [chunk], // IMPORTANT: reuse SAME chunk, no second retrieval
344
+ cachedGen,
345
  verbose,
346
  logger,
347
  });
 
354
  log(` ↳ status: ${result.status}`);
355
  }
356
 
357
+ let savedGenRecord = null;
358
+ if (result.gen) {
359
+ savedGenRecord = await saveGeneration(stableChunkId, qId, result.gen, {
360
+ provider: generatorProviderName,
361
+ model: process.env.GENERATOR_MODEL,
362
+ });
363
+ }
364
+ const genIdForFollowups =
365
+ savedGenRecord?.gen_id || cachedGen?.gen_id;
366
+ if (result.ver) {
367
+ await saveVerification(
368
+ stableChunkId,
369
+ qId,
370
+ genIdForFollowups,
371
+ result.ver,
372
+ {
373
+ provider: verifierProviderName,
374
+ model: process.env.VERIFIER_MODEL,
375
+ },
376
+ );
377
+ }
378
+ if (result.rew) {
379
+ await saveReward(
380
+ stableChunkId,
381
+ qId,
382
+ genIdForFollowups,
383
+ result.rew,
384
+ { provider: rewardProviderName, model: process.env.REWARD_MODEL },
385
+ );
386
+ }
387
+
388
  if (result.status === 'accepted') {
389
  const record = {
390
  question: q,
src/pipeline/cache.mjs ADDED
@@ -0,0 +1,177 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ // src/pipeline/cache.mjs
2
+ // Lightweight JSONL cache for intermediate pipeline artifacts.
3
+ import fs from 'fs/promises';
4
+ import path from 'path';
5
+ import { PROJECT_ROOT } from './util.mjs';
6
+ import {
7
+ chunkIdFromContent,
8
+ questionId,
9
+ generationId,
10
+ normalizeText,
11
+ } from './ids.mjs';
12
+
13
+ const CUSTOM_CACHE_DIR = process.env.PIPELINE_CACHE_DIR;
14
+ const CACHE_DIR = CUSTOM_CACHE_DIR
15
+ ? (path.isAbsolute(CUSTOM_CACHE_DIR)
16
+ ? CUSTOM_CACHE_DIR
17
+ : path.join(PROJECT_ROOT, CUSTOM_CACHE_DIR))
18
+ : path.join(PROJECT_ROOT, 'data', 'cache');
19
+
20
+ const FILES = {
21
+ questions: 'questions.jsonl',
22
+ generations: 'generations.jsonl',
23
+ verifications: 'verifications.jsonl',
24
+ rewards: 'rewards.jsonl',
25
+ };
26
+
27
+ async function ensureDir() {
28
+ await fs.mkdir(CACHE_DIR, { recursive: true });
29
+ }
30
+
31
+ async function appendJsonl(fileName, record) {
32
+ await ensureDir();
33
+ const line = JSON.stringify(record) + '\n';
34
+ await fs.appendFile(path.join(CACHE_DIR, fileName), line, 'utf8');
35
+ }
36
+
37
+ async function readJsonl(fileName, predicate) {
38
+ const filePath = path.join(CACHE_DIR, fileName);
39
+ try {
40
+ const txt = await fs.readFile(filePath, 'utf8');
41
+ const lines = txt
42
+ .split('\n')
43
+ .map((l) => l.trim())
44
+ .filter(Boolean);
45
+ const parsed = lines.map((l) => {
46
+ try {
47
+ return JSON.parse(l);
48
+ } catch {
49
+ return null;
50
+ }
51
+ }).filter(Boolean);
52
+ return predicate ? parsed.filter(predicate) : parsed;
53
+ } catch (e) {
54
+ if (e.code === 'ENOENT') return [];
55
+ throw e;
56
+ }
57
+ }
58
+
59
+ // ---------------------------
60
+ // Question cache
61
+ // ---------------------------
62
+ export async function getCachedQuestions(chunkId) {
63
+ return readJsonl(
64
+ FILES.questions,
65
+ (r) => r.chunk_id === chunkId && Array.isArray(r.questions) && r.questions.length > 0,
66
+ );
67
+ }
68
+
69
+ export async function saveQuestions(chunkId, questions, meta = {}) {
70
+ const ts = Date.now();
71
+ const record = {
72
+ chunk_id: chunkId,
73
+ questions,
74
+ question_ids: questions.map((q) => questionId(chunkId, q)),
75
+ provider: meta.provider,
76
+ model: meta.model,
77
+ ts,
78
+ };
79
+ await appendJsonl(FILES.questions, record);
80
+ }
81
+
82
+ // ---------------------------
83
+ // Generator cache
84
+ // ---------------------------
85
+ export async function getCachedGeneration(chunkId, qId) {
86
+ const records = await readJsonl(
87
+ FILES.generations,
88
+ (r) => r.chunk_id === chunkId && r.question_id === qId && r.answer,
89
+ );
90
+ // return the latest match if multiple
91
+ return records.length ? records[records.length - 1] : null;
92
+ }
93
+
94
+ export async function saveGeneration(chunkId, qId, gen, meta = {}) {
95
+ if (!gen) return;
96
+ const gen_id = generationId(chunkId, qId, gen.answer || gen.raw || '');
97
+ const ts = Date.now();
98
+ const record = {
99
+ chunk_id: chunkId,
100
+ question_id: qId,
101
+ gen_id,
102
+ answer: gen.answer || gen.raw || '',
103
+ thought: gen.thought,
104
+ raw: gen.raw,
105
+ provider: meta.provider,
106
+ model: meta.model,
107
+ ts,
108
+ };
109
+ await appendJsonl(FILES.generations, record);
110
+ return record;
111
+ }
112
+
113
+ // ---------------------------
114
+ // Verification cache
115
+ // ---------------------------
116
+ export async function getCachedVerification(chunkId, qId, genId) {
117
+ const records = await readJsonl(
118
+ FILES.verifications,
119
+ (r) =>
120
+ r.chunk_id === chunkId &&
121
+ r.question_id === qId &&
122
+ (!genId || r.gen_id === genId),
123
+ );
124
+ return records.length ? records[records.length - 1] : null;
125
+ }
126
+
127
+ export async function saveVerification(chunkId, qId, genId, ver, meta = {}) {
128
+ if (!ver) return;
129
+ const ts = Date.now();
130
+ const record = {
131
+ chunk_id: chunkId,
132
+ question_id: qId,
133
+ gen_id: genId,
134
+ ok: ver.ok === true,
135
+ raw: ver.raw,
136
+ provider: meta.provider,
137
+ model: meta.model,
138
+ ts,
139
+ };
140
+ await appendJsonl(FILES.verifications, record);
141
+ return record;
142
+ }
143
+
144
+ // ---------------------------
145
+ // Reward cache
146
+ // ---------------------------
147
+ export async function getCachedReward(chunkId, qId, genId) {
148
+ const records = await readJsonl(
149
+ FILES.rewards,
150
+ (r) =>
151
+ r.chunk_id === chunkId &&
152
+ r.question_id === qId &&
153
+ (!genId || r.gen_id === genId),
154
+ );
155
+ return records.length ? records[records.length - 1] : null;
156
+ }
157
+
158
+ export async function saveReward(chunkId, qId, genId, rew, meta = {}) {
159
+ if (!rew) return;
160
+ const ts = Date.now();
161
+ const record = {
162
+ chunk_id: chunkId,
163
+ question_id: qId,
164
+ gen_id: genId,
165
+ score: rew.score,
166
+ ok: rew.ok === true,
167
+ raw: rew.raw,
168
+ provider: meta.provider,
169
+ model: meta.model,
170
+ ts,
171
+ };
172
+ await appendJsonl(FILES.rewards, record);
173
+ return record;
174
+ }
175
+
176
+ // Utility export for hashing reuse
177
+ export { chunkIdFromContent, questionId, generationId, normalizeText };
src/pipeline/ids.mjs ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ // src/pipeline/ids.mjs
2
+ // Helpers for deterministic IDs across pipeline stages.
3
+ import crypto from 'crypto';
4
+
5
+ export function normalizeText(text = '') {
6
+ return String(text).replace(/\s+/g, ' ').trim();
7
+ }
8
+
9
+ export function hashString(str) {
10
+ return crypto.createHash('sha256').update(str).digest('hex');
11
+ }
12
+
13
+ export function chunkIdFromContent(content, sourceId) {
14
+ const base = normalizeText(content);
15
+ return hashString(`${base}|${sourceId ?? ''}`);
16
+ }
17
+
18
+ export function questionId(chunkId, question) {
19
+ return hashString(`${chunkId}|${normalizeText(question)}`);
20
+ }
21
+
22
+ export function generationId(chunkId, questionId, answer) {
23
+ return hashString(`${chunkId}|${questionId}|${normalizeText(answer)}`);
24
+ }
src/pipeline/step.mjs CHANGED
@@ -48,6 +48,7 @@ export async function runPipelineStep({
48
  generatorProvider,
49
  verifierProvider,
50
  rewardProvider,
 
51
  verbose = false,
52
  logger = console,
53
  } = {}) {
@@ -147,23 +148,28 @@ export async function runPipelineStep({
147
  // Generator
148
  // ----------------------------------------
149
  let gen;
150
- try {
151
- if (verbose) log(' [generator] calling model…');
152
- gen = await runGenerator(question, context, genProv);
 
 
 
 
153
 
154
- if (verbose) {
155
- log(' [generator] answer:');
156
- log(' ' + preview(gen?.answer ?? '', 400).replace(/\n/g, '\n '));
 
 
 
 
 
 
 
 
 
 
157
  }
158
- } catch (e) {
159
- const msg = e?.message || String(e);
160
- if (verbose) errLog(' [generator] ERROR:', msg);
161
- return {
162
- status: 'generator_failed',
163
- question,
164
- context,
165
- error: msg,
166
- };
167
  }
168
 
169
  // Empty answer means generator failed
 
48
  generatorProvider,
49
  verifierProvider,
50
  rewardProvider,
51
+ cachedGen,
52
  verbose = false,
53
  logger = console,
54
  } = {}) {
 
148
  // Generator
149
  // ----------------------------------------
150
  let gen;
151
+ if (cachedGen) {
152
+ gen = cachedGen;
153
+ if (verbose) log(' [generator] using cached generation');
154
+ } else {
155
+ try {
156
+ if (verbose) log(' [generator] calling model…');
157
+ gen = await runGenerator(question, context, genProv);
158
 
159
+ if (verbose) {
160
+ log(' [generator] answer:');
161
+ log(' ' + preview(gen?.answer ?? '', 400).replace(/\n/g, '\n '));
162
+ }
163
+ } catch (e) {
164
+ const msg = e?.message || String(e);
165
+ if (verbose) errLog(' [generator] ERROR:', msg);
166
+ return {
167
+ status: 'generator_failed',
168
+ question,
169
+ context,
170
+ error: msg,
171
+ };
172
  }
 
 
 
 
 
 
 
 
 
173
  }
174
 
175
  // Empty answer means generator failed
src/retrieval/jsonl_chunks.mjs CHANGED
@@ -3,6 +3,7 @@ import fs from 'fs/promises';
3
  import path from 'path';
4
  import crypto from 'crypto';
5
  import { PROJECT_ROOT } from '../pipeline/util.mjs';
 
6
 
7
  const DEFAULT_RAG_PATH = path.join(
8
  PROJECT_ROOT,
@@ -52,14 +53,17 @@ async function loadAllChunksFromJsonl(filePath = DEFAULT_RAG_PATH) {
52
  obj.body ||
53
  '';
54
 
55
- const id =
56
  obj.id ||
57
  obj.session_key ||
58
  obj.title ||
59
  `jsonl-${idx}`;
60
 
 
 
61
  return {
62
  id,
 
63
  content,
64
  source: obj,
65
  };
 
3
  import path from 'path';
4
  import crypto from 'crypto';
5
  import { PROJECT_ROOT } from '../pipeline/util.mjs';
6
+ import { chunkIdFromContent } from '../pipeline/cache.mjs';
7
 
8
  const DEFAULT_RAG_PATH = path.join(
9
  PROJECT_ROOT,
 
53
  obj.body ||
54
  '';
55
 
56
+ const sourceId =
57
  obj.id ||
58
  obj.session_key ||
59
  obj.title ||
60
  `jsonl-${idx}`;
61
 
62
+ const id = chunkIdFromContent(content, sourceId);
63
+
64
  return {
65
  id,
66
+ sourceId,
67
  content,
68
  source: obj,
69
  };
src/retrieval/retrieval.mjs CHANGED
@@ -2,6 +2,7 @@
2
  import dotenv from 'dotenv';
3
  import { Client } from '@elastic/elasticsearch';
4
  import fetch from 'node-fetch';
 
5
 
6
  dotenv.config();
7
 
@@ -160,17 +161,24 @@ export async function fetchChunksFromIndex(limit) {
160
 
161
  const hits = res.hits?.hits || [];
162
 
163
- return hits.map((h) => ({
164
- id: h._id,
165
- score: h._score,
166
- // distill-rag usually stores text in `content`, but we try a few keys
167
- content:
168
  h._source?.content ??
169
  h._source?.text ??
170
  h._source?.chunk ??
171
- '',
172
- source: h._source,
173
- }));
 
 
 
 
 
 
 
 
 
 
174
  }
175
 
176
 
 
2
  import dotenv from 'dotenv';
3
  import { Client } from '@elastic/elasticsearch';
4
  import fetch from 'node-fetch';
5
+ import { chunkIdFromContent } from '../pipeline/cache.mjs';
6
 
7
  dotenv.config();
8
 
 
161
 
162
  const hits = res.hits?.hits || [];
163
 
164
+ return hits.map((h) => {
165
+ const content =
 
 
 
166
  h._source?.content ??
167
  h._source?.text ??
168
  h._source?.chunk ??
169
+ '';
170
+
171
+ const sourceId = h._id;
172
+ const id = chunkIdFromContent(content, sourceId);
173
+
174
+ return {
175
+ id,
176
+ sourceId,
177
+ score: h._score,
178
+ content,
179
+ source: h._source,
180
+ };
181
+ });
182
  }
183
 
184
 
tests/cache_pipeline.test.mjs ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
2
+ import path from 'path';
3
+ import os from 'os';
4
+
5
+ describe('pipeline cache + deterministic IDs', () => {
6
+ const tmpCache = () => path.join(os.tmpdir(), `cache-${Date.now()}`);
7
+
8
+ beforeEach(() => {
9
+ vi.resetModules();
10
+ process.env.PIPELINE_CHUNK_SOURCE = 'jsonl';
11
+ process.env.PIPELINE_CACHE_DIR = tmpCache();
12
+ });
13
+
14
+ afterEach(() => {
15
+ delete process.env.PIPELINE_CHUNK_SOURCE;
16
+ delete process.env.PIPELINE_CACHE_DIR;
17
+ vi.restoreAllMocks();
18
+ vi.unmock('../src/retrieval/jsonl_chunks.mjs');
19
+ vi.unmock('../src/pipeline/step.mjs');
20
+ });
21
+
22
+ it('uses cached reward to skip generator/verifier/reward work', async () => {
23
+ const chunk = { content: 'hello world', sourceId: 's1' };
24
+
25
+ // Deterministic IDs and cache helpers
26
+ const {
27
+ chunkIdFromContent,
28
+ questionId,
29
+ saveQuestions,
30
+ saveReward,
31
+ } = await import('../src/pipeline/cache.mjs');
32
+
33
+ const chunkId = chunkIdFromContent(chunk.content, chunk.sourceId);
34
+ const question = 'What is said?';
35
+ const qId = questionId(chunkId, question);
36
+
37
+ // Pre-populate cache with questions and an accepted reward
38
+ await saveQuestions(chunkId, [question], {
39
+ provider: 'mock',
40
+ model: 'mock',
41
+ });
42
+ await saveReward(
43
+ chunkId,
44
+ qId,
45
+ 'gen-1',
46
+ { score: 0.9, ok: true, raw: '0.9' },
47
+ { provider: 'mock', model: 'mock' },
48
+ );
49
+
50
+ // Mock chunk loader and pipeline step to observe bypass
51
+ vi.doMock('../src/retrieval/jsonl_chunks.mjs', () => ({
52
+ loadRagChunks: vi.fn(async () => [{ ...chunk, id: chunkId }]),
53
+ }));
54
+
55
+ const runPipelineStep = vi.fn();
56
+ vi.doMock('../src/pipeline/step.mjs', () => ({ runPipelineStep }));
57
+
58
+ const { runPipelineBatch } = await import('../src/pipeline/batch.mjs');
59
+
60
+ const result = await runPipelineBatch({
61
+ seedMode: 'question-first',
62
+ limit: 5,
63
+ verbose: false,
64
+ logger: { log() {}, error() {} },
65
+ });
66
+
67
+ expect(runPipelineStep).not.toHaveBeenCalled();
68
+ expect(result.processed).toBe(1);
69
+ expect(result.accepted).toBe(1);
70
+ expect(result.statusCounts.cached_reward).toBe(1);
71
+ });
72
+
73
+ it('stable chunk ids despite whitespace differences', async () => {
74
+ const { chunkIdFromContent } = await import('../src/pipeline/ids.mjs');
75
+
76
+ const idA = chunkIdFromContent('Hello world', 'doc1');
77
+ const idB = chunkIdFromContent('Hello world', 'doc1');
78
+ const idC = chunkIdFromContent('Hello world!', 'doc1');
79
+
80
+ expect(idA).toBe(idB);
81
+ expect(idA).not.toBe(idC); // punctuation changes content hash
82
+ });
83
+ });
tests/pipeline.full.mock.test.mjs CHANGED
@@ -14,6 +14,10 @@ describe('full pipeline (mock providers)', () => {
14
  process.env.VERIFIER_PROVIDER = 'mock';
15
  process.env.REWARD_PROVIDER = 'mock';
16
  process.env.QUESTION_PROVIDER = 'mock';
 
 
 
 
17
  });
18
 
19
  afterEach(() => {
@@ -22,6 +26,7 @@ describe('full pipeline (mock providers)', () => {
22
  delete process.env.VERIFIER_PROVIDER;
23
  delete process.env.REWARD_PROVIDER;
24
  delete process.env.QUESTION_PROVIDER;
 
25
  vi.restoreAllMocks();
26
  vi.unmock('../src/retrieval/jsonl_chunks.mjs');
27
  });
 
14
  process.env.VERIFIER_PROVIDER = 'mock';
15
  process.env.REWARD_PROVIDER = 'mock';
16
  process.env.QUESTION_PROVIDER = 'mock';
17
+ process.env.PIPELINE_CACHE_DIR = path.join(
18
+ os.tmpdir(),
19
+ `cache-${Date.now()}`,
20
+ );
21
  });
22
 
23
  afterEach(() => {
 
26
  delete process.env.VERIFIER_PROVIDER;
27
  delete process.env.REWARD_PROVIDER;
28
  delete process.env.QUESTION_PROVIDER;
29
+ delete process.env.PIPELINE_CACHE_DIR;
30
  vi.restoreAllMocks();
31
  vi.unmock('../src/retrieval/jsonl_chunks.mjs');
32
  });
tests/pipeline_behaviour.test.mjs CHANGED
@@ -74,10 +74,15 @@ describe('runPipelineBatch question cap', () => {
74
  beforeEach(() => {
75
  vi.resetModules();
76
  process.env.PIPELINE_CHUNK_SOURCE = 'jsonl';
 
 
 
 
77
  });
78
 
79
  afterEach(() => {
80
  delete process.env.PIPELINE_CHUNK_SOURCE;
 
81
  vi.restoreAllMocks();
82
  vi.unmock('../src/providers/provider.mjs');
83
  vi.unmock('../src/retrieval/jsonl_chunks.mjs');
 
74
  beforeEach(() => {
75
  vi.resetModules();
76
  process.env.PIPELINE_CHUNK_SOURCE = 'jsonl';
77
+ process.env.PIPELINE_CACHE_DIR = path.join(
78
+ os.tmpdir(),
79
+ `cache-${Date.now()}`,
80
+ );
81
  });
82
 
83
  afterEach(() => {
84
  delete process.env.PIPELINE_CHUNK_SOURCE;
85
+ delete process.env.PIPELINE_CACHE_DIR;
86
  vi.restoreAllMocks();
87
  vi.unmock('../src/providers/provider.mjs');
88
  vi.unmock('../src/retrieval/jsonl_chunks.mjs');