Spaces:
Sleeping
Sleeping
| """ | |
| Cloudinary Uploader - Upload images to Cloudinary CDN. | |
| Works INDEPENDENTLY of PDF processing and embedding. | |
| Use it if you just need to upload images to a CDN. | |
| Features: | |
| - Retry logic with timeouts | |
| - Batch uploading | |
| - Automatic JPEG optimization | |
| Environment Variables: | |
| - VISUAL_RAG_THREAD_SAFE: Set to "1" to use thread-safe timeouts | |
| (required for Streamlit, Flask, or other threaded contexts) | |
| """ | |
| import io | |
| import logging | |
| import os | |
| import platform | |
| import signal | |
| import threading | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| from concurrent.futures import TimeoutError as FuturesTimeoutError | |
| from typing import Optional | |
| from PIL import Image | |
| logger = logging.getLogger(__name__) | |
| THREAD_SAFE_MODE = os.getenv("VISUAL_RAG_THREAD_SAFE", "").lower() in ("1", "true", "yes") | |
| class CloudinaryUploader: | |
| """ | |
| Upload images to Cloudinary CDN. | |
| Works independently - just needs PIL images. | |
| Args: | |
| cloud_name: Cloudinary cloud name | |
| api_key: Cloudinary API key | |
| api_secret: Cloudinary API secret | |
| folder: Base folder for uploads | |
| max_retries: Number of retry attempts | |
| timeout_seconds: Timeout per upload | |
| Example: | |
| >>> uploader = CloudinaryUploader( | |
| ... cloud_name="my-cloud", | |
| ... api_key="xxx", | |
| ... api_secret="yyy", | |
| ... folder="my-project", | |
| ... ) | |
| >>> | |
| >>> url = uploader.upload(image, "doc_page_1") | |
| >>> print(url) # https://res.cloudinary.com/.../doc_page_1.jpg | |
| """ | |
| def __init__( | |
| self, | |
| cloud_name: Optional[str] = None, | |
| api_key: Optional[str] = None, | |
| api_secret: Optional[str] = None, | |
| folder: str = "visual-rag", | |
| max_retries: int = 3, | |
| timeout_seconds: int = 30, | |
| jpeg_quality: int = 95, | |
| ): | |
| # Load from environment if not provided | |
| self.cloud_name = cloud_name or os.getenv("CLOUDINARY_CLOUD_NAME") | |
| self.api_key = api_key or os.getenv("CLOUDINARY_API_KEY") | |
| self.api_secret = api_secret or os.getenv("CLOUDINARY_API_SECRET") | |
| if not all([self.cloud_name, self.api_key, self.api_secret]): | |
| raise ValueError( | |
| "Cloudinary credentials required. Set CLOUDINARY_CLOUD_NAME, " | |
| "CLOUDINARY_API_KEY, CLOUDINARY_API_SECRET environment variables " | |
| "or pass them as arguments." | |
| ) | |
| self.folder = folder | |
| self.max_retries = max_retries | |
| self.timeout_seconds = timeout_seconds | |
| self.jpeg_quality = jpeg_quality | |
| # Check dependency | |
| try: | |
| import cloudinary # noqa | |
| except ImportError: | |
| raise ImportError( | |
| "Cloudinary not installed. " | |
| "Install with: pip install visual-rag-toolkit[cloudinary]" | |
| ) | |
| logger.info("☁️ Cloudinary uploader initialized") | |
| logger.info(f" Folder: {folder}") | |
| def upload( | |
| self, | |
| image: Image.Image, | |
| public_id: str, | |
| subfolder: Optional[str] = None, | |
| ) -> Optional[str]: | |
| """ | |
| Upload a single image to Cloudinary. | |
| Args: | |
| image: PIL Image to upload | |
| public_id: Public ID (filename without extension) | |
| subfolder: Optional subfolder within base folder | |
| Returns: | |
| Secure URL of uploaded image, or None if failed | |
| """ | |
| import cloudinary | |
| import cloudinary.uploader | |
| # Prepare buffer | |
| buffer = io.BytesIO() | |
| image.save(buffer, format="JPEG", quality=self.jpeg_quality, optimize=True) | |
| # Configure Cloudinary | |
| cloudinary.config( | |
| cloud_name=self.cloud_name, | |
| api_key=self.api_key, | |
| api_secret=self.api_secret, | |
| ) | |
| # Build folder path | |
| folder_path = self.folder | |
| if subfolder: | |
| folder_path = f"{self.folder}/{subfolder}" | |
| def do_upload(): | |
| buffer.seek(0) | |
| result = cloudinary.uploader.upload( | |
| buffer, | |
| folder=folder_path, | |
| overwrite=True, | |
| public_id=public_id, | |
| resource_type="image", | |
| timeout=self.timeout_seconds, | |
| ) | |
| return result["secure_url"] | |
| # Use thread-safe mode for Streamlit/Flask/threaded contexts | |
| # Set VISUAL_RAG_THREAD_SAFE=1 to enable | |
| if THREAD_SAFE_MODE or threading.current_thread() is not threading.main_thread(): | |
| return self._upload_with_thread_timeout(do_upload, public_id) | |
| else: | |
| return self._upload_with_signal_timeout(do_upload, public_id) | |
| def _upload_with_thread_timeout(self, do_upload, public_id: str) -> Optional[str]: | |
| """Thread-safe upload with ThreadPoolExecutor timeout.""" | |
| for attempt in range(self.max_retries): | |
| try: | |
| with ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(do_upload) | |
| return future.result(timeout=self.timeout_seconds) | |
| except FuturesTimeoutError: | |
| logger.warning( | |
| f"Upload timeout (attempt {attempt + 1}/{self.max_retries}): {public_id}" | |
| ) | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2**attempt) | |
| except Exception as e: | |
| logger.warning(f"Upload failed (attempt {attempt + 1}/{self.max_retries}): {e}") | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2**attempt) | |
| logger.error(f"❌ Upload failed after {self.max_retries} attempts: {public_id}") | |
| return None | |
| def _upload_with_signal_timeout(self, do_upload, public_id: str) -> Optional[str]: | |
| """Signal-based upload timeout (main thread only, Unix/macOS).""" | |
| use_timeout = platform.system() != "Windows" | |
| class SignalTimeoutError(Exception): | |
| pass | |
| def timeout_handler(signum, frame): | |
| raise SignalTimeoutError(f"Upload timed out after {self.timeout_seconds}s") | |
| for attempt in range(self.max_retries): | |
| try: | |
| if use_timeout: | |
| old_handler = signal.signal(signal.SIGALRM, timeout_handler) | |
| signal.alarm(self.timeout_seconds) | |
| try: | |
| return do_upload() | |
| finally: | |
| if use_timeout: | |
| signal.alarm(0) | |
| signal.signal(signal.SIGALRM, old_handler) | |
| except SignalTimeoutError: | |
| logger.warning( | |
| f"Upload timeout (attempt {attempt + 1}/{self.max_retries}): {public_id}" | |
| ) | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2**attempt) | |
| except Exception as e: | |
| logger.warning(f"Upload failed (attempt {attempt + 1}/{self.max_retries}): {e}") | |
| if attempt < self.max_retries - 1: | |
| time.sleep(2**attempt) | |
| logger.error(f"❌ Upload failed after {self.max_retries} attempts: {public_id}") | |
| return None | |
| def upload_original_and_resized( | |
| self, | |
| original_image: Image.Image, | |
| resized_image: Image.Image, | |
| base_public_id: str, | |
| ) -> tuple: | |
| """ | |
| Upload both original and resized versions. | |
| Args: | |
| original_image: Original PDF page image | |
| resized_image: Resized image for ColPali | |
| base_public_id: Base public ID (e.g., "doc_page_1") | |
| Returns: | |
| Tuple of (original_url, resized_url) - either can be None on failure | |
| """ | |
| original_url = self.upload( | |
| original_image, | |
| base_public_id, | |
| subfolder="original", | |
| ) | |
| resized_url = self.upload( | |
| resized_image, | |
| base_public_id, | |
| subfolder="resized", | |
| ) | |
| return original_url, resized_url | |
| def upload_original_cropped_and_resized( | |
| self, | |
| original_image: Image.Image, | |
| cropped_image: Optional[Image.Image], | |
| resized_image: Image.Image, | |
| base_public_id: str, | |
| ) -> tuple: | |
| original_url = self.upload( | |
| original_image, | |
| base_public_id, | |
| subfolder="original", | |
| ) | |
| cropped_url = None | |
| if cropped_image is not None: | |
| cropped_url = self.upload( | |
| cropped_image, | |
| base_public_id, | |
| subfolder="cropped", | |
| ) | |
| resized_url = self.upload( | |
| resized_image, | |
| base_public_id, | |
| subfolder="resized", | |
| ) | |
| return original_url, cropped_url, resized_url | |