Spaces:
Sleeping
Sleeping
| from config import AUTO_GEN_SCAN_EXTENSIONS,AUTO_GENERATED_MARKERS,SUPPORTED_TYPES,EXCLUDE_PATTERNS | |
| from pathlib import Path | |
| import pathspec | |
| import json | |
| import os | |
| import pathspec | |
| from langchain_core.documents import Document | |
| from langchain_core.document_loaders.base import BaseLoader | |
| from langchain_community.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader | |
| def is_valid(file_path): | |
| path = Path(file_path) | |
| if not path.is_file(): | |
| return False | |
| name_lower = path.name.lower() | |
| extension = path.suffix.lower() | |
| if path.name in {"Dockerfile", "Makefile", "LICENSE", "Procfile", "Rakefile"}: | |
| return True | |
| if ".min." in name_lower or ".pb." in name_lower or ".g." in name_lower: | |
| return False | |
| if path.name in {".env"} or extension in {".pem", ".key"}: | |
| return False | |
| if extension == ".lock": | |
| return False | |
| # Reject auto-generated files before touching the size check | |
| if extension in AUTO_GEN_SCAN_EXTENSIONS: | |
| try: | |
| with open(file_path, "r", errors="ignore") as f: | |
| header = f.read(512).lower() # 512 bytes is fast; covers any header | |
| if any(marker.lower() in header for marker in AUTO_GENERATED_MARKERS): | |
| return False | |
| except Exception: | |
| pass # If we can't read the header, fall through to normal checks | |
| size_kb = path.stat().st_size >> 10 | |
| if extension in SUPPORTED_TYPES["no_limit"]: | |
| return True | |
| if extension in SUPPORTED_TYPES["limit_2048kb"]: | |
| return size_kb <= 2048 | |
| if extension in SUPPORTED_TYPES["limit_50kb"]: | |
| return size_kb <= 50 | |
| if extension in SUPPORTED_TYPES["limit_30kb"]: | |
| return size_kb <= 30 | |
| if extension in SUPPORTED_TYPES["limit_20kb"]: | |
| return size_kb <= 20 | |
| if extension != "": | |
| return False | |
| try: | |
| with open(file_path, "rb") as f: | |
| chunk = f.read(2048) | |
| if b"\x00" in chunk: | |
| return False | |
| chunk.decode("utf-8") | |
| return True | |
| except Exception: | |
| return False | |
| def count_valid_supported_files(directory_path: Path) -> int: | |
| import os | |
| import pathspec | |
| from concurrent.futures import ThreadPoolExecutor | |
| spec = pathspec.PathSpec.from_lines('gitwildmatch', EXCLUDE_PATTERNS) | |
| root = str(directory_path) | |
| # 1. FAST TRAVERSAL: Gather all file paths first | |
| candidates = [] | |
| stack = [root] | |
| # We define this locally since we can't edit globals. | |
| # Checking a set is O(1) and bypasses slow pathspec regex for massive junk folders. | |
| fast_ignore_dirs = { | |
| ".git", ".svn", ".hg", "node_modules", "venv", ".venv", "env", "python_env", | |
| "__pycache__", "dist", "build", "out", "target", "bin", "obj", ".next", | |
| ".nuxt", ".vscode", ".idea", "coverage", "tmp", "temp" | |
| } | |
| while stack: | |
| current_dir = stack.pop() | |
| try: | |
| with os.scandir(current_dir) as it: | |
| for entry in it: | |
| # Instantly skip giant junk directories before running slow regex | |
| if entry.is_dir(follow_symlinks=False) and entry.name in fast_ignore_dirs: | |
| continue | |
| rel_path = os.path.relpath(entry.path, root) | |
| if spec.match_file(rel_path): | |
| continue | |
| if entry.is_dir(follow_symlinks=False): | |
| stack.append(entry.path) | |
| elif entry.is_file(follow_symlinks=False): | |
| # Do NOT validate here. Just collect the path. | |
| candidates.append(entry.path) | |
| except PermissionError: | |
| continue | |
| # At this line, len(candidates) gives you the instant total of 34,645 files! | |
| # 2. MULTITHREADED VALIDATION: Run `is_valid` in parallel | |
| valid_count = 0 | |
| # Using 32 workers is generally a sweet spot for I/O bound disk operations | |
| with ThreadPoolExecutor(max_workers=32) as executor: | |
| # executor.map feeds our candidates list into your existing `is_valid` function | |
| results = executor.map(is_valid, candidates) | |
| # Count how many returned True | |
| valid_count = sum(1 for is_file_valid in results if is_file_valid) | |
| return valid_count | |
| def _Custom_ipynbLoader(file_path): | |
| try: | |
| with open(file_path, 'r', encoding="utf-8") as f: | |
| notebook = json.load(f) | |
| cells = [] | |
| for i, cell in enumerate(notebook.get("cells", [])): | |
| if cell.get("cell_type") in ["code", "markdown"]: | |
| source = cell.get("source", "") | |
| content = "".join(source) if isinstance(source, list) else source | |
| cells.append(f"[{cell['cell_type'].upper()} CELL {i}]\n{content}") | |
| extraction = "\n\n".join(cells) | |
| return [Document(page_content=extraction, metadata={"source": str(file_path)})] | |
| except Exception: | |
| return [] | |
| class _CustomLoader(BaseLoader): | |
| def __init__(self, file_path: str): | |
| self.file_path = file_path | |
| def load(self): | |
| if not is_valid(self.file_path): | |
| return [] | |
| ext = Path(self.file_path).suffix.lower() | |
| try: | |
| if ext == ".pdf": | |
| return PyPDFLoader(self.file_path).load() | |
| elif ext == ".ipynb": | |
| return _Custom_ipynbLoader(self.file_path) | |
| else: | |
| try: | |
| return TextLoader(self.file_path, encoding="utf-8").load() | |
| except UnicodeDecodeError: | |
| # SAFETY: If the file has weird characters, open it manually and ignore errors | |
| with open(self.file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| text = f.read() | |
| return [Document(page_content=text, metadata={"source": self.file_path})] | |
| except Exception as e: | |
| print(f"Failed to load {self.file_path}: {e}") | |
| return [] | |
| def load_repository_as_documents(repo_storage_path: Path) -> list[Document]: | |
| loader = DirectoryLoader( | |
| repo_storage_path, | |
| glob="**/*.*", | |
| exclude=EXCLUDE_PATTERNS, | |
| loader_cls=_CustomLoader, | |
| recursive=True, | |
| silent_errors=True, | |
| show_progress=True, | |
| use_multithreading=True, | |
| ) | |
| docs = loader.load() | |
| print(f"Successfully loaded {len(docs)} documents.") | |
| return docs | |