Spaces:
Sleeping
Sleeping
File size: 6,405 Bytes
74de430 d0a01ab 74de430 d0a01ab b26895a d0a01ab b26895a d0a01ab b26895a 756bc21 d0a01ab b26895a d0a01ab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
"""
Cloudinary Integration
Handles image and video uploads to Cloudinary
"""
import cloudinary
import cloudinary.uploader
from fastapi import UploadFile, HTTPException
from app.config import settings
from typing import Dict, Any
import logging
logger = logging.getLogger(__name__)
class CloudinaryService:
"""Service for uploading files to Cloudinary"""
@staticmethod
def initialize():
"""Initialize Cloudinary configuration"""
if not all([settings.CLOUDINARY_CLOUD_NAME, settings.CLOUDINARY_API_KEY, settings.CLOUDINARY_API_SECRET]):
logger.warning("Cloudinary credentials not configured")
return False
cloudinary.config(
cloud_name=settings.CLOUDINARY_CLOUD_NAME,
api_key=settings.CLOUDINARY_API_KEY,
api_secret=settings.CLOUDINARY_API_SECRET,
secure=True
)
return True
@staticmethod
def get_folder_path(entity_type: str) -> str:
"""
Get Cloudinary folder path based on entity type
Folder structure:
- /swiftops/users/ - User profile photos and documents
- /swiftops/tickets/ - Ticket photos, site images
- /swiftops/receipts/ - Payment receipts, expense documents
- /swiftops/projects/ - Project-related images
- /swiftops/other/ - Everything else
"""
folder_map = {
'user': 'swiftops/users',
'ticket': 'swiftops/tickets',
'expense': 'swiftops/receipts',
'sales_order': 'swiftops/receipts',
'project': 'swiftops/projects',
'client': 'swiftops/clients',
'contractor': 'swiftops/contractors'
}
return folder_map.get(entity_type, 'swiftops/other')
@staticmethod
async def upload(
file: UploadFile,
entity_type: str,
entity_id: str,
document_type: str,
contextual_filename: str = None
) -> Dict[str, Any]:
"""
Upload file to Cloudinary
Args:
file: The file to upload
entity_type: Type of entity (user, ticket, etc.)
entity_id: ID of the entity
document_type: Type of document (profile_photo, ticket_image, etc.)
contextual_filename: Optional contextual filename (without extension)
Returns:
Dict containing upload response with secure_url, public_id, etc.
Raises:
HTTPException: If upload fails
"""
if not CloudinaryService.initialize():
raise HTTPException(
status_code=500,
detail="Cloudinary not configured. Please set CLOUDINARY credentials."
)
try:
# Read file content
file_content = await file.read()
# Determine resource type
resource_type = "auto" # Let Cloudinary detect (image, video, raw)
# Get folder path
folder = CloudinaryService.get_folder_path(entity_type)
# Prepare upload options
upload_options = {
'asset_folder': folder, # Where it appears in Media Library
'overwrite': False,
'resource_type': resource_type,
'tags': [entity_type, document_type, str(entity_id)] # For searching
}
# Use contextual filename if provided, otherwise use original with unique suffix
if contextual_filename:
# Use contextual filename as public_id (Cloudinary will add folder prefix)
upload_options['public_id'] = contextual_filename
upload_options['use_filename'] = False
upload_options['unique_filename'] = False
else:
# Fallback to original filename with unique suffix
upload_options['use_filename'] = True
upload_options['unique_filename'] = True
# Upload to Cloudinary with timeout (60 seconds)
# If upload takes longer, it will raise an exception and trigger Supabase fallback
# Frontend has 120s timeout with retries, so 60s gives Cloudinary a fair chance
# while leaving time for Supabase fallback if needed
upload_options['timeout'] = 60
upload_result = cloudinary.uploader.upload(
file_content,
**upload_options
)
logger.info(f"Uploaded file to Cloudinary: {upload_result.get('secure_url')}")
return {
'secure_url': upload_result.get('secure_url'),
'public_id': upload_result.get('public_id'),
'format': upload_result.get('format'),
'resource_type': upload_result.get('resource_type'),
'bytes': upload_result.get('bytes'),
'width': upload_result.get('width'),
'height': upload_result.get('height'),
'created_at': upload_result.get('created_at'),
'version': upload_result.get('version'),
'type': upload_result.get('type'),
'etag': upload_result.get('etag')
}
except Exception as e:
logger.error(f"Cloudinary upload failed: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Failed to upload file to Cloudinary: {str(e)}"
)
@staticmethod
def delete(public_id: str, resource_type: str = "image") -> bool:
"""
Delete file from Cloudinary
Args:
public_id: The Cloudinary public ID
resource_type: Type of resource (image, video, raw)
Returns:
True if successful, False otherwise
"""
if not CloudinaryService.initialize():
logger.warning("Cloudinary not configured, cannot delete")
return False
try:
result = cloudinary.uploader.destroy(public_id, resource_type=resource_type)
return result.get('result') == 'ok'
except Exception as e:
logger.error(f"Cloudinary delete failed: {str(e)}")
return False
|