File size: 11,278 Bytes
6c1db65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fd2f2cc
 
 
 
 
 
6c1db65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fd2f2cc
6c1db65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f3c73d2
 
 
 
21fa12d
6c1db65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import os
import gradio as gr
import logging
from git import Repo
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer
from langchain_pinecone import PineconeVectorStore
from langchain.schema import Document
from tree_sitter_languages import get_parser
from pinecone import Pinecone
import openai
import numpy as np

# Load environment variables
load_dotenv()

# Logging Configuration
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

# Environment Variables
CLONE_DIR = "./cloned_repos"
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX_KEY = "codebase-app"
GROQ_API_KEY = os.getenv("GROQ_API_KEY")

# Initialize GROQ API
client = openai.OpenAI(
    base_url="https://api.groq.com/openai/v1",
    api_key=GROQ_API_KEY
)

# Initialize Pinecone
pinecone_client = Pinecone(api_key=PINECONE_API_KEY)
pinecone_index = pinecone_client.Index(PINECONE_INDEX_KEY)

# Initialize SentenceTransformer Embedding Model
embedding_model = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")

# Supported Extensions
SUPPORTED_EXTENSIONS = {".py", ".java", ".js", ".ts", ".cpp", ".h", ".ipynb"}
IGNORED_DIRS = {"node_modules", "venv", "env", ".git", "__pycache__"}

#Systems prompt
system_prompt =f"""You are a Senior Software engineer with more than 20 years of experience delivering software for massive use. You are very technical and have complete expertise over all domains of software in all aspects. 
Answer any questions I have about the codebase, based on the code provided. Always consider all of the context provided when forming a response.
"""


# Backend Logic: Clone Repository
def clone_repository(repo_url: str) -> str:
    """Clone the GitHub repository locally."""
    repo_name = repo_url.split("/")[-1].replace(".git", "")
    repo_path = os.path.join(CLONE_DIR, repo_name)
    if not os.path.exists(CLONE_DIR):
        os.makedirs(CLONE_DIR)
    if os.path.exists(repo_path):
        logger.info(f"Repository already exists: {repo_path}")
        return repo_path
    Repo.clone_from(repo_url, repo_path)
    logger.info(f"Cloned repository to: {repo_path}")
    return repo_path

# Backend Logic: Parse Repository
class SimpleTreeSitterParser:
    """Parser for extracting code chunks from files."""
    def __init__(self, language: str):
        self.language = language
        try:
            self.parser = get_parser(language)  # Ensure only the required argument is passed
        except Exception as e:
            logger.error(f"Error initializing parser for {language}: {e}")
            raise ValueError(f"Parser error for {language}: {e}")

    def parse(self, code: str) -> list:
        try:
            tree = self.parser.parse(bytes(code, "utf-8"))
            root = tree.root_node
            chunks = []
            for child in root.children:
                chunks.append({
                    "type": child.type,
                    "content": code[child.start_byte:child.end_byte],
                    "start_line": child.start_point[0] + 1,
                    "end_line": child.end_point[0] + 1,
                })
            return chunks
        except Exception as e:
            logger.error(f"Error parsing code: {e}")
            return []

def parse_repository(repo_path: str) -> list:
    """Parse repository files into meaningful chunks."""
    chunks = []
    for root, _, files in os.walk(repo_path):
        if any(ignored_dir in root for ignored_dir in IGNORED_DIRS):
            continue
        for file in files:
            ext = os.path.splitext(file)[1]
            if ext not in SUPPORTED_EXTENSIONS:
                logger.warning(f"Skipping unsupported file: {file}")
                continue
            file_path = os.path.join(root, file)
            language = {
                ".py": "python",
                ".ts": "typescript",
                ".js": "javascript",
                ".java": "java",
                ".cpp": "cpp",
            }.get(ext, "unknown")
            try:
                logger.info(f"Processing file: {file_path}")
                code = get_file_content(file_path)
                if not code:
                    logger.warning(f"No content found in {file_path}")
                    continue
                parser = SimpleTreeSitterParser(language)
                parsed_chunks = parser.parse(code)
                chunks.extend(parsed_chunks)
            except ValueError as ve:
                logger.error(f"Skipping file {file_path} due to parser error: {ve}")
            except Exception as e:
                logger.error(f"Unexpected error processing {file_path}: {e}")
    return chunks

# Helper: Read File Content
def get_file_content(file_path: str) -> str:
    """Read and return the content of a file."""
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            return f.read()
    except Exception as e:
        logger.error(f"Error reading file {file_path}: {e}")
        return ""

# Backend Logic: Store Embeddings
def store_embeddings(documents, namespace="default"):
    """Store embeddings in Pinecone."""
    try:
        texts = [doc.page_content for doc in documents]
        embeddings = embedding_model.encode(texts, show_progress_bar=True)
        vectors = [
            {
                "id": str(i),
                "values": embeddings[i].tolist(),
                "metadata": {"text": doc.page_content, **doc.metadata},
            }
            for i, doc in enumerate(documents)
        ]
        pinecone_index.upsert(vectors=vectors, namespace=namespace)
        logger.info(f"Stored {len(vectors)} embeddings in Pinecone namespace '{namespace}'.")
    except Exception as e:
        logger.error(f"Error storing embeddings: {e}")
        raise

# Backend Logic: Perform RAG
def perform_rag(query: str, namespace="default") -> str:
    """Retrieve context and generate responses."""
    try:
        query_embedding = embedding_model.encode(query).tolist()
        response = pinecone_index.query(
            vector=query_embedding,
            top_k=10,
            include_metadata=True,
            namespace=namespace
        )
        if not response.get('matches'):
            return "No relevant context found."
        contexts = [match['metadata'].get('text', '') for match in response['matches']]
        augmented_query = "<CONTEXT>\n" + "\n\n-------\n\n".join(contexts) + "\n-------\n</CONTEXT>\n\n" + query
        llm_response = client.chat.completions.create(
            model="llama-3.1-8b-instant",
            messages=[
                {"role": "system", "content":system_prompt},
                {"role": "user", "content": augmented_query}
            ]
        )
        return llm_response.choices[0].message.content
    except Exception as e:
        logger.error(f"Error performing RAG: {e}")
        return f"Error: {e}"

# Process Repository
def process_repo(repo_url: str) -> str:
    """Clone, parse, and store embeddings for a repository."""
    try:
        namespace = repo_url.split("/")[-1].replace(".git", "")
        repo_path = clone_repository(repo_url)
        chunks = parse_repository(repo_path)
        if not chunks:
            return "No valid chunks found in the repository."
        documents = [Document(page_content=chunk["content"], metadata={"repo_url": repo_url}) for chunk in chunks]
        store_embeddings(documents, namespace=namespace)
        return f"Repository processed successfully in namespace '{namespace}'!"
    except Exception as e:
        logger.error(f"Error processing repository: {e}")
        return f"Error: {e}"

# Fetch Namespaces
def fetch_namespaces():
    """Retrieve namespaces from Pinecone."""
    try:
        stats = pinecone_index.describe_index_stats()
        return list(stats.get("namespaces", {}).keys())
    except Exception as e:
        logger.error(f"Error fetching namespaces: {e}")
        return []

# Gradio UI
def create_ui():
    namespaces = fetch_namespaces()

    with gr.Blocks() as demo:
        namespace_state = gr.State(value=None)
        chat_history = gr.State(value=[])

        with gr.Column():
            gr.Markdown("## Codebase Chat App with Repository Management")
            gr.Markdown("""
            **Instructions:**
             1. Enter the GitHub repository URL you wish to clone and click **Git Clone 😺**.
             2. After cloning, to see the new repository appear in the namespace dropdown, type any character into the URL box and click **Git Clone 😺** again.
             3. Select the desired namespace from the dropdown.
             4. Use the chatbot below to interact with the selected codebase.
             (Sorry for this I'm currently trying to solve this bug, feel free to se the code if you can spot the issue 🙂‍↕️)
            """)

            with gr.Row():
                repo_url_input = gr.Textbox(label="GitHub Repository URL", placeholder="Enter repo URL to clone")
                clone_button = gr.Button("Git Clone 😺")
                clone_status = gr.Textbox(label="Clone Status", interactive=False)

                namespace_dropdown = gr.Dropdown(choices=namespaces, label="Namespace", interactive=True)

            chatbot = gr.Chatbot(label="Codebase Chatbot", type="messages")
            message_input = gr.Textbox(placeholder="Enter your message here...")
            send_button = gr.Button("Send")

        def update_namespace_or_clone(repo_url, current_namespace):
            """Clone repository and update namespaces."""
            if repo_url:
                message = process_repo(repo_url)
                updated_namespaces = fetch_namespaces()
                return (
                    gr.update(choices=updated_namespaces, value=None),
                    message,
                    [],  # Clear chat history
                    None
                )
            return gr.update(), "Please provide a repository URL.", current_namespace, current_namespace

        def handle_query(message, history, namespace):
            """Handle chatbot queries."""
            if not namespace:
                new_history = history + [{"role": "assistant", "content": "Please select a namespace first!"}]
                return new_history, new_history, gr.update(value="")

            response = perform_rag(message, namespace)

            # Convert history to the correct format
            formatted_history = history + [
                {"role": "user", "content": message},
                {"role": "assistant", "content": response}
            ]
            return formatted_history, formatted_history, gr.update(value="")

        # Bind clone button
        clone_button.click(
            update_namespace_or_clone,
            inputs=[repo_url_input, namespace_state],
            outputs=[namespace_dropdown, clone_status, chat_history, namespace_state],
        )

        # Bind query button
        send_button.click(
            handle_query,
            inputs=[message_input, chat_history, namespace_dropdown],
            outputs=[chatbot, chat_history, message_input],
        )

    return demo


if __name__ == "__main__":
    app = create_ui()
    app.launch()