Spaces:
Sleeping
Sleeping
| import aiohttp | |
| import asyncio | |
| import tempfile | |
| import os | |
| import re | |
| import shutil | |
| import hashlib | |
| import atexit | |
| from urllib.parse import urlparse, unquote | |
| from typing import List, Tuple | |
| class FileDownloader: | |
| def __init__(self): | |
| # Create a temp directory for this process | |
| self.cache_dir = tempfile.mkdtemp(prefix="file_downloader_") | |
| print(f"π Temp cache directory created: {self.cache_dir}") | |
| # Ensure cleanup at process exit | |
| atexit.register(self._cleanup_cache_dir) | |
| def _get_cache_path(self, url: str, ext: str) -> str: | |
| """Generate a cache file path for the given URL and extension.""" | |
| url_hash = hashlib.sha256(url.encode("utf-8")).hexdigest() | |
| return os.path.join(self.cache_dir, f"{url_hash}{ext}") | |
| async def download_file( | |
| self, url: str, timeout: int = 300, max_retries: int = 3 | |
| ) -> Tuple[str, str]: | |
| """ | |
| Download any file type from a URL to the temp cache. | |
| Returns (file_path, extension_without_dot) | |
| """ | |
| print(f"π₯ Downloading file from: {url[:60]}...") | |
| # Determine file extension (before downloading) | |
| parsed_path = unquote(urlparse(url).path) | |
| guessed_ext = os.path.splitext(parsed_path)[1] or "" | |
| # Check if cached version exists | |
| if guessed_ext: | |
| cache_path = self._get_cache_path(url, guessed_ext) | |
| if os.path.exists(cache_path): | |
| print(f"β‘ Cache hit! Using cached file: {cache_path}") | |
| return cache_path, guessed_ext.lstrip(".") | |
| for attempt in range(max_retries): | |
| try: | |
| timeout_config = aiohttp.ClientTimeout( | |
| total=timeout, | |
| connect=30, | |
| sock_read=120 | |
| ) | |
| async with aiohttp.ClientSession(timeout=timeout_config) as session: | |
| print(f" Attempt {attempt + 1}/{max_retries} (timeout: {timeout}s)") | |
| async with session.get(url) as response: | |
| if response.status != 200: | |
| raise Exception(f"Failed to download file: HTTP {response.status}") | |
| # Extract filename from header or URL | |
| cd = response.headers.get('Content-Disposition', '') | |
| filename_match = re.findall('filename="?([^"]+)"?', cd) | |
| if filename_match: | |
| filename = filename_match[0] | |
| else: | |
| filename = os.path.basename(parsed_path) | |
| if not filename: | |
| filename = "downloaded_file" | |
| ext = os.path.splitext(filename)[1] | |
| if not ext: | |
| return url, "url" | |
| if ext not in ['.pdf', '.docx', '.pptx', '.png', '.xlsx', '.jpeg', '.jpg', '.txt', '.csv']: | |
| print(f" β File type not supported: {ext}") | |
| return ['not supported', ext.lstrip('.')] | |
| # Final cache path based on extension | |
| cache_path = self._get_cache_path(url, ext) | |
| # Download directly to cache path | |
| with open(cache_path, "wb") as f: | |
| downloaded = 0 | |
| content_length = response.headers.get('content-length') | |
| total_size = int(content_length) if content_length else None | |
| async for chunk in response.content.iter_chunked(16384): | |
| f.write(chunk) | |
| downloaded += len(chunk) | |
| if total_size and downloaded % (1024 * 1024) == 0: | |
| progress = (downloaded / total_size) * 100 | |
| print(f" Progress: {progress:.1f}% ({downloaded / (1024*1024):.1f} MB)") | |
| print(f"β File downloaded successfully: {cache_path}") | |
| return cache_path, ext.lstrip('.') | |
| except asyncio.TimeoutError: | |
| print(f" β° Timeout on attempt {attempt + 1}") | |
| if attempt < max_retries - 1: | |
| wait_time = (attempt + 1) * 30 | |
| print(f" β³ Waiting {wait_time}s before retry...") | |
| await asyncio.sleep(wait_time) | |
| continue | |
| except Exception as e: | |
| print(f" β Error on attempt {attempt + 1}: {str(e)}") | |
| if attempt < max_retries - 1: | |
| wait_time = (attempt + 1) * 15 | |
| print(f" β³ Waiting {wait_time}s before retry...") | |
| await asyncio.sleep(wait_time) | |
| continue | |
| raise Exception(f"Failed to download file after {max_retries} attempts") | |
| def _cleanup_cache_dir(self): | |
| """Remove the entire cache directory.""" | |
| if os.path.exists(self.cache_dir): | |
| try: | |
| shutil.rmtree(self.cache_dir) | |
| print(f"ποΈ Deleted temp cache directory: {self.cache_dir}") | |
| except Exception as e: | |
| print(f"β οΈ Could not delete cache directory {self.cache_dir}: {e}") | |