File size: 4,640 Bytes
0918f71 5e7271c 0918f71 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | // src/pipeline/cache.mjs
// Lightweight JSONL cache for intermediate pipeline artifacts.
import fs from 'fs/promises';
import path from 'path';
import { PROJECT_ROOT } from './util.mjs';
import {
chunkIdFromContent,
questionId,
generationId,
normalizeText,
} from './ids.mjs';
const CUSTOM_CACHE_DIR = process.env.PIPELINE_CACHE_DIR;
const CACHE_DIR = CUSTOM_CACHE_DIR
? (path.isAbsolute(CUSTOM_CACHE_DIR)
? CUSTOM_CACHE_DIR
: path.join(PROJECT_ROOT, CUSTOM_CACHE_DIR))
: path.join(PROJECT_ROOT, 'data', 'cache');
const FILES = {
questions: 'questions.jsonl',
generations: 'generations.jsonl',
verifications: 'verifications.jsonl',
rewards: 'rewards.jsonl',
};
async function ensureDir() {
await fs.mkdir(CACHE_DIR, { recursive: true });
}
async function appendJsonl(fileName, record) {
await ensureDir();
const line = JSON.stringify(record) + '\n';
await fs.appendFile(path.join(CACHE_DIR, fileName), line, 'utf8');
}
async function readJsonl(fileName, predicate) {
const filePath = path.join(CACHE_DIR, fileName);
try {
const txt = await fs.readFile(filePath, 'utf8');
const lines = txt
.split('\n')
.map((l) => l.trim())
.filter(Boolean);
const parsed = lines.map((l) => {
try {
return JSON.parse(l);
} catch {
return null;
}
}).filter(Boolean);
return predicate ? parsed.filter(predicate) : parsed;
} catch (e) {
if (e.code === 'ENOENT') return [];
throw e;
}
}
// ---------------------------
// Question cache
// ---------------------------
export async function getCachedQuestions(chunkId) {
return readJsonl(
FILES.questions,
(r) => r.chunk_id === chunkId && Array.isArray(r.questions) && r.questions.length > 0,
);
}
export async function saveQuestions(chunkId, questions, meta = {}) {
const ts = Date.now();
const record = {
chunk_id: chunkId,
questions,
question_ids: questions.map((q) => questionId(chunkId, q)),
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.questions, record);
}
// ---------------------------
// Generator cache
// ---------------------------
export async function getCachedGeneration(chunkId, qId) {
const records = await readJsonl(
FILES.generations,
(r) => r.chunk_id === chunkId && r.question_id === qId && r.answer,
);
// return the latest match if multiple
return records.length ? records[records.length - 1] : null;
}
export async function saveGeneration(chunkId, qId, gen, meta = {}) {
if (!gen) return;
const gen_id = generationId(chunkId, qId, gen.answer || gen.raw || '');
const ts = Date.now();
const record = {
chunk_id: chunkId,
question_id: qId,
gen_id,
answer: gen.answer || gen.raw || '',
thought: gen.thought,
raw: gen.raw,
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.generations, record);
return record;
}
// ---------------------------
// Verification cache
// ---------------------------
export async function getCachedVerification(chunkId, qId, genId) {
const records = await readJsonl(
FILES.verifications,
(r) =>
r.chunk_id === chunkId &&
r.question_id === qId &&
(!genId || r.gen_id === genId),
);
return records.length ? records[records.length - 1] : null;
}
export async function saveVerification(chunkId, qId, genId, ver, meta = {}) {
if (!ver) return;
const ts = Date.now();
const record = {
chunk_id: chunkId,
question_id: qId,
gen_id: genId,
ok: ver.ok === true,
score: ver.score,
raw: ver.raw,
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.verifications, record);
return record;
}
// ---------------------------
// Reward cache
// ---------------------------
export async function getCachedReward(chunkId, qId, genId) {
const records = await readJsonl(
FILES.rewards,
(r) =>
r.chunk_id === chunkId &&
r.question_id === qId &&
(!genId || r.gen_id === genId),
);
return records.length ? records[records.length - 1] : null;
}
export async function saveReward(chunkId, qId, genId, rew, meta = {}) {
if (!rew) return;
const ts = Date.now();
const record = {
chunk_id: chunkId,
question_id: qId,
gen_id: genId,
score: rew.score,
ok: rew.ok === true,
raw: rew.raw,
provider: meta.provider,
model: meta.model,
ts,
};
await appendJsonl(FILES.rewards, record);
return record;
}
// Utility export for hashing reuse
export { chunkIdFromContent, questionId, generationId, normalizeText };
|