""" Repository scanner. Takes an uploaded repo (a .zip or an already-extracted directory) and returns the Python source files worth indexing -- filtering out virtualenvs, caches, VCS folders, build artifacts, and oversized files so the rest of the pipeline only ever sees real source code. Run standalone to sanity-check a repo: python -m src.ingestion.scanner path/to/repo.zip python -m src.ingestion.scanner path/to/repo_dir """ import os import tempfile import zipfile from pathlib import Path # Directories we never want to descend into. SKIP_DIRS = { ".git", ".hg", ".svn", "__pycache__", ".pytest_cache", ".mypy_cache", ".ruff_cache", ".tox", ".venv", "venv", "env", "virtualenv", "node_modules", "dist", "build", ".eggs", "site-packages", ".idea", ".vscode", ".ipynb_checkpoints", } # Skip files bigger than this (generated/vendored files, data dumps, etc.). MAX_FILE_BYTES = 1_000_000 # ~1 MB def extract_zip(zip_path, dest_dir=None): """Extract a .zip and return the directory it was extracted into.""" dest_dir = dest_dir or tempfile.mkdtemp(prefix="repo_") with zipfile.ZipFile(zip_path) as zf: zf.extractall(dest_dir) return dest_dir def scan_python_files(root_dir): """ Walk root_dir and return a list of file records: {"path": absolute path, "rel_path": path relative to root, "source": text} rel_path is what we'll show in citations later (e.g. "auth/service.py"), so it must stay relative to the repo root, not the temp extraction folder. """ root = Path(root_dir) files = [] for dirpath, dirnames, filenames in os.walk(root): # Prune skip dirs in place so os.walk won't descend into them. dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS] for name in filenames: if not name.endswith(".py"): continue full = Path(dirpath) / name try: if full.stat().st_size > MAX_FILE_BYTES: continue source = full.read_text(encoding="utf-8", errors="ignore") except (OSError, UnicodeDecodeError): continue # unreadable / weird encoding -> skip quietly files.append({ "path": str(full), "rel_path": str(full.relative_to(root)), "source": source, }) # Stable order makes runs reproducible (matters for eval later). files.sort(key=lambda f: f["rel_path"]) return files def scan_repo(zip_path): """Extract a zip and scan it in one call. Returns (files, extraction_root).""" root = extract_zip(zip_path) return scan_python_files(root), root if __name__ == "__main__": import sys if len(sys.argv) < 2: print("usage: python -m src.ingestion.scanner ") sys.exit(1) target = sys.argv[1] if target.lower().endswith(".zip"): found, root = scan_repo(target) print(f"Extracted to: {root}") else: found = scan_python_files(target) print(f"Found {len(found)} Python files:") for f in found: line_count = f["source"].count("\n") + 1 print(f" {f['rel_path']} ({line_count} lines)")