""" Reprocess final tokenized dataset from existing cleaned JSONL. Purpose: - No re-download. - No full pipeline rerun. - Rebuild tokenized dataset with improved language detection. """ from __future__ import annotations import argparse import json import shutil import sys from pathlib import Path from typing import Any, Dict, Optional import yaml # Ensure src imports work from project root. PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from src.tokenizer.code_tokenizer import CodeTokenizer # noqa: E402 PY_HINTS = [ "def ", "import ", "from ", "print(", "if __name__ ==", "class ", "lambda ", "elif ", "except ", ] JS_HINTS = [ "function ", "const ", "let ", "=>", "console.log", "export ", "require(", "document.", "window.", "=> {", "var ", ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Rebuild tokenized data from existing clean JSONL.") parser.add_argument( "--config", default="configs/component3_reprocess_from_clean.yaml", help="Path to YAML config.", ) parser.add_argument( "--max_records", type=int, default=None, help="Optional quick-test limit.", ) return parser.parse_args() def load_yaml(path: Path) -> Dict[str, Any]: if not path.exists(): raise FileNotFoundError(f"Config not found: {path}") with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) if not isinstance(data, dict): raise ValueError("Config format is invalid. Expected YAML object.") return data def infer_language(prompt: str, code: str, raw_language: str, ignore_existing_labels: bool) -> str: lang = (raw_language or "").lower().strip() if not ignore_existing_labels: if "javascript" in lang or lang in {"js", "node", "nodejs"}: return "javascript" if "python" in lang: return "python" prompt_lower = prompt.lower() code_lower = code.lower() py_score = sum(1 for hint in PY_HINTS if hint in code_lower) js_score = sum(1 for hint in JS_HINTS if hint in code_lower) if "javascript" in prompt_lower or "node.js" in prompt_lower or " js " in f" {prompt_lower} ": js_score += 2 if "python" in prompt_lower: py_score += 2 return "javascript" if js_score > py_score else "python" def backup_file_if_needed(path: Path, enabled: bool) -> Optional[Path]: if not enabled or not path.exists(): return None backup = path.with_suffix(path.suffix + ".bak") shutil.copy2(path, backup) return backup def main() -> None: args = parse_args() try: cfg = load_yaml(Path(args.config)) tokenizer_dir = Path(cfg["tokenizer_dir"]) input_clean_path = Path(cfg["input_clean_path"]) output_tokenized_path = Path(cfg["output_tokenized_path"]) output_stats_path = Path(cfg["output_stats_path"]) ignore_existing_labels = bool(cfg.get("ignore_existing_language_labels", True)) max_records = args.max_records if args.max_records is not None else cfg.get("max_records") if not input_clean_path.exists(): raise FileNotFoundError( f"Input clean file not found: {input_clean_path}. " "Run Component 3 first." ) output_tokenized_path.parent.mkdir(parents=True, exist_ok=True) output_stats_path.parent.mkdir(parents=True, exist_ok=True) token_backup = backup_file_if_needed( output_tokenized_path, bool(cfg.get("backup_existing_tokenized", True)) ) stats_backup = backup_file_if_needed( output_stats_path, bool(cfg.get("backup_existing_stats", True)) ) tokenizer = CodeTokenizer.load(str(tokenizer_dir)) stats: Dict[str, int] = { "reprocess_seen_total": 0, "reprocess_kept_total": 0, "reprocess_dropped_invalid_json": 0, "reprocess_dropped_empty_fields": 0, "language_python": 0, "language_javascript": 0, } with input_clean_path.open("r", encoding="utf-8") as in_f, output_tokenized_path.open( "w", encoding="utf-8" ) as out_f: for line in in_f: stats["reprocess_seen_total"] += 1 if max_records is not None and stats["reprocess_seen_total"] > int(max_records): break line = line.strip() if not line: stats["reprocess_dropped_empty_fields"] += 1 continue try: row = json.loads(line) except json.JSONDecodeError: stats["reprocess_dropped_invalid_json"] += 1 continue prompt = str(row.get("prompt", "")).strip() code = str(row.get("code", "")).strip() raw_language = str(row.get("language", "")).strip() if not prompt or not code: stats["reprocess_dropped_empty_fields"] += 1 continue language = infer_language( prompt=prompt, code=code, raw_language=raw_language, ignore_existing_labels=ignore_existing_labels, ) if language == "javascript": stats["language_javascript"] += 1 else: stats["language_python"] += 1 formatted_text = tokenizer.format_training_sample( prompt=prompt, code=code, language=language ) token_ids = tokenizer.encode(formatted_text) out_row = { "language": language, "text": formatted_text, "input_ids": token_ids, "length": len(token_ids), } out_f.write(json.dumps(out_row, ensure_ascii=False) + "\n") stats["reprocess_kept_total"] += 1 if stats["reprocess_kept_total"] % 5000 == 0: print( f"[progress] seen={stats['reprocess_seen_total']} " f"kept={stats['reprocess_kept_total']} " f"python={stats['language_python']} js={stats['language_javascript']}" ) with output_stats_path.open("w", encoding="utf-8") as f: json.dump(stats, f, indent=2) print("Reprocess completed successfully.") print(f"Input clean file: {input_clean_path}") print(f"Output tokenized file: {output_tokenized_path}") print(f"Output stats file: {output_stats_path}") if token_backup: print(f"Tokenized backup: {token_backup}") if stats_backup: print(f"Stats backup: {stats_backup}") print("Summary stats:") print(json.dumps(stats, indent=2)) except Exception as exc: print("Reprocess failed.") print(f"What went wrong: {exc}") print( "Fix suggestion: verify Component 2 tokenizer files and " "Component 3 clean file paths." ) raise SystemExit(1) if __name__ == "__main__": main()