Spaces:
Running
Running
| """ | |
| Repository scanner. | |
| Takes an uploaded repo (a .zip or an already-extracted directory) and returns | |
| the Python source files worth indexing -- filtering out virtualenvs, caches, | |
| VCS folders, build artifacts, and oversized files so the rest of the pipeline | |
| only ever sees real source code. | |
| Run standalone to sanity-check a repo: | |
| python -m src.ingestion.scanner path/to/repo.zip | |
| python -m src.ingestion.scanner path/to/repo_dir | |
| """ | |
| import os | |
| import tempfile | |
| import zipfile | |
| from pathlib import Path | |
| # Directories we never want to descend into. | |
| SKIP_DIRS = { | |
| ".git", ".hg", ".svn", | |
| "__pycache__", ".pytest_cache", ".mypy_cache", ".ruff_cache", ".tox", | |
| ".venv", "venv", "env", "virtualenv", | |
| "node_modules", "dist", "build", ".eggs", "site-packages", | |
| ".idea", ".vscode", ".ipynb_checkpoints", | |
| } | |
| # Skip files bigger than this (generated/vendored files, data dumps, etc.). | |
| MAX_FILE_BYTES = 1_000_000 # ~1 MB | |
| def extract_zip(zip_path, dest_dir=None): | |
| """Extract a .zip and return the directory it was extracted into.""" | |
| dest_dir = dest_dir or tempfile.mkdtemp(prefix="repo_") | |
| with zipfile.ZipFile(zip_path) as zf: | |
| zf.extractall(dest_dir) | |
| return dest_dir | |
| def scan_python_files(root_dir): | |
| """ | |
| Walk root_dir and return a list of file records: | |
| {"path": absolute path, "rel_path": path relative to root, "source": text} | |
| rel_path is what we'll show in citations later (e.g. "auth/service.py"), | |
| so it must stay relative to the repo root, not the temp extraction folder. | |
| """ | |
| root = Path(root_dir) | |
| files = [] | |
| for dirpath, dirnames, filenames in os.walk(root): | |
| # Prune skip dirs in place so os.walk won't descend into them. | |
| dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS] | |
| for name in filenames: | |
| if not name.endswith(".py"): | |
| continue | |
| full = Path(dirpath) / name | |
| try: | |
| if full.stat().st_size > MAX_FILE_BYTES: | |
| continue | |
| source = full.read_text(encoding="utf-8", errors="ignore") | |
| except (OSError, UnicodeDecodeError): | |
| continue # unreadable / weird encoding -> skip quietly | |
| files.append({ | |
| "path": str(full), | |
| "rel_path": str(full.relative_to(root)), | |
| "source": source, | |
| }) | |
| # Stable order makes runs reproducible (matters for eval later). | |
| files.sort(key=lambda f: f["rel_path"]) | |
| return files | |
| def scan_repo(zip_path): | |
| """Extract a zip and scan it in one call. Returns (files, extraction_root).""" | |
| root = extract_zip(zip_path) | |
| return scan_python_files(root), root | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) < 2: | |
| print("usage: python -m src.ingestion.scanner <repo.zip | repo_dir>") | |
| sys.exit(1) | |
| target = sys.argv[1] | |
| if target.lower().endswith(".zip"): | |
| found, root = scan_repo(target) | |
| print(f"Extracted to: {root}") | |
| else: | |
| found = scan_python_files(target) | |
| print(f"Found {len(found)} Python files:") | |
| for f in found: | |
| line_count = f["source"].count("\n") + 1 | |
| print(f" {f['rel_path']} ({line_count} lines)") |