| import streamlit as st |
| import requests |
| import base64 |
| import os |
| from typing import Optional, Dict, List |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain_openai import ChatOpenAI |
| from langchain_community.embeddings import OpenAIEmbeddings |
| from langchain_community.vectorstores import Chroma |
| from langchain.chains import create_retrieval_chain |
| from langchain.chains.combine_documents import create_stuff_documents_chain |
| from langchain_core.prompts import ChatPromptTemplate |
| from langchain_core.documents import Document |
| import time |
| import tempfile |
|
|
| |
| if 'repo_content' not in st.session_state: |
| st.session_state.repo_content = None |
| if 'processing' not in st.session_state: |
| st.session_state.processing = False |
| if 'chat_history' not in st.session_state: |
| st.session_state.chat_history = [] |
| if 'current_repo' not in st.session_state: |
| st.session_state.current_repo = None |
| if 'question' not in st.session_state: |
| st.session_state.question = "" |
|
|
| |
| st.set_page_config(page_title="GitHub Repository Explorer", page_icon="๐", layout="wide") |
|
|
| |
| st.markdown(""" |
| <style> |
| /* Chat container styling */ |
| .stTextInput > div > div > input { |
| background-color: white; |
| border-radius: 20px; |
| padding: 12px 20px; |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
| } |
| |
| /* Typing indicator animation */ |
| .typing-indicator { |
| display: inline-flex; |
| align-items: center; |
| justify-content: center; |
| gap: 4px; |
| } |
| .dot { |
| width: 8px; |
| height: 8px; |
| background-color: #007AFF; |
| border-radius: 50%; |
| animation: bounce 1.4s infinite ease-in-out; |
| } |
| .dot:nth-child(1) { animation-delay: -0.32s; } |
| .dot:nth-child(2) { animation-delay: -0.16s; } |
| |
| @keyframes bounce { |
| 0%, 80%, 100% { transform: scale(0); } |
| 40% { transform: scale(1.0); } |
| } |
| |
| /* Improve spacing and readability */ |
| .main { |
| padding: 2rem; |
| } |
| .stMarkdown { |
| max-width: 100%; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
| def handle_error(error: Exception) -> str: |
| """Convert various exceptions into user-friendly messages.""" |
| if isinstance(error, requests.exceptions.RequestException): |
| return "Failed to connect to GitHub. Please check your internet connection and GitHub token." |
| elif isinstance(error, ValueError) and "URL" in str(error): |
| return "Invalid GitHub repository URL. Please use a URL in the format: https://github.com/username/repository" |
| elif "API key" in str(error).lower(): |
| return "Invalid OpenAI API key. Please check your API key in the sidebar." |
| else: |
| return f"An error occurred: {str(error)}" |
|
|
| class GitHubRepoFetcher: |
| def __init__(self, token: str): |
| self.headers = { |
| 'Authorization': f'token {token}', |
| 'Accept': 'application/vnd.github.v3+json' |
| } |
| self.base_url = 'https://api.github.com' |
|
|
| def fetch_contents(self, owner: str, repo: str, path: str = '') -> Optional[List[Dict]]: |
| path = path.replace('\\', '/') |
| url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}' |
| |
| try: |
| response = requests.get(url, headers=self.headers) |
| response.raise_for_status() |
| content = response.json() |
| return [content] if not isinstance(content, list) else content |
| except Exception as e: |
| st.error(f"Error fetching repository: {str(e)}") |
| return None |
|
|
| def get_file_content(self, url: str) -> Optional[str]: |
| try: |
| response = requests.get(url, headers=self.headers) |
| response.raise_for_status() |
| content = response.json() |
| if content.get('encoding') == 'base64': |
| return base64.b64decode(content['content']).decode('utf-8') |
| return None |
| except Exception as e: |
| st.error(f"Error fetching file: {str(e)}") |
| return None |
|
|
| def process_repo(self, owner: str, repo: str) -> str: |
| contents = self.fetch_contents(owner, repo) |
| if not contents: |
| return "" |
|
|
| all_content = [] |
| for item in contents: |
| if item['type'] == 'file': |
| content = self.get_file_content(item['url']) |
| if content: |
| all_content.append(f"File: {item['path']}\n{'='*40}\n{content}\n\n") |
| |
| return "".join(all_content) |
|
|
| def parse_github_url(url: str) -> tuple[str, str]: |
| """Extract owner and repo from GitHub URL.""" |
| try: |
| parts = url.strip('/').split('/') |
| if 'github.com' not in parts: |
| raise ValueError("Not a valid GitHub URL") |
| github_index = parts.index('github.com') |
| owner = parts[github_index + 1] |
| repo = parts[github_index + 2] |
| return owner, repo |
| except (ValueError, IndexError): |
| raise ValueError("Invalid GitHub URL format") |
|
|
| def process_question(question: str, content: str, openai_api_key: str, chat_history: list) -> str: |
| """Process a question about the repository using the QA chain.""" |
| llm = ChatOpenAI(api_key=openai_api_key) |
| embeddings = OpenAIEmbeddings(api_key=openai_api_key) |
| |
| with tempfile.TemporaryDirectory() as temp_dir: |
| vectorstore = Chroma.from_documents( |
| documents=[Document(page_content=content)], |
| embedding=embeddings, |
| persist_directory=temp_dir |
| ) |
| |
| retriever = vectorstore.as_retriever() |
| |
| prompt = ChatPromptTemplate.from_messages([ |
| ("system", "You are a helpful assistant that explains code repositories. " |
| "Use the following context to answer the question: {context}\n\n" |
| "Previous conversation history:\n{chat_history}"), |
| ("human", "{input}") |
| ]) |
| |
| document_chain = create_stuff_documents_chain(llm, prompt) |
| retrieval_chain = create_retrieval_chain(retriever, document_chain) |
| |
| formatted_history = format_chat_history(chat_history) |
| |
| result = retrieval_chain.invoke({ |
| "input": question, |
| "chat_history": formatted_history |
| }) |
| return result["answer"] |
|
|
| def format_chat_history(history): |
| """Format chat history for the prompt.""" |
| formatted = [] |
| for entry in history: |
| formatted.extend([ |
| f"Human: {entry['question']}", |
| f"Assistant: {entry['answer']}\n" |
| ]) |
| return "\n".join(formatted) |
|
|
| def display_chat_messages(): |
| """Display chat messages in a conversational format.""" |
| for message in st.session_state.chat_history: |
| |
| with st.container(): |
| col1, col2 = st.columns([2, 8]) |
| with col2: |
| st.markdown(f""" |
| <div style='background-color: #007AFF; color: white; padding: 12px 16px; |
| border-radius: 15px 15px 0 15px; margin-bottom: 8px; text-align: left; |
| box-shadow: 0 1px 2px rgba(0,0,0,0.1); max-width: 800px;'> |
| {message['question']} |
| </div> |
| """, unsafe_allow_html=True) |
| |
| |
| with st.container(): |
| col1, col2 = st.columns([8, 2]) |
| with col1: |
| st.markdown(f""" |
| <div style='background-color: #f0f2f6; padding: 12px 16px; |
| border-radius: 15px 15px 15px 0; margin-bottom: 16px; |
| box-shadow: 0 1px 2px rgba(0,0,0,0.1); max-width: 800px;'> |
| {message['answer']} |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| def handle_question_submit(): |
| """Callback function to handle question submission""" |
| if st.session_state.question_input: |
| try: |
| with st.spinner("Analyzing your question..."): |
| answer = process_question( |
| st.session_state.question_input, |
| st.session_state.repo_content, |
| st.session_state.openai_key, |
| st.session_state.chat_history |
| ) |
| |
| st.session_state.chat_history.append({ |
| 'question': st.session_state.question_input, |
| 'answer': answer, |
| 'timestamp': time.time() |
| }) |
| |
| |
| st.session_state.question = "" |
| st.session_state.question_input = "" |
| except Exception as e: |
| st.error(handle_error(e)) |
|
|
| |
| st.title("๐ GitHub Repository Explorer") |
| st.markdown("Understand any GitHub repository through AI-powered exploration") |
|
|
| |
| with st.sidebar: |
| st.header("Configuration") |
| github_token = st.text_input("GitHub Token", type="password", key="github_token_input") |
| openai_key = st.text_input("OpenAI API Key", type="password", key="openai_key_input") |
| st.session_state.openai_key = openai_key |
| |
| st.markdown("---") |
| st.header("Chat History") |
| if st.button("Clear Chat History", key="clear_history_btn"): |
| st.session_state.chat_history = [] |
| st.rerun() |
| |
| st.markdown("---") |
| st.markdown(""" |
| ### How to use |
| 1. Enter your API keys in the sidebar |
| 2. Paste a GitHub repository URL |
| 3. Click 'Analyze Repository' |
| 4. Ask questions about the code |
| """) |
|
|
| |
| repo_url = st.text_input("Enter GitHub Repository URL", |
| placeholder="https://github.com/username/repository", |
| key="repo_url_input") |
|
|
| analyze_button = st.button("Analyze Repository", |
| disabled=not (github_token and repo_url), |
| key="analyze_repo_btn", |
| use_container_width=True) |
|
|
| if analyze_button: |
| try: |
| st.session_state.processing = True |
| progress_text = st.empty() |
| |
| progress_text.text("Parsing repository URL...") |
| owner, repo = parse_github_url(repo_url) |
| repo_identifier = f"{owner}/{repo}" |
| |
| if st.session_state.current_repo != repo_identifier: |
| progress_text.text("Fetching repository contents...") |
| fetcher = GitHubRepoFetcher(github_token) |
| content = fetcher.process_repo(owner, repo) |
| |
| if content: |
| st.session_state.repo_content = content |
| st.session_state.current_repo = repo_identifier |
| st.session_state.chat_history = [] |
| progress_text.text("Repository analyzed successfully!") |
| st.success("Repository has been processed! You can now ask questions about it.") |
| else: |
| st.error("Failed to process repository. Please check if the repository is public and accessible.") |
| else: |
| st.info("Repository already loaded! You can continue asking questions.") |
| |
| except Exception as e: |
| st.error(handle_error(e)) |
| finally: |
| st.session_state.processing = False |
|
|
| if st.session_state.repo_content: |
| st.markdown("---") |
| |
| |
| chat_container = st.container() |
| input_container = st.container() |
| |
| |
| with chat_container: |
| if st.session_state.chat_history: |
| display_chat_messages() |
| |
| |
| with input_container: |
| st.text_input( |
| "Ask a question about the repository", |
| placeholder="Type your question here and press Enter", |
| key="question_input", |
| on_change=handle_question_submit |
| ) |
|
|
| if st.session_state.processing: |
| st.markdown(""" |
| <div style='text-align: center; margin-top: 8px;'> |
| <div class='typing-indicator'> |
| <div class='dot'></div> |
| <div class='dot'></div> |
| <div class='dot'></div> |
| </div> |
| </div> |
| """, unsafe_allow_html=True) |
|
|
| |
| st.markdown("---") |
| st.markdown("Built with Streamlit, LangChain, and OpenAI") |