| """ |
| Component 3 runner script. |
| |
| Reads YAML config and executes full Hugging Face dataset preprocessing. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from pathlib import Path |
| from typing import Any, Dict, List |
|
|
| import yaml |
|
|
| |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from src.dataset_pipeline.hf_dataset_pipeline import ( |
| HFDatasetPipeline, |
| PipelineConfig, |
| SourceDatasetSpec, |
| ) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| |
| parser = argparse.ArgumentParser(description="Run Component 3 dataset preprocessing pipeline.") |
| parser.add_argument( |
| "--config", |
| default="configs/component3_dataset_pipeline.yaml", |
| help="Path to YAML config file.", |
| ) |
| parser.add_argument( |
| "--max_records_per_dataset", |
| type=int, |
| default=None, |
| help="Optional override for quick test runs.", |
| ) |
| return parser.parse_args() |
|
|
|
|
| def _read_yaml(path: Path) -> Dict[str, Any]: |
| |
| if not path.exists(): |
| raise FileNotFoundError(f"Config file not found: {path}") |
| with path.open("r", encoding="utf-8") as f: |
| data = yaml.safe_load(f) |
| if not isinstance(data, dict): |
| raise ValueError("Config file is invalid. Expected a YAML object at top level.") |
| return data |
|
|
|
|
| def _build_config(data: Dict[str, Any], max_records_override: int | None) -> PipelineConfig: |
| |
| dataset_specs: List[SourceDatasetSpec] = [] |
| datasets_data = data.get("datasets", []) |
| if not isinstance(datasets_data, list) or not datasets_data: |
| raise ValueError("Config must include a non-empty 'datasets' list.") |
|
|
| for item in datasets_data: |
| dataset_specs.append( |
| SourceDatasetSpec( |
| hf_dataset_id=str(item["hf_dataset_id"]), |
| split=str(item.get("split", "train")), |
| prompt_field=str(item["prompt_field"]), |
| code_field=str(item["code_field"]), |
| language_field=item.get("language_field"), |
| default_language=str(item.get("default_language", "python")), |
| ) |
| ) |
|
|
| cfg = PipelineConfig( |
| datasets=dataset_specs, |
| tokenizer_dir=str(data["tokenizer_dir"]), |
| interim_output_dir=str(data["interim_output_dir"]), |
| processed_output_dir=str(data["processed_output_dir"]), |
| dedupe_db_path=str(data["dedupe_db_path"]), |
| max_records_per_dataset=data.get("max_records_per_dataset"), |
| min_prompt_chars=int(data.get("min_prompt_chars", 8)), |
| min_code_chars=int(data.get("min_code_chars", 16)), |
| max_code_chars=int(data.get("max_code_chars", 40_000)), |
| progress_every=int(data.get("progress_every", 1_000)), |
| ) |
|
|
| if max_records_override is not None: |
| cfg.max_records_per_dataset = max_records_override |
| return cfg |
|
|
|
|
| def main() -> None: |
| |
| args = parse_args() |
| try: |
| config_path = Path(args.config) |
| data = _read_yaml(config_path) |
| cfg = _build_config(data, args.max_records_per_dataset) |
| pipeline = HFDatasetPipeline(cfg) |
| try: |
| stats = pipeline.run() |
| finally: |
| pipeline.close() |
|
|
| print("Component 3 pipeline completed successfully.") |
| print("Saved files:") |
| print(f"- {Path(cfg.interim_output_dir) / 'combined_clean.jsonl'}") |
| print(f"- {Path(cfg.processed_output_dir) / 'train_tokenized.jsonl'}") |
| print(f"- {Path(cfg.processed_output_dir) / 'pipeline_stats.json'}") |
| print("Summary stats:") |
| print(json.dumps(stats, indent=2)) |
| except Exception as exc: |
| print("Component 3 pipeline failed.") |
| print(f"What went wrong: {exc}") |
| print( |
| "Fix suggestion: verify internet access for Hugging Face, tokenizer path, " |
| "and config field names." |
| ) |
| raise SystemExit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|