| """ |
| Git Repository Pipeline Runner - Processes Git repositories at scale. |
| |
| This is the main entry point for processing GIT REPOSITORIES. It provides |
| enhanced features for repository analysis, including git metadata extraction, |
| agentic framework detection, and comprehensive statistics generation. |
| |
| ARCHITECTURE POSITION: |
| - Repository Pipeline Orchestrator: Coordinates Git repo processing |
| - Enhanced Metadata Collector: Extracts git history and agentic patterns |
| - Production Pipeline: Handles large repositories with performance tracking |
| |
| KEY FEATURES: |
| 1. Complete repository processing with git metadata |
| 2. Extension-aware filtering (None = full repository) |
| 3. Performance tracking (files/sec, chunks/sec) |
| 4. Agentic framework detection (via RepoMetadataExtractor) |
| 5. Comprehensive output (JSONL chunks + metadata + statistics) |
| |
| DATA FLOW: |
| Repo URL β Clone β Metadata extraction β File listing β Chunking β |
| Enhanced export β Statistics β Comprehensive output package |
| |
| USE CASES: |
| - Processing complete Git repositories for training data |
| - Creating agentic-aware datasets |
| - Benchmarking chunking performance |
| - Production dataset generation |
| |
| USAGE: |
| python run_repo_pipeline.py single https://github.com/crewAIInc/crewAI |
| python run_repo_pipeline.py single https://github.com/autogen/autogen --extensions .py .md |
| python run_repo_pipeline.py single https://github.com/langchain --max-files 1000 |
| """ |
|
|
| from pathlib import Path |
| import json |
| from typing import Dict, Any, Optional, Set, List |
| import argparse |
| import time |
| from datetime import datetime |
|
|
| |
| from src.task_3_data_engineering.ingestion.git_crawler import GitCrawler |
| from src.task_3_data_engineering.ingestion.repo_metadata import RepoMetadataExtractor |
| from src.task_3_data_engineering.chunking.repo_chunker import RepoChunker |
| from src.task_3_data_engineering.analysis.dataset_stats import compute_dataset_stats |
| from src.task_3_data_engineering.export.enhanced_jsonl_exporter import export_repo_chunks_jsonl |
|
|
|
|
| class EnhancedRepoPipeline: |
| """Enhanced pipeline with agentic focus""" |
|
|
| def __init__( |
| self, |
| output_base: Path = Path("data/processed/repos"), |
| use_hierarchical: bool = True, |
| ): |
| self.crawler = GitCrawler() |
| self.chunker = RepoChunker(use_hierarchical=use_hierarchical) |
| self.output_base = output_base |
| self.output_base.mkdir(parents=True, exist_ok=True) |
|
|
| def process_repository( |
| self, |
| repo_url: str, |
| extensions: Optional[Set[str]] = None, |
| output_name: Optional[str] = None, |
| include_binary: bool = False, |
| max_files: Optional[int] = None, |
| skip_git_metadata: bool = False, |
| ) -> Dict[str, Any]: |
| """ |
| Process repository with enhanced features |
| |
| IMPORTANT FIX: |
| - extensions=None => FULL repository (no filtering) |
| - extensions=set() => filtered repository |
| """ |
|
|
| start_time = time.time() |
| print(f"π Processing repository: {repo_url}") |
| print("-" * 60) |
|
|
| |
| repo_path = self.crawler.clone_repository(repo_url) |
| if not repo_path: |
| raise RuntimeError(f"Failed to clone {repo_url}") |
|
|
| |
| if not output_name: |
| output_name = repo_path.name |
|
|
| |
| if extensions: |
| print(f"π Extension filter enabled: {sorted(extensions)}") |
| else: |
| print("π No extension filter β processing FULL repository") |
|
|
| |
| print("π Extracting repository metadata...") |
| metadata = {} |
|
|
| if not skip_git_metadata: |
| extractor = RepoMetadataExtractor(repo_path) |
| metadata = extractor.extract_comprehensive_metadata() |
|
|
| |
| print("π Listing repository files...") |
| file_infos, file_stats = self.crawler.list_files_with_info( |
| repo_path, |
| extensions=extensions, |
| skip_binary=not include_binary, |
| ) |
|
|
| |
| if max_files and len(file_infos) > max_files: |
| print(f"β οΈ Limiting to {max_files} files (out of {len(file_infos)})") |
| file_infos = file_infos[:max_files] |
|
|
| print(f"π Found {len(file_infos)} files to process") |
|
|
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_dir = self.output_base / f"{output_name}_{timestamp}" |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| |
| actual_repo_name = metadata.get("basic", {}).get("repo_name", output_name) |
|
|
| repo_metadata = { |
| "repo_url": repo_url, |
| "repo_name": actual_repo_name, |
| "folder_name": output_name, |
| "local_path": str(repo_path), |
| "extensions_included": list(extensions) if extensions else "ALL", |
| "timestamp": timestamp, |
| **metadata, |
| } |
|
|
| metadata_file = output_dir / "repository_metadata.json" |
| with open(metadata_file, "w", encoding="utf-8") as f: |
| json.dump(repo_metadata, f, indent=2, default=str) |
|
|
| |
| all_chunks = [] |
| processing_stats = { |
| "total_files": len(file_infos), |
| "processed": 0, |
| "failed": 0, |
| "file_types": {}, |
| "chunk_types": {}, |
| } |
|
|
| print("\nπ§ Processing files...") |
| print("-" * 60) |
|
|
| for idx, file_info in enumerate(file_infos, start=1): |
| try: |
| if idx % 10 == 0: |
| print(f" [{idx}/{len(file_infos)}] Processing...") |
|
|
| file_metadata = { |
| **repo_metadata, |
| "file_info": { |
| "relative_path": file_info.relative_path, |
| "size_bytes": file_info.size, |
| "extension": file_info.extension, |
| "is_binary": file_info.is_binary, |
| }, |
| } |
|
|
| chunks = self.chunker.chunk_file( |
| file_info.path, |
| file_metadata, |
| ) |
|
|
| all_chunks.extend(chunks) |
| processing_stats["processed"] += 1 |
| processing_stats["file_types"][file_info.extension] = ( |
| processing_stats["file_types"].get(file_info.extension, 0) + 1 |
| ) |
|
|
| for chunk in chunks: |
| ct = chunk.chunk_type |
| processing_stats["chunk_types"][ct] = ( |
| processing_stats["chunk_types"].get(ct, 0) + 1 |
| ) |
|
|
| except Exception as e: |
| print(f"β οΈ Error processing {file_info.relative_path}: {str(e)[:120]}") |
| processing_stats["failed"] += 1 |
|
|
| |
| print("\nπΎ Exporting chunks...") |
| output_file = output_dir / f"{output_name}_chunks.jsonl" |
|
|
| export_repo_chunks_jsonl( |
| chunks=all_chunks, |
| output_path=output_file, |
| repo_metadata=repo_metadata, |
| print_stats=True, |
| ) |
|
|
| |
| print("π Computing statistics...") |
| chunk_stats = compute_dataset_stats(all_chunks) |
|
|
| total_time = time.time() - start_time |
|
|
| final_stats = { |
| "repository_info": { |
| "name": actual_repo_name, |
| "folder_name": output_name, |
| "url": repo_url, |
| "path": str(repo_path), |
| "timestamp": timestamp, |
| }, |
| "processing_stats": processing_stats, |
| "chunk_statistics": chunk_stats, |
| "performance": { |
| "total_time_seconds": round(total_time, 2), |
| "files_per_second": round(len(file_infos) / total_time, 2), |
| "chunks_per_second": round(len(all_chunks) / total_time, 2), |
| }, |
| "output_files": { |
| "chunks": str(output_file), |
| "metadata": str(metadata_file), |
| }, |
| } |
|
|
| stats_file = output_dir / f"{output_name}_stats.json" |
| with open(stats_file, "w", encoding="utf-8") as f: |
| json.dump(final_stats, f, indent=2) |
|
|
| |
| print("\n" + "=" * 70) |
| print("β
REPOSITORY PROCESSING COMPLETE") |
| print("=" * 70) |
| print(f"π Repository: {output_name}") |
| print(f"π Files: {len(file_infos)}") |
| print(f"π§© Chunks: {len(all_chunks)}") |
| print(f"β±οΈ Time: {final_stats['performance']['total_time_seconds']}s") |
| print(f"πΎ Output: {output_dir}") |
| print("=" * 70) |
|
|
| return final_stats |
|
|
|
|
| def main(): |
| """Enhanced CLI for repository pipeline (FIXED)""" |
|
|
| parser = argparse.ArgumentParser( |
| description="Process Git repositories for agentic datasets" |
| ) |
|
|
| subparsers = parser.add_subparsers(dest="command", required=True) |
|
|
| |
| single = subparsers.add_parser("single", help="Process single repository") |
| single.add_argument("repo_url", help="Git repository URL") |
| single.add_argument("--name", help="Custom output name") |
| single.add_argument( |
| "--extensions", |
| nargs="+", |
| default=None, |
| help="Optional file extensions (.py .md). If omitted, FULL repo is processed.", |
| ) |
| single.add_argument("--max-files", type=int, help="Limit number of files") |
| single.add_argument("--skip-git-metadata", action="store_true") |
| single.add_argument("--include-binary", action="store_true") |
|
|
| args = parser.parse_args() |
| pipeline = EnhancedRepoPipeline() |
|
|
| if args.command == "single": |
| pipeline.process_repository( |
| repo_url=args.repo_url, |
| output_name=args.name, |
| extensions=set(args.extensions) if args.extensions else None, |
| max_files=args.max_files, |
| skip_git_metadata=args.skip_git_metadata, |
| include_binary=args.include_binary, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |