distill-pipeline / src /retrieval /jsonl_chunks.mjs
htaf's picture
added intermediate storage of chunks
0918f71
// src/retrieval/jsonl_chunks.mjs
import fs from 'fs/promises';
import path from 'path';
import crypto from 'crypto';
import { PROJECT_ROOT } from '../pipeline/util.mjs';
import { chunkIdFromContent } from '../pipeline/cache.mjs';
const DEFAULT_RAG_PATH = path.join(
PROJECT_ROOT,
'data',
'rag_chunks.jsonl',
);
// simple in-memory cache of parsed chunks
let cachedChunks = null;
/**
* Parse rag_chunks.jsonl into an array of
* { id, content, source } records.
*
* We are deliberately tolerant about field names so this works
* with different builders:
* - content: obj.content || obj.text || obj.chunk || ''
* - id: obj.id || obj.session_key || obj.title || `jsonl-${idx}`
* - source: whole original object
*/
async function loadAllChunksFromJsonl(filePath = DEFAULT_RAG_PATH) {
if (cachedChunks) return cachedChunks;
const absPath = path.isAbsolute(filePath)
? filePath
: path.join(PROJECT_ROOT, filePath);
const raw = await fs.readFile(absPath, 'utf8');
const lines = raw
.split('\n')
.map((l) => l.trim())
.filter(Boolean);
const chunks = lines.map((line, idx) => {
let obj;
try {
obj = JSON.parse(line);
} catch (e) {
// Skip bad lines instead of exploding
return null;
}
const content =
obj.content ||
obj.text ||
obj.chunk ||
obj.body ||
'';
const sourceId =
obj.id ||
obj.session_key ||
obj.title ||
`jsonl-${idx}`;
const id = chunkIdFromContent(content, sourceId);
return {
id,
sourceId,
content,
source: obj,
};
});
cachedChunks = chunks.filter(Boolean);
return cachedChunks;
}
/**
* Hardware-random sampling without replacement using crypto.randomInt.
* `k >= n` ⇒ returns full array.
*/
function sampleWithoutReplacement(arr, k) {
const n = arr.length;
if (k == null || k >= n) return arr.slice();
const chosen = new Set();
const out = [];
while (out.length < k && chosen.size < n) {
const idx = crypto.randomInt(0, n);
if (chosen.has(idx)) continue;
chosen.add(idx);
out.push(arr[idx]);
}
return out;
}
/**
* Public API: load RAG chunks for pipeline seeding.
*
* @param {number|undefined} limit Max chunks to return
* @param {string|undefined} filePath Override path (defaults to env or data/rag_chunks.jsonl)
* @returns {Promise<Array<{id, content, source}>>}
*/
export async function loadRagChunks(limit, filePath) {
const envPath = process.env.RAG_CHUNKS_PATH;
const chunks = await loadAllChunksFromJsonl(filePath || envPath || DEFAULT_RAG_PATH);
if (!chunks || chunks.length === 0) return [];
return sampleWithoutReplacement(chunks, limit ?? chunks.length);
}