`;
document.body.appendChild(backdrop);
const cancelBtn = backdrop.querySelector('#ckpt-delete-cancel');
const uniqueBtn = backdrop.querySelector('#ckpt-delete-unique');
const allBtn = backdrop.querySelector('#ckpt-delete-all');
cancelBtn.addEventListener('click', () => {
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
uniqueBtn.addEventListener('click', () => {
const payload = backdrop.dataset.payloadUnique || '';
if (payload) {
window.updatemodelInput("model_action_input", payload);
}
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
allBtn.addEventListener('click', () => {
const payload = backdrop.dataset.payloadAll || '';
if (payload) {
window.updatemodelInput("model_action_input", payload);
}
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
backdrop.addEventListener('click', (evt) => {
if (evt.target === backdrop) {
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
}
});
return backdrop;
}
function showmodelModal(message, payloadUnique, payloadAll, uniqueSize, totalSize) {
const backdrop = ensuremodelModal();
const body = backdrop.querySelector('#ckpt-delete-modal-body');
const uniqueBtn = backdrop.querySelector('#ckpt-delete-unique');
const allBtn = backdrop.querySelector('#ckpt-delete-all');
body.textContent = message;
backdrop.dataset.payloadUnique = payloadUnique || '';
backdrop.dataset.payloadAll = payloadAll || '';
uniqueBtn.textContent = uniqueSize ? `Delete Unique (${uniqueSize})` : 'Delete Unique';
allBtn.textContent = totalSize ? `Delete Unique + Shared (${totalSize})` : 'Delete Unique + Shared';
backdrop.style.display = 'flex';
}
window.handlemodelAction = function(button, action) {
if (!button) return;
const nodeId = button.dataset.nodeId;
const nodeLabel = button.dataset.nodeLabel || nodeId;
const uniqueSize = button.dataset.uniqueSize || "0 B";
const totalSize = button.dataset.totalSize || uniqueSize;
if (!nodeId) return;
if (action === "delete") {
const payloadUnique = JSON.stringify({
action: action,
node_id: nodeId,
delete_shared: false,
ts: Date.now()
});
const payloadAll = JSON.stringify({
action: action,
node_id: nodeId,
delete_shared: true,
ts: Date.now()
});
showmodelModal(
`Choose delete scope for "${nodeLabel}": unique only ${uniqueSize}, or unique + shared ${totalSize}.`,
payloadUnique,
payloadAll,
uniqueSize,
totalSize
);
return;
}
const payload = JSON.stringify({
action: action,
node_id: nodeId,
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
window.handlemodelRowClick = function(evt, row) {
if (evt) {
evt.stopPropagation();
evt.preventDefault();
}
if (!row) return;
const nodeId = row.dataset.nodeId;
if (!nodeId) return;
const payload = JSON.stringify({
action: "view",
node_id: nodeId,
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
window.handlemodelSummaryClick = function(evt, summary) {
if (!summary) return;
if (evt) {
evt.stopPropagation();
evt.preventDefault();
}
const details = summary.closest('details');
if (!details) return;
const nodeId = details.dataset.nodeId;
if (!nodeId) return;
const row = summary.querySelector('.ckpt-row');
const nodeType = row ? (row.dataset.nodeType || '') : '';
const payload = JSON.stringify({
action: "toggle_node",
node_id: nodeId,
open: !details.open,
view: nodeType === "model",
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
function ismodelTabActive() {
const root = modelRoot();
const tabs = root.querySelectorAll('button[role="tab"]');
for (const tab of tabs) {
if (tab.getAttribute('aria-selected') === 'true') {
const label = (tab.textContent || '').trim();
if (label === 'Models Manager') return true;
}
}
return false;
}
function triggermodelRefresh() {
const root = modelRoot();
const btn = root.querySelector('#ckpt_refresh_btn button') || root.querySelector('#ckpt_refresh_btn');
if (btn) btn.click();
}
function triggermodelFilter() {
const root = modelRoot();
const btn = root.querySelector('#ckpt_filter_btn button') || root.querySelector('#ckpt_filter_btn');
if (btn) btn.click();
}
function setupmodelFilterEnter() {
const root = modelRoot();
const input = root.querySelector('#ckpt_filter_input textarea, #ckpt_filter_input input');
if (!input || input.dataset.ckptEnterBound === '1') return;
input.dataset.ckptEnterBound = '1';
input.addEventListener('keydown', (evt) => {
if (evt.key !== 'Enter') return;
evt.preventDefault();
evt.stopPropagation();
triggermodelFilter();
});
}
function applymodelSticky() {
const root = modelRoot();
const sticky = root.querySelector('.ckpt-view-sticky');
if (!sticky) return;
sticky.style.position = 'sticky';
sticky.style.top = '16px';
sticky.style.alignSelf = 'flex-start';
let parent = sticky.parentElement;
let steps = 0;
while (parent && steps < 6) {
const style = window.getComputedStyle(parent);
if (style.overflow && style.overflow !== 'visible') {
parent.style.overflow = 'visible';
}
parent = parent.parentElement;
steps += 1;
}
if (root && root.host && root.host.style) {
const hostStyle = window.getComputedStyle(root.host);
if (hostStyle.overflow && hostStyle.overflow !== 'visible') {
root.host.style.overflow = 'visible';
}
}
}
let ckptFloating = {
sticky: null,
scrollEl: null,
handler: null,
resizeHandler: null,
anchorTop: 0,
topPadding: 16,
};
function findmodelScrollContainer(element) {
let current = element;
while (current) {
const style = window.getComputedStyle(current);
const overflowY = style.overflowY;
if ((overflowY === 'auto' || overflowY === 'scroll') && current.scrollHeight > current.clientHeight + 1) {
return current;
}
current = current.parentElement;
}
const root = modelRoot();
const host = root && root.host ? root.host : null;
current = host;
while (current) {
const style = window.getComputedStyle(current);
const overflowY = style.overflowY;
if ((overflowY === 'auto' || overflowY === 'scroll') && current.scrollHeight > current.clientHeight + 1) {
return current;
}
current = current.parentElement;
}
return window;
}
function computemodelAnchorTop(sticky, scrollEl) {
if (scrollEl === window) {
return sticky.getBoundingClientRect().top + window.scrollY;
}
const rect = sticky.getBoundingClientRect();
const containerRect = scrollEl.getBoundingClientRect();
return rect.top - containerRect.top + scrollEl.scrollTop;
}
function clearmodelFloating(sticky) {
if (!sticky) return;
sticky.style.transform = '';
sticky.style.willChange = '';
}
function updatemodelFloating() {
const sticky = ckptFloating.sticky;
if (!sticky) return;
const scrollEl = ckptFloating.scrollEl || window;
const scrollTop = scrollEl === window ? window.scrollY : scrollEl.scrollTop;
let maxTranslate = Infinity;
if (scrollEl === window) {
const doc = document.documentElement;
maxTranslate = Math.max(
0,
(doc ? doc.scrollHeight : 0) - sticky.offsetHeight - ckptFloating.topPadding - ckptFloating.anchorTop
);
} else {
maxTranslate = Math.max(
0,
scrollEl.scrollHeight - sticky.offsetHeight - ckptFloating.topPadding - ckptFloating.anchorTop
);
}
const offset = scrollTop + ckptFloating.topPadding - ckptFloating.anchorTop;
const translate = Math.max(0, Math.min(offset, maxTranslate));
if (translate > 0) {
sticky.style.transform = `translateY(${translate}px)`;
sticky.style.willChange = 'transform';
} else {
clearmodelFloating(sticky);
}
}
function teardownmodelFloating() {
if (ckptFloating.scrollEl && ckptFloating.handler) {
if (ckptFloating.scrollEl === window) {
window.removeEventListener('scroll', ckptFloating.handler);
} else {
ckptFloating.scrollEl.removeEventListener('scroll', ckptFloating.handler);
}
}
if (ckptFloating.resizeHandler) {
window.removeEventListener('resize', ckptFloating.resizeHandler);
}
clearmodelFloating(ckptFloating.sticky);
if (ckptFloating.sticky) {
ckptFloating.sticky.style.position = '';
ckptFloating.sticky.style.top = '';
}
ckptFloating.sticky = null;
ckptFloating.scrollEl = null;
ckptFloating.handler = null;
ckptFloating.resizeHandler = null;
ckptFloating.anchorTop = 0;
}
function setupmodelFloating() {
const root = modelRoot();
const sticky = root.querySelector('.ckpt-view-sticky');
if (!sticky) return;
if (ckptFloating.sticky === sticky) {
updatemodelFloating();
return;
}
teardownmodelFloating();
ckptFloating.sticky = sticky;
sticky.style.position = 'relative';
sticky.style.top = '0px';
ckptFloating.scrollEl = findmodelScrollContainer(sticky);
ckptFloating.anchorTop = computemodelAnchorTop(sticky, ckptFloating.scrollEl);
ckptFloating.handler = () => updatemodelFloating();
if (ckptFloating.scrollEl === window) {
window.addEventListener('scroll', ckptFloating.handler, { passive: true });
} else {
ckptFloating.scrollEl.addEventListener('scroll', ckptFloating.handler, { passive: true });
}
ckptFloating.resizeHandler = () => {
ckptFloating.anchorTop = computemodelAnchorTop(sticky, ckptFloating.scrollEl);
updatemodelFloating();
};
window.addEventListener('resize', ckptFloating.resizeHandler);
updatemodelFloating();
}
function observemodelView() {
const root = modelRoot();
const column = root.querySelector('.ckpt-right-column');
if (!column || column.dataset.ckptObserved === '1') return;
column.dataset.ckptObserved = '1';
const observer = new MutationObserver(() => {
applymodelSticky();
setupmodelFloating();
setupmodelFilterEnter();
});
observer.observe(column, { childList: true, subtree: true });
}
let ckptTabWasActive = false;
let ckptRefreshBusy = false;
let ckptRefreshStartedAt = 0;
function waitForRefreshReady() {
const root = modelRoot();
const treeRoot = root.querySelector('#ckpt_tree_root');
if (treeRoot && treeRoot.dataset.status === 'ready') {
ckptRefreshBusy = false;
applymodelSticky();
setupmodelFloating();
setupmodelFilterEnter();
return;
}
if (ckptRefreshStartedAt && Date.now() - ckptRefreshStartedAt > 60000) {
ckptRefreshBusy = false;
applymodelSticky();
setupmodelFloating();
setupmodelFilterEnter();
return;
}
setTimeout(waitForRefreshReady, 400);
}
function startmodelBuild() {
if (ckptRefreshBusy) return;
ckptRefreshBusy = true;
ckptRefreshStartedAt = Date.now();
triggermodelRefresh();
setTimeout(waitForRefreshReady, 200);
}
function handleTabChange() {
const active = ismodelTabActive();
if (active && !ckptTabWasActive) {
startmodelBuild();
}
if (active) {
observemodelView();
setTimeout(applymodelSticky, 200);
setTimeout(setupmodelFloating, 260);
setTimeout(setupmodelFilterEnter, 280);
}
ckptTabWasActive = active;
}
function initmodelBridge() {
const root = modelRoot();
const tabs = root.querySelectorAll('button[role="tab"]');
if (!tabs || tabs.length === 0) {
setTimeout(initmodelBridge, 400);
return;
}
const tabObserver = new MutationObserver(handleTabChange);
tabObserver.observe(root, { attributes: true, subtree: true, attributeFilter: ['aria-selected', 'class'] });
handleTabChange();
}
initmodelBridge();
"""
def _show_loading(self):
tree_loading = self._build_loading_html()
view_hidden = gr.update(value="", visible=False)
refresh_hidden = gr.update(visible=False)
filter_input_hidden = gr.update(visible=False)
filter_button_hidden = gr.update(visible=False)
return tree_loading, view_hidden, refresh_hidden, filter_input_hidden, filter_button_hidden
def _build_tree(self):
self._build_cache()
self._expanded_nodes = set()
self._current_tree_mode = "full"
self._current_filter_text = ""
tree_html = self._build_tree_html()
view_html = self._build_empty_view_html()
view_visible = gr.update(value=view_html, visible=True)
refresh_visible = gr.update(visible=True)
filter_input_visible = gr.update(visible=True)
filter_button_visible = gr.update(visible=True)
return tree_html, view_visible, refresh_visible, filter_input_visible, filter_button_visible
def _build_filtered_tree(self, filter_text):
text = str(filter_text or "").strip()
if len(text) < 3:
gr.Info("Please enter at least 3 characters to filter finetunes.")
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
self._build_cache()
self._expanded_nodes = set()
self._current_tree_mode = "filtered"
self._current_filter_text = text
tree_html = self._build_filtered_tree_html(text)
view_html = self._build_empty_view_html()
view_visible = gr.update(value=view_html, visible=True)
refresh_visible = gr.update(visible=True)
filter_input_visible = gr.update(visible=True)
filter_button_visible = gr.update(visible=True)
return tree_html, view_visible, refresh_visible, filter_input_visible, filter_button_visible
def _handle_action(self, payload: str):
if not payload:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
try:
data = json.loads(payload)
except json.JSONDecodeError:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
action = data.get("action")
node_id = data.get("node_id")
if not action or not node_id:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
if not self._node_map:
self._build_cache()
if action == "view":
view_html = self._build_view_html(node_id)
return gr.update(), gr.update(value=view_html, visible=True), gr.update(), gr.update(), gr.update()
if action == "toggle_node":
if node_id not in self._node_map:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
if data.get("open"):
self._expanded_nodes.add(node_id)
else:
self._expanded_nodes.discard(node_id)
if self._current_tree_mode == "filtered" and len(self._current_filter_text) >= 3:
tree_html = self._build_filtered_tree_html(self._current_filter_text)
else:
tree_html = self._build_tree_html()
if data.get("view"):
view_html = self._build_view_html(node_id)
return (
gr.update(value=tree_html),
gr.update(value=view_html, visible=True),
gr.update(),
gr.update(),
gr.update(),
)
return gr.update(value=tree_html), gr.update(), gr.update(), gr.update(), gr.update()
if action == "delete":
delete_shared = bool(data.get("delete_shared"))
removed, missing, errors = self._delete_files_for_node(
node_id, delete_shared=delete_shared
)
if errors:
gr.Warning(f"Delete completed with errors: {len(errors)}")
else:
gr.Info(f"Deleted {len(removed)} files (missing: {len(missing)})")
return self._build_tree()
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
def _delete_files_for_node(self, node_id, delete_shared=False):
node = self._node_map.get(node_id)
if node is None:
return [], [], []
removed = []
missing = []
errors = []
if delete_shared:
target_files = node.get("files", set())
else:
target_files = node.get("unique_files", set())
for path in sorted(target_files):
if not os.path.isfile(path):
missing.append(path)
continue
try:
os.remove(path)
removed.append(path)
except OSError as exc:
errors.append((path, str(exc)))
return removed, missing, errors
def _build_cache(self):
self._node_map = {}
self._tree_data = []
self._model_files = {}
self._model_primary = {}
self._model_variants = {}
self._variant_to_model = {}
self._model_base_label = {}
self._model_family_label = {}
self._model_display_label = {}
self._model_direct_status_map = {}
self._model_aggregated_status_map = {}
self._model_usage_ids = {}
self._usage_files = {}
self._file_usage = defaultdict(set)
self._file_usage_models = defaultdict(set)
self._file_info = {}
self._basename_index = {}
self._errors = []
self._stats_total = 0
self._stats_by_drive = []
model_types = self._get_model_types()
self._model_direct_status_map, self._model_aggregated_status_map = self._compute_model_status_maps(model_types)
for model_type in model_types:
try:
variant_info = self._collect_model_variants(model_type)
except Exception as exc:
self._errors.append(f"{model_type}: {exc}")
variant_info = {"variants": [], "primary_paths": [], "files": set()}
variants = variant_info.get("variants", [])
model_files = variant_info.get("files", set())
self._model_variants[model_type] = variants
self._model_primary[model_type] = variant_info.get("primary_paths", [])
self._model_files[model_type] = model_files
for path in model_files:
self._file_usage_models[path].add(model_type)
self._rebalance_other_shared_variants(model_types)
self._usage_files = {}
self._file_usage = defaultdict(set)
self._variant_to_model = {}
self._model_usage_ids = {}
for model_type in model_types:
usage_ids = set()
for variant in self._model_variants.get(model_type, []):
variant_id = variant["id"]
usage_ids.add(variant_id)
self._variant_to_model[variant_id] = model_type
files = variant.get("files", set())
self._usage_files[variant_id] = files
for path in files:
self._file_usage[path].add(variant_id)
self._model_usage_ids[model_type] = usage_ids
for path in self._file_usage_models.keys():
try:
exists = os.path.isfile(path)
size = os.path.getsize(path) if exists else 0
except OSError:
exists = False
size = 0
self._file_info[path] = {"size": size, "exists": exists}
self._compute_global_stats()
self._build_tree_structure(model_types)
def _build_dropdown_deps(self, model_types):
if not model_types:
return None
fallback_type = model_types[0]
return model_dropdowns.DropdownDeps(
transformer_types=list(model_types),
displayed_model_types=list(model_types),
transformer_type=fallback_type,
three_levels_hierarchy=True,
families_infos=self.families_infos,
server_config=self.server_config,
transformer_quantization=self.transformer_quantization,
transformer_dtype_policy=self.transformer_dtype_policy,
text_encoder_quantization=self.text_encoder_quantization,
get_model_def=self.get_model_def,
get_model_recursive_prop=self.get_model_recursive_prop,
get_model_filename=self.get_model_filename,
get_local_model_filename=self.get_local_model_filename,
get_lora_dir=self.get_lora_dir,
get_parent_model_type=self.get_parent_model_type,
get_base_model_type=self.get_base_model_type,
get_model_family=self.get_model_family,
get_model_name=self.get_model_name,
get_transformer_dtype=self.get_transformer_dtype,
)
def _resolve_expected_entry_path(self, entry):
if not isinstance(entry, dict):
return None
local_path = entry.get("path", None)
if isinstance(local_path, str) and len(local_path) > 0:
return self._normalize_path(local_path)
filename = entry.get("filename", "")
extra_paths = entry.get("extra_paths", None)
return self._resolve_path(filename, force_folder=extra_paths)
def _collect_expected_missing_files(self, model_type):
deps = self._build_dropdown_deps([model_type])
if deps is None:
return set()
missing_paths = set()
try:
expected_entries = []
expected_entries.extend(model_dropdowns.get_expected_core_file_entries_for_status(deps, model_type))
expected_entries.extend(model_dropdowns.get_expected_secondary_file_entries_for_status(deps, model_type))
except Exception as exc:
self._errors.append(f"Missing file check failed for {model_type}: {exc}")
return set()
model_def = self.get_model_def(model_type)
if isinstance(model_def, dict):
try:
for resolved_path in self._collect_handler_file_paths(model_type, model_def):
if resolved_path and not os.path.isfile(resolved_path):
missing_paths.add(resolved_path)
except Exception as exc:
self._errors.append(f"Missing handler file check failed for {model_type}: {exc}")
for entry in expected_entries:
resolved_path = self._resolve_expected_entry_path(entry)
if resolved_path and not os.path.isfile(resolved_path):
missing_paths.add(resolved_path)
return missing_paths
def _compute_model_status_maps(self, model_types):
if not model_types:
return {}, {}
try:
deps = self._build_dropdown_deps(model_types)
if deps is None:
return {}, {}
direct_status_map, aggregated_parent_status_map = model_dropdowns.get_model_download_status_maps(
deps, dropdown_types=list(model_types)
)
return direct_status_map, aggregated_parent_status_map
except Exception as exc:
self._errors.append(f"Model status map failed: {exc}")
return {}, {}
def _decorate_status_label(self, label, status):
return model_dropdowns.decorate_model_dropdown_label(label, status)
def _compute_global_stats(self):
total = 0
by_drive = defaultdict(int)
for path, info in self._file_info.items():
size = info.get("size", 0)
if size <= 0:
continue
total += size
drive = self._get_drive_label(path)
by_drive[drive] += size
self._stats_total = total
self._stats_by_drive = sorted(
by_drive.items(), key=lambda item: (-item[1], item[0])
)
def _get_drive_label(self, path):
drive, _ = os.path.splitdrive(path)
if drive:
return drive.rstrip(":").upper()
if path.startswith("\\\\"):
return "UNC"
return "ROOT"
def _get_model_types(self):
types = []
if isinstance(getattr(self, "models_def", None), dict) and self.models_def:
types = sorted(self.models_def.keys())
elif isinstance(self.transformer_types, list) and self.transformer_types:
types = list(self.transformer_types)
elif isinstance(self.displayed_model_types, list):
types = list(self.displayed_model_types)
seen = set()
ordered = []
for model_type in types:
if model_type and model_type not in seen:
ordered.append(model_type)
seen.add(model_type)
return ordered
def _collect_model_variants(self, model_type):
model_def = self.get_model_def(model_type)
if model_def is None:
return {"variants": [], "primary_paths": [], "files": set()}
default_dtype_policy = self._resolve_default_dtype_policy(model_type, model_def)
transformer_variants = self._collect_transformer_variants(
model_type, model_def, default_dtype_policy
)
module_variants = self._collect_module_variants(
model_type, model_def, default_dtype_policy
)
text_encoder_variants = self._collect_text_encoder_variants(
model_type, model_def, default_dtype_policy
)
lora_variants = self._collect_lora_variants(model_type)
transformer_files = set()
module_files = set()
text_encoder_files = set()
lora_files = set()
model_primary_paths = []
for variant in transformer_variants:
transformer_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
for variant in module_variants:
module_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
for variant in text_encoder_variants:
text_encoder_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
for variant in lora_variants:
lora_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
model_primary_paths = self._dedupe_paths(model_primary_paths)
other_files, shared_files = self._collect_other_files(
model_type,
model_def,
transformer_files.union(module_files),
text_encoder_files,
lora_files,
)
variants = list(transformer_variants) + list(module_variants) + list(text_encoder_variants) + list(lora_variants)
if other_files:
other_id = self._make_variant_id(model_type, "other", "other")
variants.append(
{
"id": other_id,
"label": VARIANT_LABEL_OTHER_NON_SHARED,
"type": "variant",
"files": other_files,
"primary_paths": [],
}
)
if shared_files:
shared_id = self._make_variant_id(model_type, "shared", "shared")
variants.append(
{
"id": shared_id,
"label": VARIANT_LABEL_OTHER_SHARED,
"type": "variant",
"files": shared_files,
"primary_paths": [],
}
)
model_files = set()
for variant in variants:
model_files.update(variant.get("files", set()))
missing_files = self._collect_expected_missing_files(model_type)
if missing_files:
missing_id = self._make_variant_id(model_type, "missing", "missing")
variants.append(
{
"id": missing_id,
"label": VARIANT_LABEL_MISSING,
"type": "missing",
"files": set(),
"missing_files": missing_files,
"primary_paths": [],
}
)
return {
"variants": variants,
"primary_paths": model_primary_paths,
"files": model_files,
}
def _collect_transformer_variants(
self, model_type, model_def, default_dtype_policy
):
choice_lists, modules = self._collect_transformer_choice_lists(model_type, model_def)
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
choice_lists, default_dtype_policy
)
variants = []
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
files, primary_paths = self._collect_transformer_files_for_variant(
model_type, model_def, modules, "", dtype_policy
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
label = file_label or label
variant_label = f"Transformer {label}"
variant_id = self._make_variant_id(model_type, "transformer", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": primary_paths,
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
label = f"{quant_label}{label_suffix}"
files, primary_paths = self._collect_transformer_files_for_variant(
model_type,
model_def,
modules,
quant_param,
dtype_policy,
token_match=token,
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
quant_label = file_label or quant_label
if not self._files_match_token(files, token):
continue
label = f"{quant_label}{label_suffix}" if quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Transformer {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "transformer", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": primary_paths,
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_module_variants(self, model_type, model_def, default_dtype_policy):
module_entries = self._expand_modules_with_ids(model_type)
if not module_entries:
return []
variants = []
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
for idx, entry in enumerate(module_entries):
module = entry.get("module")
module_id = entry.get("module_id", "")
module_name = self._get_module_display_name(model_type, module, module_id, idx)
choice_lists = self._collect_module_choice_lists(module)
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
choice_lists, default_dtype_policy
)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
files = self._collect_module_files_for_variant(
model_type,
module,
"",
dtype_policy,
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
label = file_label or label
variant_label = f"Module {module_name} {label}".strip()
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "module", f"{module_name} {label}")
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [],
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
files = self._collect_module_files_for_variant(
model_type,
module,
quant_param,
dtype_policy,
token_match=token,
)
if not files:
continue
if not self._files_match_token(files, token):
continue
file_label = self._detect_quant_label_from_files(files)
effective_quant_label = file_label or quant_label
label = f"{effective_quant_label}{label_suffix}" if effective_quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Module {module_name} {label}".strip()
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "module", f"{module_name} {label}")
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [],
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _get_module_display_name(self, model_type, module, module_id, module_index):
if isinstance(module_id, str) and module_id:
return module_id
if isinstance(module, str):
module_type, _ = self._split_module_ref(module)
return module_type
return model_type
def _collect_module_choice_lists(self, module):
choice_lists = []
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1:
choice_lists.append(urls1)
if urls2:
choice_lists.append(urls2)
elif isinstance(module, list):
if module:
choice_lists.append(module)
else:
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type, "modules", sub_prop_name=sub_prop_name, return_list=True
)
except Exception:
mod_urls = []
if isinstance(mod_urls, list) and mod_urls:
if all(isinstance(item, list) for item in mod_urls):
for entry in mod_urls:
if entry:
choice_lists.append(entry)
else:
choice_lists.append(mod_urls)
return choice_lists
def _collect_module_files_for_variant(
self,
model_type,
module,
quantization,
dtype_policy,
token_match=None,
):
files = set()
is_exotic = quantization not in ("", "int8", "fp8")
if isinstance(module, dict):
for key in ("URLs", "URLs2"):
urls = module.get(key, [])
if not urls:
continue
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls,
)
if filename:
self._add_file(files, filename)
elif isinstance(module, list):
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(module, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=module,
)
if filename:
self._add_file(files, filename)
else:
filename = ""
if token_match and is_exotic:
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type,
"modules",
sub_prop_name=sub_prop_name,
return_list=True,
)
except Exception:
mod_urls = []
filename = self._select_filename_by_token(mod_urls, token_match)
if not filename:
try:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
module_type=module,
)
except Exception:
filename = ""
if filename:
self._add_file(files, filename)
return files
def _collect_transformer_choice_lists(self, model_type, model_def):
choice_lists = []
modules = self._expand_modules(model_type)
urls = self.get_model_recursive_prop(model_type, "URLs", return_list=True)
if urls:
choice_lists.append(urls)
if "URLs2" in model_def:
urls2 = self.get_model_recursive_prop(model_type, "URLs2", return_list=True)
if urls2:
choice_lists.append(urls2)
has_module_source = "module_source" in model_def
has_module_source2 = "module_source2" in model_def
for module in modules:
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1 and not has_module_source:
choice_lists.append(urls1)
if urls2 and not has_module_source2:
choice_lists.append(urls2)
elif isinstance(module, list):
if has_module_source:
continue
if module:
choice_lists.append(module)
else:
if has_module_source:
continue
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type, "modules", sub_prop_name=sub_prop_name, return_list=True
)
except Exception:
mod_urls = []
if isinstance(mod_urls, list) and mod_urls:
if all(isinstance(item, list) for item in mod_urls):
for entry in mod_urls:
if entry:
choice_lists.append(entry)
else:
choice_lists.append(mod_urls)
return choice_lists, modules
def _collect_transformer_files_for_variant(
self,
model_type,
model_def,
modules,
quantization,
dtype_policy,
token_match=None,
):
files = set()
primary_paths = []
model_filename = ""
is_exotic = quantization not in ("", "int8", "fp8")
if token_match and is_exotic:
urls = self.get_model_recursive_prop(model_type, "URLs", return_list=True)
model_filename = self._select_filename_by_token(urls, token_match)
if not model_filename:
model_filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
)
if model_filename:
self._add_file(files, model_filename)
primary_path = self._resolve_path(model_filename)
if primary_path and primary_path in files:
primary_paths.append(primary_path)
if "URLs2" in model_def:
model_filename2 = ""
if token_match and is_exotic:
urls2 = self.get_model_recursive_prop(
model_type, "URLs2", return_list=True
)
model_filename2 = self._select_filename_by_token(urls2, token_match)
if not model_filename2:
model_filename2 = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
submodel_no=2,
)
if model_filename2:
self._add_file(files, model_filename2)
primary_path2 = self._resolve_path(model_filename2)
if primary_path2 and primary_path2 in files:
primary_paths.append(primary_path2)
has_module_source = "module_source" in model_def
has_module_source2 = "module_source2" in model_def
for module in modules:
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1 and not has_module_source:
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls1, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls1,
)
if filename:
self._add_file(files, filename)
if urls2 and not has_module_source2:
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls2, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls2,
)
if filename:
self._add_file(files, filename)
elif isinstance(module, list):
if has_module_source:
continue
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(module, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=module,
)
if filename:
self._add_file(files, filename)
else:
if has_module_source:
continue
if token_match and is_exotic:
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type,
"modules",
sub_prop_name=sub_prop_name,
return_list=True,
)
except Exception:
mod_urls = []
filename = self._select_filename_by_token(mod_urls, token_match)
else:
filename = ""
try:
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
module_type=module,
)
except Exception:
filename = ""
if filename:
self._add_file(files, filename)
return files, primary_paths
def _collect_text_encoder_variants(
self, model_type, model_def, default_dtype_policy
):
variants = []
text_encoder_urls = self.get_model_recursive_prop(
model_type, "text_encoder_URLs", return_list=True
)
text_encoder_folder = model_def.get("text_encoder_folder", None)
used_paths = set()
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
if text_encoder_urls:
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
[text_encoder_urls], default_dtype_policy
)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
filename = self.get_model_filename(
model_type=model_type,
quantization="",
dtype_policy=dtype_policy,
URLs=text_encoder_urls,
)
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
label = f"{quant_label}{label_suffix}"
filename = ""
if quant_param not in ("", "int8", "fp8"):
filename = self._select_filename_by_token(text_encoder_urls, token)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quant_param,
dtype_policy=dtype_policy,
URLs=text_encoder_urls,
)
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
if not self._files_match_token(files, token):
continue
file_label = self._detect_quant_label_from_files(files)
quant_label = file_label or quant_label
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
label = f"{quant_label}{label_suffix}" if quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
else:
handler = self.get_model_handler(model_type)
get_name = getattr(handler, "get_text_encoder_filename", None)
if get_name is not None:
for quant_param in self._text_encoder_quant_candidates():
try:
filename = get_name(quant_param)
except Exception:
filename = None
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
quant_label = self._detect_quant_label_from_files(files)
label = quant_label or self._format_variant_label(
"",
self._detect_dtype_policy_from_name(os.path.basename(filename))
or default_dtype_policy,
base_default,
)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_lora_variants(self, model_type):
lora_dir = self._safe_get_lora_dir(model_type)
if not lora_dir:
return []
loras = self.get_model_recursive_prop(model_type, "loras", return_list=True)
if loras is None:
return []
variants = []
used_paths = set()
used_labels = {}
for entry in self._ensure_list(loras):
if not isinstance(entry, str) or len(entry) == 0:
continue
basename = os.path.basename(entry)
if len(basename) == 0:
continue
lora_path = os.path.join(lora_dir, basename)
files = set()
self._add_file(files, lora_path)
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
stem = os.path.splitext(basename)[0]
base_label = f"Lora {stem}"
label_count = used_labels.get(base_label, 0)
used_labels[base_label] = label_count + 1
variant_label = base_label if label_count == 0 else f"{base_label} #{label_count + 1}"
variant_id = self._make_variant_id(model_type, "lora", basename)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_other_files(
self,
model_type,
model_def,
transformer_files,
text_encoder_files,
lora_files,
):
other_files = set()
shared_files = set()
preload_urls = self.get_model_recursive_prop(
model_type, "preload_URLs", return_list=True
)
for url in self._ensure_list(preload_urls):
self._add_file(other_files, url)
vae_urls = model_def.get("VAE_URLs", [])
for url in self._ensure_list(vae_urls):
self._add_file(shared_files, url)
handler_files = self._collect_handler_files(model_type, model_def)
shared_files.update(handler_files)
other_files.difference_update(transformer_files)
other_files.difference_update(text_encoder_files)
other_files.difference_update(lora_files)
shared_files.difference_update(transformer_files)
shared_files.difference_update(text_encoder_files)
shared_files.difference_update(lora_files)
shared_files.difference_update(other_files)
return other_files, shared_files
def _resolve_default_dtype_policy(self, model_type, model_def):
dtype = model_def.get("dtype")
if isinstance(dtype, str) and dtype.lower() in ("fp16", "bf16"):
return dtype.lower()
policy = self.transformer_dtype_policy
if isinstance(policy, str) and policy:
return policy.lower()
return "bf16"
def _format_dtype_label(self, dtype_policy):
return "FP16" if dtype_policy == "fp16" else "BF16"
def _format_quant_label(self, token):
if not token:
return ""
label = quant_router.get_quantization_label(token)
if label:
return label
return str(token).replace("_", " ").upper()
def _format_variant_label(self, token, dtype_policy, default_label):
if token:
return self._format_quant_label(token)
if dtype_policy:
return self._format_dtype_label(dtype_policy)
return default_label
def _detect_quant_info(self, name):
if not name:
return "", ""
label = quant_router.detect_quantization_label_from_filename(name) or ""
if label:
return self._label_to_token(label), label
kind = quant_router.detect_quantization_kind_for_file(name, verboseLevel=0)
if kind and kind != "none":
kind = str(kind).lower()
return kind, kind.upper()
lower = str(name).lower()
aliases = self._get_quant_aliases()
for token in aliases:
if token in ("bf16", "fp16", "bfloat16", "float16"):
continue
if token and token in lower:
return token, ""
return "", ""
def _label_to_token(self, label):
if not label:
return ""
raw = str(label).strip()
lower = raw.lower()
if lower.startswith("gguf-"):
return lower.split("-", 1)[1]
if lower == "gguf":
return "gguf"
return lower
def _detect_dtype_policy_from_name(self, name):
if not name:
return ""
lower = str(name).lower()
if "fp16" in lower or "float16" in lower:
return "fp16"
if "bf16" in lower or "bfloat16" in lower:
return "bf16"
return ""
def _collect_variant_candidates(self, choice_lists, default_dtype_policy):
base_dtypes = set()
token_dtypes = defaultdict(set)
token_labels = {}
for choice_list in choice_lists:
for entry in self._ensure_list(choice_list):
if not entry:
continue
entry_str = str(entry)
name = os.path.basename(entry_str)
quant_source = self._resolve_path(entry_str) or entry_str
token, label = self._detect_quant_info(quant_source)
dtype_policy = self._detect_dtype_policy_from_name(name) or default_dtype_policy
if token:
token = str(token).lower()
token_dtypes[token].add(dtype_policy)
if label:
token_labels.setdefault(token, label)
else:
base_dtypes.add(dtype_policy)
if not base_dtypes and not token_dtypes:
base_dtypes.add(default_dtype_policy)
return base_dtypes, token_dtypes, token_labels
def _quant_param_from_token(self, token):
if not token:
return ""
lower = str(token).lower()
if "int8" in lower:
return "int8"
if "fp8" in lower or "float8" in lower:
return "fp8"
return lower
def _files_match_token(self, files, token):
if not token:
return True
token = str(token).lower()
for path in files:
base = os.path.basename(path).lower()
if token in base:
return True
label = quant_router.detect_quantization_label_from_filename(path)
if label and self._label_to_token(label) == token:
return True
return False
def _detect_quant_label_from_files(self, files):
for path in files:
label = quant_router.detect_quantization_label_from_filename(path)
if label:
return label
return ""
def _select_filename_by_token(self, urls, token):
if not urls or not token:
return ""
token = str(token).lower()
for entry in self._ensure_list(urls):
if not entry:
continue
if isinstance(entry, list):
for item in entry:
if not item:
continue
base = os.path.basename(str(item)).lower()
if token in base:
return item
label = quant_router.detect_quantization_label_from_filename(item)
if label and self._label_to_token(label) == token:
return item
continue
base = os.path.basename(str(entry)).lower()
if token in base:
return entry
label = quant_router.detect_quantization_label_from_filename(entry)
if label and self._label_to_token(label) == token:
return entry
return ""
def _split_module_ref(self, module_ref, default_sub_prop="_list"):
module_type = module_ref
sub_prop_name = default_sub_prop
if isinstance(module_ref, str) and "#" in module_ref:
pos = module_ref.rfind("#")
module_type = module_ref[:pos]
sub_prop_name = module_ref[pos + 1 :]
return module_type, sub_prop_name
def _resolve_recursive_prop_owner(self, model_type, prop, sub_prop_name, stack):
if not isinstance(model_type, str) or not model_type:
return ""
if model_type in stack or len(stack) > 20:
return model_type
model_def = self.get_model_def(model_type)
if not isinstance(model_def, dict):
return model_type
prop_value = model_def.get(prop, None)
if prop_value is None:
return model_type
if sub_prop_name is not None:
if sub_prop_name == "_list":
if not isinstance(prop_value, list) or len(prop_value) != 1:
return model_type
prop_value = prop_value[0]
else:
if not isinstance(prop_value, dict) or sub_prop_name not in prop_value:
return model_type
prop_value = prop_value[sub_prop_name]
if isinstance(prop_value, str):
next_model_type, next_sub_prop_name = self._split_module_ref(
prop_value, default_sub_prop=sub_prop_name
)
if not next_model_type:
return model_type
return self._resolve_recursive_prop_owner(
next_model_type, prop, next_sub_prop_name, stack + [model_type]
)
return model_type
def _resolve_final_module_id(self, module_ref):
if not isinstance(module_ref, str) or not module_ref:
return ""
module_type, sub_prop_name = self._split_module_ref(module_ref)
resolved_id = self._resolve_recursive_prop_owner(
module_type, "modules", sub_prop_name, []
)
return resolved_id or module_type
def _expand_modules_with_ids(self, model_type):
modules = self.get_model_recursive_prop(model_type, "modules", return_list=True)
expanded_modules = []
modules_count = len(modules)
for index, module in enumerate(modules):
if isinstance(module, str):
module_id = self._resolve_final_module_id(module)
if "#" in module:
expanded_modules.append({"module": module, "module_id": module_id})
continue
try:
expanded = self.get_model_recursive_prop(
module, "modules", sub_prop_name="_list", return_list=True
)
except Exception:
expanded = []
if expanded:
expanded_modules.append({"module": expanded, "module_id": module_id})
else:
expanded_modules.append({"module": module, "module_id": module_id})
else:
fallback_id = model_type if modules_count <= 1 else f"{model_type}#{index + 1}"
expanded_modules.append({"module": module, "module_id": fallback_id})
return expanded_modules
def _expand_modules(self, model_type):
return [entry.get("module") for entry in self._expand_modules_with_ids(model_type)]
def _make_variant_id(self, model_type, kind, label):
slug = "".join(ch.lower() if ch.isalnum() else "_" for ch in label).strip("_")
return f"variant::{model_type}::{kind}::{slug}"
def _dedupe_paths(self, paths):
seen = set()
ordered = []
for path in paths:
if not path or path in seen:
continue
ordered.append(path)
seen.add(path)
return ordered
def _get_quant_aliases(self):
if not hasattr(self, "_quant_alias_cache"):
aliases = set(quant_router.get_available_qtype_aliases() or [])
aliases.update({"int8", "int8n", "fp8"})
self._quant_alias_cache = sorted(
{alias for alias in aliases if alias}, key=len, reverse=True
)
return self._quant_alias_cache
def _text_encoder_quant_candidates(self):
candidates = {""}
aliases = self._get_quant_aliases()
for token in aliases:
if "int8" in token:
candidates.add("int8")
if "fp8" in token or "float8" in token:
candidates.add("fp8")
return sorted(candidates)
def _collect_text_encoder_files(self, model_type, model_def):
files = set()
text_encoder_urls = self.get_model_recursive_prop(
model_type, "text_encoder_URLs", return_list=True
)
text_encoder_folder = model_def.get("text_encoder_folder", None)
text_encoder_filename = None
if text_encoder_urls:
text_encoder_filename = self.get_model_filename(
model_type=model_type,
quantization=self.text_encoder_quantization,
dtype_policy=self.transformer_dtype_policy,
URLs=text_encoder_urls,
)
else:
handler = self.get_model_handler(model_type)
get_name = getattr(handler, "get_text_encoder_filename", None)
if get_name is not None:
try:
text_encoder_filename = get_name(self.text_encoder_quantization)
except Exception:
text_encoder_filename = None
if text_encoder_filename:
self._add_file(files, text_encoder_filename, force_folder=text_encoder_folder)
return files
def _collect_handler_file_paths(self, model_type, model_def):
handler = self.get_model_handler(model_type)
query = getattr(handler, "query_model_files", None)
if query is None:
return set()
base_model_type = self.get_base_model_type(model_type)
try:
download_defs = query(
self._compute_list,
base_model_type,
model_def,
)
except Exception as exc:
self._errors.append(f"{model_type}: {exc}")
return set()
if not download_defs:
return set()
if isinstance(download_defs, dict):
download_defs = [download_defs]
files = set()
for download_def in download_defs:
source_folders = download_def.get("sourceFolderList", [])
file_lists = download_def.get("fileList", [])
target_folders = download_def.get("targetFolderList")
if target_folders is None:
target_folders = [None] * len(source_folders)
for source_folder, file_list, target_folder in zip(
source_folders, file_lists, target_folders
):
if target_folder == "":
target_folder = None
if source_folder is None:
source_folder = ""
if not file_list:
continue
for filename in file_list:
if not filename:
continue
rel_path = self._combine_download_path(
target_folder, source_folder, filename
)
files.add(self._resolve_download_relpath(rel_path))
return {path for path in files if path}
def _collect_handler_files(self, model_type, model_def):
files = self._collect_handler_file_paths(model_type, model_def)
return {path for path in files if os.path.isfile(path)}
def _build_tree_structure(self, model_types):
families = defaultdict(list)
for model_type in model_types:
family = self.get_model_family(model_type, for_ui=True) or "unknown"
families[family].append(model_type)
family_order = sorted(
families.keys(),
key=lambda family: self.families_infos.get(family, (999, family))[0],
)
for family in family_order:
family_label = self.families_infos.get(family, (999, family))[1]
rows = []
for model_type in families[family]:
model_name = self.compact_name(
family_label, self.get_model_name(model_type)
)
parent_id = self.get_parent_model_type(model_type)
rows.append((model_name, model_type, parent_id))
rows.sort(key=lambda row: row[0])
parents_list, children_dict = self.create_models_hierarchy(rows)
family_node_id = f"family::{family}"
family_models = set(families[family])
family_usage_ids = self._collect_usage_ids_for_models(family_models)
family_files = self._collect_files_for_usage_ids(family_usage_ids)
self._register_node(
family_node_id,
family_label,
"family",
family_models,
family_files,
path_label=family_label,
)
base_nodes = []
for base_label, base_id in parents_list:
base_status = self._model_aggregated_status_map.get(
base_id, model_dropdowns.MODEL_FILE_STATUS_MISSING
)
base_label_decorated = self._decorate_status_label(base_label, base_status)
child_entries = children_dict.get(base_id, [])
child_nodes = []
base_models = set()
base_usage_ids = set()
base_path = f"{family_label} / {base_label}"
for child_label, child_model in child_entries:
child_status = self._model_direct_status_map.get(
child_model, model_dropdowns.MODEL_FILE_STATUS_MISSING
)
child_label_decorated = self._decorate_status_label(child_label, child_status)
base_models.add(child_model)
model_usage_ids = self._model_usage_ids.get(child_model, set())
base_usage_ids.update(model_usage_ids)
self._model_base_label[child_model] = base_label
self._model_family_label[child_model] = family_label
model_node_id = f"model::{child_model}"
model_files = self._collect_files_for_usage_ids(model_usage_ids)
primary_paths = self._model_primary.get(child_model, [])
model_path = f"{base_path} / {child_label}"
self._model_display_label[child_model] = model_path
self._register_node(
model_node_id,
child_label_decorated,
"model",
{child_model},
model_files,
primary_paths=primary_paths,
path_label=model_path,
)
variant_children = []
for variant in self._ordered_variants_for_display(child_model):
variant_id = variant["id"]
variant_label = variant["label"]
variant_type = variant.get("type", "variant")
variant_path = f"{model_path} / {variant_label}"
self._register_node(
variant_id,
variant_label,
variant_type,
{child_model},
variant.get("files", set()),
missing_files=variant.get("missing_files", set()),
primary_paths=variant.get("primary_paths", []),
path_label=variant_path,
)
variant_children.append(
{"id": variant_id, "label": variant_label, "type": variant_type}
)
child_nodes.append(
{
"id": model_node_id,
"label": child_label_decorated,
"type": "model",
"children": variant_children,
}
)
base_node_id = f"base::{family}::{base_id}"
base_files = self._collect_files_for_usage_ids(base_usage_ids)
self._register_node(
base_node_id,
base_label_decorated,
"base",
base_models,
base_files,
path_label=base_path,
)
base_nodes.append(
{
"id": base_node_id,
"label": base_label_decorated,
"type": "base",
"children": child_nodes,
}
)
self._tree_data.append(
{
"id": family_node_id,
"label": family_label,
"type": "family",
"children": base_nodes,
}
)
def _collect_usage_ids_for_models(self, model_set):
usage_ids = set()
for model_type in model_set:
usage_ids.update(self._model_usage_ids.get(model_type, set()))
return usage_ids
def _collect_files_for_usage_ids(self, usage_ids):
files = set()
for usage_id in usage_ids:
files.update(self._usage_files.get(usage_id, set()))
return files
def _ordered_variants_for_display(self, model_type):
variants = list(self._model_variants.get(model_type, []))
if not variants:
return []
non_missing = [variant for variant in variants if variant.get("type") != "missing"]
missing = [variant for variant in variants if variant.get("type") == "missing"]
return non_missing + missing
def _register_node(
self,
node_id,
label,
node_type,
model_set,
files_set,
missing_files=None,
primary_paths=None,
path_label=None,
):
files_set = set(files_set or set())
missing_files = set(missing_files or set())
missing_files.difference_update(files_set)
missing_files = {path for path in missing_files if path and not os.path.isfile(path)}
unique_files, shared_files = self._split_files_by_shared(files_set, model_set)
unique_size = self._sum_sizes(unique_files)
shared_size = self._sum_sizes(shared_files)
self._node_map[node_id] = {
"id": node_id,
"label": label,
"type": node_type,
"models": set(model_set),
"files": set(files_set),
"unique_files": unique_files,
"shared_files": shared_files,
"unique_size": unique_size,
"shared_size": shared_size,
"total_size": unique_size + shared_size,
"missing_files": missing_files,
"missing_count": len(missing_files),
"primary_paths": primary_paths or [],
"path_label": path_label or label,
}
def _sum_sizes(self, files_set):
total = 0
for path in files_set:
total += self._file_info.get(path, {}).get("size", 0)
return total
def _split_files_by_shared(self, files_set, model_set):
unique_files = set()
shared_files = set()
for path in files_set:
used_by = self._file_usage_models.get(path, set())
if used_by.issubset(model_set):
unique_files.add(path)
else:
shared_files.add(path)
return unique_files, shared_files
def _rebalance_other_shared_variants(self, model_types):
for model_type in model_types:
variants = self._model_variants.get(model_type, [])
if not variants:
continue
other_variant = None
shared_variant = None
for variant in variants:
variant_id = str(variant.get("id", ""))
parts = variant_id.split("::")
variant_kind = parts[2] if len(parts) >= 4 else ""
label = str(variant.get("label", "")).lower()
if variant_kind == "other" or label in ("other", "other non shared"):
other_variant = variant
elif variant_kind == "shared" or label in ("shared", "other shared"):
shared_variant = variant
if not other_variant and not shared_variant:
continue
combined = set()
if other_variant:
combined.update(other_variant.get("files", set()))
if shared_variant:
combined.update(shared_variant.get("files", set()))
new_variants = []
for variant in variants:
if variant is other_variant or variant is shared_variant:
continue
new_variants.append(variant)
if not combined:
self._model_variants[model_type] = new_variants
continue
model_set = {model_type}
shared_files = {
path
for path in combined
if not self._file_usage_models.get(path, set()).issubset(model_set)
}
other_files = combined - shared_files
if other_files:
if other_variant is None:
other_variant = {
"id": self._make_variant_id(model_type, "other", "other"),
"label": VARIANT_LABEL_OTHER_NON_SHARED,
"type": "variant",
"files": other_files,
"primary_paths": [],
}
else:
other_variant["files"] = other_files
new_variants.append(other_variant)
if shared_files:
if shared_variant is None:
shared_variant = {
"id": self._make_variant_id(model_type, "shared", "shared"),
"label": VARIANT_LABEL_OTHER_SHARED,
"type": "variant",
"files": shared_files,
"primary_paths": [],
}
else:
shared_variant["files"] = shared_files
new_variants.append(shared_variant)
self._model_variants[model_type] = new_variants
def _get_model_shared_label(self, model_type):
model_name = self.get_model_name(model_type)
if model_name:
return model_name
label = self._model_display_label.get(model_type)
if label:
return label
return model_type
def _shared_model_labels(self, path, exclude_models=None):
if exclude_models is None:
exclude_models = set()
labels = set()
for model_type in self._file_usage_models.get(path, set()):
if model_type in exclude_models:
continue
labels.add(self._get_model_shared_label(model_type))
return sorted(labels, key=lambda value: value.lower())
def _build_tree_html(self):
self._last_rendered_nodes_count = 0
css = """
"""
tree_parts = [
css,
"
",
]
if self._errors:
errors_html = " ".join(html.escape(err) for err in self._errors[:10])
tree_parts.append(f"
{errors_html}
")
tree_parts.append("
")
tree_parts.append(self._build_stats_html())
rendered_nodes_html = []
for family in self._tree_data:
rendered_nodes_html.append(self._render_node(family, is_family=True))
tree_parts.append("
"
)
def _split_display_path(self, display_path):
drive, rest = os.path.splitdrive(display_path)
if drive:
drive = drive.rstrip("\\/")
rest = rest.replace("\\", "/")
parts = [part for part in rest.split("/") if part]
if drive:
return [drive] + parts
if display_path.startswith("\\\\"):
return ["UNC"] + parts
return parts
def _add_file(self, files_set, value, force_folder=None):
path = self._resolve_path(value, force_folder=force_folder)
if path and os.path.isfile(path):
files_set.add(path)
def _download_root_name(self):
root = fl.get_download_location()
if not root or os.path.isabs(root):
return ""
norm = os.path.normpath(root)
if not norm or norm == ".":
return ""
return norm.split(os.sep)[0]
def _resolve_repo_relative_path(self, rel_path):
if not rel_path:
return None
rel_path = str(rel_path)
if rel_path.startswith("http"):
return None
root_name = self._download_root_name()
if not root_name:
return None
rel_norm = os.path.normpath(rel_path)
parts = rel_norm.split(os.sep)
if parts and parts[0].lower() == root_name.lower():
return self._normalize_path(os.path.join(self._repo_root, rel_norm))
return None
def _resolve_path(self, value, force_folder=None):
if not value:
return None
if not isinstance(value, str):
value = str(value)
if os.path.isabs(value):
abs_path = self._normalize_path(value)
if os.path.isfile(abs_path):
return abs_path
basename = os.path.basename(abs_path)
return abs_path
repo_rel = self._resolve_repo_relative_path(value)
if repo_rel:
return repo_rel
try:
local_path = self.get_local_model_filename(value, extra_paths=force_folder)
except Exception:
local_path = None
if local_path:
return self._normalize_path(local_path)
filename = os.path.basename(value) if value.startswith("http") else value
if force_folder:
expected = fl.get_download_location(filename, force_path=force_folder)
else:
expected = fl.get_download_location(filename)
return self._normalize_path(expected)
def _resolve_download_relpath(self, rel_path):
if rel_path and os.path.isabs(rel_path):
return self._normalize_path(rel_path)
repo_rel = self._resolve_repo_relative_path(rel_path)
if repo_rel:
return repo_rel
local = fl.locate_file(rel_path, error_if_none=False)
if local:
return self._normalize_path(local)
return self._normalize_path(fl.get_download_location(rel_path))
def _resolve_folder_path(self, source_folder, target_folder):
parts = []
if target_folder:
parts.append(target_folder)
if source_folder:
parts.append(source_folder)
rel_path = os.path.join(*parts) if parts else ""
if rel_path and os.path.isabs(rel_path):
return self._normalize_path(rel_path)
repo_rel = self._resolve_repo_relative_path(rel_path)
if repo_rel:
return repo_rel
if not rel_path:
return self._normalize_path(fl.get_download_location())
folder = fl.locate_folder(rel_path, error_if_none=False)
if folder:
return self._normalize_path(folder)
return self._normalize_path(fl.get_download_location(rel_path))
def _list_folder_files(self, folder_path):
files = set()
if not folder_path or not os.path.isdir(folder_path):
return files
for root, _, filenames in os.walk(folder_path):
for name in filenames:
files.add(self._normalize_path(os.path.join(root, name)))
return files
def _combine_download_path(self, target_folder, source_folder, filename):
parts = []
if target_folder:
parts.append(target_folder)
if source_folder:
parts.append(source_folder)
parts.append(filename)
return os.path.join(*parts)
def _compute_list(self, filename):
if not filename:
return []
return [os.path.basename(str(filename))]
def _ensure_list(self, value):
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def _safe_get_lora_dir(self, model_type):
try:
return self.get_lora_dir(model_type)
except Exception:
return None
def _normalize_path(self, path):
return os.path.normcase(os.path.abspath(path))
def _format_gb(self, size_bytes):
size_gb = size_bytes / (1024 ** 3)
return f"{size_gb:.2f} GB"
def _format_size(self, size_bytes):
if size_bytes >= 1024 ** 3:
return f"{size_bytes / (1024 ** 3):.2f} GB"
if size_bytes >= 1024 ** 2:
return f"{size_bytes / (1024 ** 2):.1f} MB"
if size_bytes >= 1024:
return f"{size_bytes / 1024:.1f} KB"
return f"{size_bytes} B"
def _display_path(self, path):
try:
rel = os.path.relpath(path, self._repo_root)
if not rel.startswith(".."):
return rel
except Exception:
pass
return path
def _get_model_roots(self):
roots = []
for root in getattr(fl, "_models_paths", []) or []:
if not root:
continue
if os.path.isabs(root):
abs_root = root
else:
abs_root = os.path.join(self._repo_root, root)
abs_root = self._normalize_path(abs_root)
if abs_root not in roots:
roots.append(abs_root)
return roots
def _build_basename_index(self, basenames, min_size=1):
basenames = {name.lower() for name in basenames if name}
if not basenames:
return {}
found = {}
remaining = set(basenames)
for root in self._get_model_roots():
if not remaining:
break
if not os.path.isdir(root):
continue
for name in list(remaining):
candidate = os.path.join(root, name)
if os.path.isfile(candidate):
try:
if os.path.getsize(candidate) < min_size:
continue
except OSError:
continue
found[name] = self._normalize_path(candidate)
remaining.discard(name)
if not remaining:
break
if os.path.normcase(root) == os.path.normcase(self._repo_root):
continue
for dirpath, _, filenames in os.walk(root):
for filename in filenames:
key = filename.lower()
if key in remaining:
full_path = os.path.join(dirpath, filename)
try:
if os.path.getsize(full_path) < min_size:
continue
except OSError:
continue
found[key] = self._normalize_path(full_path)
remaining.discard(key)
if not remaining:
break
if not remaining:
break
return found
def _find_in_model_roots(self, basename, min_size=1):
if not basename:
return None
key = basename.lower()
if key in self._basename_index:
return self._basename_index.get(key)
found = self._build_basename_index({basename}, min_size=min_size)
if found:
self._basename_index.update(found)
return self._basename_index.get(key)
self._basename_index[key] = None
return None
def _repair_missing_paths(self):
missing = []
for path in self._file_usage_models.keys():
try:
size = os.path.getsize(path) if os.path.isfile(path) else -1
except OSError:
size = -1
if size <= 0:
missing.append(path)
if not missing:
return
basenames = {os.path.basename(path) for path in missing}
found = self._build_basename_index(basenames, min_size=1)
if not found:
return
for old_path in list(missing):
key = os.path.basename(old_path).lower()
new_path = found.get(key)
if not new_path or new_path == old_path:
continue
self._swap_path(old_path, new_path)
def _swap_path(self, old_path, new_path):
usage_ids = self._file_usage.pop(old_path, set())
if usage_ids:
self._file_usage[new_path].update(usage_ids)
for usage_id in usage_ids:
files = self._usage_files.get(usage_id)
if files is not None:
files.discard(old_path)
files.add(new_path)
model_type = self._variant_to_model.get(usage_id)
if model_type:
for variant in self._model_variants.get(model_type, []):
if variant.get("id") != usage_id:
continue
files_set = variant.get("files")
if isinstance(files_set, set):
files_set.discard(old_path)
files_set.add(new_path)
primary_paths = variant.get("primary_paths", [])
variant["primary_paths"] = [
new_path if path == old_path else path for path in primary_paths
]
break
model_types = self._file_usage_models.pop(old_path, set())
if model_types:
self._file_usage_models[new_path].update(model_types)
for model_type in model_types:
files = self._model_files.get(model_type)
if files is not None:
files.discard(old_path)
files.add(new_path)
primary_paths = self._model_primary.get(model_type)
if primary_paths:
self._model_primary[model_type] = [
new_path if path == old_path else path for path in primary_paths
]