// src/pipeline/cache.mjs // Lightweight JSONL cache for intermediate pipeline artifacts. import fs from 'fs/promises'; import path from 'path'; import { PROJECT_ROOT } from './util.mjs'; import { chunkIdFromContent, questionId, generationId, normalizeText, } from './ids.mjs'; const CUSTOM_CACHE_DIR = process.env.PIPELINE_CACHE_DIR; const CACHE_DIR = CUSTOM_CACHE_DIR ? (path.isAbsolute(CUSTOM_CACHE_DIR) ? CUSTOM_CACHE_DIR : path.join(PROJECT_ROOT, CUSTOM_CACHE_DIR)) : path.join(PROJECT_ROOT, 'data', 'cache'); const FILES = { questions: 'questions.jsonl', generations: 'generations.jsonl', verifications: 'verifications.jsonl', rewards: 'rewards.jsonl', }; async function ensureDir() { await fs.mkdir(CACHE_DIR, { recursive: true }); } async function appendJsonl(fileName, record) { await ensureDir(); const line = JSON.stringify(record) + '\n'; await fs.appendFile(path.join(CACHE_DIR, fileName), line, 'utf8'); } async function readJsonl(fileName, predicate) { const filePath = path.join(CACHE_DIR, fileName); try { const txt = await fs.readFile(filePath, 'utf8'); const lines = txt .split('\n') .map((l) => l.trim()) .filter(Boolean); const parsed = lines.map((l) => { try { return JSON.parse(l); } catch { return null; } }).filter(Boolean); return predicate ? parsed.filter(predicate) : parsed; } catch (e) { if (e.code === 'ENOENT') return []; throw e; } } // --------------------------- // Question cache // --------------------------- export async function getCachedQuestions(chunkId) { return readJsonl( FILES.questions, (r) => r.chunk_id === chunkId && Array.isArray(r.questions) && r.questions.length > 0, ); } export async function saveQuestions(chunkId, questions, meta = {}) { const ts = Date.now(); const record = { chunk_id: chunkId, questions, question_ids: questions.map((q) => questionId(chunkId, q)), provider: meta.provider, model: meta.model, ts, }; await appendJsonl(FILES.questions, record); } // --------------------------- // Generator cache // --------------------------- export async function getCachedGeneration(chunkId, qId) { const records = await readJsonl( FILES.generations, (r) => r.chunk_id === chunkId && r.question_id === qId && r.answer, ); // return the latest match if multiple return records.length ? records[records.length - 1] : null; } export async function saveGeneration(chunkId, qId, gen, meta = {}) { if (!gen) return; const gen_id = generationId(chunkId, qId, gen.answer || gen.raw || ''); const ts = Date.now(); const record = { chunk_id: chunkId, question_id: qId, gen_id, answer: gen.answer || gen.raw || '', thought: gen.thought, raw: gen.raw, provider: meta.provider, model: meta.model, ts, }; await appendJsonl(FILES.generations, record); return record; } // --------------------------- // Verification cache // --------------------------- export async function getCachedVerification(chunkId, qId, genId) { const records = await readJsonl( FILES.verifications, (r) => r.chunk_id === chunkId && r.question_id === qId && (!genId || r.gen_id === genId), ); return records.length ? records[records.length - 1] : null; } export async function saveVerification(chunkId, qId, genId, ver, meta = {}) { if (!ver) return; const ts = Date.now(); const record = { chunk_id: chunkId, question_id: qId, gen_id: genId, ok: ver.ok === true, score: ver.score, raw: ver.raw, provider: meta.provider, model: meta.model, ts, }; await appendJsonl(FILES.verifications, record); return record; } // --------------------------- // Reward cache // --------------------------- export async function getCachedReward(chunkId, qId, genId) { const records = await readJsonl( FILES.rewards, (r) => r.chunk_id === chunkId && r.question_id === qId && (!genId || r.gen_id === genId), ); return records.length ? records[records.length - 1] : null; } export async function saveReward(chunkId, qId, genId, rew, meta = {}) { if (!rew) return; const ts = Date.now(); const record = { chunk_id: chunkId, question_id: qId, gen_id: genId, score: rew.score, ok: rew.ok === true, raw: rew.raw, provider: meta.provider, model: meta.model, ts, }; await appendJsonl(FILES.rewards, record); return record; } // Utility export for hashing reuse export { chunkIdFromContent, questionId, generationId, normalizeText };