CodexPilot / app.py
SharathReddy's picture
Update app.py
0a6f593 verified
import os
import hmac
import hashlib
import time
import jwt
import requests
import tempfile
import shutil
import ast
import faiss
import numpy as np
from fastapi import FastAPI, Request, HTTPException, status
from dotenv import load_dotenv
from git import Repo
from sentence_transformers import SentenceTransformer
# --- Configuration ---
load_dotenv()
GITHUB_WEBHOOK_SECRET = os.getenv("GITHUB_WEBHOOK_SECRET")
GITHUB_APP_ID = os.getenv("GITHUB_APP_ID")
GITHUB_PRIVATE_KEY = os.getenv("GITHUB_PRIVATE_KEY")
# In-memory storage for our repository data
repo_data_store = {}
# --- Code Processing and Vectorization Class ---
class CodeProcessor:
def __init__(self, model_name='all-MiniLM-L6-v2'):
print("Initializing SentenceTransformer model...")
self.model = SentenceTransformer(model_name)
print("Model initialized.")
def parse_python_file(self, file_path):
"""Parses a Python file to extract functions and their source code using AST."""
# --- THE FIX IS HERE ---
# First, read the entire file content into a string variable.
with open(file_path, 'r', encoding='utf-8') as file:
source_code = file.read()
try:
# Parse the source code string.
tree = ast.parse(source_code)
except SyntaxError:
print(f"SyntaxError in {file_path}, skipping.")
return []
functions = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
# Now, pass the correct `source_code` string to get the segment.
function_source = ast.get_source_segment(source_code, node)
if function_source: # Ensure we got a valid segment
functions.append({
"name": node.name,
"source": function_source,
"file_path": file_path.replace(tempfile.gettempdir(), "") # Store relative path
})
return functions
def process_repo(self, repo_path):
"""Walks a repository, parses Python files, and creates a FAISS index."""
all_functions = []
print(f"Walking through Python files in {repo_path}...")
for root, _, files in os.walk(repo_path):
for file in files:
if file.endswith('.py'):
# Create the full path relative to the repo root for parsing
full_path = os.path.join(root, file)
# Create a "clean" path for storage in metadata
clean_path = os.path.relpath(full_path, repo_path)
parsed_funcs = self.parse_python_file(full_path)
for func in parsed_funcs:
func['file_path'] = clean_path # Overwrite with the clean path
all_functions.extend(parsed_funcs)
if not all_functions:
print("No Python functions found in the repository.")
return None, None
print(f"Found {len(all_functions)} functions. Generating embeddings...")
function_sources = [f["source"] for f in all_functions]
embeddings = self.model.encode(function_sources, show_progress_bar=False)
embedding_dim = embeddings.shape[1]
index = faiss.IndexFlatL2(embedding_dim)
index.add(np.array(embeddings, dtype=np.float32))
print(f"FAISS index created successfully with {index.ntotal} vectors.")
return index, all_functions
# --- Initialize the processor globally ---
code_processor = CodeProcessor()
# --- GitHub App Authentication & Repo Management (Unchanged) ---
# ... (The rest of the file from the previous correct version remains the same) ...
def create_jwt(app_id, private_key):
now = int(time.time())
payload = {"iat": now, "exp": now + (10 * 60), "iss": app_id}
return jwt.encode(payload, private_key, algorithm="RS256")
def get_installation_access_token(installation_id, app_id, private_key):
app_jwt = create_jwt(app_id, private_key)
headers = {"Authorization": f"Bearer {app_jwt}", "Accept": "application/vnd.github.v3+json"}
url = f"https://api.github.com/app/installations/{installation_id}/access_tokens"
response = requests.post(url, headers=headers)
response.raise_for_status()
return response.json()["token"]
def process_repository(repo_url, token, repo_full_name):
"""Clones a repo and hands it off to the CodeProcessor."""
temp_dir = tempfile.mkdtemp()
print(f"Cloning {repo_full_name} into {temp_dir}...")
try:
clone_url = repo_url.replace("https://", f"https://x-access-token:{token}@")
Repo.clone_from(clone_url, temp_dir)
faiss_index, metadata = code_processor.process_repo(temp_dir)
if faiss_index:
repo_data_store[repo_full_name] = {
"status": "processed",
"faiss_index": faiss_index,
"metadata": metadata
}
print(f"Successfully processed and indexed {repo_full_name}")
else:
print(f"No processable data found for {repo_full_name}")
except Exception as e:
print(f"Failed to process repository {repo_full_name}: {e}")
finally:
print(f"Cleaning up temporary directory: {temp_dir}")
shutil.rmtree(temp_dir)
# --- FastAPI App (Webhook handler is unchanged) ---
app = FastAPI()
async def verify_signature(request: Request):
if not GITHUB_WEBHOOK_SECRET: raise HTTPException(status_code=500, detail="Webhook secret not configured.")
signature_header = request.headers.get("X-Hub-Signature-256")
if not signature_header: raise HTTPException(status_code=400, detail="X-Hub-Signature-256 header is missing.")
body = await request.body()
sha_name, signature = signature_header.split("=")
mac = hmac.new(GITHUB_WEBHOOK_SECRET.encode("utf-8"), msg=body, digestmod=hashlib.sha256)
if not hmac.compare_digest(mac.hexdigest(), signature): raise HTTPException(status_code=400, detail="Invalid signature.")
@app.get("/")
def read_root():
return {"message": "Docu-Pilot server is alive!"}
@app.post("/api/github/webhook")
async def github_webhook(request: Request):
await verify_signature(request)
event_type = request.headers.get("X-GitHub-Event")
payload = await request.json()
print(f"Received event: {event_type} with action: {payload.get('action')}")
installation_id = payload.get("installation", {}).get("id")
if not installation_id: return {"status": "ok", "message": "Event does not pertain to an installation."}
repos_to_process = []
if event_type == "installation" and payload.get("action") == "created":
repos_to_process = payload.get("repositories", [])
elif event_type == "installation_repositories" and payload.get("action") == "added":
repos_to_process = payload.get("repositories_added", [])
if repos_to_process:
try:
token = get_installation_access_token(installation_id, GITHUB_APP_ID, GITHUB_PRIVATE_KEY)
for repo in repos_to_process:
repo_full_name = repo["full_name"]
repo_url = f"https://github.com/{repo_full_name}"
process_repository(repo_url, token, repo_full_name)
except Exception as e:
print(f"Error during repository processing: {e}")
elif event_type == "push":
repo_name = payload.get("repository", {}).get("full_name")
print(f"Received a push event on repo {repo_name}")
return {"status": "ok"}