""" Cloudinary Integration Handles image and video uploads to Cloudinary """ import cloudinary import cloudinary.uploader from fastapi import UploadFile, HTTPException from app.config import settings from typing import Dict, Any import logging logger = logging.getLogger(__name__) class CloudinaryService: """Service for uploading files to Cloudinary""" @staticmethod def initialize(): """Initialize Cloudinary configuration""" if not all([settings.CLOUDINARY_CLOUD_NAME, settings.CLOUDINARY_API_KEY, settings.CLOUDINARY_API_SECRET]): logger.warning("Cloudinary credentials not configured") return False cloudinary.config( cloud_name=settings.CLOUDINARY_CLOUD_NAME, api_key=settings.CLOUDINARY_API_KEY, api_secret=settings.CLOUDINARY_API_SECRET, secure=True ) return True @staticmethod def get_folder_path(entity_type: str) -> str: """ Get Cloudinary folder path based on entity type Folder structure: - /swiftops/users/ - User profile photos and documents - /swiftops/tickets/ - Ticket photos, site images - /swiftops/receipts/ - Payment receipts, expense documents - /swiftops/projects/ - Project-related images - /swiftops/other/ - Everything else """ folder_map = { 'user': 'swiftops/users', 'ticket': 'swiftops/tickets', 'expense': 'swiftops/receipts', 'sales_order': 'swiftops/receipts', 'project': 'swiftops/projects', 'client': 'swiftops/clients', 'contractor': 'swiftops/contractors' } return folder_map.get(entity_type, 'swiftops/other') @staticmethod async def upload( file: UploadFile, entity_type: str, entity_id: str, document_type: str, contextual_filename: str = None ) -> Dict[str, Any]: """ Upload file to Cloudinary Args: file: The file to upload entity_type: Type of entity (user, ticket, etc.) entity_id: ID of the entity document_type: Type of document (profile_photo, ticket_image, etc.) contextual_filename: Optional contextual filename (without extension) Returns: Dict containing upload response with secure_url, public_id, etc. Raises: HTTPException: If upload fails """ if not CloudinaryService.initialize(): raise HTTPException( status_code=500, detail="Cloudinary not configured. Please set CLOUDINARY credentials." ) try: # Read file content file_content = await file.read() # Determine resource type resource_type = "auto" # Let Cloudinary detect (image, video, raw) # Get folder path folder = CloudinaryService.get_folder_path(entity_type) # Prepare upload options upload_options = { 'asset_folder': folder, # Where it appears in Media Library 'overwrite': False, 'resource_type': resource_type, 'tags': [entity_type, document_type, str(entity_id)] # For searching } # Use contextual filename if provided, otherwise use original with unique suffix if contextual_filename: # Use contextual filename as public_id (Cloudinary will add folder prefix) upload_options['public_id'] = contextual_filename upload_options['use_filename'] = False upload_options['unique_filename'] = False else: # Fallback to original filename with unique suffix upload_options['use_filename'] = True upload_options['unique_filename'] = True # Upload to Cloudinary with timeout (60 seconds) # If upload takes longer, it will raise an exception and trigger Supabase fallback # Frontend has 120s timeout with retries, so 60s gives Cloudinary a fair chance # while leaving time for Supabase fallback if needed upload_options['timeout'] = 60 upload_result = cloudinary.uploader.upload( file_content, **upload_options ) logger.info(f"Uploaded file to Cloudinary: {upload_result.get('secure_url')}") return { 'secure_url': upload_result.get('secure_url'), 'public_id': upload_result.get('public_id'), 'format': upload_result.get('format'), 'resource_type': upload_result.get('resource_type'), 'bytes': upload_result.get('bytes'), 'width': upload_result.get('width'), 'height': upload_result.get('height'), 'created_at': upload_result.get('created_at'), 'version': upload_result.get('version'), 'type': upload_result.get('type'), 'etag': upload_result.get('etag') } except Exception as e: logger.error(f"Cloudinary upload failed: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to upload file to Cloudinary: {str(e)}" ) @staticmethod def delete(public_id: str, resource_type: str = "image") -> bool: """ Delete file from Cloudinary Args: public_id: The Cloudinary public ID resource_type: Type of resource (image, video, raw) Returns: True if successful, False otherwise """ if not CloudinaryService.initialize(): logger.warning("Cloudinary not configured, cannot delete") return False try: result = cloudinary.uploader.destroy(public_id, resource_type=resource_type) return result.get('result') == 'ok' except Exception as e: logger.error(f"Cloudinary delete failed: {str(e)}") return False