mindi-backup / scripts /add_incremental_javascript_dataset.py
Mindigenous
Initial full project backup with Git LFS
53f0cc2
"""
Incremental JS dataset augmentation for Component 3 outputs.
Goal:
- Do NOT rebuild the full pipeline.
- Reuse existing cleaned/tokenized files.
- Add only new JavaScript samples from one additional HF dataset.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict
import yaml
from datasets import load_dataset
# Ensure src imports work when run from project root.
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from src.dataset_pipeline.hf_dataset_pipeline import ( # noqa: E402
HFDatasetPipeline,
PipelineConfig,
SourceDatasetSpec,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Add JS-focused dataset incrementally.")
parser.add_argument(
"--config",
default="configs/component3_incremental_js.yaml",
help="Path to YAML config.",
)
parser.add_argument(
"--target_new_javascript_examples",
type=int,
default=None,
help="Optional override for JS target.",
)
return parser.parse_args()
def load_yaml(path: Path) -> Dict[str, Any]:
if not path.exists():
raise FileNotFoundError(f"Config file not found: {path}")
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if not isinstance(data, dict):
raise ValueError("Config must be a YAML object.")
return data
def load_existing_into_dedupe_db(pipeline: HFDatasetPipeline, existing_clean_path: Path) -> int:
"""
Seeds dedupe DB with existing clean dataset hashes.
This prevents re-adding duplicates during incremental merge.
"""
if not existing_clean_path.exists():
raise FileNotFoundError(
f"Existing clean dataset not found: {existing_clean_path}. "
"Run Component 3 full pipeline first."
)
seeded = 0
with existing_clean_path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
row = json.loads(line)
prompt = str(row.get("prompt", "")).strip()
code = str(row.get("code", "")).strip()
except Exception:
continue
if not prompt or not code:
continue
# Keep unique adds hash to DB, False means already there.
pipeline._keep_unique(prompt, code)
seeded += 1
if seeded % 5000 == 0:
pipeline.conn.commit()
pipeline.conn.commit()
return seeded
def main() -> None:
args = parse_args()
try:
cfg_data = load_yaml(Path(args.config))
existing_clean_path = Path(cfg_data["existing_clean_path"])
existing_tokenized_path = Path(cfg_data["existing_tokenized_path"])
existing_stats_path = Path(cfg_data["existing_stats_path"])
tokenizer_dir = str(cfg_data["tokenizer_dir"])
dedupe_db_path = str(cfg_data["dedupe_db_path"])
progress_every = int(cfg_data.get("progress_every", 500))
min_prompt_chars = int(cfg_data.get("min_prompt_chars", 8))
min_code_chars = int(cfg_data.get("min_code_chars", 16))
max_code_chars = int(cfg_data.get("max_code_chars", 40_000))
target_js = (
args.target_new_javascript_examples
if args.target_new_javascript_examples is not None
else int(cfg_data.get("target_new_javascript_examples", 20_000))
)
if not existing_tokenized_path.exists():
raise FileNotFoundError(
f"Existing tokenized dataset not found: {existing_tokenized_path}. "
"Run Component 3 full pipeline first."
)
new_ds = cfg_data["new_dataset"]
spec = SourceDatasetSpec(
hf_dataset_id=str(new_ds["hf_dataset_id"]),
split=str(new_ds.get("split", "train")),
prompt_field=str(new_ds["prompt_field"]),
code_field=str(new_ds["code_field"]),
language_field=new_ds.get("language_field"),
default_language=str(new_ds.get("default_language", "auto")),
)
# Build minimal pipeline object to reuse cleaning/dedupe/tokenization utilities.
pipeline_cfg = PipelineConfig(
datasets=[spec],
tokenizer_dir=tokenizer_dir,
interim_output_dir=str(existing_clean_path.parent),
processed_output_dir=str(existing_tokenized_path.parent),
dedupe_db_path=dedupe_db_path,
min_prompt_chars=min_prompt_chars,
min_code_chars=min_code_chars,
max_code_chars=max_code_chars,
progress_every=progress_every,
)
pipeline = HFDatasetPipeline(pipeline_cfg)
try:
seeded = load_existing_into_dedupe_db(pipeline, existing_clean_path)
print(f"[info] Seeded dedupe DB with existing clean records: {seeded}")
stream = load_dataset(spec.hf_dataset_id, split=spec.split, streaming=True)
added_js = 0
seen_new = 0
dropped_duplicate = 0
dropped_filtered = 0
with existing_clean_path.open("a", encoding="utf-8") as clean_f, existing_tokenized_path.open(
"a", encoding="utf-8"
) as tok_f:
for row in stream:
seen_new += 1
std = pipeline._standardize_record(row=row, spec=spec)
if std is None:
dropped_filtered += 1
continue
prompt, code, lang = std
cleaned = pipeline._clean_and_filter(prompt=prompt, code=code, language=lang)
if cleaned is None:
dropped_filtered += 1
continue
c_prompt, c_code, c_lang = cleaned
if c_lang != "javascript":
dropped_filtered += 1
continue
if not pipeline._keep_unique(c_prompt, c_code):
dropped_duplicate += 1
continue
formatted_text = pipeline.tokenizer.format_training_sample(
prompt=c_prompt, code=c_code, language="javascript"
)
token_ids = pipeline.tokenizer.encode(formatted_text)
clean_record = {"prompt": c_prompt, "code": c_code, "language": "javascript"}
tok_record = {
"language": "javascript",
"text": formatted_text,
"input_ids": token_ids,
"length": len(token_ids),
}
clean_f.write(json.dumps(clean_record, ensure_ascii=False) + "\n")
tok_f.write(json.dumps(tok_record, ensure_ascii=False) + "\n")
added_js += 1
if added_js % progress_every == 0:
pipeline.conn.commit()
print(
f"[progress] seen_new={seen_new} added_js={added_js} "
f"dropped_duplicate={dropped_duplicate}"
)
if added_js >= target_js:
break
pipeline.conn.commit()
finally:
pipeline.close()
# Merge incremental stats into existing summary.
merged_stats: Dict[str, Any] = {}
if existing_stats_path.exists():
with existing_stats_path.open("r", encoding="utf-8") as f:
try:
merged_stats = json.load(f)
except Exception:
merged_stats = {}
merged_stats["incremental_js_dataset"] = spec.hf_dataset_id
merged_stats["incremental_js_target"] = target_js
merged_stats["incremental_js_added"] = added_js
merged_stats["incremental_new_seen"] = seen_new
merged_stats["incremental_new_dropped_duplicate"] = dropped_duplicate
merged_stats["incremental_new_dropped_filtered"] = dropped_filtered
merged_stats["final_clean_records_estimate"] = int(merged_stats.get("kept_total", 0)) + added_js
with existing_stats_path.open("w", encoding="utf-8") as f:
json.dump(merged_stats, f, indent=2)
print("Incremental JavaScript augmentation completed.")
print(f"Dataset used: {spec.hf_dataset_id}")
print(f"Target JS examples: {target_js}")
print(f"Added JS examples: {added_js}")
if added_js < target_js:
print(
"Warning: JS target not reached from this dataset after filtering/dedupe. "
"You may need one more JS dataset."
)
print("Updated files:")
print(f"- {existing_clean_path}")
print(f"- {existing_tokenized_path}")
print(f"- {existing_stats_path}")
except Exception as exc:
print("Incremental JavaScript augmentation failed.")
print(f"What went wrong: {exc}")
print("Fix suggestion: verify internet access, dataset ID, and existing Component 3 files.")
raise SystemExit(1)
if __name__ == "__main__":
main()