Download_addon / scripts /colab_sd_models.py
AkinoKaze's picture
Upload 2 files
0d7237b verified
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from re import search
from shutil import disk_usage
from subprocess import PIPE, Popen, STDOUT, run
from time import sleep
import gradio as gr
from requests import get as requests_get, head as requests_head
from modules import script_callbacks, sd_models, shared
from modules.paths_internal import data_path
DL_COMMAND = 'wget -nv -t 10 --show-progress --progress=bar:force -q --content-disposition "{link}" -P {dl_path}'
WEBUI_ROOT = Path(data_path)
LINKS_FILE = WEBUI_ROOT / 'links.txt'
MODELS_FOLDER_PATH = Path(sd_models.model_path)
LORAS_FOLDER_PATH = Path(shared.cmd_opts.lora_dir)
EMBEDDINGS_FOLDER_PATH = Path(shared.cmd_opts.embeddings_dir)
CIVITAI_TOKEN = '542c1d6077168822e1b49e30e3437a5d'
def del_null_model():
null_model_path = MODELS_FOLDER_PATH / 'nullModel.ckpt'
if null_model_path.exists():
try:
null_model_path.unlink(missing_ok=True)
except:
pass
def find_mount_point():
path = Path(__file__).resolve()
while not path.is_mount():
path = path.parent
return path
def free_space():
total, used, free = disk_usage(find_mount_point())
power = 2 ** 10
n = 0
power_labels = {0: '', 1: 'Кило', 2: 'Мега', 3: 'Гига', 4: 'Тера'}
while free > power:
free /= power
n += 1
return f'{free:.2f} {power_labels[n]}байт'
def extract_url(command_eith_url):
pattern = r'["\']?((?:https?|ftp|ftps)://[^\s"\'<>]+)["\']?'
match = search(pattern, command_eith_url)
return match.group(1) if match else None
def hf_size(url: str) -> int:
try:
modified_url = url.replace('resolve', 'raw')
response = requests_get(modified_url, timeout=10)
response.raise_for_status()
content = response.text
size_str = content.split('size')[-1].strip().split()[0]
return int(size_str) if size_str.isdigit() else 0
except:
return 0
def cv_size(url: str) -> int:
try:
model_version_id = url.split('/')[-1]
response = requests_get(f'https://civitai.com/api/v1/model-versions/{model_version_id}?token={CIVITAI_TOKEN}', timeout=10)
response.raise_for_status()
files = response.json().get('files', [])
if files:
size_kb = files[0].get('sizeKB', 0)
return int(size_kb * 1024)
return 0
except:
return 0
def get_file_size(command_with_url: str) -> int:
url = extract_url(command_with_url)
if not url:
print(f'в строке `{command_with_url}` ссылка не найдена')
return 0
file_size = 0
if 'huggingface' in url:
file_size = hf_size(url)
elif 'civitai' in url:
file_size = cv_size(url)
if file_size:
return file_size
try:
response = requests_head(url, allow_redirects=True, timeout=10)
response.raise_for_status()
content_length = response.headers.get('Content-Length')
if content_length and content_length.isdigit():
return int(content_length)
except Exception:
pass
return 0
def get_total_file_size(urls: list):
total = 0
sizes = []
with ThreadPoolExecutor(max_workers=len(urls)) as executor:
futures = [executor.submit(get_file_size, url) for url in urls]
for future in as_completed(futures):
size = future.result()
sizes.append(size)
total += size
return total, sizes
def bytes_convert(size_bytes):
if size_bytes >= 1073741824:
return f'{round(size_bytes / 1073741824, 2)} ГБ'
else:
return f'{round(size_bytes / 1048576, 2)} МБ'
def get_own_links(ownmodels, ownloras, ownembeddings):
dl_commands = []
for text, dlpath in zip([ownmodels, ownloras, ownembeddings], [MODELS_FOLDER_PATH, LORAS_FOLDER_PATH, EMBEDDINGS_FOLDER_PATH]):
lines = text.split('\n')
for line in lines:
if line.strip():
link = line.strip() + (f"?token={CIVITAI_TOKEN}" if "?" not in line else f"&token={CIVITAI_TOKEN}") if "civitai" in line else line.strip()
dl_command = DL_COMMAND.format(link=link, dl_path=dlpath.resolve().as_posix())
dl_commands.append(dl_command)
LINKS_FILE.write_text('\n'.join(dl_commands).strip(), encoding='utf-8')
print('список загрузки сформирован...')
def on_ui_tabs():
with gr.Blocks() as models_list:
gr.HTML('<h1>Загрузка моделей</h1>')
ownmodels = gr.Textbox(label="Модели", lines=5)
ownloras = gr.Textbox(label="LoRA", lines=5)
ownembeddings = gr.Textbox(label="Внедрения", lines=5)
progress_slider = gr.Slider(minimum=0, maximum=100, value=0, label="Прогресс загрузки", interactive=False)
dl_result_box = gr.Textbox(label='Результат')
button = gr.Button("Сформировать ссылки")
button.click(get_own_links, inputs=[ownmodels, ownloras, ownembeddings])
def start_download():
try:
urls = LINKS_FILE.read_text(encoding='utf-8').splitlines()
LINKS_FILE.unlink(missing_ok=True)
total_file_size, sizes = get_total_file_size(urls)
total, used, free = disk_usage(find_mount_point())
if total_file_size > (free - 1073741824):
msg = f'Недостаточно места! Нужно: {bytes_convert(total_file_size)}, доступно: {bytes_convert(free)}'
print(msg)
return gr.update(value=0), msg
print(f'Загрузка {bytes_convert(total_file_size)} началась...')
total_urls = len(urls)
downloaded_size = 0
for idx, (command, size) in enumerate(zip(urls, sizes)):
for line in downloader(command):
print(line)
downloaded_size += size
progress = int(((idx + 1) / total_urls) * 100)
progress_text = f"Загружено {bytes_convert(downloaded_size)} из {bytes_convert(total_file_size)}"
yield gr.update(value=progress), progress_text
del_null_model()
return gr.update(value=100), "Загрузка завершена!"
except Exception as e:
return gr.update(value=0), f"ОШИБКА: {e}"
download_button = gr.Button("Скачать")
download_button.click(start_download, outputs=[progress_slider, dl_result_box])
return (models_list, 'Модели', 'models_list'),
script_callbacks.on_ui_tabs(on_ui_tabs)