Update ingest.py
Browse files
ingest.py
CHANGED
|
@@ -1,31 +1,60 @@
|
|
| 1 |
import os
|
| 2 |
import shutil
|
|
|
|
|
|
|
| 3 |
from git import Repo
|
| 4 |
from chunker import chunk_code
|
| 5 |
|
| 6 |
-
SUPPORTED_EXTENSIONS = (".py", ".js", ".java", ".cpp")
|
| 7 |
BASE_REPO_DIR = "/tmp/user_repo"
|
|
|
|
| 8 |
|
| 9 |
def load_repo(repo_url: str) -> str:
|
| 10 |
-
# Always start fresh
|
| 11 |
if os.path.exists(BASE_REPO_DIR):
|
| 12 |
shutil.rmtree(BASE_REPO_DIR)
|
| 13 |
-
|
| 14 |
Repo.clone_from(repo_url, BASE_REPO_DIR)
|
| 15 |
return BASE_REPO_DIR
|
| 16 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 17 |
def ingest_repo(repo_path: str):
|
| 18 |
documents = []
|
| 19 |
|
| 20 |
for root, _, files in os.walk(repo_path):
|
| 21 |
for file in files:
|
| 22 |
-
|
| 23 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 24 |
try:
|
| 25 |
-
with open(
|
| 26 |
code = f.read()
|
| 27 |
-
documents.extend(chunk_code(
|
| 28 |
except Exception:
|
| 29 |
pass
|
| 30 |
|
| 31 |
return documents
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
import os
|
| 2 |
import shutil
|
| 3 |
+
import zipfile
|
| 4 |
+
import nbformat
|
| 5 |
from git import Repo
|
| 6 |
from chunker import chunk_code
|
| 7 |
|
| 8 |
+
SUPPORTED_EXTENSIONS = (".py", ".js", ".java", ".cpp", ".txt")
|
| 9 |
BASE_REPO_DIR = "/tmp/user_repo"
|
| 10 |
+
BASE_ZIP_DIR = "/tmp/user_zip"
|
| 11 |
|
| 12 |
def load_repo(repo_url: str) -> str:
|
|
|
|
| 13 |
if os.path.exists(BASE_REPO_DIR):
|
| 14 |
shutil.rmtree(BASE_REPO_DIR)
|
|
|
|
| 15 |
Repo.clone_from(repo_url, BASE_REPO_DIR)
|
| 16 |
return BASE_REPO_DIR
|
| 17 |
|
| 18 |
+
def extract_zip(zip_file) -> str:
|
| 19 |
+
if os.path.exists(BASE_ZIP_DIR):
|
| 20 |
+
shutil.rmtree(BASE_ZIP_DIR)
|
| 21 |
+
os.makedirs(BASE_ZIP_DIR, exist_ok=True)
|
| 22 |
+
|
| 23 |
+
with zipfile.ZipFile(zip_file, "r") as zip_ref:
|
| 24 |
+
zip_ref.extractall(BASE_ZIP_DIR)
|
| 25 |
+
|
| 26 |
+
return BASE_ZIP_DIR
|
| 27 |
+
|
| 28 |
def ingest_repo(repo_path: str):
|
| 29 |
documents = []
|
| 30 |
|
| 31 |
for root, _, files in os.walk(repo_path):
|
| 32 |
for file in files:
|
| 33 |
+
path = os.path.join(root, file)
|
| 34 |
+
|
| 35 |
+
if file.endswith(".ipynb"):
|
| 36 |
+
documents.extend(parse_notebook(path))
|
| 37 |
+
|
| 38 |
+
elif file.endswith(SUPPORTED_EXTENSIONS):
|
| 39 |
try:
|
| 40 |
+
with open(path, "r", errors="ignore") as f:
|
| 41 |
code = f.read()
|
| 42 |
+
documents.extend(chunk_code(path, code))
|
| 43 |
except Exception:
|
| 44 |
pass
|
| 45 |
|
| 46 |
return documents
|
| 47 |
+
|
| 48 |
+
def parse_notebook(file_path: str):
|
| 49 |
+
docs = []
|
| 50 |
+
try:
|
| 51 |
+
nb = nbformat.read(file_path, as_version=4)
|
| 52 |
+
code_cells = [
|
| 53 |
+
cell.source for cell in nb.cells if cell.cell_type == "code"
|
| 54 |
+
]
|
| 55 |
+
combined = "\n\n".join(code_cells)
|
| 56 |
+
if len(combined.strip()) > 100:
|
| 57 |
+
docs.extend(chunk_code(file_path, combined))
|
| 58 |
+
except Exception:
|
| 59 |
+
pass
|
| 60 |
+
return docs
|