Spaces:
Sleeping
Sleeping
File size: 11,278 Bytes
6c1db65 fd2f2cc 6c1db65 fd2f2cc 6c1db65 f3c73d2 21fa12d 6c1db65 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | import os
import gradio as gr
import logging
from git import Repo
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from langchain_pinecone import PineconeVectorStore
from langchain.schema import Document
from tree_sitter_languages import get_parser
from pinecone import Pinecone
import openai
import numpy as np
# Load environment variables
load_dotenv()
# Logging Configuration
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Environment Variables
CLONE_DIR = "./cloned_repos"
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX_KEY = "codebase-app"
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
# Initialize GROQ API
client = openai.OpenAI(
base_url="https://api.groq.com/openai/v1",
api_key=GROQ_API_KEY
)
# Initialize Pinecone
pinecone_client = Pinecone(api_key=PINECONE_API_KEY)
pinecone_index = pinecone_client.Index(PINECONE_INDEX_KEY)
# Initialize SentenceTransformer Embedding Model
embedding_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")
# Supported Extensions
SUPPORTED_EXTENSIONS = {".py", ".java", ".js", ".ts", ".cpp", ".h", ".ipynb"}
IGNORED_DIRS = {"node_modules", "venv", "env", ".git", "__pycache__"}
#Systems prompt
system_prompt =f"""You are a Senior Software engineer with more than 20 years of experience delivering software for massive use. You are very technical and have complete expertise over all domains of software in all aspects.
Answer any questions I have about the codebase, based on the code provided. Always consider all of the context provided when forming a response.
"""
# Backend Logic: Clone Repository
def clone_repository(repo_url: str) -> str:
"""Clone the GitHub repository locally."""
repo_name = repo_url.split("/")[-1].replace(".git", "")
repo_path = os.path.join(CLONE_DIR, repo_name)
if not os.path.exists(CLONE_DIR):
os.makedirs(CLONE_DIR)
if os.path.exists(repo_path):
logger.info(f"Repository already exists: {repo_path}")
return repo_path
Repo.clone_from(repo_url, repo_path)
logger.info(f"Cloned repository to: {repo_path}")
return repo_path
# Backend Logic: Parse Repository
class SimpleTreeSitterParser:
"""Parser for extracting code chunks from files."""
def __init__(self, language: str):
self.language = language
try:
self.parser = get_parser(language) # Ensure only the required argument is passed
except Exception as e:
logger.error(f"Error initializing parser for {language}: {e}")
raise ValueError(f"Parser error for {language}: {e}")
def parse(self, code: str) -> list:
try:
tree = self.parser.parse(bytes(code, "utf-8"))
root = tree.root_node
chunks = []
for child in root.children:
chunks.append({
"type": child.type,
"content": code[child.start_byte:child.end_byte],
"start_line": child.start_point[0] + 1,
"end_line": child.end_point[0] + 1,
})
return chunks
except Exception as e:
logger.error(f"Error parsing code: {e}")
return []
def parse_repository(repo_path: str) -> list:
"""Parse repository files into meaningful chunks."""
chunks = []
for root, _, files in os.walk(repo_path):
if any(ignored_dir in root for ignored_dir in IGNORED_DIRS):
continue
for file in files:
ext = os.path.splitext(file)[1]
if ext not in SUPPORTED_EXTENSIONS:
logger.warning(f"Skipping unsupported file: {file}")
continue
file_path = os.path.join(root, file)
language = {
".py": "python",
".ts": "typescript",
".js": "javascript",
".java": "java",
".cpp": "cpp",
}.get(ext, "unknown")
try:
logger.info(f"Processing file: {file_path}")
code = get_file_content(file_path)
if not code:
logger.warning(f"No content found in {file_path}")
continue
parser = SimpleTreeSitterParser(language)
parsed_chunks = parser.parse(code)
chunks.extend(parsed_chunks)
except ValueError as ve:
logger.error(f"Skipping file {file_path} due to parser error: {ve}")
except Exception as e:
logger.error(f"Unexpected error processing {file_path}: {e}")
return chunks
# Helper: Read File Content
def get_file_content(file_path: str) -> str:
"""Read and return the content of a file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
return ""
# Backend Logic: Store Embeddings
def store_embeddings(documents, namespace="default"):
"""Store embeddings in Pinecone."""
try:
texts = [doc.page_content for doc in documents]
embeddings = embedding_model.encode(texts, show_progress_bar=True)
vectors = [
{
"id": str(i),
"values": embeddings[i].tolist(),
"metadata": {"text": doc.page_content, **doc.metadata},
}
for i, doc in enumerate(documents)
]
pinecone_index.upsert(vectors=vectors, namespace=namespace)
logger.info(f"Stored {len(vectors)} embeddings in Pinecone namespace '{namespace}'.")
except Exception as e:
logger.error(f"Error storing embeddings: {e}")
raise
# Backend Logic: Perform RAG
def perform_rag(query: str, namespace="default") -> str:
"""Retrieve context and generate responses."""
try:
query_embedding = embedding_model.encode(query).tolist()
response = pinecone_index.query(
vector=query_embedding,
top_k=10,
include_metadata=True,
namespace=namespace
)
if not response.get('matches'):
return "No relevant context found."
contexts = [match['metadata'].get('text', '') for match in response['matches']]
augmented_query = "<CONTEXT>\n" + "\n\n-------\n\n".join(contexts) + "\n-------\n</CONTEXT>\n\n" + query
llm_response = client.chat.completions.create(
model="llama-3.1-8b-instant",
messages=[
{"role": "system", "content":system_prompt},
{"role": "user", "content": augmented_query}
]
)
return llm_response.choices[0].message.content
except Exception as e:
logger.error(f"Error performing RAG: {e}")
return f"Error: {e}"
# Process Repository
def process_repo(repo_url: str) -> str:
"""Clone, parse, and store embeddings for a repository."""
try:
namespace = repo_url.split("/")[-1].replace(".git", "")
repo_path = clone_repository(repo_url)
chunks = parse_repository(repo_path)
if not chunks:
return "No valid chunks found in the repository."
documents = [Document(page_content=chunk["content"], metadata={"repo_url": repo_url}) for chunk in chunks]
store_embeddings(documents, namespace=namespace)
return f"Repository processed successfully in namespace '{namespace}'!"
except Exception as e:
logger.error(f"Error processing repository: {e}")
return f"Error: {e}"
# Fetch Namespaces
def fetch_namespaces():
"""Retrieve namespaces from Pinecone."""
try:
stats = pinecone_index.describe_index_stats()
return list(stats.get("namespaces", {}).keys())
except Exception as e:
logger.error(f"Error fetching namespaces: {e}")
return []
# Gradio UI
def create_ui():
namespaces = fetch_namespaces()
with gr.Blocks() as demo:
namespace_state = gr.State(value=None)
chat_history = gr.State(value=[])
with gr.Column():
gr.Markdown("## Codebase Chat App with Repository Management")
gr.Markdown("""
**Instructions:**
1. Enter the GitHub repository URL you wish to clone and click **Git Clone 😺**.
2. After cloning, to see the new repository appear in the namespace dropdown, type any character into the URL box and click **Git Clone 😺** again.
3. Select the desired namespace from the dropdown.
4. Use the chatbot below to interact with the selected codebase.
(Sorry for this I'm currently trying to solve this bug, feel free to se the code if you can spot the issue 🙂↕️)
""")
with gr.Row():
repo_url_input = gr.Textbox(label="GitHub Repository URL", placeholder="Enter repo URL to clone")
clone_button = gr.Button("Git Clone 😺")
clone_status = gr.Textbox(label="Clone Status", interactive=False)
namespace_dropdown = gr.Dropdown(choices=namespaces, label="Namespace", interactive=True)
chatbot = gr.Chatbot(label="Codebase Chatbot", type="messages")
message_input = gr.Textbox(placeholder="Enter your message here...")
send_button = gr.Button("Send")
def update_namespace_or_clone(repo_url, current_namespace):
"""Clone repository and update namespaces."""
if repo_url:
message = process_repo(repo_url)
updated_namespaces = fetch_namespaces()
return (
gr.update(choices=updated_namespaces, value=None),
message,
[], # Clear chat history
None
)
return gr.update(), "Please provide a repository URL.", current_namespace, current_namespace
def handle_query(message, history, namespace):
"""Handle chatbot queries."""
if not namespace:
new_history = history + [{"role": "assistant", "content": "Please select a namespace first!"}]
return new_history, new_history, gr.update(value="")
response = perform_rag(message, namespace)
# Convert history to the correct format
formatted_history = history + [
{"role": "user", "content": message},
{"role": "assistant", "content": response}
]
return formatted_history, formatted_history, gr.update(value="")
# Bind clone button
clone_button.click(
update_namespace_or_clone,
inputs=[repo_url_input, namespace_state],
outputs=[namespace_dropdown, clone_status, chat_history, namespace_state],
)
# Bind query button
send_button.click(
handle_query,
inputs=[message_input, chat_history, namespace_dropdown],
outputs=[chatbot, chat_history, message_input],
)
return demo
if __name__ == "__main__":
app = create_ui()
app.launch()
|