nehajiya8's picture
Update app.py
1c1597c verified
import requests
import chromadb
from chromadb.config import Settings
import gradio as gr
import tempfile
from utils.github_fetcher import GitHubRepoFetcher
from utils.repo_converter import SimpleRepoConverter
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.documents import Document
import os
from dotenv import load_dotenv
import tempfile
# Load environment variables
load_dotenv()
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
def parse_github_url(url):
"""Extract owner, repo, and path from GitHub URL."""
try:
parts = url.strip('/').split('/')
try:
github_index = parts.index('github.com')
except ValueError:
github_index = -1
if github_index >= 0 and len(parts) > github_index + 2:
owner = parts[github_index + 1]
repo = parts[github_index + 2]
# Handle folder paths
path_parts = parts[github_index + 3:]
if len(path_parts) > 0:
# Remove 'tree' and branch name from path if present
if path_parts[0] == 'tree' and len(path_parts) > 1:
path_parts = path_parts[2:]
path = '/'.join(path_parts)
else:
path = ''
return owner, repo, path
else:
raise ValueError("Invalid GitHub URL format")
except Exception as e:
raise ValueError(f"Error parsing GitHub URL: {str(e)}")
def format_chat_history(history):
"""Format chat history for display."""
formatted = []
for msg in history:
if isinstance(msg, dict):
# Handle dictionary format
formatted.append(f"{'Q' if msg['role'] == 'user' else 'A'}: {msg['content']}")
else:
# Handle tuple format for backward compatibility
q, a = msg
formatted.append(f"Q: {q}")
formatted.append(f"A: {a}")
return "\n\n".join(formatted)
def process_repository(github_url):
"""Process a GitHub repository, folder, or file and return its content."""
try:
owner, repo, path = parse_github_url(github_url)
print(f"Parsed URL - Owner: {owner}, Repo: {repo}, Path: {path}")
fetcher = GitHubRepoFetcher(GITHUB_TOKEN)
# Test GitHub API access
test_url = f'https://api.github.com/repos/{owner}/{repo}'
response = requests.get(test_url, headers=fetcher.headers)
if response.status_code != 200:
return f"Error: Unable to access repository. Status code: {response.status_code}. Message: {response.json().get('message', '')}"
print("Successfully connected to GitHub API")
with tempfile.TemporaryDirectory() as temp_dir:
print(f"Created temp directory: {temp_dir}")
if path:
# Check if path exists and get its type
contents = fetcher.fetch_contents(owner, repo, path)
if not contents:
return f"Error: Unable to access path: {path}"
if isinstance(contents, list) or contents[0].get('type') == 'dir':
print(f"Attempting to download directory: {path}")
target_dir = os.path.join(temp_dir, os.path.basename(path))
success = fetcher.download_directory(owner, repo, path, target_dir)
else:
print(f"Attempting to download file: {path}")
file_path = os.path.join(temp_dir, os.path.basename(path))
content = fetcher.download_file(owner, repo, path)
if content:
os.makedirs(os.path.dirname(file_path) or '.', exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
success = True
print(f"Successfully downloaded file to {file_path}")
else:
success = False
print("Failed to download file")
else:
print("Attempting to download entire repository")
success = fetcher.download_directory(owner, repo, "", temp_dir)
if not success:
return "Error: Failed to download repository content. Please verify the repository URL and access permissions."
print(f"Processing repository content in {temp_dir}")
converter = SimpleRepoConverter()
output_dir = os.path.join(temp_dir, "output")
converter.process_repository(temp_dir, output_dir)
output_file = os.path.join(output_dir, '_all_files.txt')
print(f"Looking for output file at: {output_file}")
if not os.path.exists(output_file):
return "Error: Failed to generate repository content file."
with open(output_file, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
return "Error: No readable content found in the repository."
print(f"Successfully processed content (length: {len(content)})")
return content
except Exception as e:
print(f"Error in process_repository: {str(e)}")
return f"Error: {str(e)}"
def answer_question(repo_content, question, chat_history):
"""Answer questions using chat history for context."""
try:
if not repo_content or isinstance(repo_content, str) and repo_content.startswith("Error:"):
return "Please load a valid repository first. " + (repo_content or "")
llm = ChatOpenAI(api_key=OPENAI_API_KEY, temperature=0)
embeddings_model = OpenAIEmbeddings(api_key=OPENAI_API_KEY)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
# Extract current directory/file context from repo_content
current_context = ""
lines = repo_content.split('\n')
for i, line in enumerate(lines):
if line.startswith("File: "):
current_path = line[6:].strip()
content_start = i + 2 # Skip the separator line
content_end = next((j for j in range(content_start, len(lines))
if j + 1 < len(lines) and lines[j + 1].startswith("File: ")),
len(lines))
current_context += f"\nAnalyzing file: {current_path}\n"
current_context += "\n".join(lines[content_start:content_end]) + "\n"
# Add repository content to context with better structure
docs = [Document(page_content=current_context)]
splits = text_splitter.split_documents(docs)
# Set up Chroma with new client architecture
with tempfile.TemporaryDirectory() as temp_persist_dir:
client = chromadb.PersistentClient(path=temp_persist_dir)
# Create collection
collection = client.create_collection(
name="repo_content",
metadata={"hnsw:space": "cosine"}
)
# Add documents to collection
for i, doc in enumerate(splits):
embedding = embeddings_model.embed_query(doc.page_content)
collection.add(
documents=[doc.page_content],
ids=[f"doc_{i}"],
embeddings=[embedding]
)
# Get relevant documents for the question
query_embedding = embeddings_model.embed_query(question)
results = collection.query(
query_embeddings=[query_embedding],
n_results=5,
include=["documents", "distances"]
)
# Convert results to documents for the chain
retrieved_docs = [
Document(page_content=doc)
for doc in results['documents'][0]
]
# Include chat history and repository content in the prompt
chat_context = format_chat_history(chat_history) if chat_history else ""
system_message = """You are a helpful assistant that explains code repositories.
Answer questions based on the provided repository content and chat history.
Repository Structure:
{context}
Previous Conversation:
{chat_history}
Important Instructions:
1. When asked about specific folders or files, refer to their actual contents from the repository
2. If a specific folder or file is mentioned in the question, focus your answer on that particular location
3. Reference the actual file paths and code snippets when explaining
4. If the requested folder or file isn't in the provided content, clearly state that
5. Provide specific examples and code references from the actual contents
6. When explaining folders, describe their purpose, main files, and overall structure
Current Question: {input}
Please provide a clear, structured explanation focusing on the specific parts of the repository mentioned in the question.
"""
prompt = ChatPromptTemplate.from_messages([
("system", system_message),
("human", "{input}")
])
# Create and execute chain with retrieved documents
chain = create_stuff_documents_chain(
llm,
prompt,
document_variable_name="context"
)
response = chain.invoke({
"input": question,
"context": retrieved_docs,
"chat_history": chat_context
})
return response["answer"]
except Exception as e:
print(f"Error in answer_question: {str(e)}") # Debug log
return f"Error processing question: {str(e)}"
def create_demo():
"""Create and configure the Gradio interface."""
css = """
.button-press {
animation: button-press 0.3s ease;
}
@keyframes button-press {
0% { opacity: 1; }
50% { opacity: 0.7; }
100% { opacity: 1; }
}
"""
with gr.Blocks(title="GitHub Repository Explorer", css=css) as demo:
gr.Markdown("# GitHub Repository Explorer")
gr.Markdown("Understand any GitHub repository, folder, or file with AI-powered explanations!")
repo_content = gr.State()
chat_history = gr.State([])
status = gr.Markdown()
def load_repository_and_update(url):
"""Handle repository loading and UI updates."""
content = process_repository(url)
if content.startswith("Error:"):
return {
repo_content: None,
status: f"❌ {content}",
question_input: gr.update(interactive=False, value=""),
ask_button: gr.update(interactive=False),
chat_history: [],
chatbot: []
}
return {
repo_content: content,
status: "✅ Content loaded successfully! You can now ask questions.",
question_input: gr.update(interactive=True, value=""),
ask_button: gr.update(interactive=True),
chat_history: [],
chatbot: []
}
def get_answer_and_update(question, content, history):
"""Handle question answering and chat updates."""
if not content:
return history, history, "Please load a repository or file first."
# Check for empty question
if not question or question.strip() == "":
return history, history, ""
answer = answer_question(content, question, history)
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": answer})
chat_display = [msg for msg in history]
return history, chat_display, ""
def clear_chat():
"""Clear chat history and reset input."""
return [], [], ""
with gr.Row():
with gr.Column():
repo_url = gr.Textbox(
label="GitHub URL (repository, folder, or file)",
placeholder="https://github.com/username/repository"
)
load_button = gr.Button("Load Content", elem_classes=["interactive-button"])
chatbot = gr.Chatbot(
label="Chat History",
height=400,
type="messages"
)
question_input = gr.Textbox(
label="Ask a question about the content",
placeholder="What is this code about?",
interactive=False
)
ask_button = gr.Button("Ask Question", interactive=False, elem_classes=["ask-button"])
clear_button = gr.Button("Clear Chat History")
# Event handlers with animations
repo_url.submit(
fn=load_repository_and_update,
inputs=[repo_url],
outputs=[repo_content, status, question_input, ask_button, chat_history, chatbot]
)
load_button.click(
fn=load_repository_and_update,
inputs=[repo_url],
outputs=[repo_content, status, question_input, ask_button, chat_history, chatbot]
)
question_input.submit(
fn=get_answer_and_update,
inputs=[question_input, repo_content, chat_history],
outputs=[chat_history, chatbot, question_input]
)
ask_button.click(
fn=get_answer_and_update,
inputs=[question_input, repo_content, chat_history],
outputs=[chat_history, chatbot, question_input]
)
clear_button.click(
fn=clear_chat,
outputs=[chat_history, chatbot, question_input]
)
return demo
# Initialize and launch the app
demo = create_demo()
if __name__ == "__main__":
demo.launch(
share=True,
show_api=False
)