| """ |
| Local Codebase Pipeline Runner - Processes local codebases for dataset creation. |
| |
| This is the main entry point for processing LOCAL CODEBASES (not Git repos). |
| It orchestrates the entire chunking pipeline for local files, handling both |
| code files and documentation with intelligent fallback strategies. |
| |
| ARCHITECTURE POSITION: |
| - Local Pipeline Orchestrator: Coordinates local file processing |
| - Fallback Handler: Intelligent fallback from code to documentation |
| - Dataset Exporter: Creates final JSONL datasets with statistics |
| |
| KEY FEATURES: |
| 1. Unified processing of Python files and documentation |
| 2. Intelligent fallback (failed code chunking β documentation chunking) |
| 3. Hierarchical chunking for Python files |
| 4. Documentation-aware chunking for markdown/text files |
| 5. Dataset statistics and metadata generation |
| |
| DATA FLOW: |
| Local files β Type detection β Python chunking (or fallback) β |
| Documentation chunking β JSONL export β Statistics |
| |
| USE CASES: |
| - Processing locally saved code examples |
| - Creating datasets from example repositories |
| - Testing chunking strategies on local files |
| |
| USAGE: |
| python run_python_pipeline.py --name crewai_examples --include crewai |
| python run_python_pipeline.py --name test_dataset --exclude large_repos |
| """ |
|
|
| from pathlib import Path |
| import json |
| import argparse |
|
|
| from src.task_3_data_engineering.chunking.hierarchical_chunker import HierarchicalChunker |
| from src.task_3_data_engineering.export.jsonl_exporter import export_chunks_jsonl |
| from src.task_3_data_engineering.analysis.dataset_stats import compute_dataset_stats |
| from src.task_3_data_engineering.export.dataset_metadata import write_dataset_metadata |
| from src.task_3_data_engineering.chunking.doc_chunker import chunk_document , wrap_doc_chunks |
|
|
|
|
| INPUT_DIR = Path("data/raw/codebases") |
| BASE_OUTPUT_DIR = Path("data/processed/chunks") |
|
|
| DOC_EXTS = {".md", ".txt", ".rst"} |
|
|
|
|
| def run(dataset_name: str, include: list[str] | None, exclude: list[str] | None): |
| output_dir = BASE_OUTPUT_DIR / dataset_name |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| chunker = HierarchicalChunker() |
| all_chunks = [] |
|
|
| files = [p for p in INPUT_DIR.rglob("*") if p.is_file()] |
|
|
| for file_path in files: |
| rel = file_path.relative_to(INPUT_DIR).parts |
| if include and rel[0] not in include: |
| continue |
| if exclude and rel[0] in exclude: |
| continue |
|
|
| print(f"Processing: {file_path}") |
|
|
| |
| if file_path.suffix == ".py": |
| try: |
| code_chunks = chunker.chunk_file(file_path) |
| if code_chunks: |
| all_chunks.extend(code_chunks) |
| continue |
| except Exception: |
| pass |
|
|
| |
| if file_path.suffix.lower() in DOC_EXTS or file_path.suffix == ".py": |
| try: |
| raw_text = file_path.read_text(encoding="utf-8", errors="ignore") |
| except Exception: |
| continue |
|
|
| if not raw_text.strip(): |
| continue |
|
|
| doc_chunks = chunk_document( |
| raw_text=raw_text, |
| source_name=str(file_path), |
| source_url=None, |
| ) |
|
|
| all_chunks.extend(wrap_doc_chunks(doc_chunks)) |
|
|
| |
| export_chunks_jsonl(all_chunks, output_dir / "chunks.jsonl", print_stats=True) |
|
|
| stats = compute_dataset_stats(all_chunks) |
|
|
| primary = [c for c in all_chunks if c.hierarchy.is_primary] |
| stats["hierarchy"] = { |
| "primary_chunks": len(primary), |
| "secondary_chunks": len(all_chunks) - len(primary), |
| } |
|
|
| with (output_dir / "dataset_stats.json").open("w", encoding="utf-8") as f: |
| json.dump(stats, f, indent=2) |
|
|
| write_dataset_metadata( |
| chunks=all_chunks, |
| output_path=output_dir / "dataset_metadata.json", |
| dataset_name=dataset_name, |
| dataset_version="v1", |
| ) |
|
|
| print("\nβ
Dataset built successfully") |
| print(f" - Files: {len({c.file_path for c in all_chunks})}") |
| print(f" - Chunks: {len(all_chunks)}") |
| print(f" - Output: {output_dir}") |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--name", required=True) |
| parser.add_argument("--include", nargs="+") |
| parser.add_argument("--exclude", nargs="+") |
| args = parser.parse_args() |
|
|
| run(args.name, args.include, args.exclude) |