import io import os from pathlib import Path from typing import Callable, Dict, Optional from urllib.parse import urlparse from structlog import get_logger import requests import zipfile logger = get_logger(__name__) class GitHubRepoDownloader: def __init__(self, repo_url: str, branch: str = "main", cache_dir: str = ".cache"): """ Initialize downloader with a GitHub repo URL. Args: repo_url: Full GitHub repo URL (e.g., https://github.com/owner/repo) branch: Branch name to download (default: main) cache_dir: Directory to cache downloaded files """ self.owner, self.repo = self._parse_repo_url(repo_url) self.branch = branch self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self._validate_branch() def _parse_repo_url(self, repo_url: str) -> tuple[str, str]: """Extract owner and repo name from GitHub URL""" repo_url = repo_url.rstrip('/').replace('.git', '') # Validate it's a GitHub URL parsed = urlparse(repo_url) if 'github.com' not in repo_url: message = f"Not a GitHub URL: {repo_url}" logger.error(message) raise ValueError(message) parts = repo_url.split('/') if len(parts) < 2: message = f"Invalid GitHub URL format: {repo_url}" logger.error(message) raise ValueError(message) repo = parts[-1] owner = parts[-2] return owner, repo def _validate_branch(self) -> None: """Validate that the branch exists in the repository""" url = f"https://api.github.com/repos/{self.owner}/{self.repo}/branches/{self.branch}" logger.info(f"Validating branch: {self.branch}") response = requests.get(url) if response.status_code == 404: message = f"Branch '{self.branch}' not found in {self.owner}/{self.repo}" logger.error(message) raise ValueError(message) response.raise_for_status() def _get_cache_path(self) -> Path: """Get the cache file path for this repo""" return self.cache_dir / f"{self.owner}_{self.repo}_{self.branch}.zip" def _download_zip(self) -> Path: """Download repo ZIP to cache""" cache_path = self._get_cache_path() # Return cached file if exists if cache_path.exists(): logger.info(f"Using cached file: {cache_path}") return cache_path # Download ZIP url = f"https://github.com/{self.owner}/{self.repo}/archive/refs/heads/{self.branch}.zip" logger.info(f"Downloading {self.owner}/{self.repo} (branch: {self.branch})...") response = requests.get(url) response.raise_for_status() with open(cache_path, 'wb') as f: f.write(response.content) logger.info(f"Saved to cache: {cache_path}") return cache_path def read_files(self, file_filter: Optional[Callable[[str], bool]] = None) -> Dict[str, str]: """ Read files from the repo without extracting. Args: file_filter: Optional function to filter files (e.g., lambda path: path.endswith('.py')) Returns: Dictionary mapping file paths to their contents """ cache_path = self._download_zip() files_content = {} with zipfile.ZipFile(cache_path) as zip_file: for filename in zip_file.namelist(): if filename.endswith('/'): continue # Remove root folder (format: repo-branch/path/to/file) clean_path = '/'.join(filename.split('/')[1:]) if not clean_path: continue # Apply filter if file_filter and not file_filter(clean_path): continue logger.info(f"Reading: {clean_path}") try: with zip_file.open(filename) as file: content = file.read().decode('utf-8', errors='ignore') files_content[clean_path] = content except Exception as e: logger.exception(f"⚠️ Error reading {clean_path}: {e}") return files_content