Rahul-Samedavar's picture
fixed pdf exrtaction
7c4579c
import aiohttp
import asyncio
import tempfile
import os
import re
import shutil
import hashlib
import atexit
from urllib.parse import urlparse, unquote
from typing import List, Tuple
class FileDownloader:
def __init__(self):
# Create a temp directory for this process
self.cache_dir = tempfile.mkdtemp(prefix="file_downloader_")
print(f"πŸ“‚ Temp cache directory created: {self.cache_dir}")
# Ensure cleanup at process exit
atexit.register(self._cleanup_cache_dir)
def _get_cache_path(self, url: str, ext: str) -> str:
"""Generate a cache file path for the given URL and extension."""
url_hash = hashlib.sha256(url.encode("utf-8")).hexdigest()
return os.path.join(self.cache_dir, f"{url_hash}{ext}")
async def download_file(
self, url: str, timeout: int = 300, max_retries: int = 3
) -> Tuple[str, str]:
"""
Download any file type from a URL to the temp cache.
Returns (file_path, extension_without_dot)
"""
print(f"πŸ“₯ Downloading file from: {url[:60]}...")
# Determine file extension (before downloading)
parsed_path = unquote(urlparse(url).path)
guessed_ext = os.path.splitext(parsed_path)[1] or ""
# Check if cached version exists
if guessed_ext:
cache_path = self._get_cache_path(url, guessed_ext)
if os.path.exists(cache_path):
print(f"⚑ Cache hit! Using cached file: {cache_path}")
return cache_path, guessed_ext.lstrip(".")
for attempt in range(max_retries):
try:
timeout_config = aiohttp.ClientTimeout(
total=timeout,
connect=30,
sock_read=120
)
async with aiohttp.ClientSession(timeout=timeout_config) as session:
print(f" Attempt {attempt + 1}/{max_retries} (timeout: {timeout}s)")
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"Failed to download file: HTTP {response.status}")
# Extract filename from header or URL
cd = response.headers.get('Content-Disposition', '')
filename_match = re.findall('filename="?([^"]+)"?', cd)
if filename_match:
filename = filename_match[0]
else:
filename = os.path.basename(parsed_path)
if not filename:
filename = "downloaded_file"
ext = os.path.splitext(filename)[1]
if not ext:
return url, "url"
if ext not in ['.pdf', '.docx', '.pptx', '.png', '.xlsx', '.jpeg', '.jpg', '.txt', '.csv']:
print(f" ❌ File type not supported: {ext}")
return ['not supported', ext.lstrip('.')]
# Final cache path based on extension
cache_path = self._get_cache_path(url, ext)
# Download directly to cache path
with open(cache_path, "wb") as f:
downloaded = 0
content_length = response.headers.get('content-length')
total_size = int(content_length) if content_length else None
async for chunk in response.content.iter_chunked(16384):
f.write(chunk)
downloaded += len(chunk)
if total_size and downloaded % (1024 * 1024) == 0:
progress = (downloaded / total_size) * 100
print(f" Progress: {progress:.1f}% ({downloaded / (1024*1024):.1f} MB)")
print(f"βœ… File downloaded successfully: {cache_path}")
return cache_path, ext.lstrip('.')
except asyncio.TimeoutError:
print(f" ⏰ Timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 30
print(f" ⏳ Waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
continue
except Exception as e:
print(f" ❌ Error on attempt {attempt + 1}: {str(e)}")
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 15
print(f" ⏳ Waiting {wait_time}s before retry...")
await asyncio.sleep(wait_time)
continue
raise Exception(f"Failed to download file after {max_retries} attempts")
def _cleanup_cache_dir(self):
"""Remove the entire cache directory."""
if os.path.exists(self.cache_dir):
try:
shutil.rmtree(self.cache_dir)
print(f"πŸ—‘οΈ Deleted temp cache directory: {self.cache_dir}")
except Exception as e:
print(f"⚠️ Could not delete cache directory {self.cache_dir}: {e}")