Spaces:
Sleeping
Sleeping
| """ | |
| Cloudinary Integration | |
| Handles image and video uploads to Cloudinary | |
| """ | |
| import cloudinary | |
| import cloudinary.uploader | |
| from fastapi import UploadFile, HTTPException | |
| from app.config import settings | |
| from typing import Dict, Any | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class CloudinaryService: | |
| """Service for uploading files to Cloudinary""" | |
| def initialize(): | |
| """Initialize Cloudinary configuration""" | |
| if not all([settings.CLOUDINARY_CLOUD_NAME, settings.CLOUDINARY_API_KEY, settings.CLOUDINARY_API_SECRET]): | |
| logger.warning("Cloudinary credentials not configured") | |
| return False | |
| cloudinary.config( | |
| cloud_name=settings.CLOUDINARY_CLOUD_NAME, | |
| api_key=settings.CLOUDINARY_API_KEY, | |
| api_secret=settings.CLOUDINARY_API_SECRET, | |
| secure=True | |
| ) | |
| return True | |
| def get_folder_path(entity_type: str) -> str: | |
| """ | |
| Get Cloudinary folder path based on entity type | |
| Folder structure: | |
| - /swiftops/users/ - User profile photos and documents | |
| - /swiftops/tickets/ - Ticket photos, site images | |
| - /swiftops/receipts/ - Payment receipts, expense documents | |
| - /swiftops/projects/ - Project-related images | |
| - /swiftops/other/ - Everything else | |
| """ | |
| folder_map = { | |
| 'user': 'swiftops/users', | |
| 'ticket': 'swiftops/tickets', | |
| 'expense': 'swiftops/receipts', | |
| 'sales_order': 'swiftops/receipts', | |
| 'project': 'swiftops/projects', | |
| 'client': 'swiftops/clients', | |
| 'contractor': 'swiftops/contractors' | |
| } | |
| return folder_map.get(entity_type, 'swiftops/other') | |
| async def upload( | |
| file: UploadFile, | |
| entity_type: str, | |
| entity_id: str, | |
| document_type: str, | |
| contextual_filename: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Upload file to Cloudinary | |
| Args: | |
| file: The file to upload | |
| entity_type: Type of entity (user, ticket, etc.) | |
| entity_id: ID of the entity | |
| document_type: Type of document (profile_photo, ticket_image, etc.) | |
| contextual_filename: Optional contextual filename (without extension) | |
| Returns: | |
| Dict containing upload response with secure_url, public_id, etc. | |
| Raises: | |
| HTTPException: If upload fails | |
| """ | |
| if not CloudinaryService.initialize(): | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Cloudinary not configured. Please set CLOUDINARY credentials." | |
| ) | |
| try: | |
| # Read file content | |
| file_content = await file.read() | |
| # Determine resource type | |
| resource_type = "auto" # Let Cloudinary detect (image, video, raw) | |
| # Get folder path | |
| folder = CloudinaryService.get_folder_path(entity_type) | |
| # Prepare upload options | |
| upload_options = { | |
| 'asset_folder': folder, # Where it appears in Media Library | |
| 'overwrite': False, | |
| 'resource_type': resource_type, | |
| 'tags': [entity_type, document_type, str(entity_id)] # For searching | |
| } | |
| # Use contextual filename if provided, otherwise use original with unique suffix | |
| if contextual_filename: | |
| # Use contextual filename as public_id (Cloudinary will add folder prefix) | |
| upload_options['public_id'] = contextual_filename | |
| upload_options['use_filename'] = False | |
| upload_options['unique_filename'] = False | |
| else: | |
| # Fallback to original filename with unique suffix | |
| upload_options['use_filename'] = True | |
| upload_options['unique_filename'] = True | |
| # Upload to Cloudinary with timeout (60 seconds) | |
| # If upload takes longer, it will raise an exception and trigger Supabase fallback | |
| # Frontend has 120s timeout with retries, so 60s gives Cloudinary a fair chance | |
| # while leaving time for Supabase fallback if needed | |
| upload_options['timeout'] = 60 | |
| upload_result = cloudinary.uploader.upload( | |
| file_content, | |
| **upload_options | |
| ) | |
| logger.info(f"Uploaded file to Cloudinary: {upload_result.get('secure_url')}") | |
| return { | |
| 'secure_url': upload_result.get('secure_url'), | |
| 'public_id': upload_result.get('public_id'), | |
| 'format': upload_result.get('format'), | |
| 'resource_type': upload_result.get('resource_type'), | |
| 'bytes': upload_result.get('bytes'), | |
| 'width': upload_result.get('width'), | |
| 'height': upload_result.get('height'), | |
| 'created_at': upload_result.get('created_at'), | |
| 'version': upload_result.get('version'), | |
| 'type': upload_result.get('type'), | |
| 'etag': upload_result.get('etag') | |
| } | |
| except Exception as e: | |
| logger.error(f"Cloudinary upload failed: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Failed to upload file to Cloudinary: {str(e)}" | |
| ) | |
| def delete(public_id: str, resource_type: str = "image") -> bool: | |
| """ | |
| Delete file from Cloudinary | |
| Args: | |
| public_id: The Cloudinary public ID | |
| resource_type: Type of resource (image, video, raw) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not CloudinaryService.initialize(): | |
| logger.warning("Cloudinary not configured, cannot delete") | |
| return False | |
| try: | |
| result = cloudinary.uploader.destroy(public_id, resource_type=resource_type) | |
| return result.get('result') == 'ok' | |
| except Exception as e: | |
| logger.error(f"Cloudinary delete failed: {str(e)}") | |
| return False | |