import os import tempfile import zipfile import re from pathlib import Path from typing import Optional, Union, Callable, List import requests import anndata from anndata import AnnData class H5adLoader: """Handle h5ad file loading with backed='r' for efficient memory usage""" ALLOWED_DOMAINS = [ "huggingface.co", "zenodo.org", "s3.amazonaws.com", "drive.google.com", "docs.google.com", ] MAX_DOWNLOAD_SIZE = 20 * 1024 * 1024 * 1024 # 20GB TIMEOUT = 3000 # 3000 seconds = 50 minutes @staticmethod def convert_google_drive_url(url: str) -> str: """ Convert Google Drive sharing URL to direct download URL Supports formats: - https://drive.google.com/file/d/{FILE_ID}/view?usp=sharing - https://drive.google.com/open?id={FILE_ID} - https://docs.google.com/... Args: url: Google Drive sharing URL Returns: Direct download URL Raises: ValueError: If cannot extract file ID """ # Pattern 1: /file/d/{ID}/view match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) if match: file_id = match.group(1) return f"https://drive.google.com/uc?export=download&id={file_id}" # Pattern 2: open?id={ID} match = re.search(r'[?&]id=([a-zA-Z0-9_-]+)', url) if match: file_id = match.group(1) return f"https://drive.google.com/uc?export=download&id={file_id}" # If already a direct download URL, return as-is if 'drive.google.com/uc' in url: return url raise ValueError( "Cannot parse Google Drive URL. Please use a sharing link like: " "https://drive.google.com/file/d/{FILE_ID}/view?usp=sharing" ) @staticmethod def is_zip_file(filepath: str) -> bool: """Check if file is a ZIP archive""" return filepath.lower().endswith('.zip') and zipfile.is_zipfile(filepath) @staticmethod def extract_h5ad_from_zip(zip_path: str, extract_dir: Optional[str] = None) -> List[str]: """ Extract all .h5ad files from a ZIP archive Args: zip_path: Path to ZIP file extract_dir: Directory to extract to (uses temp dir if None) Returns: List of paths to extracted h5ad files Raises: ValueError: If no h5ad files found in ZIP """ if extract_dir is None: extract_dir = tempfile.mkdtemp() extracted_h5ad_files = [] try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: # Get all .h5ad files h5ad_files = [f for f in zip_ref.namelist() if f.lower().endswith('.h5ad')] if not h5ad_files: raise ValueError("No .h5ad files found in ZIP archive") # Extract each h5ad file for h5ad_file in h5ad_files: # Skip macOS metadata files if '__MACOSX' in h5ad_file or h5ad_file.startswith('.'): continue zip_ref.extract(h5ad_file, extract_dir) extracted_path = os.path.join(extract_dir, h5ad_file) extracted_h5ad_files.append(extracted_path) if not extracted_h5ad_files: raise ValueError("No valid .h5ad files found in ZIP (only hidden/system files)") except zipfile.BadZipFile: raise ValueError("Invalid or corrupted ZIP file") return extracted_h5ad_files @staticmethod def is_valid_url(url: str) -> bool: """Check if URL is from allowed domains""" if not url.startswith(("http://", "https://")): return False return any(domain in url for domain in H5adLoader.ALLOWED_DOMAINS) @staticmethod def _extract_filename_from_response(response, url: str) -> str: """ Extract filename from HTTP response headers or URL Prioritizes Content-Disposition header (especially useful for Google Drive) Args: response: requests.Response object url: Original URL Returns: Extracted filename """ filename = None # Try to get filename from Content-Disposition header content_disposition = response.headers.get('Content-Disposition', '') if content_disposition: # Try filename*= (RFC 5987 encoded) match = re.search(r"filename\*=(?:UTF-8''|utf-8'')(.+?)(?:;|$)", content_disposition, re.IGNORECASE) if match: from urllib.parse import unquote filename = unquote(match.group(1).strip()) # Try filename= with quotes if not filename: match = re.search(r'filename="([^"]+)"', content_disposition) if match: filename = match.group(1).strip() # Try filename= without quotes if not filename: match = re.search(r'filename=([^;\s]+)', content_disposition) if match: filename = match.group(1).strip() # Fallback: try to extract from URL if not filename: filename = url.split("/")[-1].split("?")[0] # Default filename if still empty if not filename or filename == "" or filename == "uc": filename = "downloaded_data.h5ad" # If no extension, try to determine from content type or URL if '.' not in filename: content_type = response.headers.get('Content-Type', '') if 'zip' in content_type.lower() or 'zip' in url.lower(): filename = filename + ".zip" else: filename = filename + ".h5ad" return filename @staticmethod def download_h5ad( url: str, save_dir: Optional[str] = None, progress_callback: Optional[Callable[[int, int], None]] = None ) -> Union[str, List[str]]: """ Download h5ad file (or ZIP containing h5ad files) from URL Args: url: URL to h5ad or ZIP file save_dir: Directory to save file (uses temp dir if None) progress_callback: Optional callback function(downloaded_bytes, total_bytes) Returns: Path to downloaded file, or list of paths if ZIP was extracted Raises: ValueError: If URL is invalid or download fails """ # Convert Google Drive URL if needed original_url = url if 'drive.google.com' in url or 'docs.google.com' in url: try: url = H5adLoader.convert_google_drive_url(url) except ValueError as e: raise ValueError(f"Google Drive URL error: {str(e)}") if not H5adLoader.is_valid_url(url) and not H5adLoader.is_valid_url(original_url): raise ValueError( f"URL not from allowed domains: {', '.join(H5adLoader.ALLOWED_DOMAINS)}" ) if save_dir is None: save_dir = tempfile.mkdtemp() try: response = requests.get( url, stream=True, timeout=H5adLoader.TIMEOUT, allow_redirects=True ) response.raise_for_status() # Extract filename from response headers (handles Google Drive properly) filename = H5adLoader._extract_filename_from_response(response, url) filepath = os.path.join(save_dir, filename) # Get total size if available total_size = int(response.headers.get('content-length', 0)) downloaded_size = 0 with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: downloaded_size += len(chunk) # Check size limit if downloaded_size > H5adLoader.MAX_DOWNLOAD_SIZE: raise ValueError( f"File too large (>{H5adLoader.MAX_DOWNLOAD_SIZE / 1e9:.1f}GB)" ) f.write(chunk) # Call progress callback if provided if progress_callback: progress_callback(downloaded_size, total_size) # Check if it's a ZIP file and extract if so if H5adLoader.is_zip_file(filepath): extracted_files = H5adLoader.extract_h5ad_from_zip(filepath, save_dir) return extracted_files # Return list of extracted h5ad files return filepath except requests.RequestException as e: raise ValueError(f"Failed to download file: {str(e)}") @staticmethod def load_h5ad( path: Union[str, Path], backed: str = "r", ) -> Union[AnnData, List[AnnData]]: """ Load h5ad file with backed mode for memory efficiency Also handles ZIP files containing h5ad files Args: path: Path to h5ad or ZIP file, or URL backed: Backing mode ('r' for read-only, recommended) Returns: AnnData object with backed mode enabled, or list of AnnData if ZIP Raises: ValueError: If file cannot be loaded """ path_str = str(path) # If it's a URL, download first if path_str.startswith(("http://", "https://")): downloaded = H5adLoader.download_h5ad(path_str) # Check if we got multiple files from ZIP if isinstance(downloaded, list): # Load all extracted h5ad files adata_list = [] for h5ad_path in downloaded: adata = anndata.read_h5ad(h5ad_path, backed=backed) adata_list.append(adata) return adata_list path_str = downloaded # Check if local file is a ZIP if os.path.exists(path_str) and H5adLoader.is_zip_file(path_str): extracted_files = H5adLoader.extract_h5ad_from_zip(path_str) if len(extracted_files) == 1: # Single h5ad file in ZIP path_str = extracted_files[0] else: # Multiple h5ad files in ZIP adata_list = [] for h5ad_path in extracted_files: adata = anndata.read_h5ad(h5ad_path, backed=backed) adata_list.append(adata) return adata_list # Validate file exists if not os.path.exists(path_str): raise ValueError(f"File not found: {path_str}") # Validate file extension if not path_str.endswith(".h5ad"): raise ValueError("File must have .h5ad extension") try: # Load with backed mode for efficient memory usage adata = anndata.read_h5ad(path_str, backed=backed) return adata except Exception as e: raise ValueError(f"Failed to load h5ad file: {str(e)}") @staticmethod def load_from_source(source: Union[str, Path]) -> AnnData: """ Convenience method to load h5ad from file path or URL Args: source: File path or URL to h5ad file Returns: AnnData object loaded with backed='r' """ return H5adLoader.load_h5ad(source, backed="r")