File size: 6,560 Bytes
8725d0d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from config import AUTO_GEN_SCAN_EXTENSIONS,AUTO_GENERATED_MARKERS,SUPPORTED_TYPES,EXCLUDE_PATTERNS
from pathlib import Path
import pathspec
import json
import os
import pathspec

from langchain_core.documents import Document
from langchain_core.document_loaders.base import BaseLoader
from langchain_community.document_loaders import DirectoryLoader, TextLoader, PyPDFLoader

def is_valid(file_path):
    path = Path(file_path)

    if not path.is_file():
        return False

    name_lower = path.name.lower()
    extension = path.suffix.lower()

    if path.name in {"Dockerfile", "Makefile", "LICENSE", "Procfile", "Rakefile"}:
        return True

    if ".min." in name_lower or ".pb." in name_lower or ".g." in name_lower:
        return False

    if path.name in {".env"} or extension in {".pem", ".key"}:
        return False

    if extension == ".lock":
        return False

    # Reject auto-generated files before touching the size check 
    if extension in AUTO_GEN_SCAN_EXTENSIONS:
        try:
            with open(file_path, "r", errors="ignore") as f:
                header = f.read(512).lower()   # 512 bytes is fast; covers any header
            if any(marker.lower() in header for marker in AUTO_GENERATED_MARKERS):
                return False
        except Exception:
            pass   # If we can't read the header, fall through to normal checks

    size_kb = path.stat().st_size >> 10

    if extension in SUPPORTED_TYPES["no_limit"]:
        return True
    if extension in SUPPORTED_TYPES["limit_2048kb"]:
        return size_kb <= 2048
    if extension in SUPPORTED_TYPES["limit_50kb"]:
        return size_kb <= 50
    if extension in SUPPORTED_TYPES["limit_30kb"]:
        return size_kb <= 30
    if extension in SUPPORTED_TYPES["limit_20kb"]:
        return size_kb <= 20

    if extension != "":
        return False

    try:
        with open(file_path, "rb") as f:
            chunk = f.read(2048)
            if b"\x00" in chunk:
                return False
            chunk.decode("utf-8")
            return True
    except Exception:
        return False


def count_valid_supported_files(directory_path: Path) -> int:
    import os
    import pathspec
    from concurrent.futures import ThreadPoolExecutor

    spec = pathspec.PathSpec.from_lines('gitwildmatch', EXCLUDE_PATTERNS)
    root = str(directory_path)

    # 1. FAST TRAVERSAL: Gather all file paths first
    candidates = []
    stack = [root]
    
    # We define this locally since we can't edit globals. 
    # Checking a set is O(1) and bypasses slow pathspec regex for massive junk folders.
    fast_ignore_dirs = {
        ".git", ".svn", ".hg", "node_modules", "venv", ".venv", "env", "python_env",
        "__pycache__", "dist", "build", "out", "target", "bin", "obj", ".next", 
        ".nuxt", ".vscode", ".idea", "coverage", "tmp", "temp"
    }

    while stack:
        current_dir = stack.pop()

        try:
            with os.scandir(current_dir) as it:
                for entry in it:
                    # Instantly skip giant junk directories before running slow regex
                    if entry.is_dir(follow_symlinks=False) and entry.name in fast_ignore_dirs:
                        continue
                        
                    rel_path = os.path.relpath(entry.path, root)

                    if spec.match_file(rel_path):
                        continue

                    if entry.is_dir(follow_symlinks=False):
                        stack.append(entry.path)
                    elif entry.is_file(follow_symlinks=False):
                        # Do NOT validate here. Just collect the path.
                        candidates.append(entry.path)
                        
        except PermissionError:
            continue

    # At this line, len(candidates) gives you the instant total of 34,645 files!
    
    # 2. MULTITHREADED VALIDATION: Run `is_valid` in parallel
    valid_count = 0
    
    # Using 32 workers is generally a sweet spot for I/O bound disk operations
    with ThreadPoolExecutor(max_workers=32) as executor:
        # executor.map feeds our candidates list into your existing `is_valid` function
        results = executor.map(is_valid, candidates)
        
        # Count how many returned True
        valid_count = sum(1 for is_file_valid in results if is_file_valid)

    return valid_count




def _Custom_ipynbLoader(file_path):
    try:
        with open(file_path, 'r', encoding="utf-8") as f:
            notebook = json.load(f)

        cells = []
        for i, cell in enumerate(notebook.get("cells", [])):
            if cell.get("cell_type") in ["code", "markdown"]:
                source = cell.get("source", "")
                content = "".join(source) if isinstance(source, list) else source
                cells.append(f"[{cell['cell_type'].upper()} CELL {i}]\n{content}")

        extraction = "\n\n".join(cells)
        return [Document(page_content=extraction, metadata={"source": str(file_path)})]
    except Exception:
        return []


class _CustomLoader(BaseLoader):
    def __init__(self, file_path: str):
        self.file_path = file_path

    def load(self):
        if not is_valid(self.file_path):
            return []

        ext = Path(self.file_path).suffix.lower()

        try:
            if ext == ".pdf":
                return PyPDFLoader(self.file_path).load()
            elif ext == ".ipynb":
                return _Custom_ipynbLoader(self.file_path)
            else:
                try:
                    return TextLoader(self.file_path, encoding="utf-8").load()
                except UnicodeDecodeError:
                    # SAFETY: If the file has weird characters, open it manually and ignore errors
                    with open(self.file_path, "r", encoding="utf-8", errors="ignore") as f:
                        text = f.read()
                    return [Document(page_content=text, metadata={"source": self.file_path})]
                    
        except Exception as e:
            print(f"Failed to load {self.file_path}: {e}")
            return []

def load_repository_as_documents(repo_storage_path: Path) -> list[Document]:
    loader = DirectoryLoader(
        repo_storage_path,
        glob="**/*.*",
        exclude=EXCLUDE_PATTERNS,
        loader_cls=_CustomLoader,
        recursive=True,
        silent_errors=True,
        show_progress=True,
        use_multithreading=True,
    )
    
    docs = loader.load()
    print(f"Successfully loaded {len(docs)} documents.")
    return docs