test / app.py
nehajiya8's picture
Update app.py
0a8455e verified
import streamlit as st
import requests
import base64
import os
from typing import Optional, Dict, List
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import ChatOpenAI
from langchain_community.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.chains import create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.documents import Document
import time
import tempfile
# Initialize session state
if 'repo_content' not in st.session_state:
st.session_state.repo_content = None
if 'processing' not in st.session_state:
st.session_state.processing = False
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
if 'current_repo' not in st.session_state:
st.session_state.current_repo = None
if 'question' not in st.session_state:
st.session_state.question = ""
# Configure Streamlit page
st.set_page_config(page_title="GitHub Repository Explorer", page_icon="๐Ÿ”", layout="wide")
# Add custom CSS
st.markdown("""
<style>
/* Chat container styling */
.stTextInput > div > div > input {
background-color: white;
border-radius: 20px;
padding: 12px 20px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
/* Typing indicator animation */
.typing-indicator {
display: inline-flex;
align-items: center;
justify-content: center;
gap: 4px;
}
.dot {
width: 8px;
height: 8px;
background-color: #007AFF;
border-radius: 50%;
animation: bounce 1.4s infinite ease-in-out;
}
.dot:nth-child(1) { animation-delay: -0.32s; }
.dot:nth-child(2) { animation-delay: -0.16s; }
@keyframes bounce {
0%, 80%, 100% { transform: scale(0); }
40% { transform: scale(1.0); }
}
/* Improve spacing and readability */
.main {
padding: 2rem;
}
.stMarkdown {
max-width: 100%;
}
</style>
""", unsafe_allow_html=True)
def handle_error(error: Exception) -> str:
"""Convert various exceptions into user-friendly messages."""
if isinstance(error, requests.exceptions.RequestException):
return "Failed to connect to GitHub. Please check your internet connection and GitHub token."
elif isinstance(error, ValueError) and "URL" in str(error):
return "Invalid GitHub repository URL. Please use a URL in the format: https://github.com/username/repository"
elif "API key" in str(error).lower():
return "Invalid OpenAI API key. Please check your API key in the sidebar."
else:
return f"An error occurred: {str(error)}"
class GitHubRepoFetcher:
def __init__(self, token: str):
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.base_url = 'https://api.github.com'
def fetch_contents(self, owner: str, repo: str, path: str = '') -> Optional[List[Dict]]:
path = path.replace('\\', '/')
url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}'
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
content = response.json()
return [content] if not isinstance(content, list) else content
except Exception as e:
st.error(f"Error fetching repository: {str(e)}")
return None
def get_file_content(self, url: str) -> Optional[str]:
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
content = response.json()
if content.get('encoding') == 'base64':
return base64.b64decode(content['content']).decode('utf-8')
return None
except Exception as e:
st.error(f"Error fetching file: {str(e)}")
return None
def process_repo(self, owner: str, repo: str) -> str:
contents = self.fetch_contents(owner, repo)
if not contents:
return ""
all_content = []
for item in contents:
if item['type'] == 'file':
content = self.get_file_content(item['url'])
if content:
all_content.append(f"File: {item['path']}\n{'='*40}\n{content}\n\n")
return "".join(all_content)
def parse_github_url(url: str) -> tuple[str, str]:
"""Extract owner and repo from GitHub URL."""
try:
parts = url.strip('/').split('/')
if 'github.com' not in parts:
raise ValueError("Not a valid GitHub URL")
github_index = parts.index('github.com')
owner = parts[github_index + 1]
repo = parts[github_index + 2]
return owner, repo
except (ValueError, IndexError):
raise ValueError("Invalid GitHub URL format")
def process_question(question: str, content: str, openai_api_key: str, chat_history: list) -> str:
"""Process a question about the repository using the QA chain."""
llm = ChatOpenAI(api_key=openai_api_key)
embeddings = OpenAIEmbeddings(api_key=openai_api_key)
with tempfile.TemporaryDirectory() as temp_dir:
vectorstore = Chroma.from_documents(
documents=[Document(page_content=content)],
embedding=embeddings,
persist_directory=temp_dir
)
retriever = vectorstore.as_retriever()
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant that explains code repositories. "
"Use the following context to answer the question: {context}\n\n"
"Previous conversation history:\n{chat_history}"),
("human", "{input}")
])
document_chain = create_stuff_documents_chain(llm, prompt)
retrieval_chain = create_retrieval_chain(retriever, document_chain)
formatted_history = format_chat_history(chat_history)
result = retrieval_chain.invoke({
"input": question,
"chat_history": formatted_history
})
return result["answer"]
def format_chat_history(history):
"""Format chat history for the prompt."""
formatted = []
for entry in history:
formatted.extend([
f"Human: {entry['question']}",
f"Assistant: {entry['answer']}\n"
])
return "\n".join(formatted)
def display_chat_messages():
"""Display chat messages in a conversational format."""
for message in st.session_state.chat_history:
# User message - right aligned with wider column
with st.container():
col1, col2 = st.columns([2, 8]) # Changed from [6, 4] to [2, 8]
with col2:
st.markdown(f"""
<div style='background-color: #007AFF; color: white; padding: 12px 16px;
border-radius: 15px 15px 0 15px; margin-bottom: 8px; text-align: left;
box-shadow: 0 1px 2px rgba(0,0,0,0.1); max-width: 800px;'>
{message['question']}
</div>
""", unsafe_allow_html=True)
# Assistant message - left aligned with wider column
with st.container():
col1, col2 = st.columns([8, 2]) # Changed from [4, 6] to [8, 2]
with col1:
st.markdown(f"""
<div style='background-color: #f0f2f6; padding: 12px 16px;
border-radius: 15px 15px 15px 0; margin-bottom: 16px;
box-shadow: 0 1px 2px rgba(0,0,0,0.1); max-width: 800px;'>
{message['answer']}
</div>
""", unsafe_allow_html=True)
def handle_question_submit():
"""Callback function to handle question submission"""
if st.session_state.question_input:
try:
with st.spinner("Analyzing your question..."):
answer = process_question(
st.session_state.question_input,
st.session_state.repo_content,
st.session_state.openai_key,
st.session_state.chat_history
)
st.session_state.chat_history.append({
'question': st.session_state.question_input,
'answer': answer,
'timestamp': time.time()
})
# Clear the question
st.session_state.question = ""
st.session_state.question_input = ""
except Exception as e:
st.error(handle_error(e))
# Main UI
st.title("๐Ÿ” GitHub Repository Explorer")
st.markdown("Understand any GitHub repository through AI-powered exploration")
# Sidebar configuration
with st.sidebar:
st.header("Configuration")
github_token = st.text_input("GitHub Token", type="password", key="github_token_input")
openai_key = st.text_input("OpenAI API Key", type="password", key="openai_key_input")
st.session_state.openai_key = openai_key
st.markdown("---")
st.header("Chat History")
if st.button("Clear Chat History", key="clear_history_btn"):
st.session_state.chat_history = []
st.rerun()
st.markdown("---")
st.markdown("""
### How to use
1. Enter your API keys in the sidebar
2. Paste a GitHub repository URL
3. Click 'Analyze Repository'
4. Ask questions about the code
""")
# Main interface
repo_url = st.text_input("Enter GitHub Repository URL",
placeholder="https://github.com/username/repository",
key="repo_url_input")
analyze_button = st.button("Analyze Repository",
disabled=not (github_token and repo_url),
key="analyze_repo_btn",
use_container_width=True)
if analyze_button:
try:
st.session_state.processing = True
progress_text = st.empty()
progress_text.text("Parsing repository URL...")
owner, repo = parse_github_url(repo_url)
repo_identifier = f"{owner}/{repo}"
if st.session_state.current_repo != repo_identifier:
progress_text.text("Fetching repository contents...")
fetcher = GitHubRepoFetcher(github_token)
content = fetcher.process_repo(owner, repo)
if content:
st.session_state.repo_content = content
st.session_state.current_repo = repo_identifier
st.session_state.chat_history = []
progress_text.text("Repository analyzed successfully!")
st.success("Repository has been processed! You can now ask questions about it.")
else:
st.error("Failed to process repository. Please check if the repository is public and accessible.")
else:
st.info("Repository already loaded! You can continue asking questions.")
except Exception as e:
st.error(handle_error(e))
finally:
st.session_state.processing = False
if st.session_state.repo_content:
st.markdown("---")
# Create containers for chat history and input
chat_container = st.container()
input_container = st.container()
# Display chat history
with chat_container:
if st.session_state.chat_history:
display_chat_messages()
# Display input box
with input_container:
st.text_input(
"Ask a question about the repository",
placeholder="Type your question here and press Enter",
key="question_input",
on_change=handle_question_submit
)
if st.session_state.processing:
st.markdown("""
<div style='text-align: center; margin-top: 8px;'>
<div class='typing-indicator'>
<div class='dot'></div>
<div class='dot'></div>
<div class='dot'></div>
</div>
</div>
""", unsafe_allow_html=True)
# Footer
st.markdown("---")
st.markdown("Built with Streamlit, LangChain, and OpenAI")