import asyncio from pydrive2.auth import GoogleAuth from pydrive2.drive import GoogleDrive from pathlib import Path import logging from tqdm.asyncio import tqdm_asyncio import sys logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("download_log.txt", encoding="utf-8") ] ) logger = logging.getLogger(__name__) gauth = GoogleAuth() gauth.LoadCredentialsFile('/tmp/credentials.json') drive = GoogleDrive(gauth) matching_folders = [] MAX_CONCURRENT_REQUESTS = 10 def list_folder(folder_id): return drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList() async def crawl_drive_bfs(root_id): queue = [(root_id, "ROOT")] while queue: next_queue = [] tasks = [] for folder_id, path in queue: tasks.append(scan_folder(folder_id, path, next_queue)) await asyncio.gather(*tasks) queue = next_queue async def scan_folder(folder_id, path_trace, next_queue): semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) async with semaphore: try: file_list = await asyncio.to_thread(list_folder, folder_id) except Exception as e: logger.error(f"Failed to list folder {folder_id}: {e}") return # Infer folder title folder_title = None if file_list: parent_info = await asyncio.to_thread(lambda: drive.CreateFile({'id': folder_id})) try: parent_info.FetchMetadata(fields="title") folder_title = parent_info['title'] except: folder_title = "Unknown" current_path = f"{path_trace}/{folder_title}" if path_trace else folder_title if folder_title and folder_title.startswith("Notebook"): ipynb_files = [f for f in file_list if f['title'].endswith('.ipynb')] if ipynb_files: matching_folders.append((folder_id, current_path, ipynb_files)) for f in file_list: if f['mimeType'] == 'application/vnd.google-apps.folder': next_queue.append((f['id'], current_path)) async def download_folder(folder_id, local_path, hierarchy_path): download_semaphore = asyncio.Semaphore(10) async with download_semaphore: base_path = Path("tmp") / local_path base_path.mkdir(parents=True, exist_ok=True) try: items = await asyncio.to_thread(list_folder, folder_id) except Exception as e: logger.error(f"Failed to list folder for download {folder_id}: {e}") return ipynb_entries = [] subfolder_tasks = [] for item in items: if item['mimeType'] == 'application/vnd.google-apps.folder': task = asyncio.create_task( download_folder(item['id'], base_path / item['title'], hierarchy_path + "/" + item['title']) ) subfolder_tasks.append(task) else: try: if item['title'].endswith('.ipynb'): logger.info(f"Downloading notebook: {item['title']} into {base_path}") await asyncio.to_thread(item.GetContentFile, str(base_path / item['title'])) file_path = f"{hierarchy_path}/{item['title']}" folder_link = f"https://drive.google.com/drive/folders/{folder_id}" ipynb_entries.append(f"{file_path},{folder_link}") else: await asyncio.to_thread(item.GetContentFile, str(base_path / item['title'])) except Exception as e: logger.error(f"Error downloading {item['title']}: {e}") continue if ipynb_entries: (base_path / "file_links.txt").write_text("\n".join(ipynb_entries), encoding="utf-8") if subfolder_tasks: await asyncio.gather(*subfolder_tasks) async def parallel_download(): tasks = [] for idx, (folder_id, path, _) in enumerate(matching_folders): folder_name = Path(f"notebook_folder_{idx+1}") tasks.append(download_folder(folder_id, folder_name, path)) for f in tqdm_asyncio.as_completed(tasks, desc="Downloading Notebooks", unit="folder", total=len(tasks)): await f async def downloads(root_folder_id): try: matching_folders.clear() logger.info(f"Scanning Drive folder: {root_folder_id}") await crawl_drive_bfs(root_folder_id) if matching_folders: logger.info(f"Found {len(matching_folders)} folders with notebooks.") await parallel_download() logger.info("Download completed.") else: logger.warning("No folders with prefix 'Notebook' and .ipynb files found.") except Exception as e: logger.exception(f"Unexpected error during execution: {e}") asyncio.run(downloads("1KWHJC3yqXXVK2Cz-qZxWeeic8ifCmgej"))