""" Core orchestration module for GetGit RAG + LLM Pipeline. This module serves as the unified entry point for GetGit, coordinating repository cloning, RAG-based analysis, and LLM-powered question answering. It provides a simple API for end-to-end repository intelligence gathering. """ import os import logging from typing import Optional, List, Dict, Any from pathlib import Path from clone_repo import clone_repo from repo_manager import RepositoryManager from rag import ( RepositoryChunker, SimpleEmbedding, SentenceTransformerEmbedding, Retriever, RAGConfig, generate_response, ) from checkpoints import ( load_checkpoints, evaluate_checkpoint, run_checkpoints, format_results_summary, CheckpointResult ) # Configure logging def setup_logging(level: str = "INFO") -> logging.Logger: """ Configure logging for the core module. Args: level: Logging level (DEBUG, INFO, WARNING, ERROR) Returns: Configured logger instance """ log_level = getattr(logging, level.upper(), logging.INFO) logging.basicConfig( level=log_level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger('getgit.core') logger.setLevel(log_level) # Explicitly set logger level return logger # Initialize module logger logger = setup_logging() def initialize_repository(repo_url: str, local_path: str = "source_repo") -> str: """ Clone or load the repository and prepare it for analysis. This function now includes repository persistence and validation: - Checks if the repository URL has changed - Cleans up old data if a new repository is provided - Stores the current repository URL for future validation Args: repo_url: GitHub repository URL to clone local_path: Local path where repository will be stored Returns: Path to the cloned/loaded repository Raises: Exception: If repository cloning or loading fails """ logger.info(f"Initializing repository from {repo_url}") try: # Initialize repository manager repo_manager = RepositoryManager( data_dir="data", repo_dir=local_path, cache_dir=".rag_cache" ) # Check if we need to reset (different repository URL) reset_performed = repo_manager.prepare_for_new_repo(repo_url) if reset_performed: logger.info("Repository reset performed, will clone fresh copy") # Clone or reuse existing repository if os.path.exists(local_path): logger.info(f"Repository already exists at {local_path}, using existing copy") logger.debug(f"Skipping clone for existing repository at {local_path}") else: logger.info(f"Cloning repository to {local_path}") clone_repo(repo_url, local_path) logger.info(f"Repository successfully cloned to {local_path}") # Verify repository exists and is accessible if not os.path.isdir(local_path): raise ValueError(f"Repository path {local_path} is not a valid directory") logger.debug(f"Repository initialized at {local_path}") return local_path except Exception as e: logger.error(f"Failed to initialize repository: {str(e)}") raise def setup_rag( repo_path: str, repository_name: Optional[str] = None, config: Optional[RAGConfig] = None, use_sentence_transformer: bool = False ) -> Retriever: """ Initialize chunker, embeddings, and retriever for RAG pipeline. Args: repo_path: Path to the repository to analyze repository_name: Optional name for the repository config: Optional RAG configuration (uses default if not provided) use_sentence_transformer: Whether to use SentenceTransformer embeddings Returns: Configured Retriever instance with indexed repository chunks Raises: Exception: If RAG initialization or indexing fails """ logger.info(f"Setting up RAG pipeline for repository at {repo_path}") try: # Use default config if not provided if config is None: config = RAGConfig.default() logger.debug("Using default RAG configuration") # Determine repository name if repository_name is None: repository_name = os.path.basename(repo_path) logger.debug(f"Repository name: {repository_name}") # Step 1: Chunk the repository logger.info("Chunking repository content...") chunker = RepositoryChunker(repo_path, repository_name=repository_name) chunks = chunker.chunk_repository(config.chunking.file_patterns) logger.info(f"Created {len(chunks)} chunks from repository") if not chunks: logger.warning("No chunks created - repository may be empty or contain no supported file types") raise ValueError( "No chunks created from repository. Ensure the repository contains " f"files matching patterns: {config.chunking.file_patterns}" ) # Step 2: Initialize embedding model logger.info("Initializing embedding model...") if use_sentence_transformer: try: embedding_model = SentenceTransformerEmbedding(config.embedding.model_name) logger.info(f"Using SentenceTransformer model: {config.embedding.model_name}") except ImportError: logger.warning("sentence-transformers not available, falling back to SimpleEmbedding") embedding_model = SimpleEmbedding(max_features=config.embedding.embedding_dim) else: embedding_model = SimpleEmbedding(max_features=config.embedding.embedding_dim) logger.info("Using SimpleEmbedding (TF-IDF based)") # Step 3: Create retriever and index chunks logger.info("Creating retriever and indexing chunks...") retriever = Retriever(embedding_model) retriever.index_chunks(chunks, batch_size=config.embedding.batch_size) logger.info(f"Successfully indexed {len(retriever)} chunks") logger.debug("RAG pipeline setup complete") return retriever except Exception as e: logger.error(f"Failed to setup RAG pipeline: {str(e)}") raise def answer_query( query: str, retriever: Retriever, top_k: int = 5, use_llm: bool = True, api_key: Optional[str] = None, model_name: str = "gemini-2.5-flash" ) -> Dict[str, Any]: """ Retrieve relevant context and generate an LLM response for the query. Args: query: Natural language question about the repository retriever: Configured Retriever instance top_k: Number of relevant chunks to retrieve use_llm: Whether to generate LLM response (requires API key) api_key: Optional API key for LLM (reads from env if not provided) model_name: Name of the LLM model to use Returns: Dictionary containing: - query: The original query - retrieved_chunks: List of retrieved chunk information - context: Combined context from retrieved chunks - response: Generated LLM response (if use_llm=True) - error: Error message if LLM generation fails Raises: Exception: If query processing fails """ logger.info(f"Processing query: '{query}'") try: # Step 1: Retrieve relevant chunks logger.info(f"Retrieving top {top_k} relevant chunks...") results = retriever.retrieve(query, top_k=top_k) logger.info(f"Retrieved {len(results)} relevant chunks") if not results: logger.warning("No relevant chunks found for query") return { 'query': query, 'retrieved_chunks': [], 'context': '', 'response': 'No relevant information found in the repository for this query.', 'error': None } # Log retrieved chunks for result in results: logger.debug( f"Chunk {result.rank}: {result.chunk.file_path} " f"(score: {result.score:.4f}, type: {result.chunk.chunk_type.value})" ) # Step 2: Extract context context_chunks = [result.chunk.content for result in results] retrieved_info = [ { 'rank': result.rank, 'file_path': result.chunk.file_path, 'chunk_type': result.chunk.chunk_type.value, 'score': result.score, 'start_line': result.chunk.start_line, 'end_line': result.chunk.end_line, 'metadata': result.chunk.metadata } for result in results ] # Step 3: Generate LLM response if requested response_text = None error = None if use_llm: logger.info("Generating LLM response...") try: response_text = generate_response( query, context_chunks, model_name=model_name, api_key=api_key ) logger.info("LLM response generated successfully") logger.debug(f"Response length: {len(response_text)} characters") except Exception as e: error = str(e) logger.error(f"Failed to generate LLM response: {error}") response_text = None else: logger.debug("LLM response generation skipped (use_llm=False)") return { 'query': query, 'retrieved_chunks': retrieved_info, 'context': '\n\n---\n\n'.join(context_chunks), 'response': response_text, 'error': error } except Exception as e: logger.error(f"Failed to process query: {str(e)}") raise def validate_checkpoints( repo_url: str, checkpoints_file: str = "checkpoints.txt", local_path: str = "source_repo", use_llm: bool = True, log_level: str = "INFO", config: Optional[RAGConfig] = None, stop_on_failure: bool = False ) -> Dict[str, Any]: """ Validate repository against checkpoints defined in a text file. This function orchestrates the checkpoint validation pipeline: 1. Repository cloning/loading 2. RAG initialization and indexing 3. Loading checkpoints from file 4. Sequential checkpoint evaluation 5. Results aggregation and reporting Args: repo_url: GitHub repository URL checkpoints_file: Path to checkpoints text file local_path: Local path for repository storage use_llm: Whether to use LLM for checkpoint evaluation log_level: Logging level (DEBUG, INFO, WARNING, ERROR) config: Optional RAG configuration stop_on_failure: Stop processing on first checkpoint failure Returns: Dictionary containing: - checkpoints: List of checkpoint strings - results: List of CheckpointResult objects - summary: Formatted summary string - passed_count: Number of passed checkpoints - total_count: Total number of checkpoints - pass_rate: Percentage of passed checkpoints Raises: FileNotFoundError: If checkpoints file doesn't exist Exception: If any step of the pipeline fails Example: >>> result = validate_checkpoints( ... repo_url="https://github.com/user/repo.git", ... checkpoints_file="checkpoints.txt", ... use_llm=True ... ) >>> print(result['summary']) """ # Setup logging global logger logger = setup_logging(log_level) logger.info("="*70) logger.info("GetGit Checkpoint Validation Pipeline Starting") logger.info("="*70) logger.info(f"Repository: {repo_url}") logger.info(f"Checkpoints File: {checkpoints_file}") logger.info(f"LLM Enabled: {use_llm}") logger.info("="*70) try: # Step 1: Initialize repository logger.info("\n[1/4] Initializing repository...") repo_path = initialize_repository(repo_url, local_path) logger.info(f"✓ Repository ready at {repo_path}") # Step 2: Setup RAG pipeline logger.info("\n[2/4] Setting up RAG pipeline...") retriever = setup_rag(repo_path, config=config) logger.info(f"✓ RAG pipeline ready with {len(retriever)} indexed chunks") # Step 3: Load checkpoints logger.info("\n[3/4] Loading checkpoints...") checkpoints = load_checkpoints(checkpoints_file) logger.info(f"✓ Loaded {len(checkpoints)} checkpoints") # Step 4: Run checkpoints logger.info("\n[4/4] Running checkpoint validation...") results = run_checkpoints( checkpoints=checkpoints, repo_path=repo_path, retriever=retriever, use_llm=use_llm, stop_on_failure=stop_on_failure ) logger.info("✓ Checkpoint validation completed") # Generate summary summary = format_results_summary(results) # Calculate statistics passed_count = sum(1 for r in results if r.passed) total_count = len(results) pass_rate = (passed_count / total_count * 100) if total_count > 0 else 0 logger.info("\n" + "="*70) logger.info("GetGit Checkpoint Validation Pipeline Completed") logger.info(f"Results: {passed_count}/{total_count} passed ({pass_rate:.1f}%)") logger.info("="*70) return { 'checkpoints': checkpoints, 'results': results, 'summary': summary, 'passed_count': passed_count, 'total_count': total_count, 'pass_rate': pass_rate } except Exception as e: logger.error("\n" + "="*70) logger.error("GetGit Checkpoint Validation Pipeline Failed") logger.error(f"Error: {str(e)}") logger.error("="*70) raise def main( repo_url: str, query: str, local_path: str = "source_repo", use_llm: bool = True, top_k: int = 5, log_level: str = "INFO", config: Optional[RAGConfig] = None ) -> Dict[str, Any]: """ Orchestrates the full GetGit pipeline from repository input to answer generation. This is the main entry point that coordinates: 1. Repository cloning/loading 2. RAG initialization and indexing 3. Query processing and context retrieval 4. LLM response generation Args: repo_url: GitHub repository URL query: Natural language question about the repository local_path: Local path for repository storage use_llm: Whether to generate LLM responses top_k: Number of relevant chunks to retrieve log_level: Logging level (DEBUG, INFO, WARNING, ERROR) config: Optional RAG configuration Returns: Dictionary containing query results and response Raises: Exception: If any step of the pipeline fails Example: >>> result = main( ... repo_url="https://github.com/user/repo.git", ... query="How do I install this project?", ... use_llm=True ... ) >>> print(result['response']) """ # Setup logging global logger logger = setup_logging(log_level) logger.info("="*70) logger.info("GetGit Core Pipeline Starting") logger.info("="*70) logger.info(f"Repository: {repo_url}") logger.info(f"Query: {query}") logger.info(f"LLM Enabled: {use_llm}") logger.info("="*70) try: # Step 1: Initialize repository logger.info("\n[1/3] Initializing repository...") repo_path = initialize_repository(repo_url, local_path) logger.info(f"✓ Repository ready at {repo_path}") # Step 2: Setup RAG pipeline logger.info("\n[2/3] Setting up RAG pipeline...") retriever = setup_rag(repo_path, config=config) logger.info(f"✓ RAG pipeline ready with {len(retriever)} indexed chunks") # Step 3: Process query logger.info("\n[3/3] Processing query...") result = answer_query( query=query, retriever=retriever, top_k=top_k, use_llm=use_llm ) logger.info("✓ Query processed successfully") logger.info("\n" + "="*70) logger.info("GetGit Core Pipeline Completed Successfully") logger.info("="*70) return result except Exception as e: logger.error("\n" + "="*70) logger.error("GetGit Core Pipeline Failed") logger.error(f"Error: {str(e)}") logger.error("="*70) raise if __name__ == "__main__": """ Example usage of the core module. This demonstrates a simple interactive session with GetGit. For CLI integration, consider using argparse or similar. """ import sys # Example: Simple command-line usage if len(sys.argv) > 1: # If arguments provided, use them repo_url = sys.argv[1] if len(sys.argv) > 1 else "https://github.com/samarthnaikk/getgit.git" query = sys.argv[2] if len(sys.argv) > 2 else "What is this project about?" else: # Default example repo_url = "https://github.com/samarthnaikk/getgit.git" query = "What is this project about?" print("\nGetGit - Repository Intelligence System") print("="*70) print(f"Repository: {repo_url}") print(f"Query: {query}") print("="*70 + "\n") try: # Run the pipeline result = main( repo_url=repo_url, query=query, use_llm=True, log_level="INFO" ) # Display results print("\n" + "="*70) print("RESULTS") print("="*70) print(f"\nQuery: {result['query']}") print(f"\nRetrieved {len(result['retrieved_chunks'])} relevant chunks:") for chunk_info in result['retrieved_chunks'][:3]: # Show top 3 print(f" - {chunk_info['file_path']} (score: {chunk_info['score']:.4f})") if result['response']: print("\n" + "-"*70) print("ANSWER:") print("-"*70) print(result['response']) elif result['error']: print("\n" + "-"*70) print("ERROR:") print("-"*70) print(f"Failed to generate LLM response: {result['error']}") print("\nShowing retrieved context instead:") print("-"*70) # Show snippet of context context_preview = result['context'][:500] if len(result['context']) > 500: context_preview += "..." print(context_preview) print("\n" + "="*70) except Exception as e: print(f"\n✗ Error: {str(e)}", file=sys.stderr) sys.exit(1)