htaf commited on
Commit
c3e2166
ยท
1 Parent(s): 365aa28

added verbosity to pipeline

Browse files
Files changed (2) hide show
  1. src/pipeline/pipeline.mjs +180 -55
  2. src/pipeline/pipeline_cli.js +103 -90
src/pipeline/pipeline.mjs CHANGED
@@ -11,21 +11,29 @@ import { runReward } from '../reward/reward_core.mjs';
11
 
12
  const __filename = fileURLToPath(import.meta.url);
13
  const __dirname = path.dirname(__filename);
14
-
15
  const PROJECT_ROOT = path.join(__dirname, '..', '..');
16
 
17
  const DEFAULT_SEEDS_PATH = path.join(
18
  PROJECT_ROOT,
19
  'test_samples',
20
- 'seed_questions.jsonl'
21
  );
22
 
23
  const DEFAULT_OUT_PATH = path.join(
24
  PROJECT_ROOT,
25
  'gold',
26
- 'pipeline_gold.jsonl'
27
  );
28
 
 
 
 
 
 
 
 
 
 
29
  /**
30
  * Load JSONL seed questions.
31
  * Each line may be:
@@ -59,72 +67,164 @@ export function seedToQuestion(seed) {
59
  *
60
  * Returns a structured result:
61
  * {
62
- * status: 'accepted' | 'invalid_question'
63
- * | 'generator_failed'
64
- * | 'verifier_rejected'
65
- * | 'reward_rejected',
66
  * question,
67
  * context,
68
  * gen,
69
  * ver,
70
  * rew,
 
71
  * }
72
  */
73
  export async function runPipelineStep({
74
  question,
75
  retrievalMode = process.env.RETRIEVAL_MODE || 'hybrid',
76
  k = Number(process.env.RETRIEVAL_K || '6'),
77
- providers
 
 
 
 
78
  } = {}) {
 
 
 
79
  if (!question || !question.trim()) {
 
80
  return { status: 'invalid_question', question };
81
  }
82
 
83
- // Let provider.mjs decide env mapping; we just say which stage.
84
- const generatorProvider =
85
- providers?.generator || loadProviderFor('generator');
86
- const verifierProvider =
87
- providers?.verifier || loadProviderFor('verifier');
88
- const rewardProvider =
89
- providers?.reward || loadProviderFor('reward');
90
 
91
  // --- Retrieval ---
92
  let context = [];
93
- if (retrievalMode === 'hybrid' || !retrievalMode) {
94
- context = await hybridSearch(question, k);
95
- } else {
96
- // Additional modes could be added later
97
  context = await hybridSearch(question, k);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
98
  }
99
 
100
  // --- Generator ---
101
- const gen = await runGenerator(question, context, generatorProvider);
102
-
103
- // If generator decisively failed or emitted obviously bad JSON,
104
- // treat as failure.
105
- if (!gen || gen.parsed?.error === 'invalid_json') {
106
- return { status: 'generator_failed', question, context, gen };
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  }
108
 
109
  // --- Verifier ---
110
- const ver = await runVerifier(gen, verifierProvider);
111
- if (!ver || ver.ok === false) {
112
- return { status: 'verifier_rejected', question, context, gen, ver };
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
113
  }
114
 
115
  // --- Reward ---
116
- const rew = await runReward(gen, rewardProvider);
117
- if (!rew || rew.ok === false) {
118
- return { status: 'reward_rejected', question, context, gen, ver, rew };
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
  }
120
 
 
 
121
  return {
122
  status: 'accepted',
123
  question,
124
  context,
125
  gen,
126
  ver,
127
- rew
128
  };
129
  }
130
 
@@ -145,45 +245,69 @@ export async function appendGoldRecord(outPath, record) {
145
  * - seedsPath: JSONL of seeds (defaults to test_samples/seed_questions.jsonl)
146
  * - outPath: output JSONL (defaults to gold/pipeline_gold.jsonl)
147
  * - limit: max number of seeds to process
 
 
148
  *
149
  * Returns:
150
- * { total, processed, accepted, outPath }
151
  */
152
  export async function runPipelineBatch({
153
  seedsPath = DEFAULT_SEEDS_PATH,
154
  outPath = DEFAULT_OUT_PATH,
155
- limit
 
 
156
  } = {}) {
 
 
 
157
  const seeds = await loadSeedQuestions(seedsPath);
158
  const max = typeof limit === 'number' ? limit : seeds.length;
159
 
160
- // Shared provider instances for the whole batch
161
- const providers = {
162
- generator: loadProviderFor('generator'),
163
- verifier: loadProviderFor('verifier'),
164
- reward: loadProviderFor('reward')
165
- };
166
-
167
  let processed = 0;
168
  let accepted = 0;
 
169
 
170
- for (const seed of seeds.slice(0, max)) {
 
171
  const question = seedToQuestion(seed);
172
- const result = await runPipelineStep({ question, providers });
173
 
174
- processed += 1;
175
 
176
- if (result.status === 'accepted') {
177
- const record = {
178
  question,
179
- context: result.context,
180
- sample: result.gen, // treat generator result as the core sample
181
- verifier: result.ver,
182
- reward: result.rew
183
- };
184
-
185
- await appendGoldRecord(outPath, record);
186
- accepted += 1;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
187
  }
188
  }
189
 
@@ -191,6 +315,7 @@ export async function runPipelineBatch({
191
  total: seeds.length,
192
  processed,
193
  accepted,
194
- outPath
 
195
  };
196
  }
 
11
 
12
  const __filename = fileURLToPath(import.meta.url);
13
  const __dirname = path.dirname(__filename);
 
14
  const PROJECT_ROOT = path.join(__dirname, '..', '..');
15
 
16
  const DEFAULT_SEEDS_PATH = path.join(
17
  PROJECT_ROOT,
18
  'test_samples',
19
+ 'seed_questions.jsonl',
20
  );
21
 
22
  const DEFAULT_OUT_PATH = path.join(
23
  PROJECT_ROOT,
24
  'gold',
25
+ 'pipeline_gold.jsonl',
26
  );
27
 
28
+ function preview(value, max = 400) {
29
+ if (value == null) return '';
30
+ let str = typeof value === 'string' ? value : JSON.stringify(value, null, 2);
31
+ if (str.length > max) {
32
+ return str.slice(0, max) + `โ€ฆ [truncated ${str.length - max} chars]`;
33
+ }
34
+ return str;
35
+ }
36
+
37
  /**
38
  * Load JSONL seed questions.
39
  * Each line may be:
 
67
  *
68
  * Returns a structured result:
69
  * {
70
+ * status: 'accepted' | 'invalid_question' | 'retrieval_failed'
71
+ * | 'generator_failed' | 'verifier_rejected'
72
+ * | 'reward_rejected' | 'verifier_error' | 'reward_error',
 
73
  * question,
74
  * context,
75
  * gen,
76
  * ver,
77
  * rew,
78
+ * error? // optional message
79
  * }
80
  */
81
  export async function runPipelineStep({
82
  question,
83
  retrievalMode = process.env.RETRIEVAL_MODE || 'hybrid',
84
  k = Number(process.env.RETRIEVAL_K || '6'),
85
+ generatorProvider,
86
+ verifierProvider,
87
+ rewardProvider,
88
+ verbose = false,
89
+ logger = console,
90
  } = {}) {
91
+ const log = logger?.log?.bind(logger) || console.log;
92
+ const errLog = logger?.error?.bind(logger) || console.error;
93
+
94
  if (!question || !question.trim()) {
95
+ if (verbose) log(' [pipeline] empty / invalid question, skipping');
96
  return { status: 'invalid_question', question };
97
  }
98
 
99
+ const genProv = generatorProvider || loadProviderFor('generator');
100
+ const verProv = verifierProvider || loadProviderFor('verifier');
101
+ const rewProv = rewardProvider || loadProviderFor('reward');
 
 
 
 
102
 
103
  // --- Retrieval ---
104
  let context = [];
105
+ try {
106
+ if (verbose) log(` [retrieval] mode=${retrievalMode} k=${k}`);
 
 
107
  context = await hybridSearch(question, k);
108
+ if (verbose) {
109
+ log(` [retrieval] got ${context.length} chunks`);
110
+ if (context.length > 0) {
111
+ const first = context[0]?.content ?? '';
112
+ log(' [retrieval] first chunk:');
113
+ log(' ' + preview(first, 200).replace(/\n/g, '\n '));
114
+ }
115
+ }
116
+ } catch (e) {
117
+ const msg = e?.message || String(e);
118
+ if (verbose) errLog(' [retrieval] ERROR:', msg);
119
+ return {
120
+ status: 'retrieval_failed',
121
+ question,
122
+ error: msg,
123
+ };
124
  }
125
 
126
  // --- Generator ---
127
+ let gen;
128
+ try {
129
+ if (verbose) log(' [generator] calling modelโ€ฆ');
130
+ // NOTE: runGenerator(query, contextChunks, provider)
131
+ gen = await runGenerator(question, context, genProv);
132
+ if (verbose) {
133
+ log(' [generator] raw:');
134
+ log(' ' + preview(gen.raw ?? '', 400).replace(/\n/g, '\n '));
135
+ log(' [generator] parsed:');
136
+ log(' ' + preview(gen.parsed, 400).replace(/\n/g, '\n '));
137
+ }
138
+ } catch (e) {
139
+ const msg = e?.message || String(e);
140
+ if (verbose) errLog(' [generator] ERROR:', msg);
141
+ return {
142
+ status: 'generator_failed',
143
+ question,
144
+ context,
145
+ error: msg,
146
+ };
147
  }
148
 
149
  // --- Verifier ---
150
+ let ver;
151
+ try {
152
+ if (verbose) log(' [verifier] calling modelโ€ฆ');
153
+ // NOTE: runVerifier(sample, provider)
154
+ ver = await runVerifier(gen, verProv);
155
+ if (verbose) {
156
+ log(' [verifier] parsed:');
157
+ log(' ' + preview(ver.parsed, 400).replace(/\n/g, '\n '));
158
+ log(` [verifier] ok=${ver.ok === true}`);
159
+ }
160
+ } catch (e) {
161
+ const msg = e?.message || String(e);
162
+ if (verbose) errLog(' [verifier] ERROR:', msg);
163
+ return {
164
+ status: 'verifier_error',
165
+ question,
166
+ context,
167
+ gen,
168
+ error: msg,
169
+ };
170
+ }
171
+
172
+ if (!ver || ver.ok !== true) {
173
+ if (verbose) log(' [verifier] rejected sample');
174
+ return {
175
+ status: 'verifier_rejected',
176
+ question,
177
+ context,
178
+ gen,
179
+ ver,
180
+ };
181
  }
182
 
183
  // --- Reward ---
184
+ let rew;
185
+ try {
186
+ if (verbose) log(' [reward] calling modelโ€ฆ');
187
+ // NOTE: runReward(sample, provider)
188
+ rew = await runReward(gen, rewProv);
189
+ if (verbose) {
190
+ log(' [reward] parsed:');
191
+ log(' ' + preview(rew.parsed, 400).replace(/\n/g, '\n '));
192
+ log(` [reward] score=${rew.score} ok=${rew.ok}`);
193
+ }
194
+ } catch (e) {
195
+ const msg = e?.message || String(e);
196
+ if (verbose) errLog(' [reward] ERROR:', msg);
197
+ return {
198
+ status: 'reward_error',
199
+ question,
200
+ context,
201
+ gen,
202
+ ver,
203
+ error: msg,
204
+ };
205
+ }
206
+
207
+ if (!rew || rew.ok !== true) {
208
+ if (verbose) log(' [reward] rejected sample');
209
+ return {
210
+ status: 'reward_rejected',
211
+ question,
212
+ context,
213
+ gen,
214
+ ver,
215
+ rew,
216
+ };
217
  }
218
 
219
+ if (verbose) log(' [pipeline] accepted โœ…');
220
+
221
  return {
222
  status: 'accepted',
223
  question,
224
  context,
225
  gen,
226
  ver,
227
+ rew,
228
  };
229
  }
230
 
 
245
  * - seedsPath: JSONL of seeds (defaults to test_samples/seed_questions.jsonl)
246
  * - outPath: output JSONL (defaults to gold/pipeline_gold.jsonl)
247
  * - limit: max number of seeds to process
248
+ * - verbose: extra per-stage logging
249
+ * - logger: optional logger (defaults to console)
250
  *
251
  * Returns:
252
+ * { total, processed, accepted, outPath, statusCounts }
253
  */
254
  export async function runPipelineBatch({
255
  seedsPath = DEFAULT_SEEDS_PATH,
256
  outPath = DEFAULT_OUT_PATH,
257
+ limit,
258
+ verbose = false,
259
+ logger = console,
260
  } = {}) {
261
+ const log = logger?.log?.bind(logger) || console.log;
262
+ const errLog = logger?.error?.bind(logger) || console.error;
263
+
264
  const seeds = await loadSeedQuestions(seedsPath);
265
  const max = typeof limit === 'number' ? limit : seeds.length;
266
 
 
 
 
 
 
 
 
267
  let processed = 0;
268
  let accepted = 0;
269
+ const statusCounts = {};
270
 
271
+ for (let idx = 0; idx < max; idx++) {
272
+ const seed = seeds[idx];
273
  const question = seedToQuestion(seed);
274
+ const label = `[${idx + 1}/${max}]`;
275
 
276
+ log(`โ†’ ${label} Running pipeline for: "${question}"`);
277
 
278
+ try {
279
+ const result = await runPipelineStep({
280
  question,
281
+ verbose,
282
+ logger,
283
+ });
284
+
285
+ processed += 1;
286
+ statusCounts[result.status] =
287
+ (statusCounts[result.status] || 0) + 1;
288
+
289
+ if (verbose) {
290
+ log(` โ†ณ status: ${result.status}`);
291
+ }
292
+
293
+ if (result.status === 'accepted') {
294
+ const record = {
295
+ question,
296
+ context: result.context,
297
+ sample: result.gen, // generator output
298
+ verifier: result.ver,
299
+ reward: result.rew,
300
+ };
301
+
302
+ await appendGoldRecord(outPath, record);
303
+ accepted += 1;
304
+ }
305
+ } catch (e) {
306
+ const msg = e?.message || String(e);
307
+ processed += 1;
308
+ statusCounts.pipeline_error =
309
+ (statusCounts.pipeline_error || 0) + 1;
310
+ errLog(' [pipeline] ERROR:', msg);
311
  }
312
  }
313
 
 
315
  total: seeds.length,
316
  processed,
317
  accepted,
318
+ outPath,
319
+ statusCounts,
320
  };
321
  }
src/pipeline/pipeline_cli.js CHANGED
@@ -1,117 +1,130 @@
1
  #!/usr/bin/env node
2
  // src/pipeline/pipeline_cli.js
3
-
4
- import { fileURLToPath } from 'url';
5
  import path from 'path';
6
- import fs from 'fs/promises';
7
-
8
- import { loadProvider } from '../providers/provider.mjs';
9
- import {
10
- loadSeedQuestions,
11
- seedToQuestion,
12
- runPipelineStep,
13
- appendGoldRecord,
14
- runPipelineBatch
15
- } from './pipeline.mjs';
16
 
17
- //
18
- // ---- CLI Helpers ----
19
- //
20
  const __filename = fileURLToPath(import.meta.url);
21
  const __dirname = path.dirname(__filename);
22
-
23
- function parseArgs() {
24
- const args = process.argv.slice(2);
25
- const out = {};
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
  for (let i = 0; i < args.length; i++) {
28
  const a = args[i];
29
-
30
- if (a === '--seeds' || a === '-s') {
31
- out.seeds = args[++i];
32
- } else if (a === '--out' || a === '-o') {
33
- out.out = args[++i];
34
- } else if (a === '--limit' || a === '-l') {
35
- out.limit = Number(args[++i]);
36
- } else if (a === '--provider' || a === '-p') {
37
- out.provider = args[++i];
38
- } else if (a === '--help' || a === '-h') {
39
- out.help = true;
 
40
  }
41
  }
42
 
43
- return out;
44
- }
45
-
46
- function showHelp() {
47
- console.log(`
48
- distill-pipeline โ€” Full Distillation Cycle Runner
49
-
50
- Usage:
51
- node pipeline_cli.js [options]
52
-
53
- Options:
54
- --seeds <file> Path to seed JSONL file (default: test_samples/seed_questions.jsonl)
55
- --out <file> Path to output JSONL (default: gold/pipeline_gold.jsonl)
56
- --limit <n> Max number of seeds to process
57
- --provider <name> Provider override: ollama | openai | http
58
- -h, --help Show this help
59
 
60
- Examples:
61
- node pipeline_cli.js --limit 25
62
- node pipeline_cli.js -s custom_seeds.jsonl -o gold/round1.jsonl
63
- `);
 
 
64
  }
65
 
66
- //
67
- // ---- MAIN ----
68
- //
69
  async function main() {
70
- const args = parseArgs();
71
-
72
- if (args.help) {
73
- showHelp();
74
- process.exit(0);
75
- }
76
-
77
- // ---- Resolve paths ----
78
- const seedsPath = args.seeds
79
- ? path.resolve(args.seeds)
80
- : path.resolve(__dirname, '../../test_samples/seed_questions.jsonl');
81
 
82
- const outPath = args.out
83
- ? path.resolve(args.out)
84
- : path.resolve(__dirname, '../../gold/pipeline_gold.jsonl');
85
 
86
- const limit = args.limit ?? undefined;
 
 
 
 
 
87
 
88
- // ---- Provider override ----
89
- if (args.provider) {
90
- process.env.PROVIDER_TYPE = args.provider;
91
- }
92
-
93
- // ---- Announce run ----
94
- console.log(`\n๐Ÿš€ Starting Distillation Pipeline`);
95
  console.log(` Seeds: ${seedsPath}`);
96
  console.log(` Output: ${outPath}`);
97
- console.log(` Provider: ${process.env.PROVIDER_TYPE || 'ollama (default)'}`);
98
- console.log(` Limit: ${limit ?? 'none'}\n`);
99
-
100
- // ---- Run batch ----
101
- const result = await runPipelineBatch({
102
- seedsPath,
103
- outPath,
104
- limit,
105
- });
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
 
107
- console.log(`\n๐ŸŽ‰ Pipeline completed`);
108
- console.log(` Total Seeds: ${result.total}`);
109
- console.log(` Processed: ${result.processed}`);
110
- console.log(` Accepted: ${result.accepted}`);
111
- console.log(` Written to: ${result.outPath}\n`);
112
  }
113
 
114
  main().catch((err) => {
115
- console.error(`โŒ Pipeline error:`, err);
116
  process.exit(1);
117
  });
 
1
  #!/usr/bin/env node
2
  // src/pipeline/pipeline_cli.js
 
 
3
  import path from 'path';
4
+ import { fileURLToPath } from 'url';
5
+ import { runPipelineBatch } from './pipeline.mjs';
 
 
 
 
 
 
 
 
6
 
 
 
 
7
  const __filename = fileURLToPath(import.meta.url);
8
  const __dirname = path.dirname(__filename);
9
+ const PROJECT_ROOT = path.join(__dirname, '..', '..');
10
+
11
+ const DEFAULT_SEEDS = path.join(
12
+ PROJECT_ROOT,
13
+ 'test_samples',
14
+ 'seed_questions.jsonl',
15
+ );
16
+ const DEFAULT_OUT = path.join(
17
+ PROJECT_ROOT,
18
+ 'gold',
19
+ 'pipeline_gold.jsonl',
20
+ );
21
+
22
+ function parseArgs(argv) {
23
+ const args = argv.slice(2);
24
+ let limit;
25
+ let seedsPath;
26
+ let outPath;
27
+ let verbose = false;
28
 
29
  for (let i = 0; i < args.length; i++) {
30
  const a = args[i];
31
+ if (a === '--limit' || a === '-n') {
32
+ const v = Number(args[i + 1]);
33
+ if (!Number.isNaN(v)) limit = v;
34
+ i++;
35
+ } else if (a === '--seeds') {
36
+ seedsPath = args[i + 1];
37
+ i++;
38
+ } else if (a === '--out') {
39
+ outPath = args[i + 1];
40
+ i++;
41
+ } else if (a === '--verbose' || a === '-v') {
42
+ verbose = true;
43
  }
44
  }
45
 
46
+ // Also honour env var PIPELINE_VERBOSE=1
47
+ if (!verbose && process.env.PIPELINE_VERBOSE) {
48
+ const v = process.env.PIPELINE_VERBOSE.toLowerCase();
49
+ if (v === '1' || v === 'true' || v === 'yes') {
50
+ verbose = true;
51
+ }
52
+ }
 
 
 
 
 
 
 
 
 
53
 
54
+ return {
55
+ limit,
56
+ seedsPath: seedsPath || DEFAULT_SEEDS,
57
+ outPath: outPath || DEFAULT_OUT,
58
+ verbose,
59
+ };
60
  }
61
 
 
 
 
62
  async function main() {
63
+ const { limit, seedsPath, outPath, verbose } = parseArgs(process.argv);
 
 
 
 
 
 
 
 
 
 
64
 
65
+ const generatorProvider = process.env.GENERATOR_PROVIDER || 'ollama';
66
+ const verifierProvider = process.env.VERIFIER_PROVIDER || generatorProvider;
67
+ const rewardProvider = process.env.REWARD_PROVIDER || generatorProvider;
68
 
69
+ const generatorModel =
70
+ process.env.GENERATOR_MODEL || process.env.OLLAMA_MODEL || 'qwen3-vl:8b-thinking';
71
+ const verifierModel =
72
+ process.env.VERIFIER_MODEL || generatorModel;
73
+ const rewardModel =
74
+ process.env.REWARD_MODEL || verifierModel;
75
 
76
+ console.log('');
77
+ console.log('๐Ÿš€ Starting Distillation Pipeline');
 
 
 
 
 
78
  console.log(` Seeds: ${seedsPath}`);
79
  console.log(` Output: ${outPath}`);
80
+ console.log(` Providers:`);
81
+ console.log(
82
+ ` generator: ${generatorProvider} (${generatorModel})`,
83
+ );
84
+ console.log(
85
+ ` verifier: ${verifierProvider} (${verifierModel})`,
86
+ );
87
+ console.log(
88
+ ` reward: ${rewardProvider} (${rewardModel})`,
89
+ );
90
+ console.log(` Limit: ${limit ?? 'all'}`);
91
+ console.log(` Verbose: ${verbose ? 'yes' : 'no'}`);
92
+ console.log('');
93
+
94
+ try {
95
+ const result = await runPipelineBatch({
96
+ seedsPath,
97
+ outPath,
98
+ limit,
99
+ verbose,
100
+ logger: console,
101
+ });
102
+
103
+ console.log('');
104
+ console.log('๐ŸŽ‰ Pipeline completed');
105
+ console.log(` Total Seeds: ${result.total}`);
106
+ console.log(` Processed: ${result.processed}`);
107
+ console.log(` Accepted: ${result.accepted}`);
108
+ console.log(` Written to: ${result.outPath}`);
109
+
110
+ if (result.statusCounts) {
111
+ console.log('');
112
+ console.log(' Status breakdown:');
113
+ for (const [status, count] of Object.entries(
114
+ result.statusCounts,
115
+ )) {
116
+ console.log(` ${status.padEnd(17)} ${count}`);
117
+ }
118
+ }
119
 
120
+ console.log('');
121
+ } catch (err) {
122
+ console.error('โŒ Pipeline error:', err);
123
+ process.exit(1);
124
+ }
125
  }
126
 
127
  main().catch((err) => {
128
+ console.error('โŒ Fatal error:', err);
129
  process.exit(1);
130
  });