File size: 9,822 Bytes
960f6e1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
Repository Loader Module

Handles downloading and processing GitHub repositories and ZIP files.
"""

import io
import re
import zipfile
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple, List
import requests

from ..config import get_config

logger = logging.getLogger("codeatlas.repository")


@dataclass
class ProcessingStats:
    """Statistics from processing a repository."""
    files_processed: int = 0
    files_skipped: int = 0
    total_characters: int = 0
    estimated_tokens: int = 0
    
    @property
    def as_dict(self) -> dict:
        return {
            "files_processed": self.files_processed,
            "files_skipped": self.files_skipped,
            "total_characters": self.total_characters,
            "estimated_tokens": self.estimated_tokens,
        }


@dataclass
class ProcessingResult:
    """Result of processing a repository."""
    context: Optional[str] = None
    error: Optional[str] = None
    stats: Optional[ProcessingStats] = None
    repo_name: str = ""


class RepositoryLoader:
    """Loads and processes code repositories."""
    
    def __init__(self):
        self.config = get_config()
        self.processing = self.config.processing
    
    def load_from_github(self, url: str) -> ProcessingResult:
        """Download and process a GitHub repository.
        
        Args:
            url: GitHub repository URL
            
        Returns:
            ProcessingResult with context or error
        """
        zip_file, error = self._download_github_repo(url)
        if error:
            return ProcessingResult(error=error)
        
        # Extract repo name
        match = re.search(r"github\.com/([^/]+)/([^/]+)", url)
        repo_name = f"{match.group(1)}/{match.group(2)}" if match else url
        
        try:
            context, stats = self._process_zip(zip_file)
            if not context:
                return ProcessingResult(error="No valid code files found in repository.")
            return ProcessingResult(context=context, stats=stats, repo_name=repo_name)
        finally:
            zip_file.close()
    
    def load_from_file(self, file_path: str) -> ProcessingResult:
        """Process an uploaded ZIP file.
        
        Args:
            file_path: Path to the uploaded file
            
        Returns:
            ProcessingResult with context or error
        """
        try:
            with zipfile.ZipFile(file_path, "r") as zip_file:
                context, stats = self._process_zip(zip_file)
                if not context:
                    return ProcessingResult(error="No valid code files found in ZIP.")
                repo_name = Path(file_path).stem
                return ProcessingResult(context=context, stats=stats, repo_name=repo_name)
        except zipfile.BadZipFile:
            return ProcessingResult(error="Invalid ZIP archive.")
        except Exception as e:
            logger.exception("Error processing file")
            return ProcessingResult(error=f"Error: {str(e)}")
    
    def _download_github_repo(self, url: str) -> Tuple[Optional[zipfile.ZipFile], Optional[str]]:
        """Download a GitHub repository as a ZIP file."""
        try:
            # Normalize URL
            url = url.strip().rstrip("/")
            if url.endswith(".git"):
                url = url[:-4]
            if not url.startswith(("http://", "https://")):
                url = "https://" + url
            
            # Validate GitHub URL
            if "github.com" not in url:
                return None, "Please provide a valid GitHub URL"
            
            # Extract owner/repo
            match = re.search(r"github\.com/([^/]+)/([^/]+)", url)
            if not match:
                return None, "Invalid GitHub URL format"
            
            owner, repo = match.groups()
            repo = repo.split(".")[0] if "." in repo and not repo.endswith(".js") else repo
            clean_url = f"https://github.com/{owner}/{repo}"
            
            # Try downloading from different branches
            for branch in ["HEAD", "main", "master"]:
                archive_url = f"{clean_url}/archive/{branch}.zip"
                logger.info(f"Trying: {archive_url}")
                
                response = requests.get(archive_url, stream=True, timeout=60, allow_redirects=True)
                if response.status_code == 200:
                    buffer = io.BytesIO()
                    for chunk in response.iter_content(chunk_size=8192):
                        buffer.write(chunk)
                    buffer.seek(0)
                    return zipfile.ZipFile(buffer, "r"), None
            
            return None, f"Repository not found: {owner}/{repo}"
            
        except requests.exceptions.Timeout:
            return None, "Request timed out"
        except requests.exceptions.RequestException as e:
            return None, f"Network error: {str(e)}"
        except Exception as e:
            return None, f"Error: {str(e)}"
    
    def _is_allowed_file(self, file_path: str, aggressive: bool = False) -> bool:
        """Check if a file should be processed."""
        filename = file_path.split("/")[-1]
        filename_lower = filename.lower()
        
        # Check blocked patterns
        if filename in self.processing.BLOCKED_PATTERNS:
            return False
        
        # Check blocked directories
        path_parts = file_path.split("/")
        for part in path_parts[:-1]:
            if part in self.processing.BLOCKED_DIRS:
                return False
        
        # Check test file patterns
        for pattern in self.processing.TEST_FILE_PATTERNS:
            if pattern in filename_lower:
                return False
        
        # Aggressive filtering for large repos
        if aggressive:
            path_lower = file_path.lower()
            skip_patterns = ["example", "demo", "sample", "doc/", "docs/", 
                          "tutorial", "benchmark", "contrib/", "scripts/"]
            for pattern in skip_patterns:
                if pattern in path_lower:
                    return False
            
            # Only core code extensions
            core_extensions = {".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rs"}
            ext = "." + filename.split(".")[-1] if "." in filename else ""
            if ext and ext not in core_extensions:
                return False
        
        # Check allowed files
        if filename in self.processing.ALLOWED_FILES:
            return True
        
        # Check extensions
        for ext in self.processing.ALLOWED_EXTENSIONS:
            if filename.endswith(ext):
                return True
        
        return False
    
    def _clean_code(self, content: str) -> str:
        """Clean code content."""
        # Remove excessive blank lines
        content = re.sub(r"\n{4,}", "\n\n\n", content)
        # Remove trailing whitespace
        lines = [line.rstrip() for line in content.split("\n")]
        return "\n".join(lines).strip()
    
    def _process_zip(self, zip_file: zipfile.ZipFile) -> Tuple[str, ProcessingStats]:
        """Process a ZIP file and extract code content."""
        stats = ProcessingStats()
        file_contents = []
        
        # Calculate total size for aggressive filtering
        file_list = zip_file.namelist()
        total_size = sum(
            zip_file.getinfo(f).file_size 
            for f in file_list 
            if not f.endswith("/")
        )
        aggressive = total_size > self.processing.LARGE_REPO_THRESHOLD
        
        if aggressive:
            logger.info(f"Large repo ({total_size:,} bytes), using aggressive filtering")
        
        # Sort by priority (shallow = more important)
        def file_priority(path):
            depth = path.count("/")
            priority_dirs = ["src/", "lib/", "core/", "app/", "pkg/"]
            for pd in priority_dirs:
                if pd in path.lower():
                    return (0, depth, path)
            return (1, depth, path)
        
        sorted_files = sorted(file_list, key=file_priority)
        
        for file_path in sorted_files:
            if file_path.endswith("/"):
                continue
            
            if not self._is_allowed_file(file_path, aggressive):
                stats.files_skipped += 1
                continue
            
            try:
                file_info = zip_file.getinfo(file_path)
                if file_info.file_size > self.processing.MAX_FILE_SIZE:
                    stats.files_skipped += 1
                    continue
                
                with zip_file.open(file_path) as f:
                    content = f.read().decode("utf-8", errors="ignore")
                
                content = self._clean_code(content)
                if not content.strip():
                    stats.files_skipped += 1
                    continue
                
                file_entry = f'<file name="{file_path}">\n{content}\n</file>\n\n'
                
                if stats.total_characters + len(file_entry) > self.processing.MAX_CONTEXT_SIZE:
                    break
                
                file_contents.append(file_entry)
                stats.total_characters += len(file_entry)
                stats.files_processed += 1
                
            except Exception as e:
                stats.files_skipped += 1
                logger.debug(f"Error processing {file_path}: {e}")
        
        stats.estimated_tokens = stats.total_characters // 4
        context = "".join(file_contents)
        
        logger.info(f"Processed {stats.files_processed} files, {stats.total_characters:,} chars")
        return context, stats