Spaces:
Runtime error
Runtime error
File size: 5,095 Bytes
5989690 9a1bfd2 5989690 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import asyncio
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from pathlib import Path
import logging
from tqdm.asyncio import tqdm_asyncio
import sys
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("download_log.txt", encoding="utf-8")
]
)
logger = logging.getLogger(__name__)
gauth = GoogleAuth()
gauth.LoadCredentialsFile('/tmp/credentials.json')
drive = GoogleDrive(gauth)
matching_folders = []
MAX_CONCURRENT_REQUESTS = 10
def list_folder(folder_id):
return drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList()
async def crawl_drive_bfs(root_id):
queue = [(root_id, "ROOT")]
while queue:
next_queue = []
tasks = []
for folder_id, path in queue:
tasks.append(scan_folder(folder_id, path, next_queue))
await asyncio.gather(*tasks)
queue = next_queue
async def scan_folder(folder_id, path_trace, next_queue):
semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
async with semaphore:
try:
file_list = await asyncio.to_thread(list_folder, folder_id)
except Exception as e:
logger.error(f"Failed to list folder {folder_id}: {e}")
return
# Infer folder title
folder_title = None
if file_list:
parent_info = await asyncio.to_thread(lambda: drive.CreateFile({'id': folder_id}))
try:
parent_info.FetchMetadata(fields="title")
folder_title = parent_info['title']
except:
folder_title = "Unknown"
current_path = f"{path_trace}/{folder_title}" if path_trace else folder_title
if folder_title and folder_title.startswith("Notebook"):
ipynb_files = [f for f in file_list if f['title'].endswith('.ipynb')]
if ipynb_files:
matching_folders.append((folder_id, current_path, ipynb_files))
for f in file_list:
if f['mimeType'] == 'application/vnd.google-apps.folder':
next_queue.append((f['id'], current_path))
async def download_folder(folder_id, local_path, hierarchy_path):
download_semaphore = asyncio.Semaphore(10)
async with download_semaphore:
base_path = Path("tmp") / local_path
base_path.mkdir(parents=True, exist_ok=True)
try:
items = await asyncio.to_thread(list_folder, folder_id)
except Exception as e:
logger.error(f"Failed to list folder for download {folder_id}: {e}")
return
ipynb_entries = []
subfolder_tasks = []
for item in items:
if item['mimeType'] == 'application/vnd.google-apps.folder':
task = asyncio.create_task(
download_folder(item['id'], base_path / item['title'], hierarchy_path + "/" + item['title'])
)
subfolder_tasks.append(task)
else:
try:
if item['title'].endswith('.ipynb'):
logger.info(f"Downloading notebook: {item['title']} into {base_path}")
await asyncio.to_thread(item.GetContentFile, str(base_path / item['title']))
file_path = f"{hierarchy_path}/{item['title']}"
folder_link = f"https://drive.google.com/drive/folders/{folder_id}"
ipynb_entries.append(f"{file_path},{folder_link}")
else:
await asyncio.to_thread(item.GetContentFile, str(base_path / item['title']))
except Exception as e:
logger.error(f"Error downloading {item['title']}: {e}")
continue
if ipynb_entries:
(base_path / "file_links.txt").write_text("\n".join(ipynb_entries), encoding="utf-8")
if subfolder_tasks:
await asyncio.gather(*subfolder_tasks)
async def parallel_download():
tasks = []
for idx, (folder_id, path, _) in enumerate(matching_folders):
folder_name = Path(f"notebook_folder_{idx+1}")
tasks.append(download_folder(folder_id, folder_name, path))
for f in tqdm_asyncio.as_completed(tasks, desc="Downloading Notebooks", unit="folder", total=len(tasks)):
await f
async def downloads(root_folder_id):
try:
matching_folders.clear()
logger.info(f"Scanning Drive folder: {root_folder_id}")
await crawl_drive_bfs(root_folder_id)
if matching_folders:
logger.info(f"Found {len(matching_folders)} folders with notebooks.")
await parallel_download()
logger.info("Download completed.")
else:
logger.warning("No folders with prefix 'Notebook' and .ipynb files found.")
except Exception as e:
logger.exception(f"Unexpected error during execution: {e}")
asyncio.run(downloads("1KWHJC3yqXXVK2Cz-qZxWeeic8ifCmgej"))
|