htaf's picture
updated to fit verifier
5e7271c
// src/pipeline/cache.mjs
// Lightweight JSONL cache for intermediate pipeline artifacts.
import fs from 'fs/promises';
import path from 'path';
import { PROJECT_ROOT } from './util.mjs';
import {
chunkIdFromContent,
questionId,
generationId,
normalizeText,
} from './ids.mjs';
const CUSTOM_CACHE_DIR = process.env.PIPELINE_CACHE_DIR;
const CACHE_DIR = CUSTOM_CACHE_DIR
? (path.isAbsolute(CUSTOM_CACHE_DIR)
? CUSTOM_CACHE_DIR
: path.join(PROJECT_ROOT, CUSTOM_CACHE_DIR))
: path.join(PROJECT_ROOT, 'data', 'cache');
const FILES = {
questions: 'questions.jsonl',
generations: 'generations.jsonl',
verifications: 'verifications.jsonl',
rewards: 'rewards.jsonl',
};
async function ensureDir() {
await fs.mkdir(CACHE_DIR, { recursive: true });
}
async function appendJsonl(fileName, record) {
await ensureDir();
const line = JSON.stringify(record) + '\n';
await fs.appendFile(path.join(CACHE_DIR, fileName), line, 'utf8');
}
async function readJsonl(fileName, predicate) {
const filePath = path.join(CACHE_DIR, fileName);
try {
const txt = await fs.readFile(filePath, 'utf8');
const lines = txt
.split('\n')
.map((l) => l.trim())
.filter(Boolean);
const parsed = lines.map((l) => {
try {
return JSON.parse(l);
} catch {
return null;
}
}).filter(Boolean);
return predicate ? parsed.filter(predicate) : parsed;
} catch (e) {
if (e.code === 'ENOENT') return [];
throw e;
}
}
// ---------------------------
// Question cache
// ---------------------------
export async function getCachedQuestions(chunkId) {
return readJsonl(
FILES.questions,
(r) => r.chunk_id === chunkId && Array.isArray(r.questions) && r.questions.length > 0,
);
}
export async function saveQuestions(chunkId, questions, meta = {}) {
const ts = Date.now();
const record = {
chunk_id: chunkId,
questions,
question_ids: questions.map((q) => questionId(chunkId, q)),
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.questions, record);
}
// ---------------------------
// Generator cache
// ---------------------------
export async function getCachedGeneration(chunkId, qId) {
const records = await readJsonl(
FILES.generations,
(r) => r.chunk_id === chunkId && r.question_id === qId && r.answer,
);
// return the latest match if multiple
return records.length ? records[records.length - 1] : null;
}
export async function saveGeneration(chunkId, qId, gen, meta = {}) {
if (!gen) return;
const gen_id = generationId(chunkId, qId, gen.answer || gen.raw || '');
const ts = Date.now();
const record = {
chunk_id: chunkId,
question_id: qId,
gen_id,
answer: gen.answer || gen.raw || '',
thought: gen.thought,
raw: gen.raw,
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.generations, record);
return record;
}
// ---------------------------
// Verification cache
// ---------------------------
export async function getCachedVerification(chunkId, qId, genId) {
const records = await readJsonl(
FILES.verifications,
(r) =>
r.chunk_id === chunkId &&
r.question_id === qId &&
(!genId || r.gen_id === genId),
);
return records.length ? records[records.length - 1] : null;
}
export async function saveVerification(chunkId, qId, genId, ver, meta = {}) {
if (!ver) return;
const ts = Date.now();
const record = {
chunk_id: chunkId,
question_id: qId,
gen_id: genId,
ok: ver.ok === true,
score: ver.score,
raw: ver.raw,
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.verifications, record);
return record;
}
// ---------------------------
// Reward cache
// ---------------------------
export async function getCachedReward(chunkId, qId, genId) {
const records = await readJsonl(
FILES.rewards,
(r) =>
r.chunk_id === chunkId &&
r.question_id === qId &&
(!genId || r.gen_id === genId),
);
return records.length ? records[records.length - 1] : null;
}
export async function saveReward(chunkId, qId, genId, rew, meta = {}) {
if (!rew) return;
const ts = Date.now();
const record = {
chunk_id: chunkId,
question_id: qId,
gen_id: genId,
score: rew.score,
ok: rew.ok === true,
raw: rew.raw,
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.rewards, record);
return record;
}
// Utility export for hashing reuse
export { chunkIdFromContent, questionId, generationId, normalizeText };