File size: 11,612 Bytes
05fdb87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import os
import tempfile
import zipfile
import re
from pathlib import Path
from typing import Optional, Union, Callable, List
import requests
import anndata
from anndata import AnnData


class H5adLoader:
    """Handle h5ad file loading with backed='r' for efficient memory usage"""

    ALLOWED_DOMAINS = [
        "huggingface.co",
        "zenodo.org",
        "s3.amazonaws.com",
        "drive.google.com",
        "docs.google.com",
    ]

    MAX_DOWNLOAD_SIZE = 20 * 1024 * 1024 * 1024  # 20GB
    TIMEOUT = 3000  # 3000 seconds = 50 minutes

    @staticmethod
    def convert_google_drive_url(url: str) -> str:
        """
        Convert Google Drive sharing URL to direct download URL

        Supports formats:
        - https://drive.google.com/file/d/{FILE_ID}/view?usp=sharing
        - https://drive.google.com/open?id={FILE_ID}
        - https://docs.google.com/...

        Args:
            url: Google Drive sharing URL

        Returns:
            Direct download URL

        Raises:
            ValueError: If cannot extract file ID
        """
        # Pattern 1: /file/d/{ID}/view
        match = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url)
        if match:
            file_id = match.group(1)
            return f"https://drive.google.com/uc?export=download&id={file_id}"

        # Pattern 2: open?id={ID}
        match = re.search(r'[?&]id=([a-zA-Z0-9_-]+)', url)
        if match:
            file_id = match.group(1)
            return f"https://drive.google.com/uc?export=download&id={file_id}"

        # If already a direct download URL, return as-is
        if 'drive.google.com/uc' in url:
            return url

        raise ValueError(
            "Cannot parse Google Drive URL. Please use a sharing link like: "
            "https://drive.google.com/file/d/{FILE_ID}/view?usp=sharing"
        )

    @staticmethod
    def is_zip_file(filepath: str) -> bool:
        """Check if file is a ZIP archive"""
        return filepath.lower().endswith('.zip') and zipfile.is_zipfile(filepath)

    @staticmethod
    def extract_h5ad_from_zip(zip_path: str, extract_dir: Optional[str] = None) -> List[str]:
        """
        Extract all .h5ad files from a ZIP archive

        Args:
            zip_path: Path to ZIP file
            extract_dir: Directory to extract to (uses temp dir if None)

        Returns:
            List of paths to extracted h5ad files

        Raises:
            ValueError: If no h5ad files found in ZIP
        """
        if extract_dir is None:
            extract_dir = tempfile.mkdtemp()

        extracted_h5ad_files = []

        try:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                # Get all .h5ad files
                h5ad_files = [f for f in zip_ref.namelist() if f.lower().endswith('.h5ad')]

                if not h5ad_files:
                    raise ValueError("No .h5ad files found in ZIP archive")

                # Extract each h5ad file
                for h5ad_file in h5ad_files:
                    # Skip macOS metadata files
                    if '__MACOSX' in h5ad_file or h5ad_file.startswith('.'):
                        continue

                    zip_ref.extract(h5ad_file, extract_dir)
                    extracted_path = os.path.join(extract_dir, h5ad_file)
                    extracted_h5ad_files.append(extracted_path)

                if not extracted_h5ad_files:
                    raise ValueError("No valid .h5ad files found in ZIP (only hidden/system files)")

        except zipfile.BadZipFile:
            raise ValueError("Invalid or corrupted ZIP file")

        return extracted_h5ad_files

    @staticmethod
    def is_valid_url(url: str) -> bool:
        """Check if URL is from allowed domains"""
        if not url.startswith(("http://", "https://")):
            return False
        return any(domain in url for domain in H5adLoader.ALLOWED_DOMAINS)

    @staticmethod
    def _extract_filename_from_response(response, url: str) -> str:
        """
        Extract filename from HTTP response headers or URL

        Prioritizes Content-Disposition header (especially useful for Google Drive)

        Args:
            response: requests.Response object
            url: Original URL

        Returns:
            Extracted filename
        """
        filename = None

        # Try to get filename from Content-Disposition header
        content_disposition = response.headers.get('Content-Disposition', '')
        if content_disposition:
            # Try filename*= (RFC 5987 encoded)
            match = re.search(r"filename\*=(?:UTF-8''|utf-8'')(.+?)(?:;|$)", content_disposition, re.IGNORECASE)
            if match:
                from urllib.parse import unquote
                filename = unquote(match.group(1).strip())

            # Try filename= with quotes
            if not filename:
                match = re.search(r'filename="([^"]+)"', content_disposition)
                if match:
                    filename = match.group(1).strip()

            # Try filename= without quotes
            if not filename:
                match = re.search(r'filename=([^;\s]+)', content_disposition)
                if match:
                    filename = match.group(1).strip()

        # Fallback: try to extract from URL
        if not filename:
            filename = url.split("/")[-1].split("?")[0]

        # Default filename if still empty
        if not filename or filename == "" or filename == "uc":
            filename = "downloaded_data.h5ad"

        # If no extension, try to determine from content type or URL
        if '.' not in filename:
            content_type = response.headers.get('Content-Type', '')
            if 'zip' in content_type.lower() or 'zip' in url.lower():
                filename = filename + ".zip"
            else:
                filename = filename + ".h5ad"

        return filename

    @staticmethod
    def download_h5ad(
        url: str,
        save_dir: Optional[str] = None,
        progress_callback: Optional[Callable[[int, int], None]] = None
    ) -> Union[str, List[str]]:
        """
        Download h5ad file (or ZIP containing h5ad files) from URL

        Args:
            url: URL to h5ad or ZIP file
            save_dir: Directory to save file (uses temp dir if None)
            progress_callback: Optional callback function(downloaded_bytes, total_bytes)

        Returns:
            Path to downloaded file, or list of paths if ZIP was extracted

        Raises:
            ValueError: If URL is invalid or download fails
        """
        # Convert Google Drive URL if needed
        original_url = url
        if 'drive.google.com' in url or 'docs.google.com' in url:
            try:
                url = H5adLoader.convert_google_drive_url(url)
            except ValueError as e:
                raise ValueError(f"Google Drive URL error: {str(e)}")

        if not H5adLoader.is_valid_url(url) and not H5adLoader.is_valid_url(original_url):
            raise ValueError(
                f"URL not from allowed domains: {', '.join(H5adLoader.ALLOWED_DOMAINS)}"
            )

        if save_dir is None:
            save_dir = tempfile.mkdtemp()

        try:
            response = requests.get(
                url,
                stream=True,
                timeout=H5adLoader.TIMEOUT,
                allow_redirects=True
            )
            response.raise_for_status()

            # Extract filename from response headers (handles Google Drive properly)
            filename = H5adLoader._extract_filename_from_response(response, url)
            filepath = os.path.join(save_dir, filename)

            # Get total size if available
            total_size = int(response.headers.get('content-length', 0))

            downloaded_size = 0
            with open(filepath, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        downloaded_size += len(chunk)

                        # Check size limit
                        if downloaded_size > H5adLoader.MAX_DOWNLOAD_SIZE:
                            raise ValueError(
                                f"File too large (>{H5adLoader.MAX_DOWNLOAD_SIZE / 1e9:.1f}GB)"
                            )

                        f.write(chunk)

                        # Call progress callback if provided
                        if progress_callback:
                            progress_callback(downloaded_size, total_size)

            # Check if it's a ZIP file and extract if so
            if H5adLoader.is_zip_file(filepath):
                extracted_files = H5adLoader.extract_h5ad_from_zip(filepath, save_dir)
                return extracted_files  # Return list of extracted h5ad files

            return filepath

        except requests.RequestException as e:
            raise ValueError(f"Failed to download file: {str(e)}")

    @staticmethod
    def load_h5ad(
        path: Union[str, Path],
        backed: str = "r",
    ) -> Union[AnnData, List[AnnData]]:
        """
        Load h5ad file with backed mode for memory efficiency
        Also handles ZIP files containing h5ad files

        Args:
            path: Path to h5ad or ZIP file, or URL
            backed: Backing mode ('r' for read-only, recommended)

        Returns:
            AnnData object with backed mode enabled, or list of AnnData if ZIP

        Raises:
            ValueError: If file cannot be loaded
        """
        path_str = str(path)

        # If it's a URL, download first
        if path_str.startswith(("http://", "https://")):
            downloaded = H5adLoader.download_h5ad(path_str)

            # Check if we got multiple files from ZIP
            if isinstance(downloaded, list):
                # Load all extracted h5ad files
                adata_list = []
                for h5ad_path in downloaded:
                    adata = anndata.read_h5ad(h5ad_path, backed=backed)
                    adata_list.append(adata)
                return adata_list

            path_str = downloaded

        # Check if local file is a ZIP
        if os.path.exists(path_str) and H5adLoader.is_zip_file(path_str):
            extracted_files = H5adLoader.extract_h5ad_from_zip(path_str)

            if len(extracted_files) == 1:
                # Single h5ad file in ZIP
                path_str = extracted_files[0]
            else:
                # Multiple h5ad files in ZIP
                adata_list = []
                for h5ad_path in extracted_files:
                    adata = anndata.read_h5ad(h5ad_path, backed=backed)
                    adata_list.append(adata)
                return adata_list

        # Validate file exists
        if not os.path.exists(path_str):
            raise ValueError(f"File not found: {path_str}")

        # Validate file extension
        if not path_str.endswith(".h5ad"):
            raise ValueError("File must have .h5ad extension")

        try:
            # Load with backed mode for efficient memory usage
            adata = anndata.read_h5ad(path_str, backed=backed)
            return adata

        except Exception as e:
            raise ValueError(f"Failed to load h5ad file: {str(e)}")

    @staticmethod
    def load_from_source(source: Union[str, Path]) -> AnnData:
        """
        Convenience method to load h5ad from file path or URL

        Args:
            source: File path or URL to h5ad file

        Returns:
            AnnData object loaded with backed='r'
        """
        return H5adLoader.load_h5ad(source, backed="r")