| import modules.async_worker as worker |
|
|
| from modules.settings import default_settings |
| import shared |
| import glob |
| from pathlib import Path |
| import datetime |
| import re |
| import json |
|
|
| from PIL import Image |
|
|
| |
|
|
| def search_for_words(searchfor, prompt): |
| searchfor = searchfor.lower() |
| prompt = prompt.lower() |
| words = searchfor.split(",") |
| result = True |
| for word in words: |
| word = word.strip() |
| if word not in prompt: |
| result = False |
| break |
| return result |
|
|
| def search(search_string, maxresults=10, callback=None): |
| images = [] |
| skip = 0 |
|
|
| folder=shared.path_manager.model_paths["temp_outputs_path"] |
| current_time = datetime.datetime.now() |
| daystr = current_time.strftime("%Y-%m-%d") |
|
|
| |
| searchfor = re.sub(r"search: *", "", search_string, count=1, flags=re.IGNORECASE) |
|
|
| chomp = True |
| while chomp: |
| chomp = False |
|
|
| |
| matchstr = r"^[0-9]{4}-[0-9]{2}-[0-9]{2}\s?" |
| match = re.match(matchstr, searchfor, re.IGNORECASE) |
| if match is not None: |
| daystr = match.group().strip() |
| searchfor = re.sub(matchstr, "", searchfor) |
| chomp = True |
|
|
| |
| matchstr = r"^all:\s?" |
| match = re.match(matchstr, searchfor, re.IGNORECASE) |
| if match is not None: |
| daystr = "*" |
| searchfor = re.sub(matchstr, "", searchfor) |
| chomp = True |
|
|
| |
| matchstr = r"^skip:\s?(?P<skip>[0-9]+)\s?" |
| match = re.match(matchstr, searchfor, re.IGNORECASE) |
| if match is not None: |
| skip = int(match.group("skip")) |
| searchfor = re.sub(matchstr, "", searchfor) |
| chomp = True |
|
|
| |
| matchstr = r"^\+(?P<skip>[0-9]+)\s?" |
| match = re.match(matchstr, searchfor, re.IGNORECASE) |
| if match is not None: |
| skip += int(match.group("skip")) |
| searchfor = re.sub(matchstr, "", searchfor) |
| chomp = True |
|
|
| |
| matchstr = r"^max:\s?(?P<max>[0-9]+)\s?" |
| match = re.match(matchstr, searchfor, re.IGNORECASE) |
| if match is not None: |
| maxresults = int(match.group("max")) |
| searchfor = re.sub(matchstr, "", searchfor) |
| chomp = True |
|
|
| searchfor = searchfor.strip() |
|
|
| |
| globs = ["*.png", "*.gif"] |
| pngs = set() |
| for g in globs: |
| for f in glob.glob(str(Path(folder) / daystr / g)): |
| pngs.add(f) |
| pngs = sorted(pngs) |
|
|
| found = 0 |
| for file in pngs: |
| im = Image.open(file) |
| metadata = {"prompt": ""} |
| if im.info.get("parameters"): |
| metadata = json.loads(im.info["parameters"]) |
|
|
| |
| if searchfor == "" or "Prompt" not in metadata or search_for_words(searchfor, metadata["Prompt"]): |
| |
| if callback is not None and found > skip: |
| callback(found - skip, 0, 0, maxresults, None) |
| images.append(file) |
| found += 1 |
| if found >= (maxresults + skip): |
| break |
|
|
| return images[skip:] |
|
|
|
|
| class pipeline: |
| pipeline_type = ["search"] |
|
|
| model_hash = "" |
|
|
| def parse_gen_data(self, gen_data): |
| gen_data["original_image_number"] = gen_data["image_number"] |
| gen_data["image_number"] = 1 |
| gen_data["show_preview"] = False |
| return gen_data |
|
|
| def load_base_model(self, name): |
| |
| return |
|
|
| def load_keywords(self, lora): |
| filename = lora.replace(".safetensors", ".txt") |
| try: |
| with open(filename, "r") as file: |
| data = file.read() |
| return data |
| except FileNotFoundError: |
| return " " |
|
|
| def load_loras(self, loras): |
| return |
|
|
| def refresh_controlnet(self, name=None): |
| return |
|
|
| def clean_prompt_cond_caches(self): |
| return |
|
|
|
|
| def process( |
| self, |
| gen_data=None, |
| callback=None, |
| ): |
| worker.add_result( |
| gen_data["task_id"], |
| "preview", |
| (-1, f"Searching ...", None) |
| ) |
| maxresults = gen_data["original_image_number"] |
| maxresults = 100 if maxresults <= 1 else maxresults |
|
|
| images = search(gen_data["positive_prompt"], maxresults=maxresults, callback=callback) |
|
|
| return images |
|
|