""" Component 3 runner script. Reads YAML config and executes full Hugging Face dataset preprocessing. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any, Dict, List import yaml # This makes "src" imports work when script is run from project root. PROJECT_ROOT = Path(__file__).resolve().parents[1] if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from src.dataset_pipeline.hf_dataset_pipeline import ( # noqa: E402 HFDatasetPipeline, PipelineConfig, SourceDatasetSpec, ) def parse_args() -> argparse.Namespace: # Parse command-line arguments for config and optional overrides. parser = argparse.ArgumentParser(description="Run Component 3 dataset preprocessing pipeline.") parser.add_argument( "--config", default="configs/component3_dataset_pipeline.yaml", help="Path to YAML config file.", ) parser.add_argument( "--max_records_per_dataset", type=int, default=None, help="Optional override for quick test runs.", ) return parser.parse_args() def _read_yaml(path: Path) -> Dict[str, Any]: # Reads YAML file with friendly errors. if not path.exists(): raise FileNotFoundError(f"Config file not found: {path}") with path.open("r", encoding="utf-8") as f: data = yaml.safe_load(f) if not isinstance(data, dict): raise ValueError("Config file is invalid. Expected a YAML object at top level.") return data def _build_config(data: Dict[str, Any], max_records_override: int | None) -> PipelineConfig: # Converts generic dict into strongly typed config objects. dataset_specs: List[SourceDatasetSpec] = [] datasets_data = data.get("datasets", []) if not isinstance(datasets_data, list) or not datasets_data: raise ValueError("Config must include a non-empty 'datasets' list.") for item in datasets_data: dataset_specs.append( SourceDatasetSpec( hf_dataset_id=str(item["hf_dataset_id"]), split=str(item.get("split", "train")), prompt_field=str(item["prompt_field"]), code_field=str(item["code_field"]), language_field=item.get("language_field"), default_language=str(item.get("default_language", "python")), ) ) cfg = PipelineConfig( datasets=dataset_specs, tokenizer_dir=str(data["tokenizer_dir"]), interim_output_dir=str(data["interim_output_dir"]), processed_output_dir=str(data["processed_output_dir"]), dedupe_db_path=str(data["dedupe_db_path"]), max_records_per_dataset=data.get("max_records_per_dataset"), min_prompt_chars=int(data.get("min_prompt_chars", 8)), min_code_chars=int(data.get("min_code_chars", 16)), max_code_chars=int(data.get("max_code_chars", 40_000)), progress_every=int(data.get("progress_every", 1_000)), ) if max_records_override is not None: cfg.max_records_per_dataset = max_records_override return cfg def main() -> None: # Main entry with explicit plain-English error handling. args = parse_args() try: config_path = Path(args.config) data = _read_yaml(config_path) cfg = _build_config(data, args.max_records_per_dataset) pipeline = HFDatasetPipeline(cfg) try: stats = pipeline.run() finally: pipeline.close() print("Component 3 pipeline completed successfully.") print("Saved files:") print(f"- {Path(cfg.interim_output_dir) / 'combined_clean.jsonl'}") print(f"- {Path(cfg.processed_output_dir) / 'train_tokenized.jsonl'}") print(f"- {Path(cfg.processed_output_dir) / 'pipeline_stats.json'}") print("Summary stats:") print(json.dumps(stats, indent=2)) except Exception as exc: print("Component 3 pipeline failed.") print(f"What went wrong: {exc}") print( "Fix suggestion: verify internet access for Hugging Face, tokenizer path, " "and config field names." ) raise SystemExit(1) if __name__ == "__main__": main()