import requests import json import base64 from typing import Optional, Dict, List, Union import os class GitHubRepoFetcher: def __init__(self, token: str): """Initialize with GitHub personal access token.""" self.headers = { 'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json' } self.base_url = 'https://api.github.com' self.downloaded_files = set() def fetch_contents(self, owner: str, repo: str, path: str = '') -> Optional[List[Dict]]: """ Fetch contents of a repository path. Args: owner: Repository owner username repo: Repository name path: Path within repository Returns: List of dictionaries containing file/directory information """ # Normalize path path = path.strip('/') path = path.replace('\\', '/') path = path.replace('blob/main/', '').replace('tree/main/', '') path = path.replace('blob/master/', '').replace('tree/master/', '') url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}' try: response = requests.get(url, headers=self.headers) response.raise_for_status() # Handle both single item and list responses content = response.json() if not isinstance(content, list): content = [content] return content except requests.exceptions.RequestException as e: print(f"Error fetching {path}: {str(e)}") return None def download_file(self, owner: str, repo: str, path: str, save_path: str = None) -> Optional[Union[str, bytes]]: """Download a specific file from the repository.""" try: # Normalize path path = path.strip('/') path = path.replace('\\', '/') path = path.replace('blob/main/', '').replace('tree/main/', '') path = path.replace('blob/master/', '').replace('tree/master/', '') url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}' print(f"Fetching file from: {url}") response = requests.get(url, headers=self.headers) response.raise_for_status() content = response.json() if not content.get('content'): print(f"No content found in response") return None file_content = base64.b64decode(content['content']) print(f"Successfully decoded file content (size: {len(file_content)} bytes)") if save_path: os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True) with open(save_path, 'wb') as f: f.write(file_content) print(f"Saved file to: {save_path}") self.downloaded_files.add(save_path) return None try: return file_content.decode('utf-8') except UnicodeDecodeError: return file_content except Exception as e: print(f"Error downloading file: {str(e)}") return None def download_directory(self, owner: str, repo: str, path: str, local_path: str) -> bool: """ Download an entire directory recursively. Args: owner: Repository owner username repo: Repository name path: Path to directory within repository local_path: Local path to save directory Returns: Boolean indicating success """ try: # Normalize path path = path.strip('/') path = path.replace('\\', '/') path = path.replace('blob/main/', '').replace('tree/main/', '') path = path.replace('blob/master/', '').replace('tree/master/', '') print(f"Fetching directory: {path} for {owner}/{repo}") contents = self.fetch_contents(owner, repo, path) if not contents: print(f"Failed to fetch contents for path: {path}") return False print(f"Processing directory: {path or 'root'}") os.makedirs(local_path, exist_ok=True) success = True for item in contents: item_path = item['path'] # Use basename for local path to maintain correct directory structure local_item_path = os.path.join(local_path, os.path.basename(item_path)) try: if item['type'] == 'dir': print(f"Found directory: {item_path}") if not self.download_directory(owner, repo, item_path, local_item_path): success = False else: print(f"Downloading file: {item_path}") result = self.download_file(owner, repo, item_path, local_item_path) if result is None and local_item_path not in self.downloaded_files: success = False else: print(f"Successfully downloaded: {item_path}") except Exception as e: print(f"Error processing {item_path}: {str(e)}") success = False return success except Exception as e: print(f"Error in download_directory: {str(e)}") return False