dikdimon's picture
Upload extensions using SD-Hub extension
bb7f1f4 verified
import hashlib
import json
import os
import time
import zlib
import gradio as gr
from scripts.mo.environment import env
from scripts.mo.models import ModelType
from scripts.mo.utils import get_model_files_in_dir, find_preview_file, link_preview, read_hash_cache, \
calculate_file_temp_hash, write_hash_cache, calculate_sha256
def _ui_state_report():
with gr.Column():
gr.Button('Generate state report')
def _on_local_files_scan_click():
result = []
def search_in_dir(model_type) -> list:
dir_path = env.get_model_path(model_type)
local = []
files = get_model_files_in_dir(dir_path)
for file in files:
preview_file = find_preview_file(file)
rec = {
'filename': os.path.basename(file),
'model_type': model_type.value,
'path': file,
}
if preview_file is not None and preview_file:
prev = {
'preview_filename': os.path.basename(preview_file),
'preview_path': preview_file,
'preview_link': link_preview(preview_file)
}
rec.update(prev)
local.append(rec)
return local
result.extend(search_in_dir(ModelType.CHECKPOINT))
result.extend(search_in_dir(ModelType.VAE))
result.extend(search_in_dir(ModelType.LORA))
result.extend(search_in_dir(ModelType.HYPER_NETWORK))
result.extend(search_in_dir(ModelType.EMBEDDING))
result.extend(search_in_dir(ModelType.LYCORIS))
return gr.JSON.update(value=json.dumps(result))
def _ui_local_files():
with gr.Column():
scan_button = gr.Button('Scan Local Model files')
local_files_json = gr.JSON(label='Local files')
scan_button.click(fn=_on_local_files_scan_click,
outputs=local_files_json)
def _on_read_hash_click():
cache = read_hash_cache()
return [
gr.JSON.update(value=json.dumps(cache)),
gr.Button.update(visible=False)
]
def calculate_crc32(file_path):
# Initialize the CRC32 checksum
crc32 = 0
try:
# Open the file in binary mode
with open(file_path, "rb") as file:
# Read the file in chunks to conserve memory
chunk_size = 1024 # You can adjust this according to your needs
while True:
data = file.read(chunk_size)
if not data:
break
crc32 = zlib.crc32(data, crc32)
except FileNotFoundError:
print(f"File not found: {file_path}")
return None
# Ensure the CRC32 value is a positive integer
crc32 = crc32 & 0xFFFFFFFF
return hex(crc32)[2:]
def calculate_md5(file_path):
# Create an instance of the MD5 hash object
md5_hash = hashlib.md5()
try:
# Open the file in binary mode
with open(file_path, "rb") as file:
# Read the file in chunks to conserve memory
chunk_size = 8192 # You can adjust this according to your needs
while True:
data = file.read(chunk_size)
if not data:
break
md5_hash.update(data)
except FileNotFoundError:
print(f"File not found: {file_path}")
return None
# Get the MD5 hash value as a hexadecimal string
md5_hex = md5_hash.hexdigest()
return md5_hex
def calculate_adler32(file_path):
# Initialize the Adler-32 checksum
adler32_checksum = zlib.adler32(b'', 0)
try:
# Open the file in binary mode
with open(file_path, "rb") as file:
# Read the file in chunks to conserve memory
chunk_size = 1024 # You can adjust this according to your needs
while True:
data = file.read(chunk_size)
if not data:
break
adler32_checksum = zlib.adler32(data, adler32_checksum)
# Ensure the Adler-32 value is a positive integer
adler32_checksum &= 0xFFFFFFFF
return hex(adler32_checksum)[2:]
except FileNotFoundError:
print(f"File not found: {file_path}")
return None
def _on_calculate_hash_click():
result = []
def calc_in_dir(model_type) -> list:
dir_path = env.get_model_path(model_type)
local = []
files = get_model_files_in_dir(dir_path)
for file in files:
start_ms = int(time.time() * 1000)
sha256 = calculate_sha256(file)
time_spent_sha256 = int(time.time() * 1000) - start_ms
start_ms = int(time.time() * 1000)
crc32 = calculate_crc32(file)
time_spent_crc32 = int(time.time() * 1000) - start_ms
start_ms = int(time.time() * 1000)
md5 = calculate_md5(file)
time_spent_md5 = int(time.time() * 1000) - start_ms
start_ms = int(time.time() * 1000)
adler32 = calculate_adler32(file)
time_spent_adler32 = int(time.time() * 1000) - start_ms
rec = {
'path': file,
'file_size': os.path.getsize(file),
'temp_hash': calculate_file_temp_hash(file),
'sha256': sha256,
'sha256_time_ms': time_spent_sha256,
'crc32': crc32,
'crc32_time_ms': time_spent_crc32,
'md5': md5,
'md5_time_ms': time_spent_md5,
'adler32': adler32,
'adler32_time_ms': time_spent_adler32
}
local.append(rec)
return local
result.extend(calc_in_dir(ModelType.CHECKPOINT))
result.extend(calc_in_dir(ModelType.VAE))
result.extend(calc_in_dir(ModelType.LORA))
result.extend(calc_in_dir(ModelType.HYPER_NETWORK))
result.extend(calc_in_dir(ModelType.EMBEDDING))
result.extend(calc_in_dir(ModelType.LYCORIS))
return [
gr.JSON.update(value=json.dumps(result)),
gr.Button.update(visible=True)
]
def _on_compare_hash_click():
result = []
cache = read_hash_cache()
def find_in_cache(file_path, temp_hash):
for entry in cache:
if entry.get('path') == file_path and entry.get('temp_hash') == temp_hash and \
entry.get('sha256') is not None:
return entry['sha256']
def search_in_dir(model_type) -> list:
dir_path = env.get_model_path(model_type)
local = []
files = get_model_files_in_dir(dir_path)
for file in files:
temp_hash = calculate_file_temp_hash(file)
rec = {
'path': file,
'temp_hash': temp_hash,
'sha256': find_in_cache(file, temp_hash)
}
local.append(rec)
return local
result.extend(search_in_dir(ModelType.CHECKPOINT))
result.extend(search_in_dir(ModelType.VAE))
result.extend(search_in_dir(ModelType.LORA))
result.extend(search_in_dir(ModelType.HYPER_NETWORK))
result.extend(search_in_dir(ModelType.EMBEDDING))
result.extend(search_in_dir(ModelType.LYCORIS))
return [
gr.JSON.update(value=json.dumps(result)),
gr.Button.update(visible=False)
]
def _on_hash_cache_save_click(json_data):
write_hash_cache(json_data)
def _ui_hash_cache():
with gr.Column():
read_button = gr.Button('Read hash cache')
compare_hash_button = gr.Button('Compare hash with cache')
calculate_button = gr.Button('Calculate hashes')
save_hash_button = gr.Button('Save hash', visible=False)
hash_cache_json = gr.JSON(label='Local files')
read_button.click(fn=_on_read_hash_click, outputs=[hash_cache_json, save_hash_button])
calculate_button.click(fn=_on_calculate_hash_click, outputs=[hash_cache_json, save_hash_button])
compare_hash_button.click(fn=_on_compare_hash_click, outputs=[hash_cache_json, save_hash_button])
save_hash_button.click(fn=_on_hash_cache_save_click, inputs=hash_cache_json)
def _on_remove_duplicates_click():
records = env.storage.get_all_records()
counter_set = set()
duplicates_list = []
for record in records:
key = f'{record.name}-{record.url}'
if key in counter_set:
duplicates_list.append(record)
else:
counter_set.add(key)
for record in duplicates_list:
env.storage.remove_record(record.id_)
return f'{len(duplicates_list)} duplicates has been removed.'
def _on_remove_all_records_click():
records = env.storage.get_all_records()
for record in records:
env.storage.remove_record(record.id_)
return "All records has been removed."
def _ui_debug_utils():
with gr.Row():
with gr.Column():
remove_duplicates_button = gr.Button("Remove Records duplicate")
remove_all_records = gr.Button("Remove all Records")
with gr.Column():
debug_html_output = gr.HTML()
remove_duplicates_button.click(fn=_on_remove_duplicates_click, outputs=[debug_html_output])
remove_all_records.click(fn=_on_remove_all_records_click, outputs=[debug_html_output])
def debug_ui_block():
with gr.Column():
with gr.Row():
gr.Markdown('## Debug')
gr.Markdown('')
gr.Markdown('')
gr.Markdown('')
back_button = gr.Button('Back')
with gr.Tab('State report'):
_ui_state_report()
with gr.Tab('Local files'):
_ui_local_files()
with gr.Tab('Hash cache'):
_ui_hash_cache()
with gr.Tab('Utils'):
_ui_debug_utils()
back_button.click(fn=None, _js='navigateBack')