| import os, shutil, sys, time |
|
|
| |
| _start_time = None |
| _last_time = None |
| _last_downloaded = 0 |
| _speed_history = [] |
| _update_interval = 0.5 |
|
|
| def progress_hook(block_num, block_size, total_size, filename=None): |
| """ |
| Simple progress bar hook for urlretrieve |
| |
| Args: |
| block_num: Number of blocks downloaded so far |
| block_size: Size of each block in bytes |
| total_size: Total size of the file in bytes |
| filename: Name of the file being downloaded (optional) |
| """ |
| global _start_time, _last_time, _last_downloaded, _speed_history, _update_interval |
| |
| current_time = time.time() |
| downloaded = block_num * block_size |
| |
| |
| if _start_time is None or block_num == 0: |
| _start_time = current_time |
| _last_time = current_time |
| _last_downloaded = 0 |
| _speed_history = [] |
| |
| |
| speed = 0 |
| if current_time - _last_time >= _update_interval: |
| if _last_time > 0: |
| current_speed = (downloaded - _last_downloaded) / (current_time - _last_time) |
| _speed_history.append(current_speed) |
| |
| if len(_speed_history) > 5: |
| _speed_history.pop(0) |
| |
| speed = sum(_speed_history) / len(_speed_history) |
| |
| _last_time = current_time |
| _last_downloaded = downloaded |
| elif _speed_history: |
| |
| speed = sum(_speed_history) / len(_speed_history) |
| |
| def format_bytes(bytes_val): |
| for unit in ['B', 'KB', 'MB', 'GB']: |
| if bytes_val < 1024: |
| return f"{bytes_val:.1f}{unit}" |
| bytes_val /= 1024 |
| return f"{bytes_val:.1f}TB" |
| |
| file_display = filename if filename else "Unknown file" |
| |
| if total_size <= 0: |
| |
| speed_str = f" @ {format_bytes(speed)}/s" if speed > 0 else "" |
| line = f"\r{file_display}: {format_bytes(downloaded)}{speed_str}" |
| |
| sys.stdout.write(line.ljust(80)) |
| sys.stdout.flush() |
| return |
| |
| downloaded = block_num * block_size |
| percent = min(100, (downloaded / total_size) * 100) |
| |
| |
| bar_length = 40 |
| filled = int(bar_length * percent / 100) |
| bar = '█' * filled + '░' * (bar_length - filled) |
| |
| |
| def format_bytes(bytes_val): |
| for unit in ['B', 'KB', 'MB', 'GB']: |
| if bytes_val < 1024: |
| return f"{bytes_val:.1f}{unit}" |
| bytes_val /= 1024 |
| return f"{bytes_val:.1f}TB" |
| |
| speed_str = f" @ {format_bytes(speed)}/s" if speed > 0 else "" |
| |
| |
| line = f"\r{file_display}: [{bar}] {percent:.1f}% ({format_bytes(downloaded)}/{format_bytes(total_size)}){speed_str}" |
| |
| sys.stdout.write(line.ljust(100)) |
| sys.stdout.flush() |
| |
| |
| if percent >= 100: |
| print() |
|
|
| |
| def create_progress_hook(filename): |
| """Creates a progress hook with the filename included""" |
| global _start_time, _last_time, _last_downloaded, _speed_history |
| |
| _start_time = None |
| _last_time = None |
| _last_downloaded = 0 |
| _speed_history = [] |
| |
| def hook(block_num, block_size, total_size): |
| return progress_hook(block_num, block_size, total_size, filename) |
| return hook |
|
|
|
|
| def process_files_def(repoId=None, sourceFolderList=None, fileList=None, targetFolderList=None): |
| from huggingface_hub import hf_hub_download, snapshot_download |
| from shared.utils import files_locator as fl |
|
|
| if targetFolderList is None: |
| targetFolderList = [None] * len(sourceFolderList) |
| for targetFolder, sourceFolder, files in zip(targetFolderList, sourceFolderList, fileList): |
| if targetFolder is not None and len(targetFolder) == 0: |
| targetFolder = None |
| explicit_target = targetFolder if targetFolder is not None else (sourceFolder if len(sourceFolder) > 0 else None) |
| targetRoot = fl.get_smart_download_root(explicit_target) |
| local_dir = os.path.join(targetRoot, targetFolder) if targetFolder is not None else targetRoot |
| if len(files) == 0: |
| if fl.locate_folder(sourceFolder if targetFolder is None else os.path.join(targetFolder, sourceFolder), error_if_none=False) is None: |
| snapshot_download(repo_id=repoId, allow_patterns=sourceFolder + "/*", local_dir=local_dir) |
| else: |
| for onefile in files: |
| if len(sourceFolder) > 0: |
| if fl.locate_file((sourceFolder + "/" + onefile) if targetFolder is None else os.path.join(targetFolder, sourceFolder, onefile), error_if_none=False) is None: |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=local_dir, subfolder=sourceFolder) |
| else: |
| if fl.locate_file(onefile if targetFolder is None else os.path.join(targetFolder, onefile), error_if_none=False) is None: |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=local_dir) |
|
|
|
|
| def _download_relpath(source_folder, filename, target_folder=None): |
| source_folder = "" if source_folder is None else source_folder |
| if target_folder is not None and len(target_folder) == 0: |
| target_folder = None |
| if target_folder is None: |
| return os.path.join(source_folder, filename) if len(source_folder) > 0 else filename |
| return os.path.join(target_folder, source_folder, filename) if len(source_folder) > 0 else os.path.join(target_folder, filename) |
|
|
|
|
| def download_def_missing_files(download_def): |
| from shared.utils import files_locator as fl |
|
|
| if download_def is None: |
| return [] |
| if isinstance(download_def, list): |
| missing = [] |
| for one_def in download_def: |
| missing.extend(download_def_missing_files(one_def)) |
| return missing |
| source_folders = download_def.get("sourceFolderList", []) |
| file_lists = download_def.get("fileList", []) |
| target_folders = download_def.get("targetFolderList") |
| if target_folders is None: |
| target_folders = [None] * len(source_folders) |
| missing = [] |
| for source_folder, files, target_folder in zip(source_folders, file_lists, target_folders): |
| if len(files) == 0: |
| rel_folder = _download_relpath(source_folder, "", target_folder).rstrip("\\/") |
| if fl.locate_folder(rel_folder, error_if_none=False) is None: |
| missing.append(rel_folder) |
| continue |
| for filename in files: |
| rel_path = _download_relpath(source_folder, filename, target_folder) |
| if fl.locate_file(rel_path, error_if_none=False) is None: |
| missing.append(rel_path) |
| return missing |
|
|
|
|
| def send_download_status(send_cmd=None, status_text=None): |
| if send_cmd is not None and status_text: |
| send_cmd("status", status_text) |
|
|
|
|
| def process_files_def_if_needed(download_def, send_cmd=None, status_text=None): |
| if download_def is None or len(download_def_missing_files(download_def)) == 0: |
| return False |
| send_download_status(send_cmd, status_text) |
| if isinstance(download_def, list): |
| for one_def in download_def: |
| process_files_def(**one_def) |
| else: |
| process_files_def(**download_def) |
| return True |
|
|
|
|
| def query_audio_background_replacement_download_def(): |
| return { |
| "repoId": "DeepBeepMeep/Wan2.1", |
| "sourceFolderList": ["roformer"], |
| "fileList": [["model_bs_roformer_ep_317_sdr_12.9755.ckpt", "model_bs_roformer_ep_317_sdr_12.9755.yaml", "download_checks.json"]], |
| } |
|
|
|
|
| def download_audio_background_replacement(send_cmd=None, status_text="Downloading audio background replacement model files..."): |
| return process_files_def_if_needed(query_audio_background_replacement_download_def(), send_cmd=send_cmd, status_text=status_text) |
|
|
|
|
| def query_speaker_separator_download_def(): |
| return [ |
| { |
| "repoId": "DeepBeepMeep/Wan2.1", |
| "sourceFolderList": ["pyannote"], |
| "fileList": [["pyannote_model_wespeaker-voxceleb-resnet34-LM.bin", "pytorch_model_segmentation-3.0.bin"]], |
| }, |
| { |
| "repoId": "DeepBeepMeep/LTX-2", |
| "sourceFolderList": ["sherpa"], |
| "fileList": [["sherpa-onnx-pyannote-segmentation-3-0/model.onnx", "3dspeaker_speech_eres2net_base_sv_zh-cn_3dspeaker_16k.onnx"]], |
| }, |
| ] |
|
|
|
|
| def download_speaker_separator(send_cmd=None, status_text="Downloading speaker separator model files..."): |
| return process_files_def_if_needed(query_speaker_separator_download_def(), send_cmd=send_cmd, status_text=status_text) |
|
|
|
|
| def process_download_defs(download_defs): |
| if isinstance(download_defs, dict): |
| process_files_def(**download_defs) |
| return |
| for download_def in download_defs or []: |
| if download_def is not None: |
| process_files_def(**download_def) |
|
|
|
|
| def download_file(url, filename): |
| from huggingface_hub import hf_hub_download |
| from shared.utils import files_locator as fl |
|
|
| url = url.split("|")[0] |
| if url.startswith("https://huggingface.co/") and "/resolve/main/" in url: |
| base_dir = os.path.dirname(filename) |
| url = url[len("https://huggingface.co/"):] |
| url_parts = url.split("/resolve/main/") |
| repoId = url_parts[0] |
| onefile = os.path.basename(url_parts[-1]) |
| sourceFolder = os.path.dirname(url_parts[-1]) |
| if len(sourceFolder) == 0: |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=fl.get_download_location() if len(base_dir) == 0 else base_dir) |
| else: |
| tgt = fl.get_download_location() if len(base_dir) == 0 else base_dir |
| os.makedirs(tgt, exist_ok=True) |
| temp_dir_path = os.path.join(tgt, f"_temp{time.time()}") |
| temp_full_path = os.path.join(temp_dir_path, sourceFolder) |
| os.makedirs(temp_full_path, exist_ok=True) |
| hf_hub_download(repo_id=repoId, filename=onefile, local_dir=temp_dir_path, subfolder=sourceFolder) |
| shutil.move(os.path.join(temp_full_path, onefile), tgt) |
| shutil.rmtree(temp_dir_path) |
| else: |
| from urllib.request import urlretrieve |
|
|
| urlretrieve(url, filename, create_progress_hook(filename)) |
|
|
|
|
|
|