Spaces:
Running
Running
| import requests | |
| import chromadb | |
| from chromadb.config import Settings | |
| import gradio as gr | |
| import tempfile | |
| from utils.github_fetcher import GitHubRepoFetcher | |
| from utils.repo_converter import SimpleRepoConverter | |
| from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.chains import create_retrieval_chain | |
| from langchain.chains.combine_documents import create_stuff_documents_chain | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.documents import Document | |
| import os | |
| from dotenv import load_dotenv | |
| import tempfile | |
| # Load environment variables | |
| load_dotenv() | |
| GITHUB_TOKEN = os.getenv('GITHUB_TOKEN') | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| def parse_github_url(url): | |
| """Extract owner, repo, and path from GitHub URL.""" | |
| try: | |
| parts = url.strip('/').split('/') | |
| try: | |
| github_index = parts.index('github.com') | |
| except ValueError: | |
| github_index = -1 | |
| if github_index >= 0 and len(parts) > github_index + 2: | |
| owner = parts[github_index + 1] | |
| repo = parts[github_index + 2] | |
| # Handle folder paths | |
| path_parts = parts[github_index + 3:] | |
| if len(path_parts) > 0: | |
| # Remove 'tree' and branch name from path if present | |
| if path_parts[0] == 'tree' and len(path_parts) > 1: | |
| path_parts = path_parts[2:] | |
| path = '/'.join(path_parts) | |
| else: | |
| path = '' | |
| return owner, repo, path | |
| else: | |
| raise ValueError("Invalid GitHub URL format") | |
| except Exception as e: | |
| raise ValueError(f"Error parsing GitHub URL: {str(e)}") | |
| def format_chat_history(history): | |
| """Format chat history for display.""" | |
| formatted = [] | |
| for msg in history: | |
| if isinstance(msg, dict): | |
| # Handle dictionary format | |
| formatted.append(f"{'Q' if msg['role'] == 'user' else 'A'}: {msg['content']}") | |
| else: | |
| # Handle tuple format for backward compatibility | |
| q, a = msg | |
| formatted.append(f"Q: {q}") | |
| formatted.append(f"A: {a}") | |
| return "\n\n".join(formatted) | |
| def process_repository(github_url): | |
| """Process a GitHub repository, folder, or file and return its content.""" | |
| try: | |
| owner, repo, path = parse_github_url(github_url) | |
| print(f"Parsed URL - Owner: {owner}, Repo: {repo}, Path: {path}") | |
| fetcher = GitHubRepoFetcher(GITHUB_TOKEN) | |
| # Test GitHub API access | |
| test_url = f'https://api.github.com/repos/{owner}/{repo}' | |
| response = requests.get(test_url, headers=fetcher.headers) | |
| if response.status_code != 200: | |
| return f"Error: Unable to access repository. Status code: {response.status_code}. Message: {response.json().get('message', '')}" | |
| print("Successfully connected to GitHub API") | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| print(f"Created temp directory: {temp_dir}") | |
| if path: | |
| # Check if path exists and get its type | |
| contents = fetcher.fetch_contents(owner, repo, path) | |
| if not contents: | |
| return f"Error: Unable to access path: {path}" | |
| if isinstance(contents, list) or contents[0].get('type') == 'dir': | |
| print(f"Attempting to download directory: {path}") | |
| target_dir = os.path.join(temp_dir, os.path.basename(path)) | |
| success = fetcher.download_directory(owner, repo, path, target_dir) | |
| else: | |
| print(f"Attempting to download file: {path}") | |
| file_path = os.path.join(temp_dir, os.path.basename(path)) | |
| content = fetcher.download_file(owner, repo, path) | |
| if content: | |
| os.makedirs(os.path.dirname(file_path) or '.', exist_ok=True) | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| success = True | |
| print(f"Successfully downloaded file to {file_path}") | |
| else: | |
| success = False | |
| print("Failed to download file") | |
| else: | |
| print("Attempting to download entire repository") | |
| success = fetcher.download_directory(owner, repo, "", temp_dir) | |
| if not success: | |
| return "Error: Failed to download repository content. Please verify the repository URL and access permissions." | |
| print(f"Processing repository content in {temp_dir}") | |
| converter = SimpleRepoConverter() | |
| output_dir = os.path.join(temp_dir, "output") | |
| converter.process_repository(temp_dir, output_dir) | |
| output_file = os.path.join(output_dir, '_all_files.txt') | |
| print(f"Looking for output file at: {output_file}") | |
| if not os.path.exists(output_file): | |
| return "Error: Failed to generate repository content file." | |
| with open(output_file, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| if not content.strip(): | |
| return "Error: No readable content found in the repository." | |
| print(f"Successfully processed content (length: {len(content)})") | |
| return content | |
| except Exception as e: | |
| print(f"Error in process_repository: {str(e)}") | |
| return f"Error: {str(e)}" | |
| def answer_question(repo_content, question, chat_history): | |
| """Answer questions using chat history for context.""" | |
| try: | |
| if not repo_content or isinstance(repo_content, str) and repo_content.startswith("Error:"): | |
| return "Please load a valid repository first. " + (repo_content or "") | |
| llm = ChatOpenAI(api_key=OPENAI_API_KEY, temperature=0) | |
| embeddings_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY) | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200 | |
| ) | |
| # Extract current directory/file context from repo_content | |
| current_context = "" | |
| lines = repo_content.split('\n') | |
| for i, line in enumerate(lines): | |
| if line.startswith("File: "): | |
| current_path = line[6:].strip() | |
| content_start = i + 2 # Skip the separator line | |
| content_end = next((j for j in range(content_start, len(lines)) | |
| if j + 1 < len(lines) and lines[j + 1].startswith("File: ")), | |
| len(lines)) | |
| current_context += f"\nAnalyzing file: {current_path}\n" | |
| current_context += "\n".join(lines[content_start:content_end]) + "\n" | |
| # Add repository content to context with better structure | |
| docs = [Document(page_content=current_context)] | |
| splits = text_splitter.split_documents(docs) | |
| # Set up Chroma with new client architecture | |
| with tempfile.TemporaryDirectory() as temp_persist_dir: | |
| client = chromadb.PersistentClient(path=temp_persist_dir) | |
| # Create collection | |
| collection = client.create_collection( | |
| name="repo_content", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| # Add documents to collection | |
| for i, doc in enumerate(splits): | |
| embedding = embeddings_model.embed_query(doc.page_content) | |
| collection.add( | |
| documents=[doc.page_content], | |
| ids=[f"doc_{i}"], | |
| embeddings=[embedding] | |
| ) | |
| # Get relevant documents for the question | |
| query_embedding = embeddings_model.embed_query(question) | |
| results = collection.query( | |
| query_embeddings=[query_embedding], | |
| n_results=5, | |
| include=["documents", "distances"] | |
| ) | |
| # Convert results to documents for the chain | |
| retrieved_docs = [ | |
| Document(page_content=doc) | |
| for doc in results['documents'][0] | |
| ] | |
| # Include chat history and repository content in the prompt | |
| chat_context = format_chat_history(chat_history) if chat_history else "" | |
| system_message = """You are a helpful assistant that explains code repositories. | |
| Answer questions based on the provided repository content and chat history. | |
| Repository Structure: | |
| {context} | |
| Previous Conversation: | |
| {chat_history} | |
| Important Instructions: | |
| 1. When asked about specific folders or files, refer to their actual contents from the repository | |
| 2. If a specific folder or file is mentioned in the question, focus your answer on that particular location | |
| 3. Reference the actual file paths and code snippets when explaining | |
| 4. If the requested folder or file isn't in the provided content, clearly state that | |
| 5. Provide specific examples and code references from the actual contents | |
| 6. When explaining folders, describe their purpose, main files, and overall structure | |
| Current Question: {input} | |
| Please provide a clear, structured explanation focusing on the specific parts of the repository mentioned in the question. | |
| """ | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", system_message), | |
| ("human", "{input}") | |
| ]) | |
| # Create and execute chain with retrieved documents | |
| chain = create_stuff_documents_chain( | |
| llm, | |
| prompt, | |
| document_variable_name="context" | |
| ) | |
| response = chain.invoke({ | |
| "input": question, | |
| "context": retrieved_docs, | |
| "chat_history": chat_context | |
| }) | |
| return response["answer"] | |
| except Exception as e: | |
| print(f"Error in answer_question: {str(e)}") # Debug log | |
| return f"Error processing question: {str(e)}" | |
| def create_demo(): | |
| """Create and configure the Gradio interface.""" | |
| css = """ | |
| .button-press { | |
| animation: button-press 0.3s ease; | |
| } | |
| @keyframes button-press { | |
| 0% { opacity: 1; } | |
| 50% { opacity: 0.7; } | |
| 100% { opacity: 1; } | |
| } | |
| """ | |
| with gr.Blocks(title="GitHub Repository Explorer", css=css) as demo: | |
| gr.Markdown("# GitHub Repository Explorer") | |
| gr.Markdown("Understand any GitHub repository, folder, or file with AI-powered explanations!") | |
| repo_content = gr.State() | |
| chat_history = gr.State([]) | |
| status = gr.Markdown() | |
| def load_repository_and_update(url): | |
| """Handle repository loading and UI updates.""" | |
| content = process_repository(url) | |
| if content.startswith("Error:"): | |
| return { | |
| repo_content: None, | |
| status: f"❌ {content}", | |
| question_input: gr.update(interactive=False, value=""), | |
| ask_button: gr.update(interactive=False), | |
| chat_history: [], | |
| chatbot: [] | |
| } | |
| return { | |
| repo_content: content, | |
| status: "✅ Content loaded successfully! You can now ask questions.", | |
| question_input: gr.update(interactive=True, value=""), | |
| ask_button: gr.update(interactive=True), | |
| chat_history: [], | |
| chatbot: [] | |
| } | |
| def get_answer_and_update(question, content, history): | |
| """Handle question answering and chat updates.""" | |
| if not content: | |
| return history, history, "Please load a repository or file first." | |
| # Check for empty question | |
| if not question or question.strip() == "": | |
| return history, history, "" | |
| answer = answer_question(content, question, history) | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": answer}) | |
| chat_display = [msg for msg in history] | |
| return history, chat_display, "" | |
| def clear_chat(): | |
| """Clear chat history and reset input.""" | |
| return [], [], "" | |
| with gr.Row(): | |
| with gr.Column(): | |
| repo_url = gr.Textbox( | |
| label="GitHub URL (repository, folder, or file)", | |
| placeholder="https://github.com/username/repository" | |
| ) | |
| load_button = gr.Button("Load Content", elem_classes=["interactive-button"]) | |
| chatbot = gr.Chatbot( | |
| label="Chat History", | |
| height=400, | |
| type="messages" | |
| ) | |
| question_input = gr.Textbox( | |
| label="Ask a question about the content", | |
| placeholder="What is this code about?", | |
| interactive=False | |
| ) | |
| ask_button = gr.Button("Ask Question", interactive=False, elem_classes=["ask-button"]) | |
| clear_button = gr.Button("Clear Chat History") | |
| # Event handlers with animations | |
| repo_url.submit( | |
| fn=load_repository_and_update, | |
| inputs=[repo_url], | |
| outputs=[repo_content, status, question_input, ask_button, chat_history, chatbot] | |
| ) | |
| load_button.click( | |
| fn=load_repository_and_update, | |
| inputs=[repo_url], | |
| outputs=[repo_content, status, question_input, ask_button, chat_history, chatbot] | |
| ) | |
| question_input.submit( | |
| fn=get_answer_and_update, | |
| inputs=[question_input, repo_content, chat_history], | |
| outputs=[chat_history, chatbot, question_input] | |
| ) | |
| ask_button.click( | |
| fn=get_answer_and_update, | |
| inputs=[question_input, repo_content, chat_history], | |
| outputs=[chat_history, chatbot, question_input] | |
| ) | |
| clear_button.click( | |
| fn=clear_chat, | |
| outputs=[chat_history, chatbot, question_input] | |
| ) | |
| return demo | |
| # Initialize and launch the app | |
| demo = create_demo() | |
| if __name__ == "__main__": | |
| demo.launch( | |
| share=True, | |
| show_api=False | |
| ) |