Spaces:
Running
Running
| # File: backend/cloud_utils.py | |
| import dropbox | |
| # --- GOOGLE DRIVE LOGIC --- | |
| def create_google_folder(service, folder_name: str, parent_id: str = None) -> str: | |
| """Creates a folder in Google Drive. Can nest inside a parent folder.""" | |
| query = f"mimeType='application/vnd.google-apps.folder' and name='{folder_name}' and trashed=false" | |
| # If a parent is provided, only search inside that parent | |
| if parent_id: | |
| query += f" and '{parent_id}' in parents" | |
| results = service.files().list(q=query, spaces='drive', fields='files(id, name)').execute() | |
| files = results.get('files', []) | |
| if files: | |
| return files[0].get('id') | |
| file_metadata = { | |
| 'name': folder_name, | |
| 'mimeType': 'application/vnd.google-apps.folder' | |
| } | |
| # Attach to the parent folder if requested | |
| if parent_id: | |
| file_metadata['parents'] = [parent_id] | |
| folder = service.files().create(body=file_metadata, fields='id').execute() | |
| return folder.get('id') | |
| def move_google_file(service, file_id: str, new_folder_id: str, new_name: str = None): | |
| """Moves a file by changing its parent tag, and optionally renames it.""" | |
| try: | |
| file = service.files().get(fileId=file_id, fields='parents').execute() | |
| previous_parents = ",".join(file.get('parents', [])) | |
| body = {} | |
| if new_name: | |
| body['name'] = new_name | |
| updated_file = service.files().update( | |
| fileId=file_id, | |
| addParents=new_folder_id, | |
| removeParents=previous_parents, | |
| body=body if body else None, | |
| fields='id, parents, name' | |
| ).execute() | |
| return True | |
| except Exception as e: | |
| print(f"Error moving Google file: {e}") | |
| return False | |
| # --- DROPBOX LOGIC --- | |
| def create_dropbox_folder(dbx: dropbox.Dropbox, folder_name: str) -> str: | |
| """Creates a folder in Dropbox.""" | |
| path = f"/{folder_name}" | |
| try: | |
| res = dbx.files_create_folder_v2(path) | |
| return res.metadata.id | |
| except dropbox.exceptions.ApiError as e: | |
| if hasattr(e.error, 'get_path') and e.error.get_path().is_conflict(): | |
| print("Dropbox folder exists") | |
| return "exists" | |
| print(f"Dropbox Folder Error: {e}") | |
| return None | |
| def move_dropbox_file(dbx: dropbox.Dropbox, from_path: str, new_folder_name: str, file_name: str, new_name: str = None): | |
| """Dropbox requires full path destinations. Auto-renames if new_name is provided.""" | |
| final_name = new_name if new_name else file_name | |
| to_path = f"/{new_folder_name}/{final_name}" | |
| try: | |
| dbx.files_move_v2(from_path, to_path) | |
| return True | |
| except Exception as e: | |
| print(f"Error moving Dropbox file: {e}") | |
| return False | |
| def delete_google_file(service, file_id: str): | |
| """Permanently deletes a file from Google Drive.""" | |
| try: | |
| service.files().delete(fileId=file_id).execute() | |
| return True | |
| except Exception as e: | |
| print(f"Error deleting Google file: {e}") | |
| return False | |
| def delete_dropbox_file(dbx: dropbox.Dropbox, path: str): | |
| """Permanently deletes a file from Dropbox using its path.""" | |
| try: | |
| dbx.files_delete_v2(path) | |
| return True | |
| except Exception as e: | |
| print(f"Error deleting Dropbox file: {e}") | |
| return False |