import gradio as gr import os import tempfile import shutil from typing import List, Dict, Optional import asyncio from datetime import datetime import hashlib from urllib.parse import urlparse # Import our custom modules from services.github_service import GitHubService from services.embedding_service import FreeEmbeddingService from services.chat_service import FreeChatService from utils.file_processor import FileProcessor from config import settings # Initialize services github_service = GitHubService(settings.github_token) embedding_service = FreeEmbeddingService( embedding_provider=settings.embedding_provider.value, vector_db_path=settings.vector_db_path, model_name=settings.sentence_transformer_model ) # Initialize chat service for Groq chat_service = FreeChatService( llm_provider=settings.llm_provider.value, api_key=settings.groq_api_key, model=settings.groq_model ) file_processor = FileProcessor(settings.supported_extensions, settings.max_file_size) # Global state management repo_data = {} def validate_github_url(url: str) -> bool: """Validate if the URL is a valid GitHub repository URL""" try: parsed = urlparse(url) if parsed.netloc != "github.com": return False path_parts = parsed.path.strip('/').split('/') return len(path_parts) >= 2 except: return False async def process_repository(repo_url: str, branch: str = "main") -> tuple: """Process a GitHub repository and return status""" if not validate_github_url(repo_url): yield "❌ Error", "Invalid GitHub URL. Please provide a valid GitHub repository URL." return try: repo_id = github_service.generate_repo_id(repo_url) # Check if already processed if repo_id in repo_data: yield "✅ Ready", f"Repository already processed! You can now ask questions about the code." return # Clone repository yield "🔄 Processing", "Cloning repository..." repo_path = await github_service.clone_repository(repo_url, branch) # Extract files yield "🔄 Processing", "Extracting and processing files..." files = list(file_processor.extract_files(repo_path)) if not files: github_service.cleanup_repo(repo_path) yield "❌ Error", "No supported files found in the repository." return # Create embeddings yield "🔄 Processing", f"Creating embeddings for {len(files)} files (this may take a while)..." vectorstore = await embedding_service.create_embeddings(files, repo_id) # Store in global state repo_data[repo_id] = { 'vectorstore': vectorstore, 'files_count': len(files), 'processed_at': datetime.now(), 'repo_url': repo_url } # Cleanup github_service.cleanup_repo(repo_path) yield "✅ Ready", f"Repository processed successfully! Found {len(files)} files. You can now ask questions about the code." except Exception as e: yield "❌ Error", f"Error processing repository: {str(e)}" return def process_repo_sync(repo_url: str, branch: str = "main"): """Synchronous wrapper for repository processing that streams updates to Gradio.""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: agen = process_repository(repo_url, branch) while True: try: status, message = loop.run_until_complete(agen.__anext__()) yield status, message except StopAsyncIteration: break except Exception as e: yield "❌ Error", f"Error: {str(e)}" finally: loop.close() async def chat_with_repository(message: str, repo_url: str, history: List) -> tuple: """Chat with the processed repository""" if not repo_url: return history + [("Please process a repository first.", "")], "" if not message.strip(): return history, "" try: repo_id = github_service.generate_repo_id(repo_url) if repo_id not in repo_data: return history + [(message, "❌ Please process the repository first before asking questions.")], "" # Get vectorstore vectorstore = repo_data[repo_id]['vectorstore'] # Get answer result = await chat_service.answer_question(message, vectorstore, repo_id) # Format response with sources response = result['response'] if result['sources']: response += "\n\n**Sources:**\n" for i, source in enumerate(result['sources'][:3], 1): response += f"{i}. `{source['path']}`\n" return history + [(message, response)], "" except Exception as e: return history + [(message, f"❌ Error: {str(e)}")], "" def chat_sync(message: str, repo_url: str, history: List): """Synchronous wrapper for chat function""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) result = loop.run_until_complete(chat_with_repository(message, repo_url, history)) loop.close() return result except Exception as e: return history + [(message, f"❌ Error: {str(e)}")], "" def get_example_questions(): """Get example questions users can ask""" return [ "What is this project about?", "How is the code structured?", "What are the main functions/classes?", "How does authentication work?", "What dependencies does this project use?", "Are there any tests in this codebase?", "How is error handling implemented?", "What are the main API endpoints?" ] # Create Gradio interface with gr.Blocks(title="Chat with GitHub Repository", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🤖 Chat with GitHub Repository Analyze any GitHub repository and ask questions about the codebase using AI! **How it works:** 1. Enter a GitHub repository URL 2. Wait for the repository to be processed 3. Ask questions about the code in natural language """) with gr.Row(): with gr.Column(scale=2): repo_url = gr.Textbox( label="GitHub Repository URL", placeholder="https://github.com/username/repository", info="Enter the URL of a public GitHub repository" ) branch = gr.Textbox( label="Branch (optional)", value="main", placeholder="main" ) process_btn = gr.Button("🔄 Process Repository", variant="primary") with gr.Column(scale=1): status = gr.Textbox( label="Status", value="⏳ Waiting", interactive=False ) status_msg = gr.Textbox( label="Details", value="Enter a repository URL and click 'Process Repository'", interactive=False, lines=3 ) gr.Markdown("---") with gr.Row(): with gr.Column(): chatbot = gr.Chatbot( label="Chat with Repository", height=400, placeholder="Process a repository first, then ask questions about the code!" ) with gr.Row(): msg = gr.Textbox( label="Your Question", placeholder="Ask anything about the codebase...", scale=4 ) send_btn = gr.Button("Send", variant="primary", scale=1) gr.Examples( examples=get_example_questions(), inputs=msg, label="Example Questions" ) gr.Markdown(""" --- **Supported file types:** Python, JavaScript, TypeScript, Java, C++, Go, Rust, PHP, Ruby, Swift, Kotlin, Scala, Markdown, JSON, YAML, and more. **Note:** This app uses Groq's API for fast inference. Processing large repositories may take some time. """) # Event handlers process_btn.click( fn=process_repo_sync, inputs=[repo_url, branch], outputs=[status, status_msg] ) def handle_chat(message, repo_url_val, history): return chat_sync(message, repo_url_val, history) send_btn.click( fn=handle_chat, inputs=[msg, repo_url, chatbot], outputs=[chatbot, msg] ) msg.submit( fn=handle_chat, inputs=[msg, repo_url, chatbot], outputs=[chatbot, msg] ) if __name__ == "__main__": demo.launch()