WhatRepo / back_end /core /loader.py
Krishna172912's picture
Create loader.py
8725d0d unverified
from config import AUTO_GEN_SCAN_EXTENSIONS,AUTO_GENERATED_MARKERS,SUPPORTED_TYPES,EXCLUDE_PATTERNS
from pathlib import Path
import pathspec
import json
import os
import pathspec
from langchain_core.documents import Document
from langchain_core.document_loaders.base import BaseLoader
from langchain_community.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader
def is_valid(file_path):
path = Path(file_path)
if not path.is_file():
return False
name_lower = path.name.lower()
extension = path.suffix.lower()
if path.name in {"Dockerfile", "Makefile", "LICENSE", "Procfile", "Rakefile"}:
return True
if ".min." in name_lower or ".pb." in name_lower or ".g." in name_lower:
return False
if path.name in {".env"} or extension in {".pem", ".key"}:
return False
if extension == ".lock":
return False
# Reject auto-generated files before touching the size check
if extension in AUTO_GEN_SCAN_EXTENSIONS:
try:
with open(file_path, "r", errors="ignore") as f:
header = f.read(512).lower() # 512 bytes is fast; covers any header
if any(marker.lower() in header for marker in AUTO_GENERATED_MARKERS):
return False
except Exception:
pass # If we can't read the header, fall through to normal checks
size_kb = path.stat().st_size >> 10
if extension in SUPPORTED_TYPES["no_limit"]:
return True
if extension in SUPPORTED_TYPES["limit_2048kb"]:
return size_kb <= 2048
if extension in SUPPORTED_TYPES["limit_50kb"]:
return size_kb <= 50
if extension in SUPPORTED_TYPES["limit_30kb"]:
return size_kb <= 30
if extension in SUPPORTED_TYPES["limit_20kb"]:
return size_kb <= 20
if extension != "":
return False
try:
with open(file_path, "rb") as f:
chunk = f.read(2048)
if b"\x00" in chunk:
return False
chunk.decode("utf-8")
return True
except Exception:
return False
def count_valid_supported_files(directory_path: Path) -> int:
import os
import pathspec
from concurrent.futures import ThreadPoolExecutor
spec = pathspec.PathSpec.from_lines('gitwildmatch', EXCLUDE_PATTERNS)
root = str(directory_path)
# 1. FAST TRAVERSAL: Gather all file paths first
candidates = []
stack = [root]
# We define this locally since we can't edit globals.
# Checking a set is O(1) and bypasses slow pathspec regex for massive junk folders.
fast_ignore_dirs = {
".git", ".svn", ".hg", "node_modules", "venv", ".venv", "env", "python_env",
"__pycache__", "dist", "build", "out", "target", "bin", "obj", ".next",
".nuxt", ".vscode", ".idea", "coverage", "tmp", "temp"
}
while stack:
current_dir = stack.pop()
try:
with os.scandir(current_dir) as it:
for entry in it:
# Instantly skip giant junk directories before running slow regex
if entry.is_dir(follow_symlinks=False) and entry.name in fast_ignore_dirs:
continue
rel_path = os.path.relpath(entry.path, root)
if spec.match_file(rel_path):
continue
if entry.is_dir(follow_symlinks=False):
stack.append(entry.path)
elif entry.is_file(follow_symlinks=False):
# Do NOT validate here. Just collect the path.
candidates.append(entry.path)
except PermissionError:
continue
# At this line, len(candidates) gives you the instant total of 34,645 files!
# 2. MULTITHREADED VALIDATION: Run `is_valid` in parallel
valid_count = 0
# Using 32 workers is generally a sweet spot for I/O bound disk operations
with ThreadPoolExecutor(max_workers=32) as executor:
# executor.map feeds our candidates list into your existing `is_valid` function
results = executor.map(is_valid, candidates)
# Count how many returned True
valid_count = sum(1 for is_file_valid in results if is_file_valid)
return valid_count
def _Custom_ipynbLoader(file_path):
try:
with open(file_path, 'r', encoding="utf-8") as f:
notebook = json.load(f)
cells = []
for i, cell in enumerate(notebook.get("cells", [])):
if cell.get("cell_type") in ["code", "markdown"]:
source = cell.get("source", "")
content = "".join(source) if isinstance(source, list) else source
cells.append(f"[{cell['cell_type'].upper()} CELL {i}]\n{content}")
extraction = "\n\n".join(cells)
return [Document(page_content=extraction, metadata={"source": str(file_path)})]
except Exception:
return []
class _CustomLoader(BaseLoader):
def __init__(self, file_path: str):
self.file_path = file_path
def load(self):
if not is_valid(self.file_path):
return []
ext = Path(self.file_path).suffix.lower()
try:
if ext == ".pdf":
return PyPDFLoader(self.file_path).load()
elif ext == ".ipynb":
return _Custom_ipynbLoader(self.file_path)
else:
try:
return TextLoader(self.file_path, encoding="utf-8").load()
except UnicodeDecodeError:
# SAFETY: If the file has weird characters, open it manually and ignore errors
with open(self.file_path, "r", encoding="utf-8", errors="ignore") as f:
text = f.read()
return [Document(page_content=text, metadata={"source": self.file_path})]
except Exception as e:
print(f"Failed to load {self.file_path}: {e}")
return []
def load_repository_as_documents(repo_storage_path: Path) -> list[Document]:
loader = DirectoryLoader(
repo_storage_path,
glob="**/*.*",
exclude=EXCLUDE_PATTERNS,
loader_cls=_CustomLoader,
recursive=True,
silent_errors=True,
show_progress=True,
use_multithreading=True,
)
docs = loader.load()
print(f"Successfully loaded {len(docs)} documents.")
return docs