""" Git Repository Pipeline Runner - Processes Git repositories at scale. This is the main entry point for processing GIT REPOSITORIES. It provides enhanced features for repository analysis, including git metadata extraction, agentic framework detection, and comprehensive statistics generation. ARCHITECTURE POSITION: - Repository Pipeline Orchestrator: Coordinates Git repo processing - Enhanced Metadata Collector: Extracts git history and agentic patterns - Production Pipeline: Handles large repositories with performance tracking KEY FEATURES: 1. Complete repository processing with git metadata 2. Extension-aware filtering (None = full repository) 3. Performance tracking (files/sec, chunks/sec) 4. Agentic framework detection (via RepoMetadataExtractor) 5. Comprehensive output (JSONL chunks + metadata + statistics) DATA FLOW: Repo URL → Clone → Metadata extraction → File listing → Chunking → Enhanced export → Statistics → Comprehensive output package USE CASES: - Processing complete Git repositories for training data - Creating agentic-aware datasets - Benchmarking chunking performance - Production dataset generation USAGE: python run_repo_pipeline.py single https://github.com/crewAIInc/crewAI python run_repo_pipeline.py single https://github.com/autogen/autogen --extensions .py .md python run_repo_pipeline.py single https://github.com/langchain --max-files 1000 """ from pathlib import Path import json from typing import Dict, Any, Optional, Set, List import argparse import time from datetime import datetime # Import enhanced components from src.task_3_data_engineering.ingestion.git_crawler import GitCrawler from src.task_3_data_engineering.ingestion.repo_metadata import RepoMetadataExtractor from src.task_3_data_engineering.chunking.repo_chunker import RepoChunker from src.task_3_data_engineering.analysis.dataset_stats import compute_dataset_stats from src.task_3_data_engineering.export.enhanced_jsonl_exporter import export_repo_chunks_jsonl class EnhancedRepoPipeline: """Enhanced pipeline with agentic focus""" def __init__( self, output_base: Path = Path("data/processed/repos"), use_hierarchical: bool = True, ): self.crawler = GitCrawler() self.chunker = RepoChunker(use_hierarchical=use_hierarchical) self.output_base = output_base self.output_base.mkdir(parents=True, exist_ok=True) def process_repository( self, repo_url: str, extensions: Optional[Set[str]] = None, output_name: Optional[str] = None, include_binary: bool = False, max_files: Optional[int] = None, skip_git_metadata: bool = False, ) -> Dict[str, Any]: """ Process repository with enhanced features IMPORTANT FIX: - extensions=None => FULL repository (no filtering) - extensions=set() => filtered repository """ start_time = time.time() print(f"🚀 Processing repository: {repo_url}") print("-" * 60) # 1. Clone repository repo_path = self.crawler.clone_repository(repo_url) if not repo_path: raise RuntimeError(f"Failed to clone {repo_url}") # 2. Determine output name if not output_name: output_name = repo_path.name # 3. Log extension behavior (FIXED) if extensions: print(f"📁 Extension filter enabled: {sorted(extensions)}") else: print("📁 No extension filter → processing FULL repository") # 4. Extract repository metadata print("📊 Extracting repository metadata...") metadata = {} if not skip_git_metadata: extractor = RepoMetadataExtractor(repo_path) metadata = extractor.extract_comprehensive_metadata() # 5. List files (CORE LOGIC UNCHANGED) print("📁 Listing repository files...") file_infos, file_stats = self.crawler.list_files_with_info( repo_path, extensions=extensions, # None => full repo skip_binary=not include_binary, ) # 6. Optional file limiting if max_files and len(file_infos) > max_files: print(f"⚠️ Limiting to {max_files} files (out of {len(file_infos)})") file_infos = file_infos[:max_files] print(f"📊 Found {len(file_infos)} files to process") # 7. Create output directory timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_dir = self.output_base / f"{output_name}_{timestamp}" output_dir.mkdir(parents=True, exist_ok=True) # 8. Repository-level metadata # Get actual repo name from metadata actual_repo_name = metadata.get("basic", {}).get("repo_name", output_name) repo_metadata = { "repo_url": repo_url, "repo_name": actual_repo_name, # ✅ Use actual repo name "folder_name": output_name, # ✅ Track user's folder "local_path": str(repo_path), "extensions_included": list(extensions) if extensions else "ALL", "timestamp": timestamp, **metadata, } metadata_file = output_dir / "repository_metadata.json" with open(metadata_file, "w", encoding="utf-8") as f: json.dump(repo_metadata, f, indent=2, default=str) # 9. Chunk processing all_chunks = [] processing_stats = { "total_files": len(file_infos), "processed": 0, "failed": 0, "file_types": {}, "chunk_types": {}, } print("\n🔧 Processing files...") print("-" * 60) for idx, file_info in enumerate(file_infos, start=1): try: if idx % 10 == 0: print(f" [{idx}/{len(file_infos)}] Processing...") file_metadata = { **repo_metadata, "file_info": { "relative_path": file_info.relative_path, "size_bytes": file_info.size, "extension": file_info.extension, "is_binary": file_info.is_binary, }, } chunks = self.chunker.chunk_file( file_info.path, file_metadata, ) all_chunks.extend(chunks) processing_stats["processed"] += 1 processing_stats["file_types"][file_info.extension] = ( processing_stats["file_types"].get(file_info.extension, 0) + 1 ) for chunk in chunks: ct = chunk.chunk_type processing_stats["chunk_types"][ct] = ( processing_stats["chunk_types"].get(ct, 0) + 1 ) except Exception as e: print(f"⚠️ Error processing {file_info.relative_path}: {str(e)[:120]}") processing_stats["failed"] += 1 # 10. Export chunks print("\n💾 Exporting chunks...") output_file = output_dir / f"{output_name}_chunks.jsonl" export_repo_chunks_jsonl( chunks=all_chunks, output_path=output_file, repo_metadata=repo_metadata, print_stats=True, ) # 11. Compute statistics print("📈 Computing statistics...") chunk_stats = compute_dataset_stats(all_chunks) total_time = time.time() - start_time final_stats = { "repository_info": { "name": actual_repo_name, # ✅ USE actual_repo_name "folder_name": output_name, # ✅ ADD folder_name field "url": repo_url, "path": str(repo_path), "timestamp": timestamp, }, "processing_stats": processing_stats, "chunk_statistics": chunk_stats, "performance": { "total_time_seconds": round(total_time, 2), "files_per_second": round(len(file_infos) / total_time, 2), "chunks_per_second": round(len(all_chunks) / total_time, 2), }, "output_files": { "chunks": str(output_file), "metadata": str(metadata_file), }, } stats_file = output_dir / f"{output_name}_stats.json" with open(stats_file, "w", encoding="utf-8") as f: json.dump(final_stats, f, indent=2) # 12. Summary print("\n" + "=" * 70) print("✅ REPOSITORY PROCESSING COMPLETE") print("=" * 70) print(f"📁 Repository: {output_name}") print(f"📄 Files: {len(file_infos)}") print(f"🧩 Chunks: {len(all_chunks)}") print(f"⏱️ Time: {final_stats['performance']['total_time_seconds']}s") print(f"💾 Output: {output_dir}") print("=" * 70) return final_stats def main(): """Enhanced CLI for repository pipeline (FIXED)""" parser = argparse.ArgumentParser( description="Process Git repositories for agentic datasets" ) subparsers = parser.add_subparsers(dest="command", required=True) # ---- Single repo ---- single = subparsers.add_parser("single", help="Process single repository") single.add_argument("repo_url", help="Git repository URL") single.add_argument("--name", help="Custom output name") single.add_argument( "--extensions", nargs="+", default=None, help="Optional file extensions (.py .md). If omitted, FULL repo is processed.", ) single.add_argument("--max-files", type=int, help="Limit number of files") single.add_argument("--skip-git-metadata", action="store_true") single.add_argument("--include-binary", action="store_true") args = parser.parse_args() pipeline = EnhancedRepoPipeline() if args.command == "single": pipeline.process_repository( repo_url=args.repo_url, output_name=args.name, extensions=set(args.extensions) if args.extensions else None, max_files=args.max_files, skip_git_metadata=args.skip_git_metadata, include_binary=args.include_binary, ) if __name__ == "__main__": main()