| |
| |
| import os |
| import time |
| import json |
| import re |
| import requests |
| from . import util |
| from . import model |
| from . import setting |
|
|
| suffix = ".civitai" |
|
|
| url_dict = { |
| "modelPage":"https://civitai.com/models/", |
| "modelId": "https://civitai.com/api/v1/models/", |
| "modelVersionId": "https://civitai.com/api/v1/model-versions/", |
| "hash": "https://civitai.com/api/v1/model-versions/by-hash/" |
| } |
|
|
| model_type_dict = { |
| "Checkpoint": "ckp", |
| "TextualInversion": "ti", |
| "Hypernetwork": "hyper", |
| "LORA": "lora", |
| "LoCon": "lora", |
| } |
|
|
|
|
|
|
| |
| |
| |
| def get_full_size_image_url(image_url, width): |
| return re.sub('/width=\d+/', '/width=' + str(width) + '/', image_url) |
|
|
|
|
| |
| |
| def get_model_info_by_hash(hash:str): |
| util.printD("Request model info from civitai") |
|
|
| if not hash: |
| util.printD("hash is empty") |
| return |
|
|
| r = requests.get(url_dict["hash"]+hash, headers=util.def_headers, proxies=util.proxies) |
| if not r.ok: |
| if r.status_code == 404: |
| |
| util.printD("Civitai does not have this model") |
| return {} |
| else: |
| util.printD("Get error code: " + str(r.status_code)) |
| util.printD(r.text) |
| return |
|
|
| |
| content = None |
| try: |
| content = r.json() |
| except Exception as e: |
| util.printD("Parse response json failed") |
| util.printD(str(e)) |
| util.printD("response:") |
| util.printD(r.text) |
| return |
| |
| if not content: |
| util.printD("error, content from civitai is None") |
| return |
| |
| return content |
|
|
|
|
|
|
| def get_model_info_by_id(id:str) -> dict: |
| util.printD("Request model info from civitai") |
|
|
| if not id: |
| util.printD("id is empty") |
| return |
|
|
| r = requests.get(url_dict["modelId"]+str(id), headers=util.def_headers, proxies=util.proxies) |
| if not r.ok: |
| if r.status_code == 404: |
| |
| util.printD("Civitai does not have this model") |
| return {} |
| else: |
| util.printD("Get error code: " + str(r.status_code)) |
| util.printD(r.text) |
| return |
|
|
| |
| content = None |
| try: |
| content = r.json() |
| except Exception as e: |
| util.printD("Parse response json failed") |
| util.printD(str(e)) |
| util.printD("response:") |
| util.printD(r.text) |
| return |
| |
| if not content: |
| util.printD("error, content from civitai is None") |
| return |
| |
| return content |
|
|
|
|
| def get_version_info_by_version_id(id:str) -> dict: |
| util.printD("Request version info from civitai") |
|
|
| if not id: |
| util.printD("id is empty") |
| return |
|
|
| r = requests.get(url_dict["modelVersionId"]+str(id), headers=util.def_headers, proxies=util.proxies) |
| if not r.ok: |
| if r.status_code == 404: |
| |
| util.printD("Civitai does not have this model version") |
| return {} |
| else: |
| util.printD("Get error code: " + str(r.status_code)) |
| util.printD(r.text) |
| return |
|
|
| |
| content = None |
| try: |
| content = r.json() |
| except Exception as e: |
| util.printD("Parse response json failed") |
| util.printD(str(e)) |
| util.printD("response:") |
| util.printD(r.text) |
| return |
| |
| if not content: |
| util.printD("error, content from civitai is None") |
| return |
| |
| return content |
|
|
|
|
| def get_version_info_by_model_id(id:str) -> dict: |
|
|
| model_info = get_model_info_by_id(id) |
| if not model_info: |
| util.printD(f"Failed to get model info by id: {id}") |
| return |
| |
| |
| if "modelVersions" not in model_info.keys(): |
| util.printD("There is no modelVersions in this model_info") |
| return |
| |
| if not model_info["modelVersions"]: |
| util.printD("modelVersions is None") |
| return |
| |
| if len(model_info["modelVersions"])==0: |
| util.printD("modelVersions is Empty") |
| return |
| |
| def_version = model_info["modelVersions"][0] |
| if not def_version: |
| util.printD("default version is None") |
| return |
| |
| if "id" not in def_version.keys(): |
| util.printD("default version has no id") |
| return |
| |
| version_id = def_version["id"] |
| |
| if not version_id: |
| util.printD("default version's id is None") |
| return |
|
|
| |
| version_info = get_version_info_by_version_id(str(version_id)) |
| if not version_info: |
| util.printD(f"Failed to get version info by version_id: {version_id}") |
| return |
|
|
| return version_info |
|
|
|
|
|
|
|
|
| |
| |
| |
| def load_model_info_by_search_term(model_type, search_term): |
| util.printD(f"Load model info of {search_term} in {model_type}") |
| if model_type not in model.folders.keys(): |
| util.printD("unknow model type: " + model_type) |
| return |
| |
| |
| base, ext = os.path.splitext(search_term) |
| model_info_base = base |
| if base[:1] == "/": |
| model_info_base = base[1:] |
|
|
| model_folder = model.folders[model_type] |
| model_info_filename = model_info_base + suffix + model.info_ext |
| model_info_filepath = os.path.join(model_folder, model_info_filename) |
|
|
| if not os.path.isfile(model_info_filepath): |
| util.printD("Can not find model info file: " + model_info_filepath) |
| return |
| |
| return model.load_model_info(model_info_filepath) |
|
|
|
|
|
|
|
|
|
|
| |
| |
| |
| |
| def get_model_names_by_type_and_filter(model_type:str, filter:dict) -> list: |
| |
| model_folder = model.folders[model_type] |
|
|
| |
| |
| no_info_only = False |
| empty_info_only = False |
|
|
| if filter: |
| if "no_info_only" in filter.keys(): |
| no_info_only = filter["no_info_only"] |
| if "empty_info_only" in filter.keys(): |
| empty_info_only = filter["empty_info_only"] |
|
|
|
|
|
|
| |
| |
| model_names = [] |
| for root, dirs, files in os.walk(model_folder, followlinks=True): |
| for filename in files: |
| item = os.path.join(root, filename) |
| |
| base, ext = os.path.splitext(item) |
| if ext in model.exts: |
| |
|
|
| |
| if no_info_only: |
| |
| info_file = base + suffix + model.info_ext |
| if os.path.isfile(info_file): |
| continue |
|
|
| if empty_info_only: |
| |
| info_file = base + suffix + model.info_ext |
| if os.path.isfile(info_file): |
| |
| model_info = model.load_model_info(info_file) |
| |
| if model_info: |
| if "id" in model_info.keys(): |
| |
| continue |
|
|
| model_names.append(filename) |
|
|
|
|
| return model_names |
|
|
| def get_model_names_by_input(model_type, empty_info_only): |
| return get_model_names_by_type_and_filter(model_type, {"empty_info_only":empty_info_only}) |
| |
|
|
| |
| def get_model_id_from_url(url:str) -> str: |
| util.printD("Run get_model_id_from_url") |
| id = "" |
|
|
| if not url: |
| util.printD("url or model id can not be empty") |
| return "" |
|
|
| if url.isnumeric(): |
| |
| id = str(url) |
| return id |
| |
| s = re.sub("\\?.+$", "", url).split("/") |
| if len(s) < 2: |
| util.printD("url is not valid") |
| return "" |
| |
| if s[-2].isnumeric(): |
| id = s[-2] |
| elif s[-1].isnumeric(): |
| id = s[-1] |
| else: |
| util.printD("There is no model id in this url") |
| return "" |
| |
| return id |
|
|
|
|
| |
| |
| def get_preview_image_by_model_path(model_path:str, max_size_preview, skip_nsfw_preview): |
| if not model_path: |
| util.printD("model_path is empty") |
| return |
|
|
| if not os.path.isfile(model_path): |
| util.printD("model_path is not a file: "+model_path) |
| return |
|
|
| base, ext = os.path.splitext(model_path) |
| first_preview = base+".png" |
| sec_preview = base+".preview.png" |
| info_file = base + suffix + model.info_ext |
|
|
| |
| if not os.path.isfile(sec_preview): |
| |
| util.printD("Checking preview image for model: " + model_path) |
| |
| if os.path.isfile(info_file): |
| model_info = model.load_model_info(info_file) |
| if not model_info: |
| util.printD("Model Info is empty") |
| return |
|
|
| if "images" in model_info.keys(): |
| if model_info["images"]: |
| for img_dict in model_info["images"]: |
| if "nsfw" in img_dict.keys(): |
| if img_dict["nsfw"]: |
| util.printD("This image is NSFW") |
| if skip_nsfw_preview: |
| util.printD("Skip NSFW image") |
| continue |
| |
| if "url" in img_dict.keys(): |
| img_url = img_dict["url"] |
| if max_size_preview: |
| |
| if "width" in img_dict.keys(): |
| if img_dict["width"]: |
| img_url = get_full_size_image_url(img_url, img_dict["width"]) |
|
|
| util.download_file(img_url, sec_preview) |
| |
| break |
|
|
|
|
|
|
| |
| |
| def search_local_model_info_by_version_id(folder:str, version_id:int) -> dict: |
| util.printD("Searching local model by version id") |
| util.printD("folder: " + folder) |
| util.printD("version_id: " + str(version_id)) |
|
|
| if not folder: |
| util.printD("folder is none") |
| return |
|
|
| if not os.path.isdir(folder): |
| util.printD("folder is not a dir") |
| return |
| |
| if not version_id: |
| util.printD("version_id is none") |
| return |
| |
| |
| for filename in os.listdir(folder): |
| |
| base, ext = os.path.splitext(filename) |
| if ext == model.info_ext: |
| |
| if len(base) < 9: |
| |
| continue |
|
|
| if base[-8:] == suffix: |
| |
| path = os.path.join(folder, filename) |
| model_info = model.load_model_info(path) |
| if not model_info: |
| continue |
|
|
| if "id" not in model_info.keys(): |
| continue |
|
|
| id = model_info["id"] |
| if not id: |
| continue |
|
|
| |
| if str(id) == str(version_id): |
| |
| return model_info |
| |
|
|
| return |
|
|
|
|
|
|
|
|
|
|
| |
| |
| def check_model_new_version_by_path(model_path:str, delay:float=1) -> tuple: |
| if not model_path: |
| util.printD("model_path is empty") |
| return |
|
|
| if not os.path.isfile(model_path): |
| util.printD("model_path is not a file: "+model_path) |
| return |
| |
| |
| base, ext = os.path.splitext(model_path) |
| info_file = base + suffix + model.info_ext |
| |
| if not os.path.isfile(info_file): |
| return |
| |
| |
| model_info_file = model.load_model_info(info_file) |
| if not model_info_file: |
| return |
|
|
| if "id" not in model_info_file.keys(): |
| return |
| |
| local_version_id = model_info_file["id"] |
| if not local_version_id: |
| return |
|
|
| if "modelId" not in model_info_file.keys(): |
| return |
| |
| model_id = model_info_file["modelId"] |
| if not model_id: |
| return |
| |
| |
| model_info = get_model_info_by_id(model_id) |
| |
| util.printD(f"delay:{delay} second") |
| time.sleep(delay) |
|
|
| if not model_info: |
| return |
| |
| if "modelVersions" not in model_info.keys(): |
| return |
| |
| modelVersions = model_info["modelVersions"] |
| if not modelVersions: |
| return |
| |
| if not len(modelVersions): |
| return |
| |
| current_version = modelVersions[0] |
| if not current_version: |
| return |
| |
| if "id" not in current_version.keys(): |
| return |
| |
| current_version_id = current_version["id"] |
| if not current_version_id: |
| return |
|
|
| util.printD(f"Compare version id, local: {local_version_id}, remote: {current_version_id} ") |
| if current_version_id == local_version_id: |
| return |
|
|
| model_name = "" |
| if "name" in model_info.keys(): |
| model_name = model_info["name"] |
| |
| if not model_name: |
| model_name = "" |
|
|
|
|
| new_version_name = "" |
| if "name" in current_version.keys(): |
| new_version_name = current_version["name"] |
| |
| if not new_version_name: |
| new_version_name = "" |
|
|
| description = "" |
| if "description" in current_version.keys(): |
| description = current_version["description"] |
| |
| if not description: |
| description = "" |
|
|
| downloadUrl = "" |
| if "downloadUrl" in current_version.keys(): |
| downloadUrl = current_version["downloadUrl"] |
| |
| if not downloadUrl: |
| downloadUrl = "" |
|
|
| |
| img_url = "" |
| if "images" in current_version.keys(): |
| if current_version["images"]: |
| if current_version["images"][0]: |
| if "url" in current_version["images"][0].keys(): |
| img_url = current_version["images"][0]["url"] |
| if not img_url: |
| img_url = "" |
|
|
|
|
| |
| return (model_path, model_id, model_name, current_version_id, new_version_name, description, downloadUrl, img_url) |
|
|
|
|
|
|
|
|
| |
| |
| |
| def check_models_new_version_by_model_types(model_types:list, delay:float=1) -> list: |
| util.printD("Checking models' new version") |
|
|
| if not model_types: |
| return [] |
|
|
| |
| mts = [] |
| if type(model_types) == str: |
| mts.append(model_types) |
| elif type(model_types) == list: |
| mts = model_types |
| else: |
| util.printD("Unknow model types:") |
| util.printD(model_types) |
| return [] |
|
|
| |
| output = "" |
| |
| new_versions = [] |
|
|
| |
| for model_type, model_folder in model.folders.items(): |
| if model_type not in mts: |
| continue |
|
|
| util.printD("Scanning path: " + model_folder) |
| for root, dirs, files in os.walk(model_folder, followlinks=True): |
| for filename in files: |
| |
| item = os.path.join(root, filename) |
| base, ext = os.path.splitext(item) |
| if ext in model.exts: |
| |
| r = check_model_new_version_by_path(item, delay) |
|
|
| if not r: |
| continue |
|
|
| model_path, model_id, model_name, current_version_id, new_version_name, description, downloadUrl, img_url = r |
| |
| if not current_version_id: |
| continue |
|
|
| |
| is_already_in_list = False |
| for new_version in new_versions: |
| if current_version_id == new_version[3]: |
| |
| is_already_in_list = True |
| break |
|
|
| if is_already_in_list: |
| util.printD("New version is already in list") |
| continue |
|
|
| |
| target_model_info = search_local_model_info_by_version_id(root, current_version_id) |
| if target_model_info: |
| util.printD("New version is already existed") |
| continue |
|
|
| |
| new_versions.append(r) |
|
|
|
|
|
|
|
|
| return new_versions |
|
|
|
|
|
|
|
|