Spaces:
Running
Running
| import os | |
| import tempfile | |
| import zipfile | |
| import re | |
| from pathlib import Path | |
| from typing import Optional, Union, Callable, List | |
| import requests | |
| import anndata | |
| from anndata import AnnData | |
| class H5adLoader: | |
| """Handle h5ad file loading with backed='r' for efficient memory usage""" | |
| ALLOWED_DOMAINS = [ | |
| "huggingface.co", | |
| "zenodo.org", | |
| "s3.amazonaws.com", | |
| "drive.google.com", | |
| "docs.google.com", | |
| ] | |
| MAX_DOWNLOAD_SIZE = 20 * 1024 * 1024 * 1024 # 20GB | |
| TIMEOUT = 3000 # 3000 seconds = 50 minutes | |
| def convert_google_drive_url(url: str) -> str: | |
| """ | |
| Convert Google Drive sharing URL to direct download URL | |
| Supports formats: | |
| - https://drive.google.com/file/d/{FILE_ID}/view?usp=sharing | |
| - https://drive.google.com/open?id={FILE_ID} | |
| - https://docs.google.com/... | |
| Args: | |
| url: Google Drive sharing URL | |
| Returns: | |
| Direct download URL | |
| Raises: | |
| ValueError: If cannot extract file ID | |
| """ | |
| # Pattern 1: /file/d/{ID}/view | |
| match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
| if match: | |
| file_id = match.group(1) | |
| return f"https://drive.google.com/uc?export=download&id={file_id}" | |
| # Pattern 2: open?id={ID} | |
| match = re.search(r'[?&]id=([a-zA-Z0-9_-]+)', url) | |
| if match: | |
| file_id = match.group(1) | |
| return f"https://drive.google.com/uc?export=download&id={file_id}" | |
| # If already a direct download URL, return as-is | |
| if 'drive.google.com/uc' in url: | |
| return url | |
| raise ValueError( | |
| "Cannot parse Google Drive URL. Please use a sharing link like: " | |
| "https://drive.google.com/file/d/{FILE_ID}/view?usp=sharing" | |
| ) | |
| def is_zip_file(filepath: str) -> bool: | |
| """Check if file is a ZIP archive""" | |
| return filepath.lower().endswith('.zip') and zipfile.is_zipfile(filepath) | |
| def extract_h5ad_from_zip(zip_path: str, extract_dir: Optional[str] = None) -> List[str]: | |
| """ | |
| Extract all .h5ad files from a ZIP archive | |
| Args: | |
| zip_path: Path to ZIP file | |
| extract_dir: Directory to extract to (uses temp dir if None) | |
| Returns: | |
| List of paths to extracted h5ad files | |
| Raises: | |
| ValueError: If no h5ad files found in ZIP | |
| """ | |
| if extract_dir is None: | |
| extract_dir = tempfile.mkdtemp() | |
| extracted_h5ad_files = [] | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| # Get all .h5ad files | |
| h5ad_files = [f for f in zip_ref.namelist() if f.lower().endswith('.h5ad')] | |
| if not h5ad_files: | |
| raise ValueError("No .h5ad files found in ZIP archive") | |
| # Extract each h5ad file | |
| for h5ad_file in h5ad_files: | |
| # Skip macOS metadata files | |
| if '__MACOSX' in h5ad_file or h5ad_file.startswith('.'): | |
| continue | |
| zip_ref.extract(h5ad_file, extract_dir) | |
| extracted_path = os.path.join(extract_dir, h5ad_file) | |
| extracted_h5ad_files.append(extracted_path) | |
| if not extracted_h5ad_files: | |
| raise ValueError("No valid .h5ad files found in ZIP (only hidden/system files)") | |
| except zipfile.BadZipFile: | |
| raise ValueError("Invalid or corrupted ZIP file") | |
| return extracted_h5ad_files | |
| def is_valid_url(url: str) -> bool: | |
| """Check if URL is from allowed domains""" | |
| if not url.startswith(("http://", "https://")): | |
| return False | |
| return any(domain in url for domain in H5adLoader.ALLOWED_DOMAINS) | |
| def _extract_filename_from_response(response, url: str) -> str: | |
| """ | |
| Extract filename from HTTP response headers or URL | |
| Prioritizes Content-Disposition header (especially useful for Google Drive) | |
| Args: | |
| response: requests.Response object | |
| url: Original URL | |
| Returns: | |
| Extracted filename | |
| """ | |
| filename = None | |
| # Try to get filename from Content-Disposition header | |
| content_disposition = response.headers.get('Content-Disposition', '') | |
| if content_disposition: | |
| # Try filename*= (RFC 5987 encoded) | |
| match = re.search(r"filename\*=(?:UTF-8''|utf-8'')(.+?)(?:;|$)", content_disposition, re.IGNORECASE) | |
| if match: | |
| from urllib.parse import unquote | |
| filename = unquote(match.group(1).strip()) | |
| # Try filename= with quotes | |
| if not filename: | |
| match = re.search(r'filename="([^"]+)"', content_disposition) | |
| if match: | |
| filename = match.group(1).strip() | |
| # Try filename= without quotes | |
| if not filename: | |
| match = re.search(r'filename=([^;\s]+)', content_disposition) | |
| if match: | |
| filename = match.group(1).strip() | |
| # Fallback: try to extract from URL | |
| if not filename: | |
| filename = url.split("/")[-1].split("?")[0] | |
| # Default filename if still empty | |
| if not filename or filename == "" or filename == "uc": | |
| filename = "downloaded_data.h5ad" | |
| # If no extension, try to determine from content type or URL | |
| if '.' not in filename: | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'zip' in content_type.lower() or 'zip' in url.lower(): | |
| filename = filename + ".zip" | |
| else: | |
| filename = filename + ".h5ad" | |
| return filename | |
| def download_h5ad( | |
| url: str, | |
| save_dir: Optional[str] = None, | |
| progress_callback: Optional[Callable[[int, int], None]] = None | |
| ) -> Union[str, List[str]]: | |
| """ | |
| Download h5ad file (or ZIP containing h5ad files) from URL | |
| Args: | |
| url: URL to h5ad or ZIP file | |
| save_dir: Directory to save file (uses temp dir if None) | |
| progress_callback: Optional callback function(downloaded_bytes, total_bytes) | |
| Returns: | |
| Path to downloaded file, or list of paths if ZIP was extracted | |
| Raises: | |
| ValueError: If URL is invalid or download fails | |
| """ | |
| # Convert Google Drive URL if needed | |
| original_url = url | |
| if 'drive.google.com' in url or 'docs.google.com' in url: | |
| try: | |
| url = H5adLoader.convert_google_drive_url(url) | |
| except ValueError as e: | |
| raise ValueError(f"Google Drive URL error: {str(e)}") | |
| if not H5adLoader.is_valid_url(url) and not H5adLoader.is_valid_url(original_url): | |
| raise ValueError( | |
| f"URL not from allowed domains: {', '.join(H5adLoader.ALLOWED_DOMAINS)}" | |
| ) | |
| if save_dir is None: | |
| save_dir = tempfile.mkdtemp() | |
| try: | |
| response = requests.get( | |
| url, | |
| stream=True, | |
| timeout=H5adLoader.TIMEOUT, | |
| allow_redirects=True | |
| ) | |
| response.raise_for_status() | |
| # Extract filename from response headers (handles Google Drive properly) | |
| filename = H5adLoader._extract_filename_from_response(response, url) | |
| filepath = os.path.join(save_dir, filename) | |
| # Get total size if available | |
| total_size = int(response.headers.get('content-length', 0)) | |
| downloaded_size = 0 | |
| with open(filepath, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| downloaded_size += len(chunk) | |
| # Check size limit | |
| if downloaded_size > H5adLoader.MAX_DOWNLOAD_SIZE: | |
| raise ValueError( | |
| f"File too large (>{H5adLoader.MAX_DOWNLOAD_SIZE / 1e9:.1f}GB)" | |
| ) | |
| f.write(chunk) | |
| # Call progress callback if provided | |
| if progress_callback: | |
| progress_callback(downloaded_size, total_size) | |
| # Check if it's a ZIP file and extract if so | |
| if H5adLoader.is_zip_file(filepath): | |
| extracted_files = H5adLoader.extract_h5ad_from_zip(filepath, save_dir) | |
| return extracted_files # Return list of extracted h5ad files | |
| return filepath | |
| except requests.RequestException as e: | |
| raise ValueError(f"Failed to download file: {str(e)}") | |
| def load_h5ad( | |
| path: Union[str, Path], | |
| backed: str = "r", | |
| ) -> Union[AnnData, List[AnnData]]: | |
| """ | |
| Load h5ad file with backed mode for memory efficiency | |
| Also handles ZIP files containing h5ad files | |
| Args: | |
| path: Path to h5ad or ZIP file, or URL | |
| backed: Backing mode ('r' for read-only, recommended) | |
| Returns: | |
| AnnData object with backed mode enabled, or list of AnnData if ZIP | |
| Raises: | |
| ValueError: If file cannot be loaded | |
| """ | |
| path_str = str(path) | |
| # If it's a URL, download first | |
| if path_str.startswith(("http://", "https://")): | |
| downloaded = H5adLoader.download_h5ad(path_str) | |
| # Check if we got multiple files from ZIP | |
| if isinstance(downloaded, list): | |
| # Load all extracted h5ad files | |
| adata_list = [] | |
| for h5ad_path in downloaded: | |
| adata = anndata.read_h5ad(h5ad_path, backed=backed) | |
| adata_list.append(adata) | |
| return adata_list | |
| path_str = downloaded | |
| # Check if local file is a ZIP | |
| if os.path.exists(path_str) and H5adLoader.is_zip_file(path_str): | |
| extracted_files = H5adLoader.extract_h5ad_from_zip(path_str) | |
| if len(extracted_files) == 1: | |
| # Single h5ad file in ZIP | |
| path_str = extracted_files[0] | |
| else: | |
| # Multiple h5ad files in ZIP | |
| adata_list = [] | |
| for h5ad_path in extracted_files: | |
| adata = anndata.read_h5ad(h5ad_path, backed=backed) | |
| adata_list.append(adata) | |
| return adata_list | |
| # Validate file exists | |
| if not os.path.exists(path_str): | |
| raise ValueError(f"File not found: {path_str}") | |
| # Validate file extension | |
| if not path_str.endswith(".h5ad"): | |
| raise ValueError("File must have .h5ad extension") | |
| try: | |
| # Load with backed mode for efficient memory usage | |
| adata = anndata.read_h5ad(path_str, backed=backed) | |
| return adata | |
| except Exception as e: | |
| raise ValueError(f"Failed to load h5ad file: {str(e)}") | |
| def load_from_source(source: Union[str, Path]) -> AnnData: | |
| """ | |
| Convenience method to load h5ad from file path or URL | |
| Args: | |
| source: File path or URL to h5ad file | |
| Returns: | |
| AnnData object loaded with backed='r' | |
| """ | |
| return H5adLoader.load_h5ad(source, backed="r") | |