| import os |
| import shutil |
| import hashlib |
| import time |
| import base64 |
|
|
|
|
|
|
|
|
| LOGS_FOLDER = '/content/Applio-RVC-Fork/logs' |
| WEIGHTS_FOLDER = '/content/Applio-RVC-Fork/weights' |
| GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup' |
|
|
| def import_google_drive_backup(): |
| print("Importing Google Drive backup...") |
| weights_exist = False |
| for root, dirs, files in os.walk(GOOGLE_DRIVE_PATH): |
| for filename in files: |
| filepath = os.path.join(root, filename) |
| if os.path.isfile(filepath) and not filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')): |
| backup_filepath = os.path.join(LOGS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH)) |
| backup_folderpath = os.path.dirname(backup_filepath) |
| if not os.path.exists(backup_folderpath): |
| os.makedirs(backup_folderpath) |
| print(f'Created backup folder: {backup_folderpath}', flush=True) |
| shutil.copy2(filepath, backup_filepath) |
| print(f'Imported file from Google Drive backup: {filename}') |
| elif filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')) and filename.endswith('.pth'): |
| weights_exist = True |
| weights_filepath = os.path.join(WEIGHTS_FOLDER, os.path.relpath(filepath, os.path.join(GOOGLE_DRIVE_PATH, 'weights'))) |
| weights_folderpath = os.path.dirname(weights_filepath) |
| if not os.path.exists(weights_folderpath): |
| os.makedirs(weights_folderpath) |
| print(f'Created weights folder: {weights_folderpath}', flush=True) |
| shutil.copy2(filepath, weights_filepath) |
| print(f'Imported file from weights: {filename}') |
| if weights_exist: |
| print("Copied weights from Google Drive backup to local weights folder.") |
| else: |
| print("No weights found in Google Drive backup.") |
| print("Google Drive backup import completed.") |
|
|
| def get_md5_hash(file_path): |
| hash_md5 = hashlib.md5() |
| with open(file_path, "rb") as f: |
| for chunk in iter(lambda: f.read(4096), b""): |
| hash_md5.update(chunk) |
| return hash_md5.hexdigest() |
|
|
| def copy_weights_folder_to_drive(): |
| destination_folder = os.path.join(GOOGLE_DRIVE_PATH, 'weights') |
| try: |
| if not os.path.exists(destination_folder): |
| os.makedirs(destination_folder) |
|
|
| num_copied = 0 |
| for filename in os.listdir(WEIGHTS_FOLDER): |
| if filename.endswith('.pth'): |
| source_file = os.path.join(WEIGHTS_FOLDER, filename) |
| destination_file = os.path.join(destination_folder, filename) |
| if not os.path.exists(destination_file): |
| shutil.copy2(source_file, destination_file) |
| num_copied += 1 |
| print(f"Copied {filename} to Google Drive!") |
|
|
| if num_copied == 0: |
| print("No new finished models found for copying.") |
| else: |
| print(f"Finished copying {num_copied} files to Google Drive!") |
|
|
| except Exception as e: |
| print(f"An error occurred while copying weights: {str(e)}") |
| |
|
|
| def backup_files(): |
| print("\nStarting backup loop...") |
| last_backup_timestamps_path = os.path.join(LOGS_FOLDER, 'last_backup_timestamps.txt') |
| fully_updated = False |
| |
| while True: |
| try: |
| updated = False |
| last_backup_timestamps = {} |
|
|
| try: |
| with open(last_backup_timestamps_path, 'r') as f: |
| last_backup_timestamps = dict(line.strip().split(':') for line in f) |
| except FileNotFoundError: |
| pass |
| |
| for root, dirs, files in os.walk(LOGS_FOLDER): |
| for filename in files: |
| if filename != 'last_backup_timestamps.txt': |
| filepath = os.path.join(root, filename) |
| if os.path.isfile(filepath): |
| backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) |
| backup_folderpath = os.path.dirname(backup_filepath) |
| if not os.path.exists(backup_folderpath): |
| os.makedirs(backup_folderpath) |
| print(f'Created backup folder: {backup_folderpath}', flush=True) |
| |
| last_backup_timestamp = last_backup_timestamps.get(filepath) |
| current_timestamp = os.path.getmtime(filepath) |
| if last_backup_timestamp is None or float(last_backup_timestamp) < current_timestamp: |
| shutil.copy2(filepath, backup_filepath) |
| last_backup_timestamps[filepath] = str(current_timestamp) |
| if last_backup_timestamp is None: |
| print(f'Backed up file: {filename}') |
| else: |
| print(f'Updating backed up file: {filename}') |
| updated = True |
| fully_updated = False |
| |
| |
| for filepath in list(last_backup_timestamps.keys()): |
| if not os.path.exists(filepath): |
| backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) |
| if os.path.exists(backup_filepath): |
| os.remove(backup_filepath) |
| print(f'Deleted file: {filepath}') |
| del last_backup_timestamps[filepath] |
| updated = True |
| fully_updated = False |
| |
| if not updated and not fully_updated: |
| print("Files are up to date.") |
| fully_updated = True |
| copy_weights_folder_to_drive() |
| sleep_time = 15 |
| else: |
| sleep_time = 0.1 |
| |
| with open(last_backup_timestamps_path, 'w') as f: |
| for filepath, timestamp in last_backup_timestamps.items(): |
| f.write(f'{filepath}:{timestamp}\n') |
| |
| time.sleep(sleep_time) |
| |
| except Exception as e: |
| print(f"An error occurred: {str(e)}") |
| |
|
|