from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from re import search from shutil import disk_usage from subprocess import PIPE, Popen, STDOUT, run from time import sleep import gradio as gr from requests import get as requests_get, head as requests_head from modules import script_callbacks, sd_models, shared from modules.paths_internal import data_path DL_COMMAND = 'wget -nv -t 10 --show-progress --progress=bar:force -q --content-disposition "{link}" -P {dl_path}' WEBUI_ROOT = Path(data_path) LINKS_FILE = WEBUI_ROOT / 'links.txt' MODELS_FOLDER_PATH = Path(sd_models.model_path) LORAS_FOLDER_PATH = Path(shared.cmd_opts.lora_dir) EMBEDDINGS_FOLDER_PATH = Path(shared.cmd_opts.embeddings_dir) CIVITAI_TOKEN = '542c1d6077168822e1b49e30e3437a5d' def del_null_model(): null_model_path = MODELS_FOLDER_PATH / 'nullModel.ckpt' if null_model_path.exists(): try: null_model_path.unlink(missing_ok=True) except: pass def find_mount_point(): path = Path(__file__).resolve() while not path.is_mount(): path = path.parent return path def free_space(): total, used, free = disk_usage(find_mount_point()) power = 2 ** 10 n = 0 power_labels = {0: '', 1: 'Кило', 2: 'Мега', 3: 'Гига', 4: 'Тера'} while free > power: free /= power n += 1 return f'{free:.2f} {power_labels[n]}байт' def extract_url(command_eith_url): pattern = r'["\']?((?:https?|ftp|ftps)://[^\s"\'<>]+)["\']?' match = search(pattern, command_eith_url) return match.group(1) if match else None def hf_size(url: str) -> int: try: modified_url = url.replace('resolve', 'raw') response = requests_get(modified_url, timeout=10) response.raise_for_status() content = response.text size_str = content.split('size')[-1].strip().split()[0] return int(size_str) if size_str.isdigit() else 0 except: return 0 def cv_size(url: str) -> int: try: model_version_id = url.split('/')[-1] response = requests_get(f'https://civitai.com/api/v1/model-versions/{model_version_id}?token={CIVITAI_TOKEN}', timeout=10) response.raise_for_status() files = response.json().get('files', []) if files: size_kb = files[0].get('sizeKB', 0) return int(size_kb * 1024) return 0 except: return 0 def get_file_size(command_with_url: str) -> int: url = extract_url(command_with_url) if not url: print(f'в строке `{command_with_url}` ссылка не найдена') return 0 file_size = 0 if 'huggingface' in url: file_size = hf_size(url) elif 'civitai' in url: file_size = cv_size(url) if file_size: return file_size try: response = requests_head(url, allow_redirects=True, timeout=10) response.raise_for_status() content_length = response.headers.get('Content-Length') if content_length and content_length.isdigit(): return int(content_length) except Exception: pass return 0 def get_total_file_size(urls: list): total = 0 sizes = [] with ThreadPoolExecutor(max_workers=len(urls)) as executor: futures = [executor.submit(get_file_size, url) for url in urls] for future in as_completed(futures): size = future.result() sizes.append(size) total += size return total, sizes def bytes_convert(size_bytes): if size_bytes >= 1073741824: return f'{round(size_bytes / 1073741824, 2)} ГБ' else: return f'{round(size_bytes / 1048576, 2)} МБ' def get_own_links(ownmodels, ownloras, ownembeddings): dl_commands = [] for text, dlpath in zip([ownmodels, ownloras, ownembeddings], [MODELS_FOLDER_PATH, LORAS_FOLDER_PATH, EMBEDDINGS_FOLDER_PATH]): lines = text.split('\n') for line in lines: if line.strip(): link = line.strip() + (f"?token={CIVITAI_TOKEN}" if "?" not in line else f"&token={CIVITAI_TOKEN}") if "civitai" in line else line.strip() dl_command = DL_COMMAND.format(link=link, dl_path=dlpath.resolve().as_posix()) dl_commands.append(dl_command) LINKS_FILE.write_text('\n'.join(dl_commands).strip(), encoding='utf-8') print('список загрузки сформирован...') def on_ui_tabs(): with gr.Blocks() as models_list: gr.HTML('

Загрузка моделей

') ownmodels = gr.Textbox(label="Модели", lines=5) ownloras = gr.Textbox(label="LoRA", lines=5) ownembeddings = gr.Textbox(label="Внедрения", lines=5) progress_slider = gr.Slider(minimum=0, maximum=100, value=0, label="Прогресс загрузки", interactive=False) dl_result_box = gr.Textbox(label='Результат') button = gr.Button("Сформировать ссылки") button.click(get_own_links, inputs=[ownmodels, ownloras, ownembeddings]) def start_download(): try: urls = LINKS_FILE.read_text(encoding='utf-8').splitlines() LINKS_FILE.unlink(missing_ok=True) total_file_size, sizes = get_total_file_size(urls) total, used, free = disk_usage(find_mount_point()) if total_file_size > (free - 1073741824): msg = f'Недостаточно места! Нужно: {bytes_convert(total_file_size)}, доступно: {bytes_convert(free)}' print(msg) return gr.update(value=0), msg print(f'Загрузка {bytes_convert(total_file_size)} началась...') total_urls = len(urls) downloaded_size = 0 for idx, (command, size) in enumerate(zip(urls, sizes)): for line in downloader(command): print(line) downloaded_size += size progress = int(((idx + 1) / total_urls) * 100) progress_text = f"Загружено {bytes_convert(downloaded_size)} из {bytes_convert(total_file_size)}" yield gr.update(value=progress), progress_text del_null_model() return gr.update(value=100), "Загрузка завершена!" except Exception as e: return gr.update(value=0), f"ОШИБКА: {e}" download_button = gr.Button("Скачать") download_button.click(start_download, outputs=[progress_slider, dl_result_box]) return (models_list, 'Модели', 'models_list'), script_callbacks.on_ui_tabs(on_ui_tabs)