distill-rag / indexing /index_distill_chunks.js
htaf's picture
Rename QUO_JSON_DIR → JSON_DIR (generic input dir)
c2b7f46
// indexing/index_distill_chunks.js
//
// Long-chunk Elasticsearch indexer for bootstrapped distillation.
// DRY + centralized config + async/await clean style.
//
// Usage:
// JSON_DIR=./datasets_quo node indexing/index_distill_chunks.js
//
require('dotenv').config();
const { Client } = require('@elastic/elasticsearch');
const fs = require('fs');
const path = require('path');
const fetch = require('node-fetch');
// ------------------------------------------------------------
// CONFIG (all constants live here — DRY, centralized)
// ------------------------------------------------------------
const CONFIG = {
ES_NODE: process.env.ELASTICSEARCH_NODE || 'http://localhost:9200',
ES_INDEX: process.env.ES_DISTILL_INDEX || 'quo_distill_index',
// Embedding API
EMBED_URL: process.env.EMBED_URL || 'http://localhost:11434/api/embeddings',
EMBED_MODEL: process.env.EMBED_MODEL || 'mxbai-embed-large',
// Chunking sizes for distillation-long docs
CHUNK_MAX: Number(process.env.CHUNK_MAX || 9000),
CHUNK_MIN: Number(process.env.CHUNK_MIN || 5000),
// Input directory
QUO_DIR: process.env.JSON_DIR || process.argv[2],
};
// NOTE: no QUO_DIR validation here – tests may require this file
// without setting JSON_DIR.
// ES client
const client = new Client({ node: CONFIG.ES_NODE });
// ------------------------------------------------------------
// Embedding
// ------------------------------------------------------------
async function embed(text) {
const res = await fetch(CONFIG.EMBED_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: CONFIG.EMBED_MODEL,
prompt: text,
}),
});
if (!res.ok) {
throw new Error(`Embedding failed: HTTP ${res.status}`);
}
const data = await res.json();
return data.embedding; // 1024-dim vector
}
// ------------------------------------------------------------
// Long-form paragraph-aware chunker
// ------------------------------------------------------------
function chunkContent(text) {
if (!text || typeof text !== 'string') return [];
const paras = text
.split(/\n\s*\n/g)
.map((p) => p.trim())
.filter(Boolean);
if (!paras.length) return [];
return paras.reduce((chunks, para) => {
if (chunks.length === 0) return [para];
const last = chunks[chunks.length - 1];
const combined = `${last}\n\n${para}`;
const canGrow =
combined.length <= CONFIG.CHUNK_MAX || last.length < CONFIG.CHUNK_MIN;
return canGrow
? [...chunks.slice(0, -1), combined]
: [...chunks, para];
}, []);
}
// ------------------------------------------------------------
// Extract assistant text
// ------------------------------------------------------------
function extractAssistantText(session) {
return (Array.isArray(session?.turns) ? session.turns : [])
.filter((t) => t?.role === 'assistant' && typeof t.content === 'string')
.map((t) => t.content.trim())
.filter(Boolean)
.join('\n\n');
}
// ------------------------------------------------------------
// Progress tracker (non-spammy)
// ------------------------------------------------------------
function makeProgress(totalFiles) {
const start = Date.now();
let processed = 0;
let totalChunks = 0;
return {
tick(addedChunks) {
processed += 1;
totalChunks += addedChunks;
const elapsedSec = (Date.now() - start) / 1000;
const chunksPerSec = elapsedSec > 0 ? totalChunks / elapsedSec : 0;
const remainingFiles = totalFiles - processed;
const secPerFile = processed > 0 ? elapsedSec / processed : 0;
const etaSec = remainingFiles * secPerFile;
const pct = totalFiles > 0
? ((processed / totalFiles) * 100).toFixed(1)
: '100.0';
// Only print occasionally to avoid flooding
if (processed % 10 === 0 || processed === totalFiles) {
process.stdout.write(
`\r[progress] ${processed}/${totalFiles} files | ` +
`${totalChunks} chunks | ` +
`${chunksPerSec.toFixed(1)} chunks/s | ` +
`ETA: ${etaSec.toFixed(1)}s | ${pct}% `
);
}
return { processed, totalChunks };
},
};
}
// ------------------------------------------------------------
// Index one file
// ------------------------------------------------------------
async function indexSessionFile(filePath) {
let session;
try {
session = JSON.parse(fs.readFileSync(filePath, 'utf8'));
} catch (err) {
console.error(`[index] ERROR parse ${filePath}:`, err.message);
return 0;
}
const title = session.title || path.basename(filePath);
const sessionDate = session.session_date || 'unknown';
const content = extractAssistantText(session);
if (!content.trim()) {
// No assistant content; nothing to index
return 0;
}
const chunks = chunkContent(content);
if (!chunks.length) return 0;
const docs = [];
for (let i = 0; i < chunks.length; i++) {
const text = chunks[i];
const vec = await embed(text);
docs.push(
{ index: { _index: CONFIG.ES_INDEX } },
{
content: text,
title,
session_date: sessionDate,
source: path.basename(filePath),
chunk_index: i,
embedding: vec,
}
);
}
const resp = await client.bulk({ body: docs });
if (resp.errors) {
console.error('[index] Bulk indexing reported errors.');
}
return chunks.length;
}
// ------------------------------------------------------------
// Main (CLI entrypoint)
// ------------------------------------------------------------
async function main() {
console.log('[index] Starting distillation indexer with config:');
console.log(CONFIG);
if (!CONFIG.QUO_DIR) {
console.error(
'Usage: JSON_DIR=./datasets_quo node indexing/index_distill_chunks.js'
);
process.exit(1);
}
let files;
try {
files = fs
.readdirSync(CONFIG.QUO_DIR)
.filter((f) => f.endsWith('.json'))
.map((f) => path.join(CONFIG.QUO_DIR, f))
.sort();
} catch (err) {
console.error('[index] ERROR reading directory:', err.message);
process.exit(1);
}
console.log(`[index] Found ${files.length} sessions`);
let total = 0;
const progress = makeProgress(files.length);
for (const filePath of files) {
try {
const added = await indexSessionFile(filePath);
const { totalChunks } = progress.tick(added);
total = totalChunks;
} catch (err) {
console.error(`[index] Error indexing ${filePath}:`, err);
}
}
// Ensure the progress line is ended
process.stdout.write('\n');
console.log(`[index] DONE. Total chunks indexed: ${total}`);
}
// Only run main() when executed as a script, not when required by Jest
if (require.main === module) {
main().catch((err) => {
console.error('[index] FATAL:', err);
process.exit(1);
});
}
// Exports for tests
module.exports = {
CONFIG,
client,
embed,
chunkContent,
extractAssistantText,
indexSessionFile,
main,
};