Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import requests | |
| from urllib.parse import urlparse | |
| import cloudinary | |
| import cloudinary.api | |
| from dotenv import load_dotenv | |
| from db import get_gridfs, Database, get_workflows_collection | |
| from models.workflow import Workflow | |
| from bson.objectid import ObjectId | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configure Cloudinary | |
| try: | |
| cloudinary.config( | |
| cloud_name=os.environ.get('CLOUDINARY_CLOUD_NAME'), | |
| api_key=os.environ.get('CLOUDINARY_API_KEY'), | |
| api_secret=os.environ.get('CLOUDINARY_API_SECRET') | |
| ) | |
| logger.info("Cloudinary configured.") | |
| except Exception as e: | |
| logger.error(f"Failed to configure Cloudinary: {str(e)}") | |
| exit(1) | |
| def is_cloudinary_url(url): | |
| """Check if a URL is a Cloudinary URL""" | |
| return url.startswith('http') and 'cloudinary.com' in url | |
| def extract_cloudinary_public_id(url): | |
| """Extract the public ID from a Cloudinary URL""" | |
| parsed = urlparse(url) | |
| path = parsed.path | |
| # Remove the file extension | |
| filename = os.path.basename(path) | |
| public_id = os.path.splitext(filename)[0] | |
| return public_id | |
| def migrate_cloudinary_to_gridfs(): | |
| """Migrate all Cloudinary files referenced in workflows to GridFS""" | |
| logger.info("Starting migration of Cloudinary files to GridFS") | |
| # Initialize database connection | |
| db_instance = Database.get_instance() | |
| fs = get_gridfs() | |
| # Get all workflows | |
| workflows = Workflow.get_all() | |
| logger.info(f"Found {len(workflows)} workflows to check") | |
| migration_count = 0 | |
| error_count = 0 | |
| for workflow in workflows: | |
| # Check if workflow has Cloudinary forms | |
| cloudinary_forms = [form for form in workflow.raw_forms if is_cloudinary_url(form)] | |
| if not cloudinary_forms: | |
| continue | |
| logger.info(f"Workflow {workflow._id} has {len(cloudinary_forms)} Cloudinary forms to migrate") | |
| for form_url in cloudinary_forms: | |
| try: | |
| # Extract public_id from URL | |
| public_id = extract_cloudinary_public_id(form_url) | |
| logger.info(f"Migrating Cloudinary file {public_id} to GridFS") | |
| # Download file from Cloudinary | |
| resource = cloudinary.api.resource(public_id) | |
| file_url = resource['secure_url'] | |
| response = requests.get(file_url) | |
| if response.status_code != 200: | |
| logger.error(f"Failed to download file from Cloudinary: {response.status_code}") | |
| error_count += 1 | |
| continue | |
| file_content = response.content | |
| # Store in GridFS | |
| file_id = fs.put( | |
| file_content, | |
| filename=f"{public_id}.pdf", | |
| content_type='application/pdf', | |
| metadata={ | |
| 'workflow_id': str(workflow._id), | |
| 'migrated_from_cloudinary': True, | |
| 'original_url': form_url | |
| } | |
| ) | |
| # Create a URL for retrieving the file | |
| gridfs_url = f"/api/workflows/{workflow._id}/files/{file_id}" | |
| # Replace Cloudinary URL with GridFS URL in workflow | |
| workflow.raw_forms.remove(form_url) | |
| workflow.raw_forms.append(gridfs_url) | |
| if workflow.save(): | |
| logger.info(f"Successfully migrated {form_url} to {gridfs_url}") | |
| migration_count += 1 | |
| # Optionally delete from Cloudinary | |
| try: | |
| cloudinary.uploader.destroy(public_id) | |
| logger.info(f"Deleted file {public_id} from Cloudinary") | |
| except Exception as del_e: | |
| logger.warning(f"Could not delete {public_id} from Cloudinary: {str(del_e)}") | |
| else: | |
| logger.error(f"Failed to update workflow {workflow._id} with new GridFS URL") | |
| # Clean up the added GridFS file | |
| fs.delete(file_id) | |
| error_count += 1 | |
| except Exception as e: | |
| logger.error(f"Error migrating {form_url}: {str(e)}") | |
| error_count += 1 | |
| logger.info(f"Migration complete: {migration_count} files migrated, {error_count} errors") | |
| if __name__ == "__main__": | |
| migrate_cloudinary_to_gridfs() |