Spaces:
Sleeping
Sleeping
| """ | |
| Git Repository Pipeline Runner - Processes Git repositories at scale. | |
| This is the main entry point for processing GIT REPOSITORIES. It provides | |
| enhanced features for repository analysis, including git metadata extraction, | |
| agentic framework detection, and comprehensive statistics generation. | |
| ARCHITECTURE POSITION: | |
| - Repository Pipeline Orchestrator: Coordinates Git repo processing | |
| - Enhanced Metadata Collector: Extracts git history and agentic patterns | |
| - Production Pipeline: Handles large repositories with performance tracking | |
| KEY FEATURES: | |
| 1. Complete repository processing with git metadata | |
| 2. Extension-aware filtering (None = full repository) | |
| 3. Performance tracking (files/sec, chunks/sec) | |
| 4. Agentic framework detection (via RepoMetadataExtractor) | |
| 5. Comprehensive output (JSONL chunks + metadata + statistics) | |
| DATA FLOW: | |
| Repo URL → Clone → Metadata extraction → File listing → Chunking → | |
| Enhanced export → Statistics → Comprehensive output package | |
| USE CASES: | |
| - Processing complete Git repositories for training data | |
| - Creating agentic-aware datasets | |
| - Benchmarking chunking performance | |
| - Production dataset generation | |
| USAGE: | |
| python run_repo_pipeline.py single https://github.com/crewAIInc/crewAI | |
| python run_repo_pipeline.py single https://github.com/autogen/autogen --extensions .py .md | |
| python run_repo_pipeline.py single https://github.com/langchain --max-files 1000 | |
| """ | |
| from pathlib import Path | |
| import json | |
| from typing import Dict, Any, Optional, Set, List | |
| import argparse | |
| import time | |
| from datetime import datetime | |
| # Import enhanced components | |
| from src.task_3_data_engineering.ingestion.git_crawler import GitCrawler | |
| from src.task_3_data_engineering.ingestion.repo_metadata import RepoMetadataExtractor | |
| from src.task_3_data_engineering.chunking.repo_chunker import RepoChunker | |
| from src.task_3_data_engineering.analysis.dataset_stats import compute_dataset_stats | |
| from src.task_3_data_engineering.export.enhanced_jsonl_exporter import export_repo_chunks_jsonl | |
| class EnhancedRepoPipeline: | |
| """Enhanced pipeline with agentic focus""" | |
| def __init__( | |
| self, | |
| output_base: Path = Path("data/processed/repos"), | |
| use_hierarchical: bool = True, | |
| ): | |
| self.crawler = GitCrawler() | |
| self.chunker = RepoChunker(use_hierarchical=use_hierarchical) | |
| self.output_base = output_base | |
| self.output_base.mkdir(parents=True, exist_ok=True) | |
| def process_repository( | |
| self, | |
| repo_url: str, | |
| extensions: Optional[Set[str]] = None, | |
| output_name: Optional[str] = None, | |
| include_binary: bool = False, | |
| max_files: Optional[int] = None, | |
| skip_git_metadata: bool = False, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Process repository with enhanced features | |
| IMPORTANT FIX: | |
| - extensions=None => FULL repository (no filtering) | |
| - extensions=set() => filtered repository | |
| """ | |
| start_time = time.time() | |
| print(f"🚀 Processing repository: {repo_url}") | |
| print("-" * 60) | |
| # 1. Clone repository | |
| repo_path = self.crawler.clone_repository(repo_url) | |
| if not repo_path: | |
| raise RuntimeError(f"Failed to clone {repo_url}") | |
| # 2. Determine output name | |
| if not output_name: | |
| output_name = repo_path.name | |
| # 3. Log extension behavior (FIXED) | |
| if extensions: | |
| print(f"📁 Extension filter enabled: {sorted(extensions)}") | |
| else: | |
| print("📁 No extension filter → processing FULL repository") | |
| # 4. Extract repository metadata | |
| print("📊 Extracting repository metadata...") | |
| metadata = {} | |
| if not skip_git_metadata: | |
| extractor = RepoMetadataExtractor(repo_path) | |
| metadata = extractor.extract_comprehensive_metadata() | |
| # 5. List files (CORE LOGIC UNCHANGED) | |
| print("📁 Listing repository files...") | |
| file_infos, file_stats = self.crawler.list_files_with_info( | |
| repo_path, | |
| extensions=extensions, # None => full repo | |
| skip_binary=not include_binary, | |
| ) | |
| # 6. Optional file limiting | |
| if max_files and len(file_infos) > max_files: | |
| print(f"⚠️ Limiting to {max_files} files (out of {len(file_infos)})") | |
| file_infos = file_infos[:max_files] | |
| print(f"📊 Found {len(file_infos)} files to process") | |
| # 7. Create output directory | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_dir = self.output_base / f"{output_name}_{timestamp}" | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # 8. Repository-level metadata | |
| # Get actual repo name from metadata | |
| actual_repo_name = metadata.get("basic", {}).get("repo_name", output_name) | |
| repo_metadata = { | |
| "repo_url": repo_url, | |
| "repo_name": actual_repo_name, # ✅ Use actual repo name | |
| "folder_name": output_name, # ✅ Track user's folder | |
| "local_path": str(repo_path), | |
| "extensions_included": list(extensions) if extensions else "ALL", | |
| "timestamp": timestamp, | |
| **metadata, | |
| } | |
| metadata_file = output_dir / "repository_metadata.json" | |
| with open(metadata_file, "w", encoding="utf-8") as f: | |
| json.dump(repo_metadata, f, indent=2, default=str) | |
| # 9. Chunk processing | |
| all_chunks = [] | |
| processing_stats = { | |
| "total_files": len(file_infos), | |
| "processed": 0, | |
| "failed": 0, | |
| "file_types": {}, | |
| "chunk_types": {}, | |
| } | |
| print("\n🔧 Processing files...") | |
| print("-" * 60) | |
| for idx, file_info in enumerate(file_infos, start=1): | |
| try: | |
| if idx % 10 == 0: | |
| print(f" [{idx}/{len(file_infos)}] Processing...") | |
| file_metadata = { | |
| **repo_metadata, | |
| "file_info": { | |
| "relative_path": file_info.relative_path, | |
| "size_bytes": file_info.size, | |
| "extension": file_info.extension, | |
| "is_binary": file_info.is_binary, | |
| }, | |
| } | |
| chunks = self.chunker.chunk_file( | |
| file_info.path, | |
| file_metadata, | |
| ) | |
| all_chunks.extend(chunks) | |
| processing_stats["processed"] += 1 | |
| processing_stats["file_types"][file_info.extension] = ( | |
| processing_stats["file_types"].get(file_info.extension, 0) + 1 | |
| ) | |
| for chunk in chunks: | |
| ct = chunk.chunk_type | |
| processing_stats["chunk_types"][ct] = ( | |
| processing_stats["chunk_types"].get(ct, 0) + 1 | |
| ) | |
| except Exception as e: | |
| print(f"⚠️ Error processing {file_info.relative_path}: {str(e)[:120]}") | |
| processing_stats["failed"] += 1 | |
| # 10. Export chunks | |
| print("\n💾 Exporting chunks...") | |
| output_file = output_dir / f"{output_name}_chunks.jsonl" | |
| export_repo_chunks_jsonl( | |
| chunks=all_chunks, | |
| output_path=output_file, | |
| repo_metadata=repo_metadata, | |
| print_stats=True, | |
| ) | |
| # 11. Compute statistics | |
| print("📈 Computing statistics...") | |
| chunk_stats = compute_dataset_stats(all_chunks) | |
| total_time = time.time() - start_time | |
| final_stats = { | |
| "repository_info": { | |
| "name": actual_repo_name, # ✅ USE actual_repo_name | |
| "folder_name": output_name, # ✅ ADD folder_name field | |
| "url": repo_url, | |
| "path": str(repo_path), | |
| "timestamp": timestamp, | |
| }, | |
| "processing_stats": processing_stats, | |
| "chunk_statistics": chunk_stats, | |
| "performance": { | |
| "total_time_seconds": round(total_time, 2), | |
| "files_per_second": round(len(file_infos) / total_time, 2), | |
| "chunks_per_second": round(len(all_chunks) / total_time, 2), | |
| }, | |
| "output_files": { | |
| "chunks": str(output_file), | |
| "metadata": str(metadata_file), | |
| }, | |
| } | |
| stats_file = output_dir / f"{output_name}_stats.json" | |
| with open(stats_file, "w", encoding="utf-8") as f: | |
| json.dump(final_stats, f, indent=2) | |
| # 12. Summary | |
| print("\n" + "=" * 70) | |
| print("✅ REPOSITORY PROCESSING COMPLETE") | |
| print("=" * 70) | |
| print(f"📁 Repository: {output_name}") | |
| print(f"📄 Files: {len(file_infos)}") | |
| print(f"🧩 Chunks: {len(all_chunks)}") | |
| print(f"⏱️ Time: {final_stats['performance']['total_time_seconds']}s") | |
| print(f"💾 Output: {output_dir}") | |
| print("=" * 70) | |
| return final_stats | |
| def main(): | |
| """Enhanced CLI for repository pipeline (FIXED)""" | |
| parser = argparse.ArgumentParser( | |
| description="Process Git repositories for agentic datasets" | |
| ) | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| # ---- Single repo ---- | |
| single = subparsers.add_parser("single", help="Process single repository") | |
| single.add_argument("repo_url", help="Git repository URL") | |
| single.add_argument("--name", help="Custom output name") | |
| single.add_argument( | |
| "--extensions", | |
| nargs="+", | |
| default=None, | |
| help="Optional file extensions (.py .md). If omitted, FULL repo is processed.", | |
| ) | |
| single.add_argument("--max-files", type=int, help="Limit number of files") | |
| single.add_argument("--skip-git-metadata", action="store_true") | |
| single.add_argument("--include-binary", action="store_true") | |
| args = parser.parse_args() | |
| pipeline = EnhancedRepoPipeline() | |
| if args.command == "single": | |
| pipeline.process_repository( | |
| repo_url=args.repo_url, | |
| output_name=args.name, | |
| extensions=set(args.extensions) if args.extensions else None, | |
| max_files=args.max_files, | |
| skip_git_metadata=args.skip_git_metadata, | |
| include_binary=args.include_binary, | |
| ) | |
| if __name__ == "__main__": | |
| main() |