import gradio as gr
import os
import json
import html
from collections import defaultdict
from shared.utils.plugins import WAN2GPPlugin
from shared.utils import files_locator as fl
from mmgp import quant_router
class modelsManagerPlugin(WAN2GPPlugin):
def __init__(self):
super().__init__()
self.name = "Models Manager"
self.version = "1.0.0"
self.description = "Inspect and manage installed model dependencies"
self._node_map = {}
self._tree_data = []
self._model_files = {}
self._model_primary = {}
self._model_variants = {}
self._variant_to_model = {}
self._model_base_label = {}
self._model_family_label = {}
self._model_usage_ids = {}
self._usage_files = {}
self._file_usage = {}
self._file_usage_models = {}
self._file_info = {}
self._basename_index = {}
self._stats_total = 0
self._stats_by_drive = []
self._errors = []
self._repo_root = os.path.abspath(os.getcwd())
def setup_ui(self):
self.request_global("get_model_def")
self.request_global("get_model_name")
self.request_global("get_model_family")
self.request_global("get_parent_model_type")
self.request_global("get_base_model_type")
self.request_global("get_model_handler")
self.request_global("get_model_recursive_prop")
self.request_global("get_model_filename")
self.request_global("get_local_model_filename")
self.request_global("get_lora_dir")
self.request_global("compact_name")
self.request_global("create_models_hierarchy")
self.request_global("families_infos")
self.request_global("models_def")
self.request_global("transformer_quantization")
self.request_global("transformer_dtype_policy")
self.request_global("text_encoder_quantization")
self.request_global("displayed_model_types")
self.request_global("transformer_types")
self.add_custom_js(self._get_js())
self.add_tab(
tab_id="model_manager",
label="Models Manager",
component_constructor=self.create_model_ui,
)
def create_model_ui(self):
with gr.Column():
gr.Markdown("### Models Manager")
gr.Markdown(
"Sizes are unique GB recoverable on delete, plus shared GB used outside the branch."
)
with gr.Row():
with gr.Column(scale=3, min_width=550):
self.tree_html = gr.HTML()
with gr.Column(scale=2, min_width=350, elem_classes="ckpt-right-column"):
self.view_html = gr.HTML()
self.refresh_button = gr.Button(
"Refresh", variant="secondary", elem_id="ckpt_refresh_btn"
)
with gr.Column(visible=False):
self.action_input = gr.Textbox(elem_id="model_action_input")
self.refresh_button.click(
fn=self._show_loading,
inputs=[],
outputs=[self.tree_html, self.view_html, self.refresh_button],
show_progress="hidden",
).then(
fn=self._build_tree,
inputs=[],
outputs=[self.tree_html, self.view_html, self.refresh_button],
show_progress="full",
)
self.action_input.change(
fn=self._handle_action,
inputs=[self.action_input],
outputs=[self.tree_html, self.view_html, self.refresh_button],
show_progress="hidden",
)
self.on_tab_outputs = [self.tree_html, self.view_html, self.refresh_button]
return self.tree_html
def on_tab_select(self, state: dict):
return self._show_loading()
def _get_js(self):
return """
function modelRoot() {
if (window.gradioApp) {
return window.gradioApp();
}
const app = document.querySelector('gradio-app');
return app ? (app.shadowRoot || app) : document;
}
window.updatemodelInput = function(elem_id, value) {
const root = modelRoot();
const input = root.querySelector(`#${elem_id} textarea, #${elem_id} input`);
if (input) {
input.value = value;
input.dispatchEvent(new Event('input', { bubbles: true }));
return true;
}
return false;
};
function ensuremodelModal() {
let backdrop = document.getElementById('ckpt-delete-modal');
if (backdrop) return backdrop;
backdrop = document.createElement('div');
backdrop.id = 'ckpt-delete-modal';
backdrop.className = 'ckpt-modal-backdrop';
backdrop.innerHTML = `
Confirm Delete
Cancel
Delete Unique
Delete Unique + Shared
`;
document.body.appendChild(backdrop);
const cancelBtn = backdrop.querySelector('#ckpt-delete-cancel');
const uniqueBtn = backdrop.querySelector('#ckpt-delete-unique');
const allBtn = backdrop.querySelector('#ckpt-delete-all');
cancelBtn.addEventListener('click', () => {
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
uniqueBtn.addEventListener('click', () => {
const payload = backdrop.dataset.payloadUnique || '';
if (payload) {
window.updatemodelInput("model_action_input", payload);
}
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
allBtn.addEventListener('click', () => {
const payload = backdrop.dataset.payloadAll || '';
if (payload) {
window.updatemodelInput("model_action_input", payload);
}
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
backdrop.addEventListener('click', (evt) => {
if (evt.target === backdrop) {
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
}
});
return backdrop;
}
function showmodelModal(message, payloadUnique, payloadAll, uniqueSize, totalSize) {
const backdrop = ensuremodelModal();
const body = backdrop.querySelector('#ckpt-delete-modal-body');
const uniqueBtn = backdrop.querySelector('#ckpt-delete-unique');
const allBtn = backdrop.querySelector('#ckpt-delete-all');
body.textContent = message;
backdrop.dataset.payloadUnique = payloadUnique || '';
backdrop.dataset.payloadAll = payloadAll || '';
uniqueBtn.textContent = uniqueSize ? `Delete Unique (${uniqueSize})` : 'Delete Unique';
allBtn.textContent = totalSize ? `Delete Unique + Shared (${totalSize})` : 'Delete Unique + Shared';
backdrop.style.display = 'flex';
}
window.handlemodelAction = function(button, action) {
if (!button) return;
const nodeId = button.dataset.nodeId;
const nodeLabel = button.dataset.nodeLabel || nodeId;
const uniqueSize = button.dataset.uniqueSize || "0 B";
const totalSize = button.dataset.totalSize || uniqueSize;
if (!nodeId) return;
if (action === "delete") {
const payloadUnique = JSON.stringify({
action: action,
node_id: nodeId,
delete_shared: false,
ts: Date.now()
});
const payloadAll = JSON.stringify({
action: action,
node_id: nodeId,
delete_shared: true,
ts: Date.now()
});
showmodelModal(
`Choose delete scope for "${nodeLabel}": unique only ${uniqueSize}, or unique + shared ${totalSize}.`,
payloadUnique,
payloadAll,
uniqueSize,
totalSize
);
return;
}
const payload = JSON.stringify({
action: action,
node_id: nodeId,
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
window.handlemodelRowClick = function(evt, row) {
if (evt) evt.stopPropagation();
if (!row) return;
const nodeId = row.dataset.nodeId;
if (!nodeId) return;
const payload = JSON.stringify({
action: "view",
node_id: nodeId,
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
function ismodelTabActive() {
const root = modelRoot();
const tabs = root.querySelectorAll('button[role="tab"]');
for (const tab of tabs) {
if (tab.getAttribute('aria-selected') === 'true') {
const label = (tab.textContent || '').trim();
if (label === 'Models Manager') return true;
}
}
return false;
}
function triggermodelRefresh() {
const root = modelRoot();
const btn = root.querySelector('#ckpt_refresh_btn button') || root.querySelector('#ckpt_refresh_btn');
if (btn) btn.click();
}
function applymodelSticky() {
const root = modelRoot();
const sticky = root.querySelector('.ckpt-view-sticky');
if (!sticky) return;
sticky.style.position = 'sticky';
sticky.style.top = '16px';
sticky.style.alignSelf = 'flex-start';
let parent = sticky.parentElement;
let steps = 0;
while (parent && steps < 6) {
const style = window.getComputedStyle(parent);
if (style.overflow && style.overflow !== 'visible') {
parent.style.overflow = 'visible';
}
parent = parent.parentElement;
steps += 1;
}
if (root && root.host && root.host.style) {
const hostStyle = window.getComputedStyle(root.host);
if (hostStyle.overflow && hostStyle.overflow !== 'visible') {
root.host.style.overflow = 'visible';
}
}
}
let ckptFloating = {
sticky: null,
scrollEl: null,
handler: null,
resizeHandler: null,
anchorTop: 0,
topPadding: 16,
};
function findmodelScrollContainer(element) {
let current = element;
while (current) {
const style = window.getComputedStyle(current);
const overflowY = style.overflowY;
if ((overflowY === 'auto' || overflowY === 'scroll') && current.scrollHeight > current.clientHeight + 1) {
return current;
}
current = current.parentElement;
}
const root = modelRoot();
const host = root && root.host ? root.host : null;
current = host;
while (current) {
const style = window.getComputedStyle(current);
const overflowY = style.overflowY;
if ((overflowY === 'auto' || overflowY === 'scroll') && current.scrollHeight > current.clientHeight + 1) {
return current;
}
current = current.parentElement;
}
return window;
}
function computemodelAnchorTop(sticky, scrollEl) {
if (scrollEl === window) {
return sticky.getBoundingClientRect().top + window.scrollY;
}
const rect = sticky.getBoundingClientRect();
const containerRect = scrollEl.getBoundingClientRect();
return rect.top - containerRect.top + scrollEl.scrollTop;
}
function clearmodelFloating(sticky) {
if (!sticky) return;
sticky.style.transform = '';
sticky.style.willChange = '';
}
function updatemodelFloating() {
const sticky = ckptFloating.sticky;
if (!sticky) return;
const scrollEl = ckptFloating.scrollEl || window;
const scrollTop = scrollEl === window ? window.scrollY : scrollEl.scrollTop;
let maxTranslate = Infinity;
if (scrollEl === window) {
const doc = document.documentElement;
maxTranslate = Math.max(
0,
(doc ? doc.scrollHeight : 0) - sticky.offsetHeight - ckptFloating.topPadding - ckptFloating.anchorTop
);
} else {
maxTranslate = Math.max(
0,
scrollEl.scrollHeight - sticky.offsetHeight - ckptFloating.topPadding - ckptFloating.anchorTop
);
}
const offset = scrollTop + ckptFloating.topPadding - ckptFloating.anchorTop;
const translate = Math.max(0, Math.min(offset, maxTranslate));
if (translate > 0) {
sticky.style.transform = `translateY(${translate}px)`;
sticky.style.willChange = 'transform';
} else {
clearmodelFloating(sticky);
}
}
function teardownmodelFloating() {
if (ckptFloating.scrollEl && ckptFloating.handler) {
if (ckptFloating.scrollEl === window) {
window.removeEventListener('scroll', ckptFloating.handler);
} else {
ckptFloating.scrollEl.removeEventListener('scroll', ckptFloating.handler);
}
}
if (ckptFloating.resizeHandler) {
window.removeEventListener('resize', ckptFloating.resizeHandler);
}
clearmodelFloating(ckptFloating.sticky);
if (ckptFloating.sticky) {
ckptFloating.sticky.style.position = '';
ckptFloating.sticky.style.top = '';
}
ckptFloating.sticky = null;
ckptFloating.scrollEl = null;
ckptFloating.handler = null;
ckptFloating.resizeHandler = null;
ckptFloating.anchorTop = 0;
}
function setupmodelFloating() {
const root = modelRoot();
const sticky = root.querySelector('.ckpt-view-sticky');
if (!sticky) return;
if (ckptFloating.sticky === sticky) {
updatemodelFloating();
return;
}
teardownmodelFloating();
ckptFloating.sticky = sticky;
sticky.style.position = 'relative';
sticky.style.top = '0px';
ckptFloating.scrollEl = findmodelScrollContainer(sticky);
ckptFloating.anchorTop = computemodelAnchorTop(sticky, ckptFloating.scrollEl);
ckptFloating.handler = () => updatemodelFloating();
if (ckptFloating.scrollEl === window) {
window.addEventListener('scroll', ckptFloating.handler, { passive: true });
} else {
ckptFloating.scrollEl.addEventListener('scroll', ckptFloating.handler, { passive: true });
}
ckptFloating.resizeHandler = () => {
ckptFloating.anchorTop = computemodelAnchorTop(sticky, ckptFloating.scrollEl);
updatemodelFloating();
};
window.addEventListener('resize', ckptFloating.resizeHandler);
updatemodelFloating();
}
function observemodelView() {
const root = modelRoot();
const column = root.querySelector('.ckpt-right-column');
if (!column || column.dataset.ckptObserved === '1') return;
column.dataset.ckptObserved = '1';
const observer = new MutationObserver(() => {
applymodelSticky();
setupmodelFloating();
});
observer.observe(column, { childList: true, subtree: true });
}
let ckptTabWasActive = false;
let ckptRefreshBusy = false;
let ckptRefreshStartedAt = 0;
function waitForRefreshReady() {
const root = modelRoot();
const treeRoot = root.querySelector('#ckpt_tree_root');
if (treeRoot && treeRoot.dataset.status === 'ready') {
ckptRefreshBusy = false;
applymodelSticky();
setupmodelFloating();
return;
}
if (ckptRefreshStartedAt && Date.now() - ckptRefreshStartedAt > 60000) {
ckptRefreshBusy = false;
applymodelSticky();
setupmodelFloating();
return;
}
setTimeout(waitForRefreshReady, 400);
}
function startmodelBuild() {
if (ckptRefreshBusy) return;
ckptRefreshBusy = true;
ckptRefreshStartedAt = Date.now();
triggermodelRefresh();
setTimeout(waitForRefreshReady, 200);
}
function handleTabChange() {
const active = ismodelTabActive();
if (active && !ckptTabWasActive) {
startmodelBuild();
}
if (active) {
observemodelView();
setTimeout(applymodelSticky, 200);
setTimeout(setupmodelFloating, 260);
}
ckptTabWasActive = active;
}
function initmodelBridge() {
const root = modelRoot();
const tabs = root.querySelectorAll('button[role="tab"]');
if (!tabs || tabs.length === 0) {
setTimeout(initmodelBridge, 400);
return;
}
const tabObserver = new MutationObserver(handleTabChange);
tabObserver.observe(root, { attributes: true, subtree: true, attributeFilter: ['aria-selected', 'class'] });
handleTabChange();
}
initmodelBridge();
"""
def _show_loading(self):
tree_loading = self._build_loading_html()
view_hidden = gr.update(value="", visible=False)
refresh_hidden = gr.update(visible=False)
return tree_loading, view_hidden, refresh_hidden
def _build_tree(self):
self._build_cache()
tree_html = self._build_tree_html()
view_html = self._build_empty_view_html()
view_visible = gr.update(value=view_html, visible=True)
refresh_visible = gr.update(visible=True)
return tree_html, view_visible, refresh_visible
def _handle_action(self, payload: str):
if not payload:
return gr.update(), gr.update(), gr.update()
try:
data = json.loads(payload)
except json.JSONDecodeError:
return gr.update(), gr.update(), gr.update()
action = data.get("action")
node_id = data.get("node_id")
if not action or not node_id:
return gr.update(), gr.update(), gr.update()
if not self._node_map:
self._build_cache()
if action == "view":
view_html = self._build_view_html(node_id)
return gr.update(), gr.update(value=view_html, visible=True), gr.update()
if action == "delete":
delete_shared = bool(data.get("delete_shared"))
removed, missing, errors = self._delete_files_for_node(
node_id, delete_shared=delete_shared
)
if errors:
gr.Warning(f"Delete completed with errors: {len(errors)}")
else:
gr.Info(f"Deleted {len(removed)} files (missing: {len(missing)})")
return self._build_tree()
return gr.update(), gr.update(), gr.update()
def _delete_files_for_node(self, node_id, delete_shared=False):
node = self._node_map.get(node_id)
if node is None:
return [], [], []
removed = []
missing = []
errors = []
if delete_shared:
target_files = node.get("files", set())
else:
target_files = node.get("unique_files", set())
for path in sorted(target_files):
if not os.path.isfile(path):
missing.append(path)
continue
try:
os.remove(path)
removed.append(path)
except OSError as exc:
errors.append((path, str(exc)))
return removed, missing, errors
def _build_cache(self):
self._node_map = {}
self._tree_data = []
self._model_files = {}
self._model_primary = {}
self._model_variants = {}
self._variant_to_model = {}
self._model_base_label = {}
self._model_family_label = {}
self._model_usage_ids = {}
self._usage_files = {}
self._file_usage = defaultdict(set)
self._file_usage_models = defaultdict(set)
self._file_info = {}
self._basename_index = {}
self._errors = []
self._stats_total = 0
self._stats_by_drive = []
model_types = self._get_model_types()
for model_type in model_types:
try:
variant_info = self._collect_model_variants(model_type)
except Exception as exc:
self._errors.append(f"{model_type}: {exc}")
variant_info = {"variants": [], "primary_paths": [], "files": set()}
variants = variant_info.get("variants", [])
model_files = variant_info.get("files", set())
self._model_variants[model_type] = variants
self._model_primary[model_type] = variant_info.get("primary_paths", [])
self._model_files[model_type] = model_files
for path in model_files:
self._file_usage_models[path].add(model_type)
self._rebalance_other_shared_variants(model_types)
self._usage_files = {}
self._file_usage = defaultdict(set)
self._variant_to_model = {}
self._model_usage_ids = {}
for model_type in model_types:
usage_ids = set()
for variant in self._model_variants.get(model_type, []):
variant_id = variant["id"]
usage_ids.add(variant_id)
self._variant_to_model[variant_id] = model_type
files = variant.get("files", set())
self._usage_files[variant_id] = files
for path in files:
self._file_usage[path].add(variant_id)
self._model_usage_ids[model_type] = usage_ids
for path in self._file_usage_models.keys():
try:
exists = os.path.isfile(path)
size = os.path.getsize(path) if exists else 0
except OSError:
exists = False
size = 0
self._file_info[path] = {"size": size, "exists": exists}
self._compute_global_stats()
self._build_tree_structure(model_types)
def _compute_global_stats(self):
total = 0
by_drive = defaultdict(int)
for path, info in self._file_info.items():
size = info.get("size", 0)
if size <= 0:
continue
total += size
drive = self._get_drive_label(path)
by_drive[drive] += size
self._stats_total = total
self._stats_by_drive = sorted(
by_drive.items(), key=lambda item: (-item[1], item[0])
)
def _get_drive_label(self, path):
drive, _ = os.path.splitdrive(path)
if drive:
return drive.rstrip(":").upper()
if path.startswith("\\\\"):
return "UNC"
return "ROOT"
def _get_model_types(self):
types = []
if isinstance(getattr(self, "models_def", None), dict) and self.models_def:
types = sorted(self.models_def.keys())
elif isinstance(self.transformer_types, list) and self.transformer_types:
types = list(self.transformer_types)
elif isinstance(self.displayed_model_types, list):
types = list(self.displayed_model_types)
seen = set()
ordered = []
for model_type in types:
if model_type and model_type not in seen:
ordered.append(model_type)
seen.add(model_type)
return ordered
def _collect_model_variants(self, model_type):
model_def = self.get_model_def(model_type)
if model_def is None:
return {"variants": [], "primary_paths": [], "files": set()}
default_dtype_policy = self._resolve_default_dtype_policy(model_type, model_def)
transformer_variants = self._collect_transformer_variants(
model_type, model_def, default_dtype_policy
)
text_encoder_variants = self._collect_text_encoder_variants(
model_type, model_def, default_dtype_policy
)
transformer_files = set()
text_encoder_files = set()
model_primary_paths = []
for variant in transformer_variants:
transformer_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
for variant in text_encoder_variants:
text_encoder_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
model_primary_paths = self._dedupe_paths(model_primary_paths)
other_files, shared_files = self._collect_other_files(
model_type,
model_def,
transformer_files,
text_encoder_files,
)
variants = list(transformer_variants) + list(text_encoder_variants)
if other_files:
other_id = self._make_variant_id(model_type, "other", "other")
variants.append(
{
"id": other_id,
"label": "Other",
"type": "variant",
"files": other_files,
"primary_paths": [],
}
)
if shared_files:
shared_id = self._make_variant_id(model_type, "shared", "shared")
variants.append(
{
"id": shared_id,
"label": "Shared",
"type": "variant",
"files": shared_files,
"primary_paths": [],
}
)
model_files = set()
for variant in variants:
model_files.update(variant.get("files", set()))
return {
"variants": variants,
"primary_paths": model_primary_paths,
"files": model_files,
}
def _collect_transformer_variants(
self, model_type, model_def, default_dtype_policy
):
choice_lists, modules = self._collect_transformer_choice_lists(model_type, model_def)
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
choice_lists, default_dtype_policy
)
variants = []
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
files, primary_paths = self._collect_transformer_files_for_variant(
model_type, model_def, modules, "", dtype_policy
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
label = file_label or label
variant_label = f"Transformer {label}"
variant_id = self._make_variant_id(model_type, "transformer", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": primary_paths,
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
label = f"{quant_label}{label_suffix}"
files, primary_paths = self._collect_transformer_files_for_variant(
model_type,
model_def,
modules,
quant_param,
dtype_policy,
token_match=token,
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
quant_label = file_label or quant_label
if not self._files_match_token(files, token):
continue
label = f"{quant_label}{label_suffix}" if quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Transformer {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "transformer", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": primary_paths,
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_transformer_choice_lists(self, model_type, model_def):
choice_lists = []
modules = self._expand_modules(model_type)
urls = self.get_model_recursive_prop(model_type, "URLs", return_list=True)
if urls:
choice_lists.append(urls)
if "URLs2" in model_def:
urls2 = self.get_model_recursive_prop(model_type, "URLs2", return_list=True)
if urls2:
choice_lists.append(urls2)
has_module_source = "module_source" in model_def
has_module_source2 = "module_source2" in model_def
for module in modules:
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1 and not has_module_source:
choice_lists.append(urls1)
if urls2 and not has_module_source2:
choice_lists.append(urls2)
elif isinstance(module, list):
if has_module_source:
continue
if module:
choice_lists.append(module)
else:
if has_module_source:
continue
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type, "modules", sub_prop_name=sub_prop_name, return_list=True
)
except Exception:
mod_urls = []
if isinstance(mod_urls, list) and mod_urls:
if all(isinstance(item, list) for item in mod_urls):
for entry in mod_urls:
if entry:
choice_lists.append(entry)
else:
choice_lists.append(mod_urls)
return choice_lists, modules
def _collect_transformer_files_for_variant(
self,
model_type,
model_def,
modules,
quantization,
dtype_policy,
token_match=None,
):
files = set()
primary_paths = []
model_filename = ""
is_exotic = quantization not in ("", "int8", "fp8")
if token_match and is_exotic:
urls = self.get_model_recursive_prop(model_type, "URLs", return_list=True)
model_filename = self._select_filename_by_token(urls, token_match)
if not model_filename:
model_filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
)
if model_filename:
self._add_file(files, model_filename)
primary_path = self._resolve_path(model_filename)
if primary_path and primary_path in files:
primary_paths.append(primary_path)
if "URLs2" in model_def:
model_filename2 = ""
if token_match and is_exotic:
urls2 = self.get_model_recursive_prop(
model_type, "URLs2", return_list=True
)
model_filename2 = self._select_filename_by_token(urls2, token_match)
if not model_filename2:
model_filename2 = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
submodel_no=2,
)
if model_filename2:
self._add_file(files, model_filename2)
primary_path2 = self._resolve_path(model_filename2)
if primary_path2 and primary_path2 in files:
primary_paths.append(primary_path2)
has_module_source = "module_source" in model_def
has_module_source2 = "module_source2" in model_def
for module in modules:
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1 and not has_module_source:
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls1, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls1,
)
if filename:
self._add_file(files, filename)
if urls2 and not has_module_source2:
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls2, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls2,
)
if filename:
self._add_file(files, filename)
elif isinstance(module, list):
if has_module_source:
continue
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(module, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=module,
)
if filename:
self._add_file(files, filename)
else:
if has_module_source:
continue
if token_match and is_exotic:
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type,
"modules",
sub_prop_name=sub_prop_name,
return_list=True,
)
except Exception:
mod_urls = []
filename = self._select_filename_by_token(mod_urls, token_match)
else:
filename = ""
try:
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
module_type=module,
)
except Exception:
filename = ""
if filename:
self._add_file(files, filename)
return files, primary_paths
def _collect_text_encoder_variants(
self, model_type, model_def, default_dtype_policy
):
variants = []
text_encoder_urls = self.get_model_recursive_prop(
model_type, "text_encoder_URLs", return_list=True
)
text_encoder_folder = model_def.get("text_encoder_folder", None)
used_paths = set()
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
if text_encoder_urls:
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
[text_encoder_urls], default_dtype_policy
)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
filename = self.get_model_filename(
model_type=model_type,
quantization="",
dtype_policy=dtype_policy,
URLs=text_encoder_urls,
)
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
label = f"{quant_label}{label_suffix}"
filename = ""
if quant_param not in ("", "int8", "fp8"):
filename = self._select_filename_by_token(text_encoder_urls, token)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quant_param,
dtype_policy=dtype_policy,
URLs=text_encoder_urls,
)
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
if not self._files_match_token(files, token):
continue
file_label = self._detect_quant_label_from_files(files)
quant_label = file_label or quant_label
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
label = f"{quant_label}{label_suffix}" if quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
else:
handler = self.get_model_handler(model_type)
get_name = getattr(handler, "get_text_encoder_filename", None)
if get_name is not None:
for quant_param in self._text_encoder_quant_candidates():
try:
filename = get_name(quant_param)
except Exception:
filename = None
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
quant_label = self._detect_quant_label_from_files(files)
label = quant_label or self._format_variant_label(
"",
self._detect_dtype_policy_from_name(os.path.basename(filename))
or default_dtype_policy,
base_default,
)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_other_files(
self,
model_type,
model_def,
transformer_files,
text_encoder_files,
):
other_files = set()
shared_files = set()
preload_urls = self.get_model_recursive_prop(
model_type, "preload_URLs", return_list=True
)
for url in self._ensure_list(preload_urls):
self._add_file(other_files, url)
vae_urls = model_def.get("VAE_URLs", [])
for url in self._ensure_list(vae_urls):
self._add_file(shared_files, url)
loras = self.get_model_recursive_prop(model_type, "loras", return_list=True)
for url in self._ensure_list(loras):
lora_dir = self._safe_get_lora_dir(model_type)
if lora_dir:
lora_path = os.path.join(lora_dir, os.path.basename(url))
self._add_file(other_files, lora_path)
handler_files = self._collect_handler_files(model_type, model_def)
shared_files.update(handler_files)
other_files.difference_update(transformer_files)
other_files.difference_update(text_encoder_files)
shared_files.difference_update(transformer_files)
shared_files.difference_update(text_encoder_files)
shared_files.difference_update(other_files)
return other_files, shared_files
def _resolve_default_dtype_policy(self, model_type, model_def):
dtype = model_def.get("dtype")
if isinstance(dtype, str) and dtype.lower() in ("fp16", "bf16"):
return dtype.lower()
policy = self.transformer_dtype_policy
if isinstance(policy, str) and policy:
return policy.lower()
return "bf16"
def _format_dtype_label(self, dtype_policy):
return "FP16" if dtype_policy == "fp16" else "BF16"
def _format_quant_label(self, token):
if not token:
return ""
label = quant_router.get_quantization_label(token)
if label:
return label
return str(token).replace("_", " ").upper()
def _format_variant_label(self, token, dtype_policy, default_label):
if token:
return self._format_quant_label(token)
if dtype_policy:
return self._format_dtype_label(dtype_policy)
return default_label
def _detect_quant_info(self, name):
if not name:
return "", ""
label = quant_router.detect_quantization_label_from_filename(name) or ""
if label:
return self._label_to_token(label), label
kind = quant_router.detect_quantization_kind_for_file(name, verboseLevel=0)
if kind and kind != "none":
kind = str(kind).lower()
return kind, kind.upper()
lower = str(name).lower()
aliases = self._get_quant_aliases()
for token in aliases:
if token in ("bf16", "fp16", "bfloat16", "float16"):
continue
if token and token in lower:
return token, ""
return "", ""
def _label_to_token(self, label):
if not label:
return ""
raw = str(label).strip()
lower = raw.lower()
if lower.startswith("gguf-"):
return lower.split("-", 1)[1]
if lower == "gguf":
return "gguf"
return lower
def _detect_dtype_policy_from_name(self, name):
if not name:
return ""
lower = str(name).lower()
if "fp16" in lower or "float16" in lower:
return "fp16"
if "bf16" in lower or "bfloat16" in lower:
return "bf16"
return ""
def _collect_variant_candidates(self, choice_lists, default_dtype_policy):
base_dtypes = set()
token_dtypes = defaultdict(set)
token_labels = {}
for choice_list in choice_lists:
for entry in self._ensure_list(choice_list):
if not entry:
continue
entry_str = str(entry)
name = os.path.basename(entry_str)
quant_source = self._resolve_path(entry_str) or entry_str
token, label = self._detect_quant_info(quant_source)
dtype_policy = self._detect_dtype_policy_from_name(name) or default_dtype_policy
if token:
token = str(token).lower()
token_dtypes[token].add(dtype_policy)
if label:
token_labels.setdefault(token, label)
else:
base_dtypes.add(dtype_policy)
if not base_dtypes and not token_dtypes:
base_dtypes.add(default_dtype_policy)
return base_dtypes, token_dtypes, token_labels
def _quant_param_from_token(self, token):
if not token:
return ""
lower = str(token).lower()
if "int8" in lower:
return "int8"
if "fp8" in lower or "float8" in lower:
return "fp8"
return lower
def _files_match_token(self, files, token):
if not token:
return True
token = str(token).lower()
for path in files:
base = os.path.basename(path).lower()
if token in base:
return True
label = quant_router.detect_quantization_label_from_filename(path)
if label and self._label_to_token(label) == token:
return True
return False
def _detect_quant_label_from_files(self, files):
for path in files:
label = quant_router.detect_quantization_label_from_filename(path)
if label:
return label
return ""
def _select_filename_by_token(self, urls, token):
if not urls or not token:
return ""
token = str(token).lower()
for entry in self._ensure_list(urls):
if not entry:
continue
if isinstance(entry, list):
for item in entry:
if not item:
continue
base = os.path.basename(str(item)).lower()
if token in base:
return item
label = quant_router.detect_quantization_label_from_filename(item)
if label and self._label_to_token(label) == token:
return item
continue
base = os.path.basename(str(entry)).lower()
if token in base:
return entry
label = quant_router.detect_quantization_label_from_filename(entry)
if label and self._label_to_token(label) == token:
return entry
return ""
def _expand_modules(self, model_type):
modules = self.get_model_recursive_prop(model_type, "modules", return_list=True)
expanded_modules = []
for module in modules:
if isinstance(module, str):
if "#" in module:
expanded_modules.append(module)
continue
try:
expanded = self.get_model_recursive_prop(
module, "modules", sub_prop_name="_list", return_list=True
)
except Exception:
expanded = []
if expanded:
expanded_modules.append(expanded)
else:
expanded_modules.append(module)
else:
expanded_modules.append(module)
return expanded_modules
def _make_variant_id(self, model_type, kind, label):
slug = "".join(ch.lower() if ch.isalnum() else "_" for ch in label).strip("_")
return f"variant::{model_type}::{kind}::{slug}"
def _dedupe_paths(self, paths):
seen = set()
ordered = []
for path in paths:
if not path or path in seen:
continue
ordered.append(path)
seen.add(path)
return ordered
def _get_quant_aliases(self):
if not hasattr(self, "_quant_alias_cache"):
aliases = set(quant_router.get_available_qtype_aliases() or [])
aliases.update({"int8", "int8n", "fp8"})
self._quant_alias_cache = sorted(
{alias for alias in aliases if alias}, key=len, reverse=True
)
return self._quant_alias_cache
def _text_encoder_quant_candidates(self):
candidates = {""}
aliases = self._get_quant_aliases()
for token in aliases:
if "int8" in token:
candidates.add("int8")
if "fp8" in token or "float8" in token:
candidates.add("fp8")
return sorted(candidates)
def _collect_text_encoder_files(self, model_type, model_def):
files = set()
text_encoder_urls = self.get_model_recursive_prop(
model_type, "text_encoder_URLs", return_list=True
)
text_encoder_folder = model_def.get("text_encoder_folder", None)
text_encoder_filename = None
if text_encoder_urls:
text_encoder_filename = self.get_model_filename(
model_type=model_type,
quantization=self.text_encoder_quantization,
dtype_policy=self.transformer_dtype_policy,
URLs=text_encoder_urls,
)
else:
handler = self.get_model_handler(model_type)
get_name = getattr(handler, "get_text_encoder_filename", None)
if get_name is not None:
try:
text_encoder_filename = get_name(self.text_encoder_quantization)
except Exception:
text_encoder_filename = None
if text_encoder_filename:
self._add_file(files, text_encoder_filename, force_folder=text_encoder_folder)
return files
def _collect_handler_files(self, model_type, model_def):
handler = self.get_model_handler(model_type)
query = getattr(handler, "query_model_files", None)
if query is None:
return set()
base_model_type = self.get_base_model_type(model_type)
try:
download_defs = query(
self._compute_list,
base_model_type,
model_def,
)
except Exception as exc:
self._errors.append(f"{model_type}: {exc}")
return set()
if not download_defs:
return set()
if isinstance(download_defs, dict):
download_defs = [download_defs]
files = set()
for download_def in download_defs:
source_folders = download_def.get("sourceFolderList", [])
file_lists = download_def.get("fileList", [])
target_folders = download_def.get("targetFolderList")
if target_folders is None:
target_folders = [None] * len(source_folders)
for source_folder, file_list, target_folder in zip(
source_folders, file_lists, target_folders
):
if target_folder == "":
target_folder = None
if source_folder is None:
source_folder = ""
if not file_list:
continue
for filename in file_list:
if not filename:
continue
rel_path = self._combine_download_path(
target_folder, source_folder, filename
)
files.add(self._resolve_download_relpath(rel_path))
return {path for path in files if path and os.path.isfile(path)}
def _build_tree_structure(self, model_types):
families = defaultdict(list)
for model_type in model_types:
family = self.get_model_family(model_type, for_ui=True) or "unknown"
families[family].append(model_type)
family_order = sorted(
families.keys(),
key=lambda family: self.families_infos.get(family, (999, family))[0],
)
for family in family_order:
family_label = self.families_infos.get(family, (999, family))[1]
rows = []
for model_type in families[family]:
model_name = self.compact_name(
family_label, self.get_model_name(model_type)
)
parent_id = self.get_parent_model_type(model_type)
rows.append((model_name, model_type, parent_id))
rows.sort(key=lambda row: row[0])
parents_list, children_dict = self.create_models_hierarchy(rows)
family_node_id = f"family::{family}"
family_models = set(families[family])
family_usage_ids = self._collect_usage_ids_for_models(family_models)
family_files = self._collect_files_for_usage_ids(family_usage_ids)
self._register_node(
family_node_id,
family_label,
"family",
family_models,
family_files,
path_label=family_label,
)
base_nodes = []
for base_label, base_id in parents_list:
child_entries = children_dict.get(base_id, [])
child_nodes = []
base_models = set()
base_usage_ids = set()
base_path = f"{family_label} / {base_label}"
for child_label, child_model in child_entries:
base_models.add(child_model)
model_usage_ids = self._model_usage_ids.get(child_model, set())
base_usage_ids.update(model_usage_ids)
self._model_base_label[child_model] = base_label
self._model_family_label[child_model] = family_label
model_node_id = f"model::{child_model}"
model_files = self._collect_files_for_usage_ids(model_usage_ids)
primary_paths = self._model_primary.get(child_model, [])
model_path = f"{base_path} / {child_label}"
self._register_node(
model_node_id,
child_label,
"model",
{child_model},
model_files,
primary_paths=primary_paths,
path_label=model_path,
)
variant_children = []
for variant in self._model_variants.get(child_model, []):
variant_id = variant["id"]
variant_label = variant["label"]
variant_path = f"{model_path} / {variant_label}"
self._register_node(
variant_id,
variant_label,
"variant",
{child_model},
variant.get("files", set()),
primary_paths=variant.get("primary_paths", []),
path_label=variant_path,
)
variant_children.append(
{"id": variant_id, "label": variant_label, "type": "variant"}
)
child_nodes.append(
{
"id": model_node_id,
"label": child_label,
"type": "model",
"children": variant_children,
}
)
base_node_id = f"base::{family}::{base_id}"
base_files = self._collect_files_for_usage_ids(base_usage_ids)
self._register_node(
base_node_id,
base_label,
"base",
base_models,
base_files,
path_label=base_path,
)
base_nodes.append(
{
"id": base_node_id,
"label": base_label,
"type": "base",
"children": child_nodes,
}
)
self._tree_data.append(
{
"id": family_node_id,
"label": family_label,
"type": "family",
"children": base_nodes,
}
)
def _collect_usage_ids_for_models(self, model_set):
usage_ids = set()
for model_type in model_set:
usage_ids.update(self._model_usage_ids.get(model_type, set()))
return usage_ids
def _collect_files_for_usage_ids(self, usage_ids):
files = set()
for usage_id in usage_ids:
files.update(self._usage_files.get(usage_id, set()))
return files
def _register_node(
self,
node_id,
label,
node_type,
model_set,
files_set,
primary_paths=None,
path_label=None,
):
unique_files, shared_files = self._split_files_by_shared(files_set, model_set)
unique_size = self._sum_sizes(unique_files)
shared_size = self._sum_sizes(shared_files)
self._node_map[node_id] = {
"id": node_id,
"label": label,
"type": node_type,
"models": set(model_set),
"files": set(files_set),
"unique_files": unique_files,
"shared_files": shared_files,
"unique_size": unique_size,
"shared_size": shared_size,
"total_size": unique_size + shared_size,
"primary_paths": primary_paths or [],
"path_label": path_label or label,
}
def _sum_sizes(self, files_set):
total = 0
for path in files_set:
total += self._file_info.get(path, {}).get("size", 0)
return total
def _split_files_by_shared(self, files_set, model_set):
unique_files = set()
shared_files = set()
for path in files_set:
used_by = self._file_usage_models.get(path, set())
if used_by.issubset(model_set):
unique_files.add(path)
else:
shared_files.add(path)
return unique_files, shared_files
def _rebalance_other_shared_variants(self, model_types):
for model_type in model_types:
variants = self._model_variants.get(model_type, [])
if not variants:
continue
other_variant = None
shared_variant = None
for variant in variants:
label = str(variant.get("label", "")).lower()
if label == "other":
other_variant = variant
elif label == "shared":
shared_variant = variant
if not other_variant and not shared_variant:
continue
combined = set()
if other_variant:
combined.update(other_variant.get("files", set()))
if shared_variant:
combined.update(shared_variant.get("files", set()))
new_variants = []
for variant in variants:
if variant is other_variant or variant is shared_variant:
continue
new_variants.append(variant)
if not combined:
self._model_variants[model_type] = new_variants
continue
model_set = {model_type}
shared_files = {
path
for path in combined
if not self._file_usage_models.get(path, set()).issubset(model_set)
}
other_files = combined - shared_files
if other_files:
if other_variant is None:
other_variant = {
"id": self._make_variant_id(model_type, "other", "other"),
"label": "Other",
"type": "variant",
"files": other_files,
"primary_paths": [],
}
else:
other_variant["files"] = other_files
new_variants.append(other_variant)
if shared_files:
if shared_variant is None:
shared_variant = {
"id": self._make_variant_id(model_type, "shared", "shared"),
"label": "Shared",
"type": "variant",
"files": shared_files,
"primary_paths": [],
}
else:
shared_variant["files"] = shared_files
new_variants.append(shared_variant)
self._model_variants[model_type] = new_variants
def _shared_base_labels(self, path, exclude_key=None):
pairs = set()
for model_type in self._file_usage_models.get(path, set()):
base_label = self._model_base_label.get(model_type)
family_label = self._model_family_label.get(model_type)
if base_label and family_label:
pairs.add((family_label, base_label))
if exclude_key:
pairs.discard(exclude_key)
return [
f"{family} {base}"
for family, base in sorted(pairs, key=lambda item: (item[0].lower(), item[1].lower()))
]
def _get_node_base_label(self, node_id, node_type):
model_type = None
if node_type == "base":
family_label = None
if node_id.startswith("base::"):
parts = node_id.split("::", 2)
if len(parts) >= 2:
family_key = parts[1]
family_label = self.families_infos.get(
family_key, (999, family_key)
)[1]
info = self._node_map.get(node_id, {})
base_label = info.get("label")
if family_label and base_label:
return (family_label, base_label)
return None
if node_type == "variant":
model_type = self._variant_to_model.get(node_id)
elif node_type == "model":
if node_id.startswith("model::"):
model_type = node_id.split("model::", 1)[1]
if model_type:
family_label = self._model_family_label.get(model_type)
base_label = self._model_base_label.get(model_type)
if family_label and base_label:
return (family_label, base_label)
return None
def _build_tree_html(self):
css = """
"""
tree_parts = [
css,
"",
]
if self._errors:
errors_html = "
".join(html.escape(err) for err in self._errors[:10])
tree_parts.append(f"
{errors_html}
")
tree_parts.append("
")
tree_parts.append(self._build_stats_html())
tree_parts.append("
")
for family in self._tree_data:
tree_parts.append(self._render_node(family, is_family=True))
tree_parts.append("
")
return "".join(tree_parts)
def _build_stats_html(self):
total_label = self._format_gb(self._stats_total)
drive_html = ""
if len(self._stats_by_drive) > 1:
drive_items = []
for drive, size in self._stats_by_drive:
drive_items.append(
f"{html.escape(drive)}: {self._format_gb(size)} "
)
drive_html = f"{''.join(drive_items)}
"
return (
""
f"
Total Used: {total_label}
"
f"{drive_html}
"
)
def _render_node(self, node, is_family=False):
node_id = node["id"]
info = self._node_map.get(node_id, {})
label = info.get("label", node.get("label", node_id))
unique_size_value = info.get("unique_size", 0)
shared_size_value = info.get("shared_size", 0)
unique = self._format_gb(unique_size_value)
shared = self._format_gb(shared_size_value)
unique_label = self._format_size(unique_size_value)
shared_label = self._format_size(shared_size_value)
total_label = self._format_size(unique_size_value + shared_size_value)
node_type = info.get("type", node.get("type", ""))
children = node.get("children", [])
click_view = node_type in ("model", "variant")
row_html = self._render_row(
label,
node_id,
unique,
shared,
unique_label,
shared_label,
total_label,
node_type,
click_view=click_view,
)
if children:
child_html = "".join(self._render_node(child) for child in children)
return (
f"{row_html} "
f"{child_html}
"
)
return f"{row_html}
"
def _render_row(
self,
label,
node_id,
unique,
shared,
unique_label,
shared_label,
total_label,
node_type,
click_view=False,
):
label_display = html.escape(label)
label_attr = html.escape(label, quote=True)
node_id_attr = html.escape(node_id, quote=True)
type_attr = html.escape(node_type, quote=True)
row_class = "ckpt-row clickable" if click_view else "ckpt-row"
row_onclick = (
"onclick='handlemodelRowClick(event, this);'" if click_view else ""
)
unique_badge = f"{unique} "
shared_badge = f"{shared} "
return (
f""
f"
{label_display}
"
"
"
f"
{unique_badge}
"
f"
{shared_badge}
"
"
"
"
"
f"View "
f"Delete "
"
"
"
"
)
def _build_empty_view_html(self):
return (
""
"
Files "
"
Select a branch to view its files.
"
"
"
)
def _build_loading_html(self):
return (
""
"
"
"
Please wait while the models hierarchy is being built...
"
"
"
)
def _build_view_html(self, node_id):
node = self._node_map.get(node_id)
if node is None:
return self._build_empty_view_html()
files = sorted(node["files"])
node_type = node.get("type", "")
current_base = self._get_node_base_label(node_id, node_type)
shared_set = {
path
for path in files
if not self._file_usage_models.get(path, set()).issubset(node["models"])
}
current_base_label = None
if current_base:
current_base_label = f"{current_base[0]} {current_base[1]}"
shared_info = {}
for path in shared_set:
shared_labels = self._shared_base_labels(path, exclude_key=current_base)
if not shared_labels and current_base_label:
shared_labels = [f"Shared only within {current_base_label}"]
shared_info[path] = shared_labels
file_tree = self._build_file_tree(files, shared_set, shared_info)
file_tree_html = self._render_file_tree(file_tree)
unique = self._format_gb(node["unique_size"])
shared = self._format_gb(node["shared_size"])
label_text = node["label"]
path_label = node.get("path_label") or label_text
header = html.escape(path_label)
unique_badge = f"{unique} "
shared_badge = f"{shared} "
shared_count = len(shared_set)
unique_count = max(0, len(files) - shared_count)
summary = (
f"{unique_count} files | unique {unique_badge} "
f"{shared_count} files | shared {shared_badge}"
)
primary_paths = node.get("primary_paths", [])
primary_html = ""
if primary_paths:
label = "model path" if len(primary_paths) == 1 else "model paths"
lines = " ".join(html.escape(path) for path in primary_paths)
primary_html = f"{label}: {lines}
"
return (
""
f"
{header} "
f"{primary_html}"
f"
{summary}
"
f"{file_tree_html}
"
)
def _build_file_tree(self, files, shared_set, shared_info=None):
if shared_info is None:
shared_info = {}
root = {
"children": {},
"files": [],
"size": 0,
"all_shared": False,
"all_unique": False,
"has_files": False,
"shared_with": set(),
"shared_count": 0,
"is_root": True,
}
for path in files:
info = self._file_info.get(path, {})
display = self._display_path(path)
parts = self._split_display_path(display)
name = parts[-1] if parts else display
shared_with = shared_info.get(path, [])
entry = {
"path": path,
"display": display,
"name": name,
"size": info.get("size", 0),
"exists": info.get("exists", False),
"shared": path in shared_set,
"shared_with": shared_with,
}
node = root
for part in parts[:-1]:
node = node["children"].setdefault(
part,
{
"children": {},
"files": [],
"size": 0,
"all_shared": False,
"all_unique": False,
"has_files": False,
"shared_with": set(),
"shared_count": 0,
"is_root": False,
},
)
node["files"].append(entry)
self._compute_folder_sizes(root)
return root
def _compute_folder_sizes(self, node):
total = sum(entry["size"] for entry in node["files"])
has_files = len(node["files"]) > 0
all_shared = all(entry["shared"] for entry in node["files"]) if node["files"] else True
all_unique = all(not entry["shared"] for entry in node["files"]) if node["files"] else True
shared_with = set()
shared_count = 0
for entry in node["files"]:
if entry.get("shared"):
shared_with.update(entry.get("shared_with", []))
for child in node["children"].values():
(
child_total,
child_has_files,
child_all_shared,
child_shared_with,
child_shared_count,
child_all_unique,
) = self._compute_folder_sizes(child)
total += child_total
has_files = has_files or child_has_files
all_shared = all_shared and child_all_shared
all_unique = all_unique and child_all_unique
shared_with.update(child_shared_with)
shared_count += child_shared_count
node["size"] = total
node["has_files"] = has_files
node["all_shared"] = has_files and all_shared
node["all_unique"] = has_files and all_unique
node["shared_with"] = shared_with
if node["all_shared"] and not node.get("is_root"):
shared_count += 1
node["shared_count"] = shared_count
return total, has_files, node["all_shared"], shared_with, shared_count, node["all_unique"]
def _render_file_tree(self, node):
open_shared_path = node.get("shared_count", 0) == 1
parts = []
for name in sorted(node["children"].keys(), key=lambda n: n.lower()):
parts.append(
self._render_folder(
name,
node["children"][name],
level=0,
open_shared_path=open_shared_path,
)
)
for entry in sorted(node["files"], key=lambda e: e["name"].lower()):
parts.append(self._render_file_entry(entry))
return "" + "".join(parts) + "
"
def _render_folder(self, name, node, level, open_shared_path=False):
name, node = self._collapse_single_child_folder(name, node)
folder_name = html.escape(name)
size_label = self._format_size(node.get("size", 0))
shared_tag = ""
shared_tooltip = ""
folder_class = "ckpt-folder"
if node.get("all_shared"):
shared_tag = "Shared "
folder_class += " shared"
shared_with = sorted(node.get("shared_with", []), key=lambda value: value.lower())
if shared_with:
lines = "".join(
f"{html.escape(label)}
" for label in shared_with
)
shared_tooltip = (
""
)
elif node.get("all_unique"):
folder_class += " unique"
open_attr = ""
if level < 1 or (open_shared_path and node.get("shared_count", 0) == 1):
open_attr = " open"
children_html = []
for child_name in sorted(node["children"].keys(), key=lambda n: n.lower()):
children_html.append(
self._render_folder(
child_name,
node["children"][child_name],
level + 1,
open_shared_path=open_shared_path,
)
)
for entry in sorted(node["files"], key=lambda e: e["name"].lower()):
children_html.append(self._render_file_entry(entry))
return (
f""
""
f"{folder_name} "
f"{shared_tag}"
f"{size_label} "
f"{shared_tooltip}"
" "
""
+ "".join(children_html)
+ "
"
)
def _collapse_single_child_folder(self, name, node):
parts = [name]
current = node
while True:
if current.get("files"):
break
children = current.get("children", {})
if len(children) != 1:
break
child_name, child_node = next(iter(children.items()))
parts.append(child_name)
current = child_node
return os.sep.join(parts), current
def _render_file_entry(self, entry):
classes = ["ckpt-file-row"]
if entry.get("shared"):
classes.append("shared")
else:
classes.append("unique")
if not entry.get("exists"):
classes.append("missing")
class_attr = " ".join(classes)
name = html.escape(entry.get("name", ""))
size_label = self._format_size(entry.get("size", 0))
display = html.escape(entry.get("display", ""))
shared_tag = "Shared " if entry.get("shared") else ""
shared_tooltip = ""
if entry.get("shared"):
shared_with = entry.get("shared_with", [])
if shared_with:
lines = "".join(
f"{html.escape(label)}
" for label in shared_with
)
shared_tooltip = (
""
)
return (
f""
f"{name} "
f"{shared_tag}"
f"{size_label} "
f"{shared_tooltip}"
"
"
)
def _split_display_path(self, display_path):
drive, rest = os.path.splitdrive(display_path)
if drive:
drive = drive.rstrip("\\/")
rest = rest.replace("\\", "/")
parts = [part for part in rest.split("/") if part]
if drive:
return [drive] + parts
if display_path.startswith("\\\\"):
return ["UNC"] + parts
return parts
def _add_file(self, files_set, value, force_folder=None):
path = self._resolve_path(value, force_folder=force_folder)
if path and os.path.isfile(path):
files_set.add(path)
def _download_root_name(self):
root = fl.get_download_location()
if not root or os.path.isabs(root):
return ""
norm = os.path.normpath(root)
if not norm or norm == ".":
return ""
return norm.split(os.sep)[0]
def _resolve_repo_relative_path(self, rel_path):
if not rel_path:
return None
rel_path = str(rel_path)
if rel_path.startswith("http"):
return None
root_name = self._download_root_name()
if not root_name:
return None
rel_norm = os.path.normpath(rel_path)
parts = rel_norm.split(os.sep)
if parts and parts[0].lower() == root_name.lower():
return self._normalize_path(os.path.join(self._repo_root, rel_norm))
return None
def _resolve_path(self, value, force_folder=None):
if not value:
return None
if not isinstance(value, str):
value = str(value)
if os.path.isabs(value):
abs_path = self._normalize_path(value)
if os.path.isfile(abs_path):
return abs_path
basename = os.path.basename(abs_path)
return abs_path
repo_rel = self._resolve_repo_relative_path(value)
if repo_rel:
return repo_rel
try:
local_path = self.get_local_model_filename(value, extra_paths=force_folder)
except Exception:
local_path = None
if local_path:
return self._normalize_path(local_path)
filename = os.path.basename(value) if value.startswith("http") else value
if force_folder:
expected = fl.get_download_location(filename, force_path=force_folder)
else:
expected = fl.get_download_location(filename)
return self._normalize_path(expected)
def _resolve_download_relpath(self, rel_path):
if rel_path and os.path.isabs(rel_path):
return self._normalize_path(rel_path)
repo_rel = self._resolve_repo_relative_path(rel_path)
if repo_rel:
return repo_rel
local = fl.locate_file(rel_path, error_if_none=False)
if local:
return self._normalize_path(local)
return self._normalize_path(fl.get_download_location(rel_path))
def _resolve_folder_path(self, source_folder, target_folder):
parts = []
if target_folder:
parts.append(target_folder)
if source_folder:
parts.append(source_folder)
rel_path = os.path.join(*parts) if parts else ""
if rel_path and os.path.isabs(rel_path):
return self._normalize_path(rel_path)
repo_rel = self._resolve_repo_relative_path(rel_path)
if repo_rel:
return repo_rel
if not rel_path:
return self._normalize_path(fl.get_download_location())
folder = fl.locate_folder(rel_path, error_if_none=False)
if folder:
return self._normalize_path(folder)
return self._normalize_path(fl.get_download_location(rel_path))
def _list_folder_files(self, folder_path):
files = set()
if not folder_path or not os.path.isdir(folder_path):
return files
for root, _, filenames in os.walk(folder_path):
for name in filenames:
files.add(self._normalize_path(os.path.join(root, name)))
return files
def _combine_download_path(self, target_folder, source_folder, filename):
parts = []
if target_folder:
parts.append(target_folder)
if source_folder:
parts.append(source_folder)
parts.append(filename)
return os.path.join(*parts)
def _compute_list(self, filename):
if not filename:
return []
return [os.path.basename(str(filename))]
def _ensure_list(self, value):
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def _safe_get_lora_dir(self, model_type):
try:
return self.get_lora_dir(model_type)
except Exception:
return None
def _normalize_path(self, path):
return os.path.normcase(os.path.abspath(path))
def _format_gb(self, size_bytes):
size_gb = size_bytes / (1024 ** 3)
return f"{size_gb:.2f} GB"
def _format_size(self, size_bytes):
if size_bytes >= 1024 ** 3:
return f"{size_bytes / (1024 ** 3):.2f} GB"
if size_bytes >= 1024 ** 2:
return f"{size_bytes / (1024 ** 2):.1f} MB"
if size_bytes >= 1024:
return f"{size_bytes / 1024:.1f} KB"
return f"{size_bytes} B"
def _display_path(self, path):
try:
rel = os.path.relpath(path, self._repo_root)
if not rel.startswith(".."):
return rel
except Exception:
pass
return path
def _get_model_roots(self):
roots = []
for root in getattr(fl, "_models_paths", []) or []:
if not root:
continue
if os.path.isabs(root):
abs_root = root
else:
abs_root = os.path.join(self._repo_root, root)
abs_root = self._normalize_path(abs_root)
if abs_root not in roots:
roots.append(abs_root)
return roots
def _build_basename_index(self, basenames, min_size=1):
basenames = {name.lower() for name in basenames if name}
if not basenames:
return {}
found = {}
remaining = set(basenames)
for root in self._get_model_roots():
if not remaining:
break
if not os.path.isdir(root):
continue
for name in list(remaining):
candidate = os.path.join(root, name)
if os.path.isfile(candidate):
try:
if os.path.getsize(candidate) < min_size:
continue
except OSError:
continue
found[name] = self._normalize_path(candidate)
remaining.discard(name)
if not remaining:
break
if os.path.normcase(root) == os.path.normcase(self._repo_root):
continue
for dirpath, _, filenames in os.walk(root):
for filename in filenames:
key = filename.lower()
if key in remaining:
full_path = os.path.join(dirpath, filename)
try:
if os.path.getsize(full_path) < min_size:
continue
except OSError:
continue
found[key] = self._normalize_path(full_path)
remaining.discard(key)
if not remaining:
break
if not remaining:
break
return found
def _find_in_model_roots(self, basename, min_size=1):
if not basename:
return None
key = basename.lower()
if key in self._basename_index:
return self._basename_index.get(key)
found = self._build_basename_index({basename}, min_size=min_size)
if found:
self._basename_index.update(found)
return self._basename_index.get(key)
self._basename_index[key] = None
return None
def _repair_missing_paths(self):
missing = []
for path in self._file_usage_models.keys():
try:
size = os.path.getsize(path) if os.path.isfile(path) else -1
except OSError:
size = -1
if size <= 0:
missing.append(path)
if not missing:
return
basenames = {os.path.basename(path) for path in missing}
found = self._build_basename_index(basenames, min_size=1)
if not found:
return
for old_path in list(missing):
key = os.path.basename(old_path).lower()
new_path = found.get(key)
if not new_path or new_path == old_path:
continue
self._swap_path(old_path, new_path)
def _swap_path(self, old_path, new_path):
usage_ids = self._file_usage.pop(old_path, set())
if usage_ids:
self._file_usage[new_path].update(usage_ids)
for usage_id in usage_ids:
files = self._usage_files.get(usage_id)
if files is not None:
files.discard(old_path)
files.add(new_path)
model_type = self._variant_to_model.get(usage_id)
if model_type:
for variant in self._model_variants.get(model_type, []):
if variant.get("id") != usage_id:
continue
files_set = variant.get("files")
if isinstance(files_set, set):
files_set.discard(old_path)
files_set.add(new_path)
primary_paths = variant.get("primary_paths", [])
variant["primary_paths"] = [
new_path if path == old_path else path for path in primary_paths
]
break
model_types = self._file_usage_models.pop(old_path, set())
if model_types:
self._file_usage_models[new_path].update(model_types)
for model_type in model_types:
files = self._model_files.get(model_type)
if files is not None:
files.discard(old_path)
files.add(new_path)
primary_paths = self._model_primary.get(model_type)
if primary_paths:
self._model_primary[model_type] = [
new_path if path == old_path else path for path in primary_paths
]