File size: 3,244 Bytes
8e72e1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
"""
Repository scanner.

Takes an uploaded repo (a .zip or an already-extracted directory) and returns
the Python source files worth indexing -- filtering out virtualenvs, caches,
VCS folders, build artifacts, and oversized files so the rest of the pipeline
only ever sees real source code.

Run standalone to sanity-check a repo:
    python -m src.ingestion.scanner path/to/repo.zip
    python -m src.ingestion.scanner path/to/repo_dir
"""
import os
import tempfile
import zipfile
from pathlib import Path

# Directories we never want to descend into.
SKIP_DIRS = {
    ".git", ".hg", ".svn",
    "__pycache__", ".pytest_cache", ".mypy_cache", ".ruff_cache", ".tox",
    ".venv", "venv", "env", "virtualenv",
    "node_modules", "dist", "build", ".eggs", "site-packages",
    ".idea", ".vscode", ".ipynb_checkpoints",
}

# Skip files bigger than this (generated/vendored files, data dumps, etc.).
MAX_FILE_BYTES = 1_000_000  # ~1 MB


def extract_zip(zip_path, dest_dir=None):
    """Extract a .zip and return the directory it was extracted into."""
    dest_dir = dest_dir or tempfile.mkdtemp(prefix="repo_")
    with zipfile.ZipFile(zip_path) as zf:
        zf.extractall(dest_dir)
    return dest_dir


def scan_python_files(root_dir):
    """
    Walk root_dir and return a list of file records:
        {"path": absolute path, "rel_path": path relative to root, "source": text}

    rel_path is what we'll show in citations later (e.g. "auth/service.py"),
    so it must stay relative to the repo root, not the temp extraction folder.
    """
    root = Path(root_dir)
    files = []

    for dirpath, dirnames, filenames in os.walk(root):
        # Prune skip dirs in place so os.walk won't descend into them.
        dirnames[:] = [d for d in dirnames if d not in SKIP_DIRS]

        for name in filenames:
            if not name.endswith(".py"):
                continue
            full = Path(dirpath) / name
            try:
                if full.stat().st_size > MAX_FILE_BYTES:
                    continue
                source = full.read_text(encoding="utf-8", errors="ignore")
            except (OSError, UnicodeDecodeError):
                continue  # unreadable / weird encoding -> skip quietly
            files.append({
                "path": str(full),
                "rel_path": str(full.relative_to(root)),
                "source": source,
            })

    # Stable order makes runs reproducible (matters for eval later).
    files.sort(key=lambda f: f["rel_path"])
    return files


def scan_repo(zip_path):
    """Extract a zip and scan it in one call. Returns (files, extraction_root)."""
    root = extract_zip(zip_path)
    return scan_python_files(root), root


if __name__ == "__main__":
    import sys

    if len(sys.argv) < 2:
        print("usage: python -m src.ingestion.scanner <repo.zip | repo_dir>")
        sys.exit(1)

    target = sys.argv[1]
    if target.lower().endswith(".zip"):
        found, root = scan_repo(target)
        print(f"Extracted to: {root}")
    else:
        found = scan_python_files(target)

    print(f"Found {len(found)} Python files:")
    for f in found:
        line_count = f["source"].count("\n") + 1
        print(f"  {f['rel_path']}  ({line_count} lines)")