| import gradio as gr |
| import os |
| import tempfile |
| import shutil |
| from typing import List, Dict, Optional |
| import asyncio |
| from datetime import datetime |
| import hashlib |
| from urllib.parse import urlparse |
|
|
| |
| from services.github_service import GitHubService |
| from services.embedding_service import FreeEmbeddingService |
| from services.chat_service import FreeChatService |
| from utils.file_processor import FileProcessor |
| from config import settings |
|
|
| |
| github_service = GitHubService(settings.github_token) |
| embedding_service = FreeEmbeddingService( |
| embedding_provider=settings.embedding_provider.value, |
| vector_db_path=settings.vector_db_path, |
| model_name=settings.sentence_transformer_model |
| ) |
|
|
| |
| chat_service = FreeChatService( |
| llm_provider=settings.llm_provider.value, |
| api_key=settings.groq_api_key, |
| model=settings.groq_model |
| ) |
|
|
| file_processor = FileProcessor(settings.supported_extensions, settings.max_file_size) |
|
|
| |
| repo_data = {} |
|
|
| def validate_github_url(url: str) -> bool: |
| """Validate if the URL is a valid GitHub repository URL""" |
| try: |
| parsed = urlparse(url) |
| if parsed.netloc != "github.com": |
| return False |
| path_parts = parsed.path.strip('/').split('/') |
| return len(path_parts) >= 2 |
| except: |
| return False |
|
|
| async def process_repository(repo_url: str, branch: str = "main") -> tuple: |
| """Process a GitHub repository and return status""" |
| if not validate_github_url(repo_url): |
| yield "❌ Error", "Invalid GitHub URL. Please provide a valid GitHub repository URL." |
| return |
| |
| try: |
| repo_id = github_service.generate_repo_id(repo_url) |
| |
| |
| if repo_id in repo_data: |
| yield "✅ Ready", f"Repository already processed! You can now ask questions about the code." |
| return |
| |
| |
| yield "🔄 Processing", "Cloning repository..." |
| repo_path = await github_service.clone_repository(repo_url, branch) |
| |
| |
| yield "🔄 Processing", "Extracting and processing files..." |
| files = list(file_processor.extract_files(repo_path)) |
| |
| if not files: |
| github_service.cleanup_repo(repo_path) |
| yield "❌ Error", "No supported files found in the repository." |
| return |
| |
| |
| yield "🔄 Processing", f"Creating embeddings for {len(files)} files (this may take a while)..." |
| vectorstore = await embedding_service.create_embeddings(files, repo_id) |
| |
| |
| repo_data[repo_id] = { |
| 'vectorstore': vectorstore, |
| 'files_count': len(files), |
| 'processed_at': datetime.now(), |
| 'repo_url': repo_url |
| } |
| |
| |
| github_service.cleanup_repo(repo_path) |
| |
| yield "✅ Ready", f"Repository processed successfully! Found {len(files)} files. You can now ask questions about the code." |
| |
| except Exception as e: |
| yield "❌ Error", f"Error processing repository: {str(e)}" |
| return |
|
|
| def process_repo_sync(repo_url: str, branch: str = "main"): |
| """Synchronous wrapper for repository processing that streams updates to Gradio.""" |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| agen = process_repository(repo_url, branch) |
| while True: |
| try: |
| status, message = loop.run_until_complete(agen.__anext__()) |
| yield status, message |
| except StopAsyncIteration: |
| break |
| except Exception as e: |
| yield "❌ Error", f"Error: {str(e)}" |
| finally: |
| loop.close() |
|
|
| async def chat_with_repository(message: str, repo_url: str, history: List) -> tuple: |
| """Chat with the processed repository""" |
| if not repo_url: |
| return history + [("Please process a repository first.", "")], "" |
| |
| if not message.strip(): |
| return history, "" |
| |
| try: |
| repo_id = github_service.generate_repo_id(repo_url) |
| |
| if repo_id not in repo_data: |
| return history + [(message, "❌ Please process the repository first before asking questions.")], "" |
| |
| |
| vectorstore = repo_data[repo_id]['vectorstore'] |
| |
| |
| result = await chat_service.answer_question(message, vectorstore, repo_id) |
| |
| |
| response = result['response'] |
| if result['sources']: |
| response += "\n\n**Sources:**\n" |
| for i, source in enumerate(result['sources'][:3], 1): |
| response += f"{i}. `{source['path']}`\n" |
| |
| return history + [(message, response)], "" |
| |
| except Exception as e: |
| return history + [(message, f"❌ Error: {str(e)}")], "" |
|
|
| def chat_sync(message: str, repo_url: str, history: List): |
| """Synchronous wrapper for chat function""" |
| try: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| result = loop.run_until_complete(chat_with_repository(message, repo_url, history)) |
| loop.close() |
| return result |
| except Exception as e: |
| return history + [(message, f"❌ Error: {str(e)}")], "" |
|
|
| def get_example_questions(): |
| """Get example questions users can ask""" |
| return [ |
| "What is this project about?", |
| "How is the code structured?", |
| "What are the main functions/classes?", |
| "How does authentication work?", |
| "What dependencies does this project use?", |
| "Are there any tests in this codebase?", |
| "How is error handling implemented?", |
| "What are the main API endpoints?" |
| ] |
|
|
| |
| with gr.Blocks(title="Chat with GitHub Repository", theme=gr.themes.Soft()) as demo: |
| gr.Markdown(""" |
| # 🤖 Chat with GitHub Repository |
| |
| Analyze any GitHub repository and ask questions about the codebase using AI! |
| |
| **How it works:** |
| 1. Enter a GitHub repository URL |
| 2. Wait for the repository to be processed |
| 3. Ask questions about the code in natural language |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=2): |
| repo_url = gr.Textbox( |
| label="GitHub Repository URL", |
| placeholder="https://github.com/username/repository", |
| info="Enter the URL of a public GitHub repository" |
| ) |
| branch = gr.Textbox( |
| label="Branch (optional)", |
| value="main", |
| placeholder="main" |
| ) |
| process_btn = gr.Button("🔄 Process Repository", variant="primary") |
| |
| with gr.Column(scale=1): |
| status = gr.Textbox( |
| label="Status", |
| value="⏳ Waiting", |
| interactive=False |
| ) |
| status_msg = gr.Textbox( |
| label="Details", |
| value="Enter a repository URL and click 'Process Repository'", |
| interactive=False, |
| lines=3 |
| ) |
| |
| gr.Markdown("---") |
| |
| with gr.Row(): |
| with gr.Column(): |
| chatbot = gr.Chatbot( |
| label="Chat with Repository", |
| height=400, |
| placeholder="Process a repository first, then ask questions about the code!" |
| ) |
| |
| with gr.Row(): |
| msg = gr.Textbox( |
| label="Your Question", |
| placeholder="Ask anything about the codebase...", |
| scale=4 |
| ) |
| send_btn = gr.Button("Send", variant="primary", scale=1) |
| |
| gr.Examples( |
| examples=get_example_questions(), |
| inputs=msg, |
| label="Example Questions" |
| ) |
| |
| gr.Markdown(""" |
| --- |
| **Supported file types:** Python, JavaScript, TypeScript, Java, C++, Go, Rust, PHP, Ruby, Swift, Kotlin, Scala, Markdown, JSON, YAML, and more. |
| |
| **Note:** This app uses Groq's API for fast inference. Processing large repositories may take some time. |
| """) |
| |
| |
| process_btn.click( |
| fn=process_repo_sync, |
| inputs=[repo_url, branch], |
| outputs=[status, status_msg] |
| ) |
| |
| def handle_chat(message, repo_url_val, history): |
| return chat_sync(message, repo_url_val, history) |
| |
| send_btn.click( |
| fn=handle_chat, |
| inputs=[msg, repo_url, chatbot], |
| outputs=[chatbot, msg] |
| ) |
| |
| msg.submit( |
| fn=handle_chat, |
| inputs=[msg, repo_url, chatbot], |
| outputs=[chatbot, msg] |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |