| """ |
| Incremental JS dataset augmentation for Component 3 outputs. |
| |
| Goal: |
| - Do NOT rebuild the full pipeline. |
| - Reuse existing cleaned/tokenized files. |
| - Add only new JavaScript samples from one additional HF dataset. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from pathlib import Path |
| from typing import Any, Dict |
|
|
| import yaml |
| from datasets import load_dataset |
|
|
| |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from src.dataset_pipeline.hf_dataset_pipeline import ( |
| HFDatasetPipeline, |
| PipelineConfig, |
| SourceDatasetSpec, |
| ) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Add JS-focused dataset incrementally.") |
| parser.add_argument( |
| "--config", |
| default="configs/component3_incremental_js.yaml", |
| help="Path to YAML config.", |
| ) |
| parser.add_argument( |
| "--target_new_javascript_examples", |
| type=int, |
| default=None, |
| help="Optional override for JS target.", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def load_yaml(path: Path) -> Dict[str, Any]: |
| if not path.exists(): |
| raise FileNotFoundError(f"Config file not found: {path}") |
| with path.open("r", encoding="utf-8") as f: |
| data = yaml.safe_load(f) |
| if not isinstance(data, dict): |
| raise ValueError("Config must be a YAML object.") |
| return data |
|
|
|
|
| def load_existing_into_dedupe_db(pipeline: HFDatasetPipeline, existing_clean_path: Path) -> int: |
| """ |
| Seeds dedupe DB with existing clean dataset hashes. |
| This prevents re-adding duplicates during incremental merge. |
| """ |
| if not existing_clean_path.exists(): |
| raise FileNotFoundError( |
| f"Existing clean dataset not found: {existing_clean_path}. " |
| "Run Component 3 full pipeline first." |
| ) |
|
|
| seeded = 0 |
| with existing_clean_path.open("r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| row = json.loads(line) |
| prompt = str(row.get("prompt", "")).strip() |
| code = str(row.get("code", "")).strip() |
| except Exception: |
| continue |
| if not prompt or not code: |
| continue |
| |
| pipeline._keep_unique(prompt, code) |
| seeded += 1 |
| if seeded % 5000 == 0: |
| pipeline.conn.commit() |
| pipeline.conn.commit() |
| return seeded |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| try: |
| cfg_data = load_yaml(Path(args.config)) |
|
|
| existing_clean_path = Path(cfg_data["existing_clean_path"]) |
| existing_tokenized_path = Path(cfg_data["existing_tokenized_path"]) |
| existing_stats_path = Path(cfg_data["existing_stats_path"]) |
| tokenizer_dir = str(cfg_data["tokenizer_dir"]) |
| dedupe_db_path = str(cfg_data["dedupe_db_path"]) |
| progress_every = int(cfg_data.get("progress_every", 500)) |
| min_prompt_chars = int(cfg_data.get("min_prompt_chars", 8)) |
| min_code_chars = int(cfg_data.get("min_code_chars", 16)) |
| max_code_chars = int(cfg_data.get("max_code_chars", 40_000)) |
| target_js = ( |
| args.target_new_javascript_examples |
| if args.target_new_javascript_examples is not None |
| else int(cfg_data.get("target_new_javascript_examples", 20_000)) |
| ) |
|
|
| if not existing_tokenized_path.exists(): |
| raise FileNotFoundError( |
| f"Existing tokenized dataset not found: {existing_tokenized_path}. " |
| "Run Component 3 full pipeline first." |
| ) |
|
|
| new_ds = cfg_data["new_dataset"] |
| spec = SourceDatasetSpec( |
| hf_dataset_id=str(new_ds["hf_dataset_id"]), |
| split=str(new_ds.get("split", "train")), |
| prompt_field=str(new_ds["prompt_field"]), |
| code_field=str(new_ds["code_field"]), |
| language_field=new_ds.get("language_field"), |
| default_language=str(new_ds.get("default_language", "auto")), |
| ) |
|
|
| |
| pipeline_cfg = PipelineConfig( |
| datasets=[spec], |
| tokenizer_dir=tokenizer_dir, |
| interim_output_dir=str(existing_clean_path.parent), |
| processed_output_dir=str(existing_tokenized_path.parent), |
| dedupe_db_path=dedupe_db_path, |
| min_prompt_chars=min_prompt_chars, |
| min_code_chars=min_code_chars, |
| max_code_chars=max_code_chars, |
| progress_every=progress_every, |
| ) |
| pipeline = HFDatasetPipeline(pipeline_cfg) |
|
|
| try: |
| seeded = load_existing_into_dedupe_db(pipeline, existing_clean_path) |
| print(f"[info] Seeded dedupe DB with existing clean records: {seeded}") |
|
|
| stream = load_dataset(spec.hf_dataset_id, split=spec.split, streaming=True) |
| added_js = 0 |
| seen_new = 0 |
| dropped_duplicate = 0 |
| dropped_filtered = 0 |
|
|
| with existing_clean_path.open("a", encoding="utf-8") as clean_f, existing_tokenized_path.open( |
| "a", encoding="utf-8" |
| ) as tok_f: |
| for row in stream: |
| seen_new += 1 |
| std = pipeline._standardize_record(row=row, spec=spec) |
| if std is None: |
| dropped_filtered += 1 |
| continue |
|
|
| prompt, code, lang = std |
| cleaned = pipeline._clean_and_filter(prompt=prompt, code=code, language=lang) |
| if cleaned is None: |
| dropped_filtered += 1 |
| continue |
|
|
| c_prompt, c_code, c_lang = cleaned |
| if c_lang != "javascript": |
| dropped_filtered += 1 |
| continue |
|
|
| if not pipeline._keep_unique(c_prompt, c_code): |
| dropped_duplicate += 1 |
| continue |
|
|
| formatted_text = pipeline.tokenizer.format_training_sample( |
| prompt=c_prompt, code=c_code, language="javascript" |
| ) |
| token_ids = pipeline.tokenizer.encode(formatted_text) |
|
|
| clean_record = {"prompt": c_prompt, "code": c_code, "language": "javascript"} |
| tok_record = { |
| "language": "javascript", |
| "text": formatted_text, |
| "input_ids": token_ids, |
| "length": len(token_ids), |
| } |
| clean_f.write(json.dumps(clean_record, ensure_ascii=False) + "\n") |
| tok_f.write(json.dumps(tok_record, ensure_ascii=False) + "\n") |
|
|
| added_js += 1 |
| if added_js % progress_every == 0: |
| pipeline.conn.commit() |
| print( |
| f"[progress] seen_new={seen_new} added_js={added_js} " |
| f"dropped_duplicate={dropped_duplicate}" |
| ) |
|
|
| if added_js >= target_js: |
| break |
|
|
| pipeline.conn.commit() |
| finally: |
| pipeline.close() |
|
|
| |
| merged_stats: Dict[str, Any] = {} |
| if existing_stats_path.exists(): |
| with existing_stats_path.open("r", encoding="utf-8") as f: |
| try: |
| merged_stats = json.load(f) |
| except Exception: |
| merged_stats = {} |
|
|
| merged_stats["incremental_js_dataset"] = spec.hf_dataset_id |
| merged_stats["incremental_js_target"] = target_js |
| merged_stats["incremental_js_added"] = added_js |
| merged_stats["incremental_new_seen"] = seen_new |
| merged_stats["incremental_new_dropped_duplicate"] = dropped_duplicate |
| merged_stats["incremental_new_dropped_filtered"] = dropped_filtered |
| merged_stats["final_clean_records_estimate"] = int(merged_stats.get("kept_total", 0)) + added_js |
|
|
| with existing_stats_path.open("w", encoding="utf-8") as f: |
| json.dump(merged_stats, f, indent=2) |
|
|
| print("Incremental JavaScript augmentation completed.") |
| print(f"Dataset used: {spec.hf_dataset_id}") |
| print(f"Target JS examples: {target_js}") |
| print(f"Added JS examples: {added_js}") |
| if added_js < target_js: |
| print( |
| "Warning: JS target not reached from this dataset after filtering/dedupe. " |
| "You may need one more JS dataset." |
| ) |
| print("Updated files:") |
| print(f"- {existing_clean_path}") |
| print(f"- {existing_tokenized_path}") |
| print(f"- {existing_stats_path}") |
| except Exception as exc: |
| print("Incremental JavaScript augmentation failed.") |
| print(f"What went wrong: {exc}") |
| print("Fix suggestion: verify internet access, dataset ID, and existing Component 3 files.") |
| raise SystemExit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|