aghilsabu commited on
Commit
960f6e1
·
1 Parent(s): 8320683

feat: add GitHub repository cloning and management

Browse files
Files changed (2) hide show
  1. src/core/__init__.py +11 -0
  2. src/core/repository.py +268 -0
src/core/__init__.py ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ CodeAtlas Core Module
3
+
4
+ Core functionality for code analysis and diagram generation.
5
+ """
6
+
7
+ from .repository import RepositoryLoader
8
+ from .analyzer import CodeAnalyzer
9
+ from .diagram import DiagramGenerator
10
+
11
+ __all__ = ["RepositoryLoader", "CodeAnalyzer", "DiagramGenerator"]
src/core/repository.py ADDED
@@ -0,0 +1,268 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Repository Loader Module
3
+
4
+ Handles downloading and processing GitHub repositories and ZIP files.
5
+ """
6
+
7
+ import io
8
+ import re
9
+ import zipfile
10
+ import logging
11
+ from dataclasses import dataclass
12
+ from pathlib import Path
13
+ from typing import Optional, Tuple, List
14
+ import requests
15
+
16
+ from ..config import get_config
17
+
18
+ logger = logging.getLogger("codeatlas.repository")
19
+
20
+
21
+ @dataclass
22
+ class ProcessingStats:
23
+ """Statistics from processing a repository."""
24
+ files_processed: int = 0
25
+ files_skipped: int = 0
26
+ total_characters: int = 0
27
+ estimated_tokens: int = 0
28
+
29
+ @property
30
+ def as_dict(self) -> dict:
31
+ return {
32
+ "files_processed": self.files_processed,
33
+ "files_skipped": self.files_skipped,
34
+ "total_characters": self.total_characters,
35
+ "estimated_tokens": self.estimated_tokens,
36
+ }
37
+
38
+
39
+ @dataclass
40
+ class ProcessingResult:
41
+ """Result of processing a repository."""
42
+ context: Optional[str] = None
43
+ error: Optional[str] = None
44
+ stats: Optional[ProcessingStats] = None
45
+ repo_name: str = ""
46
+
47
+
48
+ class RepositoryLoader:
49
+ """Loads and processes code repositories."""
50
+
51
+ def __init__(self):
52
+ self.config = get_config()
53
+ self.processing = self.config.processing
54
+
55
+ def load_from_github(self, url: str) -> ProcessingResult:
56
+ """Download and process a GitHub repository.
57
+
58
+ Args:
59
+ url: GitHub repository URL
60
+
61
+ Returns:
62
+ ProcessingResult with context or error
63
+ """
64
+ zip_file, error = self._download_github_repo(url)
65
+ if error:
66
+ return ProcessingResult(error=error)
67
+
68
+ # Extract repo name
69
+ match = re.search(r"github\.com/([^/]+)/([^/]+)", url)
70
+ repo_name = f"{match.group(1)}/{match.group(2)}" if match else url
71
+
72
+ try:
73
+ context, stats = self._process_zip(zip_file)
74
+ if not context:
75
+ return ProcessingResult(error="No valid code files found in repository.")
76
+ return ProcessingResult(context=context, stats=stats, repo_name=repo_name)
77
+ finally:
78
+ zip_file.close()
79
+
80
+ def load_from_file(self, file_path: str) -> ProcessingResult:
81
+ """Process an uploaded ZIP file.
82
+
83
+ Args:
84
+ file_path: Path to the uploaded file
85
+
86
+ Returns:
87
+ ProcessingResult with context or error
88
+ """
89
+ try:
90
+ with zipfile.ZipFile(file_path, "r") as zip_file:
91
+ context, stats = self._process_zip(zip_file)
92
+ if not context:
93
+ return ProcessingResult(error="No valid code files found in ZIP.")
94
+ repo_name = Path(file_path).stem
95
+ return ProcessingResult(context=context, stats=stats, repo_name=repo_name)
96
+ except zipfile.BadZipFile:
97
+ return ProcessingResult(error="Invalid ZIP archive.")
98
+ except Exception as e:
99
+ logger.exception("Error processing file")
100
+ return ProcessingResult(error=f"Error: {str(e)}")
101
+
102
+ def _download_github_repo(self, url: str) -> Tuple[Optional[zipfile.ZipFile], Optional[str]]:
103
+ """Download a GitHub repository as a ZIP file."""
104
+ try:
105
+ # Normalize URL
106
+ url = url.strip().rstrip("/")
107
+ if url.endswith(".git"):
108
+ url = url[:-4]
109
+ if not url.startswith(("http://", "https://")):
110
+ url = "https://" + url
111
+
112
+ # Validate GitHub URL
113
+ if "github.com" not in url:
114
+ return None, "Please provide a valid GitHub URL"
115
+
116
+ # Extract owner/repo
117
+ match = re.search(r"github\.com/([^/]+)/([^/]+)", url)
118
+ if not match:
119
+ return None, "Invalid GitHub URL format"
120
+
121
+ owner, repo = match.groups()
122
+ repo = repo.split(".")[0] if "." in repo and not repo.endswith(".js") else repo
123
+ clean_url = f"https://github.com/{owner}/{repo}"
124
+
125
+ # Try downloading from different branches
126
+ for branch in ["HEAD", "main", "master"]:
127
+ archive_url = f"{clean_url}/archive/{branch}.zip"
128
+ logger.info(f"Trying: {archive_url}")
129
+
130
+ response = requests.get(archive_url, stream=True, timeout=60, allow_redirects=True)
131
+ if response.status_code == 200:
132
+ buffer = io.BytesIO()
133
+ for chunk in response.iter_content(chunk_size=8192):
134
+ buffer.write(chunk)
135
+ buffer.seek(0)
136
+ return zipfile.ZipFile(buffer, "r"), None
137
+
138
+ return None, f"Repository not found: {owner}/{repo}"
139
+
140
+ except requests.exceptions.Timeout:
141
+ return None, "Request timed out"
142
+ except requests.exceptions.RequestException as e:
143
+ return None, f"Network error: {str(e)}"
144
+ except Exception as e:
145
+ return None, f"Error: {str(e)}"
146
+
147
+ def _is_allowed_file(self, file_path: str, aggressive: bool = False) -> bool:
148
+ """Check if a file should be processed."""
149
+ filename = file_path.split("/")[-1]
150
+ filename_lower = filename.lower()
151
+
152
+ # Check blocked patterns
153
+ if filename in self.processing.BLOCKED_PATTERNS:
154
+ return False
155
+
156
+ # Check blocked directories
157
+ path_parts = file_path.split("/")
158
+ for part in path_parts[:-1]:
159
+ if part in self.processing.BLOCKED_DIRS:
160
+ return False
161
+
162
+ # Check test file patterns
163
+ for pattern in self.processing.TEST_FILE_PATTERNS:
164
+ if pattern in filename_lower:
165
+ return False
166
+
167
+ # Aggressive filtering for large repos
168
+ if aggressive:
169
+ path_lower = file_path.lower()
170
+ skip_patterns = ["example", "demo", "sample", "doc/", "docs/",
171
+ "tutorial", "benchmark", "contrib/", "scripts/"]
172
+ for pattern in skip_patterns:
173
+ if pattern in path_lower:
174
+ return False
175
+
176
+ # Only core code extensions
177
+ core_extensions = {".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rs"}
178
+ ext = "." + filename.split(".")[-1] if "." in filename else ""
179
+ if ext and ext not in core_extensions:
180
+ return False
181
+
182
+ # Check allowed files
183
+ if filename in self.processing.ALLOWED_FILES:
184
+ return True
185
+
186
+ # Check extensions
187
+ for ext in self.processing.ALLOWED_EXTENSIONS:
188
+ if filename.endswith(ext):
189
+ return True
190
+
191
+ return False
192
+
193
+ def _clean_code(self, content: str) -> str:
194
+ """Clean code content."""
195
+ # Remove excessive blank lines
196
+ content = re.sub(r"\n{4,}", "\n\n\n", content)
197
+ # Remove trailing whitespace
198
+ lines = [line.rstrip() for line in content.split("\n")]
199
+ return "\n".join(lines).strip()
200
+
201
+ def _process_zip(self, zip_file: zipfile.ZipFile) -> Tuple[str, ProcessingStats]:
202
+ """Process a ZIP file and extract code content."""
203
+ stats = ProcessingStats()
204
+ file_contents = []
205
+
206
+ # Calculate total size for aggressive filtering
207
+ file_list = zip_file.namelist()
208
+ total_size = sum(
209
+ zip_file.getinfo(f).file_size
210
+ for f in file_list
211
+ if not f.endswith("/")
212
+ )
213
+ aggressive = total_size > self.processing.LARGE_REPO_THRESHOLD
214
+
215
+ if aggressive:
216
+ logger.info(f"Large repo ({total_size:,} bytes), using aggressive filtering")
217
+
218
+ # Sort by priority (shallow = more important)
219
+ def file_priority(path):
220
+ depth = path.count("/")
221
+ priority_dirs = ["src/", "lib/", "core/", "app/", "pkg/"]
222
+ for pd in priority_dirs:
223
+ if pd in path.lower():
224
+ return (0, depth, path)
225
+ return (1, depth, path)
226
+
227
+ sorted_files = sorted(file_list, key=file_priority)
228
+
229
+ for file_path in sorted_files:
230
+ if file_path.endswith("/"):
231
+ continue
232
+
233
+ if not self._is_allowed_file(file_path, aggressive):
234
+ stats.files_skipped += 1
235
+ continue
236
+
237
+ try:
238
+ file_info = zip_file.getinfo(file_path)
239
+ if file_info.file_size > self.processing.MAX_FILE_SIZE:
240
+ stats.files_skipped += 1
241
+ continue
242
+
243
+ with zip_file.open(file_path) as f:
244
+ content = f.read().decode("utf-8", errors="ignore")
245
+
246
+ content = self._clean_code(content)
247
+ if not content.strip():
248
+ stats.files_skipped += 1
249
+ continue
250
+
251
+ file_entry = f'<file name="{file_path}">\n{content}\n</file>\n\n'
252
+
253
+ if stats.total_characters + len(file_entry) > self.processing.MAX_CONTEXT_SIZE:
254
+ break
255
+
256
+ file_contents.append(file_entry)
257
+ stats.total_characters += len(file_entry)
258
+ stats.files_processed += 1
259
+
260
+ except Exception as e:
261
+ stats.files_skipped += 1
262
+ logger.debug(f"Error processing {file_path}: {e}")
263
+
264
+ stats.estimated_tokens = stats.total_characters // 4
265
+ context = "".join(file_contents)
266
+
267
+ logger.info(f"Processed {stats.files_processed} files, {stats.total_characters:,} chars")
268
+ return context, stats