File size: 4,640 Bytes
0918f71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e7271c
0918f71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// src/pipeline/cache.mjs
// Lightweight JSONL cache for intermediate pipeline artifacts.
import fs from 'fs/promises';
import path from 'path';
import { PROJECT_ROOT } from './util.mjs';
import {
  chunkIdFromContent,
  questionId,
  generationId,
  normalizeText,
} from './ids.mjs';

const CUSTOM_CACHE_DIR = process.env.PIPELINE_CACHE_DIR;
const CACHE_DIR = CUSTOM_CACHE_DIR
  ? (path.isAbsolute(CUSTOM_CACHE_DIR)
      ? CUSTOM_CACHE_DIR
      : path.join(PROJECT_ROOT, CUSTOM_CACHE_DIR))
  : path.join(PROJECT_ROOT, 'data', 'cache');

const FILES = {
  questions: 'questions.jsonl',
  generations: 'generations.jsonl',
  verifications: 'verifications.jsonl',
  rewards: 'rewards.jsonl',
};

async function ensureDir() {
  await fs.mkdir(CACHE_DIR, { recursive: true });
}

async function appendJsonl(fileName, record) {
  await ensureDir();
  const line = JSON.stringify(record) + '\n';
  await fs.appendFile(path.join(CACHE_DIR, fileName), line, 'utf8');
}

async function readJsonl(fileName, predicate) {
  const filePath = path.join(CACHE_DIR, fileName);
  try {
    const txt = await fs.readFile(filePath, 'utf8');
    const lines = txt
      .split('\n')
      .map((l) => l.trim())
      .filter(Boolean);
    const parsed = lines.map((l) => {
      try {
        return JSON.parse(l);
      } catch {
        return null;
      }
    }).filter(Boolean);
    return predicate ? parsed.filter(predicate) : parsed;
  } catch (e) {
    if (e.code === 'ENOENT') return [];
    throw e;
  }
}

// ---------------------------
// Question cache
// ---------------------------
export async function getCachedQuestions(chunkId) {
  return readJsonl(
    FILES.questions,
    (r) => r.chunk_id === chunkId && Array.isArray(r.questions) && r.questions.length > 0,
  );
}

export async function saveQuestions(chunkId, questions, meta = {}) {
  const ts = Date.now();
  const record = {
    chunk_id: chunkId,
    questions,
    question_ids: questions.map((q) => questionId(chunkId, q)),
    provider: meta.provider,
    model: meta.model,
    ts,
  };
  await appendJsonl(FILES.questions, record);
}

// ---------------------------
// Generator cache
// ---------------------------
export async function getCachedGeneration(chunkId, qId) {
  const records = await readJsonl(
    FILES.generations,
    (r) => r.chunk_id === chunkId && r.question_id === qId && r.answer,
  );
  // return the latest match if multiple
  return records.length ? records[records.length - 1] : null;
}

export async function saveGeneration(chunkId, qId, gen, meta = {}) {
  if (!gen) return;
  const gen_id = generationId(chunkId, qId, gen.answer || gen.raw || '');
  const ts = Date.now();
  const record = {
    chunk_id: chunkId,
    question_id: qId,
    gen_id,
    answer: gen.answer || gen.raw || '',
    thought: gen.thought,
    raw: gen.raw,
    provider: meta.provider,
    model: meta.model,
    ts,
  };
  await appendJsonl(FILES.generations, record);
  return record;
}

// ---------------------------
// Verification cache
// ---------------------------
export async function getCachedVerification(chunkId, qId, genId) {
  const records = await readJsonl(
    FILES.verifications,
    (r) =>
      r.chunk_id === chunkId &&
      r.question_id === qId &&
      (!genId || r.gen_id === genId),
  );
  return records.length ? records[records.length - 1] : null;
}

export async function saveVerification(chunkId, qId, genId, ver, meta = {}) {
  if (!ver) return;
  const ts = Date.now();
  const record = {
    chunk_id: chunkId,
    question_id: qId,
    gen_id: genId,
    ok: ver.ok === true,
    score: ver.score,
    raw: ver.raw,
    provider: meta.provider,
    model: meta.model,
    ts,
  };
  await appendJsonl(FILES.verifications, record);
  return record;
}

// ---------------------------
// Reward cache
// ---------------------------
export async function getCachedReward(chunkId, qId, genId) {
  const records = await readJsonl(
    FILES.rewards,
    (r) =>
      r.chunk_id === chunkId &&
      r.question_id === qId &&
      (!genId || r.gen_id === genId),
  );
  return records.length ? records[records.length - 1] : null;
}

export async function saveReward(chunkId, qId, genId, rew, meta = {}) {
  if (!rew) return;
  const ts = Date.now();
  const record = {
    chunk_id: chunkId,
    question_id: qId,
    gen_id: genId,
    score: rew.score,
    ok: rew.ok === true,
    raw: rew.raw,
    provider: meta.provider,
    model: meta.model,
    ts,
  };
  await appendJsonl(FILES.rewards, record);
  return record;
}

// Utility export for hashing reuse
export { chunkIdFromContent, questionId, generationId, normalizeText };