enflow-api / migrate_cloudinary_to_gridfs.py
dhruv575
Proper migration to GridFS
f6249c8
import os
import logging
import requests
from urllib.parse import urlparse
import cloudinary
import cloudinary.api
from dotenv import load_dotenv
from db import get_gridfs, Database, get_workflows_collection
from models.workflow import Workflow
from bson.objectid import ObjectId
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure Cloudinary
try:
cloudinary.config(
cloud_name=os.environ.get('CLOUDINARY_CLOUD_NAME'),
api_key=os.environ.get('CLOUDINARY_API_KEY'),
api_secret=os.environ.get('CLOUDINARY_API_SECRET')
)
logger.info("Cloudinary configured.")
except Exception as e:
logger.error(f"Failed to configure Cloudinary: {str(e)}")
exit(1)
def is_cloudinary_url(url):
"""Check if a URL is a Cloudinary URL"""
return url.startswith('http') and 'cloudinary.com' in url
def extract_cloudinary_public_id(url):
"""Extract the public ID from a Cloudinary URL"""
parsed = urlparse(url)
path = parsed.path
# Remove the file extension
filename = os.path.basename(path)
public_id = os.path.splitext(filename)[0]
return public_id
def migrate_cloudinary_to_gridfs():
"""Migrate all Cloudinary files referenced in workflows to GridFS"""
logger.info("Starting migration of Cloudinary files to GridFS")
# Initialize database connection
db_instance = Database.get_instance()
fs = get_gridfs()
# Get all workflows
workflows = Workflow.get_all()
logger.info(f"Found {len(workflows)} workflows to check")
migration_count = 0
error_count = 0
for workflow in workflows:
# Check if workflow has Cloudinary forms
cloudinary_forms = [form for form in workflow.raw_forms if is_cloudinary_url(form)]
if not cloudinary_forms:
continue
logger.info(f"Workflow {workflow._id} has {len(cloudinary_forms)} Cloudinary forms to migrate")
for form_url in cloudinary_forms:
try:
# Extract public_id from URL
public_id = extract_cloudinary_public_id(form_url)
logger.info(f"Migrating Cloudinary file {public_id} to GridFS")
# Download file from Cloudinary
resource = cloudinary.api.resource(public_id)
file_url = resource['secure_url']
response = requests.get(file_url)
if response.status_code != 200:
logger.error(f"Failed to download file from Cloudinary: {response.status_code}")
error_count += 1
continue
file_content = response.content
# Store in GridFS
file_id = fs.put(
file_content,
filename=f"{public_id}.pdf",
content_type='application/pdf',
metadata={
'workflow_id': str(workflow._id),
'migrated_from_cloudinary': True,
'original_url': form_url
}
)
# Create a URL for retrieving the file
gridfs_url = f"/api/workflows/{workflow._id}/files/{file_id}"
# Replace Cloudinary URL with GridFS URL in workflow
workflow.raw_forms.remove(form_url)
workflow.raw_forms.append(gridfs_url)
if workflow.save():
logger.info(f"Successfully migrated {form_url} to {gridfs_url}")
migration_count += 1
# Optionally delete from Cloudinary
try:
cloudinary.uploader.destroy(public_id)
logger.info(f"Deleted file {public_id} from Cloudinary")
except Exception as del_e:
logger.warning(f"Could not delete {public_id} from Cloudinary: {str(del_e)}")
else:
logger.error(f"Failed to update workflow {workflow._id} with new GridFS URL")
# Clean up the added GridFS file
fs.delete(file_id)
error_count += 1
except Exception as e:
logger.error(f"Error migrating {form_url}: {str(e)}")
error_count += 1
logger.info(f"Migration complete: {migration_count} files migrated, {error_count} errors")
if __name__ == "__main__":
migrate_cloudinary_to_gridfs()