Spaces:
Runtime error
Runtime error
| import asyncio | |
| from pydrive2.auth import GoogleAuth | |
| from pydrive2.drive import GoogleDrive | |
| from pathlib import Path | |
| import logging | |
| from tqdm.asyncio import tqdm_asyncio | |
| import sys | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='[%(asctime)s] %(levelname)s: %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| logging.FileHandler("download_log.txt", encoding="utf-8") | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| gauth = GoogleAuth() | |
| gauth.LoadCredentialsFile('/tmp/credentials.json') | |
| drive = GoogleDrive(gauth) | |
| matching_folders = [] | |
| MAX_CONCURRENT_REQUESTS = 10 | |
| def list_folder(folder_id): | |
| return drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList() | |
| async def crawl_drive_bfs(root_id): | |
| queue = [(root_id, "ROOT")] | |
| while queue: | |
| next_queue = [] | |
| tasks = [] | |
| for folder_id, path in queue: | |
| tasks.append(scan_folder(folder_id, path, next_queue)) | |
| await asyncio.gather(*tasks) | |
| queue = next_queue | |
| async def scan_folder(folder_id, path_trace, next_queue): | |
| semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) | |
| async with semaphore: | |
| try: | |
| file_list = await asyncio.to_thread(list_folder, folder_id) | |
| except Exception as e: | |
| logger.error(f"Failed to list folder {folder_id}: {e}") | |
| return | |
| # Infer folder title | |
| folder_title = None | |
| if file_list: | |
| parent_info = await asyncio.to_thread(lambda: drive.CreateFile({'id': folder_id})) | |
| try: | |
| parent_info.FetchMetadata(fields="title") | |
| folder_title = parent_info['title'] | |
| except: | |
| folder_title = "Unknown" | |
| current_path = f"{path_trace}/{folder_title}" if path_trace else folder_title | |
| if folder_title and folder_title.startswith("Notebook"): | |
| ipynb_files = [f for f in file_list if f['title'].endswith('.ipynb')] | |
| if ipynb_files: | |
| matching_folders.append((folder_id, current_path, ipynb_files)) | |
| for f in file_list: | |
| if f['mimeType'] == 'application/vnd.google-apps.folder': | |
| next_queue.append((f['id'], current_path)) | |
| async def download_folder(folder_id, local_path, hierarchy_path): | |
| download_semaphore = asyncio.Semaphore(10) | |
| async with download_semaphore: | |
| base_path = Path("tmp") / local_path | |
| base_path.mkdir(parents=True, exist_ok=True) | |
| try: | |
| items = await asyncio.to_thread(list_folder, folder_id) | |
| except Exception as e: | |
| logger.error(f"Failed to list folder for download {folder_id}: {e}") | |
| return | |
| ipynb_entries = [] | |
| subfolder_tasks = [] | |
| for item in items: | |
| if item['mimeType'] == 'application/vnd.google-apps.folder': | |
| task = asyncio.create_task( | |
| download_folder(item['id'], base_path / item['title'], hierarchy_path + "/" + item['title']) | |
| ) | |
| subfolder_tasks.append(task) | |
| else: | |
| try: | |
| if item['title'].endswith('.ipynb'): | |
| logger.info(f"Downloading notebook: {item['title']} into {base_path}") | |
| await asyncio.to_thread(item.GetContentFile, str(base_path / item['title'])) | |
| file_path = f"{hierarchy_path}/{item['title']}" | |
| folder_link = f"https://drive.google.com/drive/folders/{folder_id}" | |
| ipynb_entries.append(f"{file_path},{folder_link}") | |
| else: | |
| await asyncio.to_thread(item.GetContentFile, str(base_path / item['title'])) | |
| except Exception as e: | |
| logger.error(f"Error downloading {item['title']}: {e}") | |
| continue | |
| if ipynb_entries: | |
| (base_path / "file_links.txt").write_text("\n".join(ipynb_entries), encoding="utf-8") | |
| if subfolder_tasks: | |
| await asyncio.gather(*subfolder_tasks) | |
| async def parallel_download(): | |
| tasks = [] | |
| for idx, (folder_id, path, _) in enumerate(matching_folders): | |
| folder_name = Path(f"notebook_folder_{idx+1}") | |
| tasks.append(download_folder(folder_id, folder_name, path)) | |
| for f in tqdm_asyncio.as_completed(tasks, desc="Downloading Notebooks", unit="folder", total=len(tasks)): | |
| await f | |
| async def downloads(root_folder_id): | |
| try: | |
| matching_folders.clear() | |
| logger.info(f"Scanning Drive folder: {root_folder_id}") | |
| await crawl_drive_bfs(root_folder_id) | |
| if matching_folders: | |
| logger.info(f"Found {len(matching_folders)} folders with notebooks.") | |
| await parallel_download() | |
| logger.info("Download completed.") | |
| else: | |
| logger.warning("No folders with prefix 'Notebook' and .ipynb files found.") | |
| except Exception as e: | |
| logger.exception(f"Unexpected error during execution: {e}") | |
| asyncio.run(downloads("1KWHJC3yqXXVK2Cz-qZxWeeic8ifCmgej")) | |