Spaces:
Sleeping
Sleeping
| import requests | |
| import json | |
| import base64 | |
| from typing import Optional, Dict, List, Union | |
| import os | |
| class GitHubRepoFetcher: | |
| def __init__(self, token: str): | |
| """Initialize with GitHub personal access token.""" | |
| self.headers = { | |
| 'Authorization': f'token {token}', | |
| 'Accept': 'application/vnd.github.v3+json' | |
| } | |
| self.base_url = 'https://api.github.com' | |
| self.downloaded_files = set() | |
| def fetch_contents(self, owner: str, repo: str, path: str = '') -> Optional[List[Dict]]: | |
| """ | |
| Fetch contents of a repository path. | |
| Args: | |
| owner: Repository owner username | |
| repo: Repository name | |
| path: Path within repository | |
| Returns: | |
| List of dictionaries containing file/directory information | |
| """ | |
| # Normalize path | |
| path = path.strip('/') | |
| path = path.replace('\\', '/') | |
| path = path.replace('blob/main/', '').replace('tree/main/', '') | |
| path = path.replace('blob/master/', '').replace('tree/master/', '') | |
| url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}' | |
| try: | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| # Handle both single item and list responses | |
| content = response.json() | |
| if not isinstance(content, list): | |
| content = [content] | |
| return content | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching {path}: {str(e)}") | |
| return None | |
| def download_file(self, owner: str, repo: str, path: str, save_path: str = None) -> Optional[Union[str, bytes]]: | |
| """Download a specific file from the repository.""" | |
| try: | |
| # Normalize path | |
| path = path.strip('/') | |
| path = path.replace('\\', '/') | |
| path = path.replace('blob/main/', '').replace('tree/main/', '') | |
| path = path.replace('blob/master/', '').replace('tree/master/', '') | |
| url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}' | |
| print(f"Fetching file from: {url}") | |
| response = requests.get(url, headers=self.headers) | |
| response.raise_for_status() | |
| content = response.json() | |
| if not content.get('content'): | |
| print(f"No content found in response") | |
| return None | |
| file_content = base64.b64decode(content['content']) | |
| print(f"Successfully decoded file content (size: {len(file_content)} bytes)") | |
| if save_path: | |
| os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True) | |
| with open(save_path, 'wb') as f: | |
| f.write(file_content) | |
| print(f"Saved file to: {save_path}") | |
| self.downloaded_files.add(save_path) | |
| return None | |
| try: | |
| return file_content.decode('utf-8') | |
| except UnicodeDecodeError: | |
| return file_content | |
| except Exception as e: | |
| print(f"Error downloading file: {str(e)}") | |
| return None | |
| def download_directory(self, owner: str, repo: str, path: str, local_path: str) -> bool: | |
| """ | |
| Download an entire directory recursively. | |
| Args: | |
| owner: Repository owner username | |
| repo: Repository name | |
| path: Path to directory within repository | |
| local_path: Local path to save directory | |
| Returns: | |
| Boolean indicating success | |
| """ | |
| try: | |
| # Normalize path | |
| path = path.strip('/') | |
| path = path.replace('\\', '/') | |
| path = path.replace('blob/main/', '').replace('tree/main/', '') | |
| path = path.replace('blob/master/', '').replace('tree/master/', '') | |
| print(f"Fetching directory: {path} for {owner}/{repo}") | |
| contents = self.fetch_contents(owner, repo, path) | |
| if not contents: | |
| print(f"Failed to fetch contents for path: {path}") | |
| return False | |
| print(f"Processing directory: {path or 'root'}") | |
| os.makedirs(local_path, exist_ok=True) | |
| success = True | |
| for item in contents: | |
| item_path = item['path'] | |
| # Use basename for local path to maintain correct directory structure | |
| local_item_path = os.path.join(local_path, os.path.basename(item_path)) | |
| try: | |
| if item['type'] == 'dir': | |
| print(f"Found directory: {item_path}") | |
| if not self.download_directory(owner, repo, item_path, local_item_path): | |
| success = False | |
| else: | |
| print(f"Downloading file: {item_path}") | |
| result = self.download_file(owner, repo, item_path, local_item_path) | |
| if result is None and local_item_path not in self.downloaded_files: | |
| success = False | |
| else: | |
| print(f"Successfully downloaded: {item_path}") | |
| except Exception as e: | |
| print(f"Error processing {item_path}: {str(e)}") | |
| success = False | |
| return success | |
| except Exception as e: | |
| print(f"Error in download_directory: {str(e)}") | |
| return False |