github-repo-explorer / utils /github_fetcher.py
nehajiya8's picture
Update utils/github_fetcher.py
0efb19f verified
import requests
import json
import base64
from typing import Optional, Dict, List, Union
import os
class GitHubRepoFetcher:
def __init__(self, token: str):
"""Initialize with GitHub personal access token."""
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.base_url = 'https://api.github.com'
self.downloaded_files = set()
def fetch_contents(self, owner: str, repo: str, path: str = '') -> Optional[List[Dict]]:
"""
Fetch contents of a repository path.
Args:
owner: Repository owner username
repo: Repository name
path: Path within repository
Returns:
List of dictionaries containing file/directory information
"""
# Normalize path
path = path.strip('/')
path = path.replace('\\', '/')
path = path.replace('blob/main/', '').replace('tree/main/', '')
path = path.replace('blob/master/', '').replace('tree/master/', '')
url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}'
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
# Handle both single item and list responses
content = response.json()
if not isinstance(content, list):
content = [content]
return content
except requests.exceptions.RequestException as e:
print(f"Error fetching {path}: {str(e)}")
return None
def download_file(self, owner: str, repo: str, path: str, save_path: str = None) -> Optional[Union[str, bytes]]:
"""Download a specific file from the repository."""
try:
# Normalize path
path = path.strip('/')
path = path.replace('\\', '/')
path = path.replace('blob/main/', '').replace('tree/main/', '')
path = path.replace('blob/master/', '').replace('tree/master/', '')
url = f'{self.base_url}/repos/{owner}/{repo}/contents/{path}'
print(f"Fetching file from: {url}")
response = requests.get(url, headers=self.headers)
response.raise_for_status()
content = response.json()
if not content.get('content'):
print(f"No content found in response")
return None
file_content = base64.b64decode(content['content'])
print(f"Successfully decoded file content (size: {len(file_content)} bytes)")
if save_path:
os.makedirs(os.path.dirname(save_path) or '.', exist_ok=True)
with open(save_path, 'wb') as f:
f.write(file_content)
print(f"Saved file to: {save_path}")
self.downloaded_files.add(save_path)
return None
try:
return file_content.decode('utf-8')
except UnicodeDecodeError:
return file_content
except Exception as e:
print(f"Error downloading file: {str(e)}")
return None
def download_directory(self, owner: str, repo: str, path: str, local_path: str) -> bool:
"""
Download an entire directory recursively.
Args:
owner: Repository owner username
repo: Repository name
path: Path to directory within repository
local_path: Local path to save directory
Returns:
Boolean indicating success
"""
try:
# Normalize path
path = path.strip('/')
path = path.replace('\\', '/')
path = path.replace('blob/main/', '').replace('tree/main/', '')
path = path.replace('blob/master/', '').replace('tree/master/', '')
print(f"Fetching directory: {path} for {owner}/{repo}")
contents = self.fetch_contents(owner, repo, path)
if not contents:
print(f"Failed to fetch contents for path: {path}")
return False
print(f"Processing directory: {path or 'root'}")
os.makedirs(local_path, exist_ok=True)
success = True
for item in contents:
item_path = item['path']
# Use basename for local path to maintain correct directory structure
local_item_path = os.path.join(local_path, os.path.basename(item_path))
try:
if item['type'] == 'dir':
print(f"Found directory: {item_path}")
if not self.download_directory(owner, repo, item_path, local_item_path):
success = False
else:
print(f"Downloading file: {item_path}")
result = self.download_file(owner, repo, item_path, local_item_path)
if result is None and local_item_path not in self.downloaded_files:
success = False
else:
print(f"Successfully downloaded: {item_path}")
except Exception as e:
print(f"Error processing {item_path}: {str(e)}")
success = False
return success
except Exception as e:
print(f"Error in download_directory: {str(e)}")
return False