| | |
| | |
| | import os |
| | import time |
| | from . import util |
| | from . import model |
| | from . import civitai |
| | from . import downloader |
| |
|
| |
|
| | |
| | |
| | def scan_model(scan_model_types, max_size_preview, skip_nsfw_preview): |
| | util.printD("Start scan_model") |
| | output = "" |
| |
|
| | |
| | if not scan_model_types: |
| | output = "Model Types is None, can not scan." |
| | util.printD(output) |
| | return output |
| | |
| | model_types = [] |
| | |
| | if type(scan_model_types) == str: |
| | model_types.append(scan_model_types) |
| | else: |
| | model_types = scan_model_types |
| | |
| | model_count = 0 |
| | image_count = 0 |
| | |
| | for model_type, model_folder in model.folders.items(): |
| | if model_type not in model_types: |
| | continue |
| |
|
| | util.printD("Scanning path: " + model_folder) |
| | for root, dirs, files in os.walk(model_folder, followlinks=True): |
| | for filename in files: |
| | |
| | item = os.path.join(root, filename) |
| | base, ext = os.path.splitext(item) |
| | if ext in model.exts: |
| | |
| | if len(base) > 4: |
| | if base[-4:] == model.vae_suffix: |
| | |
| | util.printD("This is a vae file: " + filename) |
| | continue |
| |
|
| | |
| | |
| | info_file = base + civitai.suffix + model.info_ext |
| | |
| | if not os.path.isfile(info_file): |
| | util.printD("Creating model info for: " + filename) |
| | |
| | hash = util.gen_file_sha256(item) |
| |
|
| | if not hash: |
| | output = "failed generating SHA256 for model:" + filename |
| | util.printD(output) |
| | return output |
| | |
| | |
| | model_info = civitai.get_model_info_by_hash(hash) |
| | |
| | if model_type == "ti": |
| | util.printD("Delay 1 second for TI") |
| | time.sleep(1) |
| |
|
| | if model_info is None: |
| | output = "Connect to Civitai API service failed. Wait a while and try again" |
| | util.printD(output) |
| | return output+", check console log for detail" |
| |
|
| | |
| | model.write_model_info(info_file, model_info) |
| |
|
| | |
| | model_count = model_count+1 |
| |
|
| | |
| | civitai.get_preview_image_by_model_path(item, max_size_preview, skip_nsfw_preview) |
| | image_count = image_count+1 |
| |
|
| |
|
| | |
| |
|
| | output = f"Done. Scanned {model_count} models, checked {image_count} images" |
| |
|
| | util.printD(output) |
| |
|
| | return output |
| |
|
| |
|
| |
|
| | |
| | |
| | def get_model_info_by_input(model_type, model_name, model_url_or_id, max_size_preview, skip_nsfw_preview): |
| | output = "" |
| | |
| | model_id = civitai.get_model_id_from_url(model_url_or_id) |
| | if not model_id: |
| | output = "failed to parse model id from url: " + model_url_or_id |
| | util.printD(output) |
| | return output |
| |
|
| | |
| | |
| | result = model.get_model_path_by_type_and_name(model_type, model_name) |
| | if not result: |
| | output = "failed to get model file path" |
| | util.printD(output) |
| | return output |
| | |
| | model_root, model_path = result |
| | if not model_path: |
| | output = "model path is empty" |
| | util.printD(output) |
| | return output |
| | |
| | |
| | base, ext = os.path.splitext(model_path) |
| | info_file = base + civitai.suffix + model.info_ext |
| |
|
| | |
| | |
| | model_info = civitai.get_version_info_by_model_id(model_id) |
| |
|
| | if not model_info: |
| | output = "failed to get model info from url: " + model_url_or_id |
| | util.printD(output) |
| | return output |
| | |
| | |
| | model.write_model_info(info_file, model_info) |
| |
|
| | util.printD("Saved model info to: "+ info_file) |
| |
|
| | |
| | civitai.get_preview_image_by_model_path(model_path, max_size_preview, skip_nsfw_preview) |
| |
|
| | output = "Model Info saved to: " + info_file |
| | return output |
| |
|
| |
|
| |
|
| | |
| | def check_models_new_version_to_md(model_types:list) -> str: |
| | new_versions = civitai.check_models_new_version_by_model_types(model_types, 1) |
| |
|
| | count = 0 |
| | output = "" |
| | if not new_versions: |
| | output = "No model has new version" |
| | else: |
| | output = "Found new version for following models: <br>" |
| | for new_version in new_versions: |
| | count = count+1 |
| | model_path, model_id, model_name, new_verion_id, new_version_name, description, download_url, img_url = new_version |
| | |
| | |
| | |
| | |
| | url = civitai.url_dict["modelPage"]+str(model_id) |
| |
|
| | part = f'<div style="font-size:20px;margin:6px 0px;"><b>Model: <a href="{url}" target="_blank"><u>{model_name}</u></a></b></div>' |
| | part = part + f'<div style="font-size:16px">File: {model_path}</div>' |
| | if download_url: |
| | |
| | model_path = model_path.replace('\\', '\\\\') |
| | part = part + f'<div style="font-size:16px;margin:6px 0px;">New Version: <u><a href="{download_url}" target="_blank" style="margin:0px 10px;">{new_version_name}</a></u>' |
| | |
| | part = part + " " |
| | |
| | part = part + f"<u><a href='#' style='margin:0px 10px;' onclick=\"ch_dl_model_new_version(event, '{model_path}', '{new_verion_id}', '{download_url}')\">[Download into SD]</a></u>" |
| | |
| | else: |
| | part = part + f'<div style="font-size:16px;margin:6px 0px;">New Version: {new_version_name}' |
| | part = part + '</div>' |
| |
|
| | |
| | if description: |
| | part = part + '<blockquote style="font-size:16px;margin:6px 0px;">'+ description + '</blockquote><br>' |
| |
|
| | |
| | if img_url: |
| | part = part + f"<img src='{img_url}'><br>" |
| | |
| |
|
| | output = output + part |
| |
|
| | util.printD(f"Done. Find {count} models have new version. Check UI for detail.") |
| |
|
| | return output |
| |
|
| |
|
| | |
| | def get_model_info_by_url(model_url_or_id:str) -> tuple: |
| | util.printD("Getting model info by: " + model_url_or_id) |
| |
|
| | |
| | model_id = civitai.get_model_id_from_url(model_url_or_id) |
| | if not model_id: |
| | util.printD("failed to parse model id from url or id") |
| | return |
| |
|
| | model_info = civitai.get_model_info_by_id(model_id) |
| | if model_info is None: |
| | util.printD("Connect to Civitai API service failed. Wait a while and try again") |
| | return |
| | |
| | if not model_info: |
| | util.printD("failed to get model info from url or id") |
| | return |
| | |
| | |
| | |
| | if "type" not in model_info.keys(): |
| | util.printD("model type is not in model_info") |
| | return |
| | |
| | civitai_model_type = model_info["type"] |
| | if civitai_model_type not in civitai.model_type_dict.keys(): |
| | util.printD("This model type is not supported:"+civitai_model_type) |
| | return |
| | |
| | model_type = civitai.model_type_dict[civitai_model_type] |
| |
|
| | |
| | if "name" not in model_info.keys(): |
| | util.printD("model name is not in model_info") |
| | return |
| |
|
| | model_name = model_info["name"] |
| | if not model_name: |
| | util.printD("model name is Empty") |
| | model_name = "" |
| |
|
| | |
| | if "modelVersions" not in model_info.keys(): |
| | util.printD("modelVersions is not in model_info") |
| | return |
| | |
| | modelVersions = model_info["modelVersions"] |
| | if not modelVersions: |
| | util.printD("modelVersions is Empty") |
| | return |
| | |
| | version_strs = [] |
| | for version in modelVersions: |
| | |
| | |
| | |
| | version_str = version["name"]+"_"+str(version["id"]) |
| | version_strs.append(version_str) |
| |
|
| | |
| | folder = model.folders[model_type] |
| | |
| | subfolders = util.get_subfolders(folder) |
| | if not subfolders: |
| | subfolders = [] |
| |
|
| | |
| | subfolders.append("/") |
| |
|
| | util.printD("Get following info for downloading:") |
| | util.printD(f"model_name:{model_name}") |
| | util.printD(f"model_type:{model_type}") |
| | util.printD(f"subfolders:{subfolders}") |
| | util.printD(f"version_strs:{version_strs}") |
| |
|
| | return (model_info, model_name, model_type, subfolders, version_strs) |
| |
|
| | |
| | def get_ver_info_by_ver_str(version_str:str, model_info:dict) -> dict: |
| | if not version_str: |
| | util.printD("version_str is empty") |
| | return |
| | |
| | if not model_info: |
| | util.printD("model_info is None") |
| | return |
| | |
| | |
| | if "modelVersions" not in model_info.keys(): |
| | util.printD("modelVersions is not in model_info") |
| | return |
| |
|
| | modelVersions = model_info["modelVersions"] |
| | if not modelVersions: |
| | util.printD("modelVersions is Empty") |
| | return |
| | |
| | |
| | version = None |
| | for ver in modelVersions: |
| | |
| | |
| | |
| | ver_str = ver["name"]+"_"+str(ver["id"]) |
| | if ver_str == version_str: |
| | |
| | version = ver |
| |
|
| | if not version: |
| | util.printD("can not find version by version string: " + version_str) |
| | return |
| | |
| | |
| | if "id" not in version.keys(): |
| | util.printD("this version has no id") |
| | return |
| | |
| | return version |
| |
|
| |
|
| | |
| | |
| | def get_id_and_dl_url_by_version_str(version_str:str, model_info:dict) -> tuple: |
| | if not version_str: |
| | util.printD("version_str is empty") |
| | return |
| | |
| | if not model_info: |
| | util.printD("model_info is None") |
| | return |
| | |
| | |
| | if "modelVersions" not in model_info.keys(): |
| | util.printD("modelVersions is not in model_info") |
| | return |
| |
|
| | modelVersions = model_info["modelVersions"] |
| | if not modelVersions: |
| | util.printD("modelVersions is Empty") |
| | return |
| | |
| | |
| | version = None |
| | for ver in modelVersions: |
| | |
| | |
| | |
| | ver_str = ver["name"]+"_"+str(ver["id"]) |
| | if ver_str == version_str: |
| | |
| | version = ver |
| |
|
| | if not version: |
| | util.printD("can not find version by version string: " + version_str) |
| | return |
| | |
| | |
| | if "id" not in version.keys(): |
| | util.printD("this version has no id") |
| | return |
| | |
| | version_id = version["id"] |
| | if not version_id: |
| | util.printD("version id is Empty") |
| | return |
| |
|
| | |
| | if "downloadUrl" not in version.keys(): |
| | util.printD("downloadUrl is not in this version") |
| | return |
| | |
| | downloadUrl = version["downloadUrl"] |
| | if not downloadUrl: |
| | util.printD("downloadUrl is Empty") |
| | return |
| | |
| | util.printD("Get Download Url: " + downloadUrl) |
| | |
| | return (version_id, downloadUrl) |
| | |
| |
|
| | |
| | |
| | def dl_model_by_input(model_info:dict, model_type:str, subfolder_str:str, version_str:str, dl_all_bool:bool, max_size_preview:bool, skip_nsfw_preview:bool) -> str: |
| |
|
| | output = "" |
| |
|
| | if not model_info: |
| | output = "model_info is None" |
| | util.printD(output) |
| | return output |
| | |
| | if not model_type: |
| | output = "model_type is None" |
| | util.printD(output) |
| | return output |
| | |
| | if not subfolder_str: |
| | output = "subfolder string is None" |
| | util.printD(output) |
| | return output |
| | |
| | if not version_str: |
| | output = "version_str is None" |
| | util.printD(output) |
| | return output |
| | |
| | |
| | if model_type not in model.folders.keys(): |
| | output = "Unknow model type: "+model_type |
| | util.printD(output) |
| | return output |
| | |
| | model_root_folder = model.folders[model_type] |
| |
|
| |
|
| | |
| | subfolder = "" |
| | if subfolder_str == "/" or subfolder_str == "\\": |
| | subfolder = "" |
| | elif subfolder_str[:1] == "/" or subfolder_str[:1] == "\\": |
| | subfolder = subfolder_str[1:] |
| | else: |
| | subfolder = subfolder_str |
| |
|
| | |
| | model_folder = os.path.join(model_root_folder, subfolder) |
| | if not os.path.isdir(model_folder): |
| | output = "Model folder is not a dir: "+ model_folder |
| | util.printD(output) |
| | return output |
| | |
| | |
| | ver_info = get_ver_info_by_ver_str(version_str, model_info) |
| | if not ver_info: |
| | output = "Fail to get version info, check console log for detail" |
| | util.printD(output) |
| | return output |
| | |
| | version_id = ver_info["id"] |
| |
|
| |
|
| | if dl_all_bool: |
| | |
| | |
| | download_urls = [] |
| | if "files" in ver_info.keys(): |
| | for file_info in ver_info["files"]: |
| | if "downloadUrl" in file_info.keys(): |
| | download_urls.append(file_info["downloadUrl"]) |
| |
|
| | if not len(download_urls): |
| | if "downloadUrl" in ver_info.keys(): |
| | download_urls.append(ver_info["downloadUrl"]) |
| |
|
| |
|
| | |
| | r = civitai.search_local_model_info_by_version_id(model_folder, version_id) |
| | if r: |
| | output = "This model version is already existed" |
| | util.printD(output) |
| | return output |
| | |
| | |
| | filepath = "" |
| | for url in download_urls: |
| | model_filepath = downloader.dl(url, model_folder, None, None) |
| | if not model_filepath: |
| | output = "Downloading failed, check console log for detail" |
| | util.printD(output) |
| | return output |
| | |
| | if url == ver_info["downloadUrl"]: |
| | filepath = model_filepath |
| | else: |
| | |
| | |
| | url = ver_info["downloadUrl"] |
| | if not url: |
| | output = "Fail to get download url, check console log for detail" |
| | util.printD(output) |
| | return output |
| | |
| | |
| | filepath = downloader.dl(url, model_folder, None, None) |
| | if not filepath: |
| | output = "Downloading failed, check console log for detail" |
| | util.printD(output) |
| | return output |
| |
|
| |
|
| | if not filepath: |
| | filepath = model_filepath |
| | |
| | |
| | version_info = civitai.get_version_info_by_version_id(version_id) |
| | if not version_info: |
| | output = "Model downloaded, but failed to get version info, check console log for detail. Model saved to: " + filepath |
| | util.printD(output) |
| | return output |
| |
|
| | |
| | base, ext = os.path.splitext(filepath) |
| | info_file = base + civitai.suffix + model.info_ext |
| | model.write_model_info(info_file, version_info) |
| |
|
| | |
| | civitai.get_preview_image_by_model_path(filepath, max_size_preview, skip_nsfw_preview) |
| | |
| | output = "Done. Model downloaded to: " + filepath |
| | util.printD(output) |
| | return output |
| |
|