import requests import chromadb from chromadb.config import Settings import gradio as gr import tempfile from utils.github_fetcher import GitHubRepoFetcher from utils.repo_converter import SimpleRepoConverter from langchain_openai import ChatOpenAI, OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chains import create_retrieval_chain from langchain.chains.combine_documents import create_stuff_documents_chain from langchain_core.prompts import ChatPromptTemplate from langchain_core.documents import Document import os from dotenv import load_dotenv import tempfile # Load environment variables load_dotenv() GITHUB_TOKEN = os.getenv('GITHUB_TOKEN') OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') def parse_github_url(url): """Extract owner, repo, and path from GitHub URL.""" try: parts = url.strip('/').split('/') try: github_index = parts.index('github.com') except ValueError: github_index = -1 if github_index >= 0 and len(parts) > github_index + 2: owner = parts[github_index + 1] repo = parts[github_index + 2] # Handle folder paths path_parts = parts[github_index + 3:] if len(path_parts) > 0: # Remove 'tree' and branch name from path if present if path_parts[0] == 'tree' and len(path_parts) > 1: path_parts = path_parts[2:] path = '/'.join(path_parts) else: path = '' return owner, repo, path else: raise ValueError("Invalid GitHub URL format") except Exception as e: raise ValueError(f"Error parsing GitHub URL: {str(e)}") def format_chat_history(history): """Format chat history for display.""" formatted = [] for msg in history: if isinstance(msg, dict): # Handle dictionary format formatted.append(f"{'Q' if msg['role'] == 'user' else 'A'}: {msg['content']}") else: # Handle tuple format for backward compatibility q, a = msg formatted.append(f"Q: {q}") formatted.append(f"A: {a}") return "\n\n".join(formatted) def process_repository(github_url): """Process a GitHub repository, folder, or file and return its content.""" try: owner, repo, path = parse_github_url(github_url) print(f"Parsed URL - Owner: {owner}, Repo: {repo}, Path: {path}") fetcher = GitHubRepoFetcher(GITHUB_TOKEN) # Test GitHub API access test_url = f'https://api.github.com/repos/{owner}/{repo}' response = requests.get(test_url, headers=fetcher.headers) if response.status_code != 200: return f"Error: Unable to access repository. Status code: {response.status_code}. Message: {response.json().get('message', '')}" print("Successfully connected to GitHub API") with tempfile.TemporaryDirectory() as temp_dir: print(f"Created temp directory: {temp_dir}") if path: # Check if path exists and get its type contents = fetcher.fetch_contents(owner, repo, path) if not contents: return f"Error: Unable to access path: {path}" if isinstance(contents, list) or contents[0].get('type') == 'dir': print(f"Attempting to download directory: {path}") target_dir = os.path.join(temp_dir, os.path.basename(path)) success = fetcher.download_directory(owner, repo, path, target_dir) else: print(f"Attempting to download file: {path}") file_path = os.path.join(temp_dir, os.path.basename(path)) content = fetcher.download_file(owner, repo, path) if content: os.makedirs(os.path.dirname(file_path) or '.', exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: f.write(content) success = True print(f"Successfully downloaded file to {file_path}") else: success = False print("Failed to download file") else: print("Attempting to download entire repository") success = fetcher.download_directory(owner, repo, "", temp_dir) if not success: return "Error: Failed to download repository content. Please verify the repository URL and access permissions." print(f"Processing repository content in {temp_dir}") converter = SimpleRepoConverter() output_dir = os.path.join(temp_dir, "output") converter.process_repository(temp_dir, output_dir) output_file = os.path.join(output_dir, '_all_files.txt') print(f"Looking for output file at: {output_file}") if not os.path.exists(output_file): return "Error: Failed to generate repository content file." with open(output_file, 'r', encoding='utf-8') as f: content = f.read() if not content.strip(): return "Error: No readable content found in the repository." print(f"Successfully processed content (length: {len(content)})") return content except Exception as e: print(f"Error in process_repository: {str(e)}") return f"Error: {str(e)}" def answer_question(repo_content, question, chat_history): """Answer questions using chat history for context.""" try: if not repo_content or isinstance(repo_content, str) and repo_content.startswith("Error:"): return "Please load a valid repository first. " + (repo_content or "") llm = ChatOpenAI(api_key=OPENAI_API_KEY, temperature=0) embeddings_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY) text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) # Extract current directory/file context from repo_content current_context = "" lines = repo_content.split('\n') for i, line in enumerate(lines): if line.startswith("File: "): current_path = line[6:].strip() content_start = i + 2 # Skip the separator line content_end = next((j for j in range(content_start, len(lines)) if j + 1 < len(lines) and lines[j + 1].startswith("File: ")), len(lines)) current_context += f"\nAnalyzing file: {current_path}\n" current_context += "\n".join(lines[content_start:content_end]) + "\n" # Add repository content to context with better structure docs = [Document(page_content=current_context)] splits = text_splitter.split_documents(docs) # Set up Chroma with new client architecture with tempfile.TemporaryDirectory() as temp_persist_dir: client = chromadb.PersistentClient(path=temp_persist_dir) # Create collection collection = client.create_collection( name="repo_content", metadata={"hnsw:space": "cosine"} ) # Add documents to collection for i, doc in enumerate(splits): embedding = embeddings_model.embed_query(doc.page_content) collection.add( documents=[doc.page_content], ids=[f"doc_{i}"], embeddings=[embedding] ) # Get relevant documents for the question query_embedding = embeddings_model.embed_query(question) results = collection.query( query_embeddings=[query_embedding], n_results=5, include=["documents", "distances"] ) # Convert results to documents for the chain retrieved_docs = [ Document(page_content=doc) for doc in results['documents'][0] ] # Include chat history and repository content in the prompt chat_context = format_chat_history(chat_history) if chat_history else "" system_message = """You are a helpful assistant that explains code repositories. Answer questions based on the provided repository content and chat history. Repository Structure: {context} Previous Conversation: {chat_history} Important Instructions: 1. When asked about specific folders or files, refer to their actual contents from the repository 2. If a specific folder or file is mentioned in the question, focus your answer on that particular location 3. Reference the actual file paths and code snippets when explaining 4. If the requested folder or file isn't in the provided content, clearly state that 5. Provide specific examples and code references from the actual contents 6. When explaining folders, describe their purpose, main files, and overall structure Current Question: {input} Please provide a clear, structured explanation focusing on the specific parts of the repository mentioned in the question. """ prompt = ChatPromptTemplate.from_messages([ ("system", system_message), ("human", "{input}") ]) # Create and execute chain with retrieved documents chain = create_stuff_documents_chain( llm, prompt, document_variable_name="context" ) response = chain.invoke({ "input": question, "context": retrieved_docs, "chat_history": chat_context }) return response["answer"] except Exception as e: print(f"Error in answer_question: {str(e)}") # Debug log return f"Error processing question: {str(e)}" def create_demo(): """Create and configure the Gradio interface.""" css = """ .button-press { animation: button-press 0.3s ease; } @keyframes button-press { 0% { opacity: 1; } 50% { opacity: 0.7; } 100% { opacity: 1; } } """ with gr.Blocks(title="GitHub Repository Explorer", css=css) as demo: gr.Markdown("# GitHub Repository Explorer") gr.Markdown("Understand any GitHub repository, folder, or file with AI-powered explanations!") repo_content = gr.State() chat_history = gr.State([]) status = gr.Markdown() def load_repository_and_update(url): """Handle repository loading and UI updates.""" content = process_repository(url) if content.startswith("Error:"): return { repo_content: None, status: f"❌ {content}", question_input: gr.update(interactive=False, value=""), ask_button: gr.update(interactive=False), chat_history: [], chatbot: [] } return { repo_content: content, status: "✅ Content loaded successfully! You can now ask questions.", question_input: gr.update(interactive=True, value=""), ask_button: gr.update(interactive=True), chat_history: [], chatbot: [] } def get_answer_and_update(question, content, history): """Handle question answering and chat updates.""" if not content: return history, history, "Please load a repository or file first." # Check for empty question if not question or question.strip() == "": return history, history, "" answer = answer_question(content, question, history) history.append({"role": "user", "content": question}) history.append({"role": "assistant", "content": answer}) chat_display = [msg for msg in history] return history, chat_display, "" def clear_chat(): """Clear chat history and reset input.""" return [], [], "" with gr.Row(): with gr.Column(): repo_url = gr.Textbox( label="GitHub URL (repository, folder, or file)", placeholder="https://github.com/username/repository" ) load_button = gr.Button("Load Content", elem_classes=["interactive-button"]) chatbot = gr.Chatbot( label="Chat History", height=400, type="messages" ) question_input = gr.Textbox( label="Ask a question about the content", placeholder="What is this code about?", interactive=False ) ask_button = gr.Button("Ask Question", interactive=False, elem_classes=["ask-button"]) clear_button = gr.Button("Clear Chat History") # Event handlers with animations repo_url.submit( fn=load_repository_and_update, inputs=[repo_url], outputs=[repo_content, status, question_input, ask_button, chat_history, chatbot] ) load_button.click( fn=load_repository_and_update, inputs=[repo_url], outputs=[repo_content, status, question_input, ask_button, chat_history, chatbot] ) question_input.submit( fn=get_answer_and_update, inputs=[question_input, repo_content, chat_history], outputs=[chat_history, chatbot, question_input] ) ask_button.click( fn=get_answer_and_update, inputs=[question_input, repo_content, chat_history], outputs=[chat_history, chatbot, question_input] ) clear_button.click( fn=clear_chat, outputs=[chat_history, chatbot, question_input] ) return demo # Initialize and launch the app demo = create_demo() if __name__ == "__main__": demo.launch( share=True, show_api=False )