| | import os |
| | import shutil |
| | import hashlib |
| | import time |
| | import base64 |
| |
|
| |
|
| |
|
| |
|
| | LOGS_FOLDER = '/content/Applio-RVC-Fork/logs' |
| | WEIGHTS_FOLDER = '/content/Applio-RVC-Fork/weights' |
| | GOOGLE_DRIVE_PATH = '/content/drive/MyDrive/RVC_Backup' |
| |
|
| | def import_google_drive_backup(): |
| | print("Importing Google Drive backup...") |
| | weights_exist = False |
| | for root, dirs, files in os.walk(GOOGLE_DRIVE_PATH): |
| | for filename in files: |
| | filepath = os.path.join(root, filename) |
| | if os.path.isfile(filepath) and not filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')): |
| | backup_filepath = os.path.join(LOGS_FOLDER, os.path.relpath(filepath, GOOGLE_DRIVE_PATH)) |
| | backup_folderpath = os.path.dirname(backup_filepath) |
| | if not os.path.exists(backup_folderpath): |
| | os.makedirs(backup_folderpath) |
| | print(f'Created backup folder: {backup_folderpath}', flush=True) |
| | shutil.copy2(filepath, backup_filepath) |
| | print(f'Imported file from Google Drive backup: {filename}') |
| | elif filepath.startswith(os.path.join(GOOGLE_DRIVE_PATH, 'weights')) and filename.endswith('.pth'): |
| | weights_exist = True |
| | weights_filepath = os.path.join(WEIGHTS_FOLDER, os.path.relpath(filepath, os.path.join(GOOGLE_DRIVE_PATH, 'weights'))) |
| | weights_folderpath = os.path.dirname(weights_filepath) |
| | if not os.path.exists(weights_folderpath): |
| | os.makedirs(weights_folderpath) |
| | print(f'Created weights folder: {weights_folderpath}', flush=True) |
| | shutil.copy2(filepath, weights_filepath) |
| | print(f'Imported file from weights: {filename}') |
| | if weights_exist: |
| | print("Copied weights from Google Drive backup to local weights folder.") |
| | else: |
| | print("No weights found in Google Drive backup.") |
| | print("Google Drive backup import completed.") |
| |
|
| | def get_md5_hash(file_path): |
| | hash_md5 = hashlib.md5() |
| | with open(file_path, "rb") as f: |
| | for chunk in iter(lambda: f.read(4096), b""): |
| | hash_md5.update(chunk) |
| | return hash_md5.hexdigest() |
| |
|
| | def copy_weights_folder_to_drive(): |
| | destination_folder = os.path.join(GOOGLE_DRIVE_PATH, 'weights') |
| | try: |
| | if not os.path.exists(destination_folder): |
| | os.makedirs(destination_folder) |
| |
|
| | num_copied = 0 |
| | for filename in os.listdir(WEIGHTS_FOLDER): |
| | if filename.endswith('.pth'): |
| | source_file = os.path.join(WEIGHTS_FOLDER, filename) |
| | destination_file = os.path.join(destination_folder, filename) |
| | if not os.path.exists(destination_file): |
| | shutil.copy2(source_file, destination_file) |
| | num_copied += 1 |
| | print(f"Copied {filename} to Google Drive!") |
| |
|
| | if num_copied == 0: |
| | print("No new finished models found for copying.") |
| | else: |
| | print(f"Finished copying {num_copied} files to Google Drive!") |
| |
|
| | except Exception as e: |
| | print(f"An error occurred while copying weights: {str(e)}") |
| | |
| |
|
| | def backup_files(): |
| | print("\nStarting backup loop...") |
| | last_backup_timestamps_path = os.path.join(LOGS_FOLDER, 'last_backup_timestamps.txt') |
| | fully_updated = False |
| | |
| | while True: |
| | try: |
| | updated = False |
| | last_backup_timestamps = {} |
| |
|
| | try: |
| | with open(last_backup_timestamps_path, 'r') as f: |
| | last_backup_timestamps = dict(line.strip().split(':') for line in f) |
| | except FileNotFoundError: |
| | pass |
| | |
| | for root, dirs, files in os.walk(LOGS_FOLDER): |
| | for filename in files: |
| | if filename != 'last_backup_timestamps.txt': |
| | filepath = os.path.join(root, filename) |
| | if os.path.isfile(filepath): |
| | backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) |
| | backup_folderpath = os.path.dirname(backup_filepath) |
| | if not os.path.exists(backup_folderpath): |
| | os.makedirs(backup_folderpath) |
| | print(f'Created backup folder: {backup_folderpath}', flush=True) |
| | |
| | last_backup_timestamp = last_backup_timestamps.get(filepath) |
| | current_timestamp = os.path.getmtime(filepath) |
| | if last_backup_timestamp is None or float(last_backup_timestamp) < current_timestamp: |
| | shutil.copy2(filepath, backup_filepath) |
| | last_backup_timestamps[filepath] = str(current_timestamp) |
| | if last_backup_timestamp is None: |
| | print(f'Backed up file: {filename}') |
| | else: |
| | print(f'Updating backed up file: {filename}') |
| | updated = True |
| | fully_updated = False |
| | |
| | |
| | for filepath in list(last_backup_timestamps.keys()): |
| | if not os.path.exists(filepath): |
| | backup_filepath = os.path.join(GOOGLE_DRIVE_PATH, os.path.relpath(filepath, LOGS_FOLDER)) |
| | if os.path.exists(backup_filepath): |
| | os.remove(backup_filepath) |
| | print(f'Deleted file: {filepath}') |
| | del last_backup_timestamps[filepath] |
| | updated = True |
| | fully_updated = False |
| | |
| | if not updated and not fully_updated: |
| | print("Files are up to date.") |
| | fully_updated = True |
| | copy_weights_folder_to_drive() |
| | sleep_time = 15 |
| | else: |
| | sleep_time = 0.1 |
| | |
| | with open(last_backup_timestamps_path, 'w') as f: |
| | for filepath, timestamp in last_backup_timestamps.items(): |
| | f.write(f'{filepath}:{timestamp}\n') |
| | |
| | time.sleep(sleep_time) |
| | |
| | except Exception as e: |
| | print(f"An error occurred: {str(e)}") |
| | |
| |
|