""" Incremental JS dataset augmentation for Component 3 outputs. Goal: - Do NOT rebuild the full pipeline. - Reuse existing cleaned/tokenized files. - Add only new JavaScript samples from one additional HF dataset. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any, Dict import yaml from datasets import load_dataset # Ensure src imports work when run from project root. PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from src.dataset_pipeline.hf_dataset_pipeline import ( # noqa: E402 HFDatasetPipeline, PipelineConfig, SourceDatasetSpec, ) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Add JS-focused dataset incrementally.") parser.add_argument( "--config", default="configs/component3_incremental_js.yaml", help="Path to YAML config.", ) parser.add_argument( "--target_new_javascript_examples", type=int, default=None, help="Optional override for JS target.", ) return parser.parse_args() def load_yaml(path: Path) -> Dict[str, Any]: if not path.exists(): raise FileNotFoundError(f"Config file not found: {path}") with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) if not isinstance(data, dict): raise ValueError("Config must be a YAML object.") return data def load_existing_into_dedupe_db(pipeline: HFDatasetPipeline, existing_clean_path: Path) -> int: """ Seeds dedupe DB with existing clean dataset hashes. This prevents re-adding duplicates during incremental merge. """ if not existing_clean_path.exists(): raise FileNotFoundError( f"Existing clean dataset not found: {existing_clean_path}. " "Run Component 3 full pipeline first." ) seeded = 0 with existing_clean_path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: row = json.loads(line) prompt = str(row.get("prompt", "")).strip() code = str(row.get("code", "")).strip() except Exception: continue if not prompt or not code: continue # Keep unique adds hash to DB, False means already there. pipeline._keep_unique(prompt, code) seeded += 1 if seeded % 5000 == 0: pipeline.conn.commit() pipeline.conn.commit() return seeded def main() -> None: args = parse_args() try: cfg_data = load_yaml(Path(args.config)) existing_clean_path = Path(cfg_data["existing_clean_path"]) existing_tokenized_path = Path(cfg_data["existing_tokenized_path"]) existing_stats_path = Path(cfg_data["existing_stats_path"]) tokenizer_dir = str(cfg_data["tokenizer_dir"]) dedupe_db_path = str(cfg_data["dedupe_db_path"]) progress_every = int(cfg_data.get("progress_every", 500)) min_prompt_chars = int(cfg_data.get("min_prompt_chars", 8)) min_code_chars = int(cfg_data.get("min_code_chars", 16)) max_code_chars = int(cfg_data.get("max_code_chars", 40_000)) target_js = ( args.target_new_javascript_examples if args.target_new_javascript_examples is not None else int(cfg_data.get("target_new_javascript_examples", 20_000)) ) if not existing_tokenized_path.exists(): raise FileNotFoundError( f"Existing tokenized dataset not found: {existing_tokenized_path}. " "Run Component 3 full pipeline first." ) new_ds = cfg_data["new_dataset"] spec = SourceDatasetSpec( hf_dataset_id=str(new_ds["hf_dataset_id"]), split=str(new_ds.get("split", "train")), prompt_field=str(new_ds["prompt_field"]), code_field=str(new_ds["code_field"]), language_field=new_ds.get("language_field"), default_language=str(new_ds.get("default_language", "auto")), ) # Build minimal pipeline object to reuse cleaning/dedupe/tokenization utilities. pipeline_cfg = PipelineConfig( datasets=[spec], tokenizer_dir=tokenizer_dir, interim_output_dir=str(existing_clean_path.parent), processed_output_dir=str(existing_tokenized_path.parent), dedupe_db_path=dedupe_db_path, min_prompt_chars=min_prompt_chars, min_code_chars=min_code_chars, max_code_chars=max_code_chars, progress_every=progress_every, ) pipeline = HFDatasetPipeline(pipeline_cfg) try: seeded = load_existing_into_dedupe_db(pipeline, existing_clean_path) print(f"[info] Seeded dedupe DB with existing clean records: {seeded}") stream = load_dataset(spec.hf_dataset_id, split=spec.split, streaming=True) added_js = 0 seen_new = 0 dropped_duplicate = 0 dropped_filtered = 0 with existing_clean_path.open("a", encoding="utf-8") as clean_f, existing_tokenized_path.open( "a", encoding="utf-8" ) as tok_f: for row in stream: seen_new += 1 std = pipeline._standardize_record(row=row, spec=spec) if std is None: dropped_filtered += 1 continue prompt, code, lang = std cleaned = pipeline._clean_and_filter(prompt=prompt, code=code, language=lang) if cleaned is None: dropped_filtered += 1 continue c_prompt, c_code, c_lang = cleaned if c_lang != "javascript": dropped_filtered += 1 continue if not pipeline._keep_unique(c_prompt, c_code): dropped_duplicate += 1 continue formatted_text = pipeline.tokenizer.format_training_sample( prompt=c_prompt, code=c_code, language="javascript" ) token_ids = pipeline.tokenizer.encode(formatted_text) clean_record = {"prompt": c_prompt, "code": c_code, "language": "javascript"} tok_record = { "language": "javascript", "text": formatted_text, "input_ids": token_ids, "length": len(token_ids), } clean_f.write(json.dumps(clean_record, ensure_ascii=False) + "\n") tok_f.write(json.dumps(tok_record, ensure_ascii=False) + "\n") added_js += 1 if added_js % progress_every == 0: pipeline.conn.commit() print( f"[progress] seen_new={seen_new} added_js={added_js} " f"dropped_duplicate={dropped_duplicate}" ) if added_js >= target_js: break pipeline.conn.commit() finally: pipeline.close() # Merge incremental stats into existing summary. merged_stats: Dict[str, Any] = {} if existing_stats_path.exists(): with existing_stats_path.open("r", encoding="utf-8") as f: try: merged_stats = json.load(f) except Exception: merged_stats = {} merged_stats["incremental_js_dataset"] = spec.hf_dataset_id merged_stats["incremental_js_target"] = target_js merged_stats["incremental_js_added"] = added_js merged_stats["incremental_new_seen"] = seen_new merged_stats["incremental_new_dropped_duplicate"] = dropped_duplicate merged_stats["incremental_new_dropped_filtered"] = dropped_filtered merged_stats["final_clean_records_estimate"] = int(merged_stats.get("kept_total", 0)) + added_js with existing_stats_path.open("w", encoding="utf-8") as f: json.dump(merged_stats, f, indent=2) print("Incremental JavaScript augmentation completed.") print(f"Dataset used: {spec.hf_dataset_id}") print(f"Target JS examples: {target_js}") print(f"Added JS examples: {added_js}") if added_js < target_js: print( "Warning: JS target not reached from this dataset after filtering/dedupe. " "You may need one more JS dataset." ) print("Updated files:") print(f"- {existing_clean_path}") print(f"- {existing_tokenized_path}") print(f"- {existing_stats_path}") except Exception as exc: print("Incremental JavaScript augmentation failed.") print(f"What went wrong: {exc}") print("Fix suggestion: verify internet access, dataset ID, and existing Component 3 files.") raise SystemExit(1) if __name__ == "__main__": main()