1ripon1's picture
Upload folder using huggingface_hub
7344bef verified
Raw
History Blame Contribute Delete
140 kB
import gradio as gr
import os
import json
import html
from collections import defaultdict
from shared.utils.plugins import WAN2GPPlugin
from shared.utils import files_locator as fl
from shared import model_dropdowns
from mmgp import quant_router
VARIANT_LABEL_OTHER_NON_SHARED = "Other Non Shared"
VARIANT_LABEL_OTHER_SHARED = "Other Shared"
VARIANT_LABEL_MISSING = "Missing"
GLOBAL_SHARED_MODEL_TYPE = "__wgp_global_shared__"
GLOBAL_SHARED_LABEL = "WanGP global shared assets"
class modelsManagerPlugin(WAN2GPPlugin):
def __init__(self):
super().__init__()
self._node_map = {}
self._tree_data = []
self._model_files = {}
self._model_primary = {}
self._model_variants = {}
self._variant_to_model = {}
self._model_base_label = {}
self._model_family_label = {}
self._model_display_label = {}
self._model_direct_status_map = {}
self._model_aggregated_status_map = {}
self._model_usage_ids = {}
self._usage_files = {}
self._file_usage = {}
self._file_usage_models = {}
self._file_info = {}
self._basename_index = {}
self._stats_total = 0
self._stats_by_drive = []
self._errors = []
self._repo_root = os.path.abspath(os.getcwd())
self._expanded_nodes = set()
self._current_tree_mode = "full"
self._current_filter_text = ""
self._last_rendered_nodes_count = 0
def setup_ui(self):
self.request_global("get_model_def")
self.request_global("get_model_name")
self.request_global("get_model_family")
self.request_global("get_parent_model_type")
self.request_global("get_base_model_type")
self.request_global("get_model_handler")
self.request_global("get_model_recursive_prop")
self.request_global("get_model_filename")
self.request_global("get_lora_dir")
self.request_global("compact_name")
self.request_global("create_models_hierarchy")
self.request_global("families_infos")
self.request_global("models_def")
self.request_global("server_config")
self.request_global("transformer_quantization")
self.request_global("transformer_dtype_policy")
self.request_global("text_encoder_quantization")
self.request_global("displayed_model_types")
self.request_global("transformer_types")
self.request_global("get_transformer_dtype")
self.request_global("query_global_shared_model_files")
self.add_custom_js(self._get_js())
self.add_tab(
tab_id="model_manager",
label="Models Manager",
component_constructor=self.create_model_ui,
)
def create_model_ui(self):
with gr.Column():
gr.Markdown("### Models Manager")
gr.Markdown(
"Sizes are unique GB recoverable on delete, plus shared GB used outside the branch."
)
with gr.Row():
self.filter_input = gr.Textbox(
label="Filter Finetunes",
show_label=False,
placeholder="Type at least 3 characters from a model name",
lines=1,
scale=5,
visible=False,
elem_id="ckpt_filter_input",
)
self.filter_button = gr.Button("Filter", variant="primary", scale=1, visible=False, elem_id="ckpt_filter_btn", elem_classes="btn_centered")
self.refresh_button = gr.Button(
"Refresh", variant="secondary", elem_id="ckpt_refresh_btn", scale=1, elem_classes="btn_centered"
)
with gr.Row():
with gr.Column(scale=3, min_width=550):
self.tree_html = gr.HTML()
with gr.Column(scale=2, min_width=350, elem_classes="ckpt-right-column"):
self.view_html = gr.HTML()
with gr.Column(visible=False):
self.action_input = gr.Textbox(elem_id="model_action_input")
self.refresh_button.click(
fn=self._show_loading,
inputs=[],
outputs=[self.tree_html, self.view_html, self.refresh_button, self.filter_input, self.filter_button],
show_progress="hidden",
).then(
fn=self._build_tree,
inputs=[],
outputs=[self.tree_html, self.view_html, self.refresh_button, self.filter_input, self.filter_button],
show_progress="full",
).then(
fn=lambda: gr.update(value=""),
inputs=[],
outputs=[self.filter_input],
show_progress="hidden",
)
self.filter_button.click(
fn=self._build_filtered_tree,
inputs=[self.filter_input],
outputs=[self.tree_html, self.view_html, self.refresh_button, self.filter_input, self.filter_button],
show_progress="full",
)
self.action_input.change(
fn=self._handle_action,
inputs=[self.action_input],
outputs=[self.tree_html, self.view_html, self.refresh_button, self.filter_input, self.filter_button],
show_progress="hidden",
)
self.on_tab_outputs = [self.tree_html, self.view_html, self.refresh_button, self.filter_input, self.filter_button]
return self.tree_html
def on_tab_select(self, state: dict):
return self._show_loading()
def _get_js(self):
return """
function modelRoot() {
if (window.gradioApp) {
return window.gradioApp();
}
const app = document.querySelector('gradio-app');
return app ? (app.shadowRoot || app) : document;
}
window.updatemodelInput = function(elem_id, value) {
const root = modelRoot();
const input = root.querySelector(`#${elem_id} textarea, #${elem_id} input`);
if (input) {
input.value = value;
input.dispatchEvent(new Event('input', { bubbles: true }));
return true;
}
return false;
};
function ensuremodelModal() {
let backdrop = document.getElementById('ckpt-delete-modal');
if (backdrop) return backdrop;
backdrop = document.createElement('div');
backdrop.id = 'ckpt-delete-modal';
backdrop.className = 'ckpt-modal-backdrop';
backdrop.innerHTML = `
<div class="ckpt-modal">
<div class="ckpt-modal-title">Confirm Delete</div>
<div class="ckpt-modal-body" id="ckpt-delete-modal-body"></div>
<div class="ckpt-modal-actions">
<button class="ckpt-modal-btn" id="ckpt-delete-cancel">Cancel</button>
<button class="ckpt-modal-btn danger" id="ckpt-delete-unique">Delete Unique</button>
<button class="ckpt-modal-btn danger" id="ckpt-delete-all">Delete Unique + Shared</button>
</div>
</div>
`;
document.body.appendChild(backdrop);
const cancelBtn = backdrop.querySelector('#ckpt-delete-cancel');
const uniqueBtn = backdrop.querySelector('#ckpt-delete-unique');
const allBtn = backdrop.querySelector('#ckpt-delete-all');
cancelBtn.addEventListener('click', () => {
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
uniqueBtn.addEventListener('click', () => {
const payload = backdrop.dataset.payloadUnique || '';
if (payload) {
window.updatemodelInput("model_action_input", payload);
}
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
allBtn.addEventListener('click', () => {
const payload = backdrop.dataset.payloadAll || '';
if (payload) {
window.updatemodelInput("model_action_input", payload);
}
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
});
backdrop.addEventListener('click', (evt) => {
if (evt.target === backdrop) {
backdrop.style.display = 'none';
backdrop.dataset.payloadUnique = '';
backdrop.dataset.payloadAll = '';
}
});
return backdrop;
}
function showmodelModal(message, payloadUnique, payloadAll, uniqueSize, totalSize) {
const backdrop = ensuremodelModal();
const body = backdrop.querySelector('#ckpt-delete-modal-body');
const uniqueBtn = backdrop.querySelector('#ckpt-delete-unique');
const allBtn = backdrop.querySelector('#ckpt-delete-all');
body.textContent = message;
backdrop.dataset.payloadUnique = payloadUnique || '';
backdrop.dataset.payloadAll = payloadAll || '';
uniqueBtn.textContent = uniqueSize ? `Delete Unique (${uniqueSize})` : 'Delete Unique';
allBtn.textContent = totalSize ? `Delete Unique + Shared (${totalSize})` : 'Delete Unique + Shared';
backdrop.style.display = 'flex';
}
window.handlemodelAction = function(button, action) {
if (!button) return;
const nodeId = button.dataset.nodeId;
const nodeLabel = button.dataset.nodeLabel || nodeId;
const uniqueSize = button.dataset.uniqueSize || "0 B";
const totalSize = button.dataset.totalSize || uniqueSize;
if (!nodeId) return;
if (action === "delete") {
const payloadUnique = JSON.stringify({
action: action,
node_id: nodeId,
delete_shared: false,
ts: Date.now()
});
const payloadAll = JSON.stringify({
action: action,
node_id: nodeId,
delete_shared: true,
ts: Date.now()
});
showmodelModal(
`Choose delete scope for "${nodeLabel}": unique only ${uniqueSize}, or unique + shared ${totalSize}.`,
payloadUnique,
payloadAll,
uniqueSize,
totalSize
);
return;
}
const payload = JSON.stringify({
action: action,
node_id: nodeId,
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
window.handlemodelRowClick = function(evt, row) {
if (evt) {
evt.stopPropagation();
evt.preventDefault();
}
if (!row) return;
const nodeId = row.dataset.nodeId;
if (!nodeId) return;
const payload = JSON.stringify({
action: "view",
node_id: nodeId,
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
window.handlemodelSummaryClick = function(evt, summary) {
if (!summary) return;
if (evt) {
evt.stopPropagation();
evt.preventDefault();
}
const details = summary.closest('details');
if (!details) return;
const nodeId = details.dataset.nodeId;
if (!nodeId) return;
const row = summary.querySelector('.ckpt-row');
const nodeType = row ? (row.dataset.nodeType || '') : '';
const payload = JSON.stringify({
action: "toggle_node",
node_id: nodeId,
open: !details.open,
view: nodeType === "model",
ts: Date.now()
});
window.updatemodelInput("model_action_input", payload);
};
function ismodelTabActive() {
const root = modelRoot();
const tabs = root.querySelectorAll('button[role="tab"]');
for (const tab of tabs) {
if (tab.getAttribute('aria-selected') === 'true') {
const label = (tab.textContent || '').trim();
if (label === 'Models Manager') return true;
}
}
return false;
}
function triggermodelRefresh() {
const root = modelRoot();
const btn = root.querySelector('#ckpt_refresh_btn button') || root.querySelector('#ckpt_refresh_btn');
if (btn) btn.click();
}
function triggermodelFilter() {
const root = modelRoot();
const btn = root.querySelector('#ckpt_filter_btn button') || root.querySelector('#ckpt_filter_btn');
if (btn) btn.click();
}
function setupmodelFilterEnter() {
const root = modelRoot();
const input = root.querySelector('#ckpt_filter_input textarea, #ckpt_filter_input input');
if (!input || input.dataset.ckptEnterBound === '1') return;
input.dataset.ckptEnterBound = '1';
input.addEventListener('keydown', (evt) => {
if (evt.key !== 'Enter') return;
evt.preventDefault();
evt.stopPropagation();
triggermodelFilter();
});
}
function applymodelSticky() {
const root = modelRoot();
const sticky = root.querySelector('.ckpt-view-sticky');
if (!sticky) return;
sticky.style.position = 'sticky';
sticky.style.top = '16px';
sticky.style.alignSelf = 'flex-start';
let parent = sticky.parentElement;
let steps = 0;
while (parent && steps < 6) {
const style = window.getComputedStyle(parent);
if (style.overflow && style.overflow !== 'visible') {
parent.style.overflow = 'visible';
}
parent = parent.parentElement;
steps += 1;
}
if (root && root.host && root.host.style) {
const hostStyle = window.getComputedStyle(root.host);
if (hostStyle.overflow && hostStyle.overflow !== 'visible') {
root.host.style.overflow = 'visible';
}
}
}
let ckptFloating = {
sticky: null,
scrollEl: null,
handler: null,
resizeHandler: null,
anchorTop: 0,
topPadding: 16,
};
function findmodelScrollContainer(element) {
let current = element;
while (current) {
const style = window.getComputedStyle(current);
const overflowY = style.overflowY;
if ((overflowY === 'auto' || overflowY === 'scroll') && current.scrollHeight > current.clientHeight + 1) {
return current;
}
current = current.parentElement;
}
const root = modelRoot();
const host = root && root.host ? root.host : null;
current = host;
while (current) {
const style = window.getComputedStyle(current);
const overflowY = style.overflowY;
if ((overflowY === 'auto' || overflowY === 'scroll') && current.scrollHeight > current.clientHeight + 1) {
return current;
}
current = current.parentElement;
}
return window;
}
function computemodelAnchorTop(sticky, scrollEl) {
if (scrollEl === window) {
return sticky.getBoundingClientRect().top + window.scrollY;
}
const rect = sticky.getBoundingClientRect();
const containerRect = scrollEl.getBoundingClientRect();
return rect.top - containerRect.top + scrollEl.scrollTop;
}
function clearmodelFloating(sticky) {
if (!sticky) return;
sticky.style.transform = '';
sticky.style.willChange = '';
}
function updatemodelFloating() {
const sticky = ckptFloating.sticky;
if (!sticky) return;
const scrollEl = ckptFloating.scrollEl || window;
const scrollTop = scrollEl === window ? window.scrollY : scrollEl.scrollTop;
let maxTranslate = Infinity;
if (scrollEl === window) {
const doc = document.documentElement;
maxTranslate = Math.max(
0,
(doc ? doc.scrollHeight : 0) - sticky.offsetHeight - ckptFloating.topPadding - ckptFloating.anchorTop
);
} else {
maxTranslate = Math.max(
0,
scrollEl.scrollHeight - sticky.offsetHeight - ckptFloating.topPadding - ckptFloating.anchorTop
);
}
const offset = scrollTop + ckptFloating.topPadding - ckptFloating.anchorTop;
const translate = Math.max(0, Math.min(offset, maxTranslate));
if (translate > 0) {
sticky.style.transform = `translateY(${translate}px)`;
sticky.style.willChange = 'transform';
} else {
clearmodelFloating(sticky);
}
}
function teardownmodelFloating() {
if (ckptFloating.scrollEl && ckptFloating.handler) {
if (ckptFloating.scrollEl === window) {
window.removeEventListener('scroll', ckptFloating.handler);
} else {
ckptFloating.scrollEl.removeEventListener('scroll', ckptFloating.handler);
}
}
if (ckptFloating.resizeHandler) {
window.removeEventListener('resize', ckptFloating.resizeHandler);
}
clearmodelFloating(ckptFloating.sticky);
if (ckptFloating.sticky) {
ckptFloating.sticky.style.position = '';
ckptFloating.sticky.style.top = '';
}
ckptFloating.sticky = null;
ckptFloating.scrollEl = null;
ckptFloating.handler = null;
ckptFloating.resizeHandler = null;
ckptFloating.anchorTop = 0;
}
function setupmodelFloating() {
const root = modelRoot();
const sticky = root.querySelector('.ckpt-view-sticky');
if (!sticky) return;
if (ckptFloating.sticky === sticky) {
updatemodelFloating();
return;
}
teardownmodelFloating();
ckptFloating.sticky = sticky;
sticky.style.position = 'relative';
sticky.style.top = '0px';
ckptFloating.scrollEl = findmodelScrollContainer(sticky);
ckptFloating.anchorTop = computemodelAnchorTop(sticky, ckptFloating.scrollEl);
ckptFloating.handler = () => updatemodelFloating();
if (ckptFloating.scrollEl === window) {
window.addEventListener('scroll', ckptFloating.handler, { passive: true });
} else {
ckptFloating.scrollEl.addEventListener('scroll', ckptFloating.handler, { passive: true });
}
ckptFloating.resizeHandler = () => {
ckptFloating.anchorTop = computemodelAnchorTop(sticky, ckptFloating.scrollEl);
updatemodelFloating();
};
window.addEventListener('resize', ckptFloating.resizeHandler);
updatemodelFloating();
}
function observemodelView() {
const root = modelRoot();
const column = root.querySelector('.ckpt-right-column');
if (!column || column.dataset.ckptObserved === '1') return;
column.dataset.ckptObserved = '1';
const observer = new MutationObserver(() => {
applymodelSticky();
setupmodelFloating();
setupmodelFilterEnter();
});
observer.observe(column, { childList: true, subtree: true });
}
let ckptTabWasActive = false;
let ckptRefreshBusy = false;
let ckptRefreshStartedAt = 0;
function waitForRefreshReady() {
const root = modelRoot();
const treeRoot = root.querySelector('#ckpt_tree_root');
if (treeRoot && treeRoot.dataset.status === 'ready') {
ckptRefreshBusy = false;
applymodelSticky();
setupmodelFloating();
setupmodelFilterEnter();
return;
}
if (ckptRefreshStartedAt && Date.now() - ckptRefreshStartedAt > 60000) {
ckptRefreshBusy = false;
applymodelSticky();
setupmodelFloating();
setupmodelFilterEnter();
return;
}
setTimeout(waitForRefreshReady, 400);
}
function startmodelBuild() {
if (ckptRefreshBusy) return;
ckptRefreshBusy = true;
ckptRefreshStartedAt = Date.now();
triggermodelRefresh();
setTimeout(waitForRefreshReady, 200);
}
function handleTabChange() {
const active = ismodelTabActive();
if (active && !ckptTabWasActive) {
startmodelBuild();
}
if (active) {
observemodelView();
setTimeout(applymodelSticky, 200);
setTimeout(setupmodelFloating, 260);
setTimeout(setupmodelFilterEnter, 280);
}
ckptTabWasActive = active;
}
function initmodelBridge() {
const root = modelRoot();
const tabs = root.querySelectorAll('button[role="tab"]');
if (!tabs || tabs.length === 0) {
setTimeout(initmodelBridge, 400);
return;
}
const tabObserver = new MutationObserver(handleTabChange);
tabObserver.observe(root, { attributes: true, subtree: true, attributeFilter: ['aria-selected', 'class'] });
handleTabChange();
}
initmodelBridge();
"""
def _show_loading(self):
tree_loading = self._build_loading_html()
view_hidden = gr.update(value="", visible=False)
refresh_hidden = gr.update(visible=False)
filter_input_hidden = gr.update(visible=False)
filter_button_hidden = gr.update(visible=False)
return tree_loading, view_hidden, refresh_hidden, filter_input_hidden, filter_button_hidden
def _build_tree(self):
self._build_cache()
self._expanded_nodes = set()
self._current_tree_mode = "full"
self._current_filter_text = ""
tree_html = self._build_tree_html()
view_html = self._build_empty_view_html()
view_visible = gr.update(value=view_html, visible=True)
refresh_visible = gr.update(visible=True)
filter_input_visible = gr.update(visible=True)
filter_button_visible = gr.update(visible=True)
return tree_html, view_visible, refresh_visible, filter_input_visible, filter_button_visible
def _build_filtered_tree(self, filter_text):
text = str(filter_text or "").strip()
if len(text) < 3:
gr.Info("Please enter at least 3 characters to filter finetunes.")
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
self._build_cache()
self._expanded_nodes = set()
self._current_tree_mode = "filtered"
self._current_filter_text = text
tree_html = self._build_filtered_tree_html(text)
view_html = self._build_empty_view_html()
view_visible = gr.update(value=view_html, visible=True)
refresh_visible = gr.update(visible=True)
filter_input_visible = gr.update(visible=True)
filter_button_visible = gr.update(visible=True)
return tree_html, view_visible, refresh_visible, filter_input_visible, filter_button_visible
def _handle_action(self, payload: str):
if not payload:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
try:
data = json.loads(payload)
except json.JSONDecodeError:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
action = data.get("action")
node_id = data.get("node_id")
if not action or not node_id:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
if not self._node_map:
self._build_cache()
if action == "view":
view_html = self._build_view_html(node_id)
return gr.update(), gr.update(value=view_html, visible=True), gr.update(), gr.update(), gr.update()
if action == "toggle_node":
if node_id not in self._node_map:
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
if data.get("open"):
self._expanded_nodes.add(node_id)
else:
self._expanded_nodes.discard(node_id)
if self._current_tree_mode == "filtered" and len(self._current_filter_text) >= 3:
tree_html = self._build_filtered_tree_html(self._current_filter_text)
else:
tree_html = self._build_tree_html()
if data.get("view"):
view_html = self._build_view_html(node_id)
return (
gr.update(value=tree_html),
gr.update(value=view_html, visible=True),
gr.update(),
gr.update(),
gr.update(),
)
return gr.update(value=tree_html), gr.update(), gr.update(), gr.update(), gr.update()
if action == "delete":
delete_shared = bool(data.get("delete_shared"))
removed, missing, errors = self._delete_files_for_node(
node_id, delete_shared=delete_shared
)
if errors:
gr.Warning(f"Delete completed with errors: {len(errors)}")
else:
gr.Info(f"Deleted {len(removed)} files (missing: {len(missing)})")
return self._build_tree()
return gr.update(), gr.update(), gr.update(), gr.update(), gr.update()
def _delete_files_for_node(self, node_id, delete_shared=False):
node = self._node_map.get(node_id)
if node is None:
return [], [], []
removed = []
missing = []
errors = []
if delete_shared:
target_files = node.get("files", set())
else:
target_files = node.get("unique_files", set())
for path in sorted(target_files):
if not os.path.isfile(path):
missing.append(path)
continue
try:
os.remove(path)
removed.append(path)
except OSError as exc:
errors.append((path, str(exc)))
return removed, missing, errors
def _build_cache(self):
self._node_map = {}
self._tree_data = []
self._model_files = {}
self._model_primary = {}
self._model_variants = {}
self._variant_to_model = {}
self._model_base_label = {}
self._model_family_label = {}
self._model_display_label = {}
self._model_direct_status_map = {}
self._model_aggregated_status_map = {}
self._model_usage_ids = {}
self._usage_files = {}
self._file_usage = defaultdict(set)
self._file_usage_models = defaultdict(set)
self._file_info = {}
self._basename_index = {}
self._errors = []
self._stats_total = 0
self._stats_by_drive = []
model_types = self._get_model_types()
self._model_direct_status_map, self._model_aggregated_status_map = self._compute_model_status_maps(model_types)
for model_type in model_types:
try:
variant_info = self._collect_model_variants(model_type)
except Exception as exc:
self._errors.append(f"{model_type}: {exc}")
variant_info = {"variants": [], "primary_paths": [], "files": set()}
variants = variant_info.get("variants", [])
model_files = variant_info.get("files", set())
self._model_variants[model_type] = variants
self._model_primary[model_type] = variant_info.get("primary_paths", [])
self._model_files[model_type] = model_files
for path in model_files:
self._file_usage_models[path].add(model_type)
self._mark_global_shared_files()
self._rebalance_other_shared_variants(model_types)
self._usage_files = {}
self._file_usage = defaultdict(set)
self._variant_to_model = {}
self._model_usage_ids = {}
for model_type in model_types:
usage_ids = set()
for variant in self._model_variants.get(model_type, []):
variant_id = variant["id"]
usage_ids.add(variant_id)
self._variant_to_model[variant_id] = model_type
files = variant.get("files", set())
self._usage_files[variant_id] = files
for path in files:
self._file_usage[path].add(variant_id)
self._model_usage_ids[model_type] = usage_ids
for path in self._file_usage_models.keys():
try:
exists = os.path.isfile(path)
size = os.path.getsize(path) if exists else 0
except OSError:
exists = False
size = 0
self._file_info[path] = {"size": size, "exists": exists}
self._compute_global_stats()
self._build_tree_structure(model_types)
def _build_dropdown_deps(self, model_types):
if not model_types:
return None
fallback_type = model_types[0]
return model_dropdowns.DropdownDeps(
transformer_types=list(model_types),
displayed_model_types=list(model_types),
transformer_type=fallback_type,
three_levels_hierarchy=True,
families_infos=self.families_infos,
server_config=self.server_config,
transformer_quantization=self.transformer_quantization,
transformer_dtype_policy=self.transformer_dtype_policy,
text_encoder_quantization=self.text_encoder_quantization,
get_model_def=self.get_model_def,
get_model_recursive_prop=self.get_model_recursive_prop,
get_model_filename=self.get_model_filename,
get_local_model_filename=self._get_local_model_filename,
get_lora_dir=self.get_lora_dir,
get_parent_model_type=self.get_parent_model_type,
get_base_model_type=self.get_base_model_type,
get_model_family=self.get_model_family,
get_model_name=self.get_model_name,
get_transformer_dtype=self.get_transformer_dtype,
)
def _resolve_expected_entry_path(self, entry, model_type=None):
if not isinstance(entry, dict):
return None
local_path = entry.get("path", None)
if isinstance(local_path, str) and len(local_path) > 0:
return self._normalize_path(local_path)
filename = entry.get("filename", "")
extra_paths = entry.get("extra_paths", None)
return self._resolve_path(filename, force_folder=extra_paths, lora_dir=self._safe_get_lora_dir(model_type))
def _collect_expected_missing_files(self, model_type):
deps = self._build_dropdown_deps([model_type])
if deps is None:
return set()
missing_paths = set()
try:
expected_entries = []
expected_entries.extend(model_dropdowns.get_expected_core_file_entries_for_status(deps, model_type))
expected_entries.extend(model_dropdowns.get_expected_secondary_file_entries_for_status(deps, model_type))
except Exception as exc:
self._errors.append(f"Missing file check failed for {model_type}: {exc}")
return set()
model_def = self.get_model_def(model_type)
if isinstance(model_def, dict):
try:
for resolved_path in self._collect_handler_file_paths(model_type, model_def):
if resolved_path and not os.path.isfile(resolved_path):
missing_paths.add(resolved_path)
except Exception as exc:
self._errors.append(f"Missing handler file check failed for {model_type}: {exc}")
for entry in expected_entries:
resolved_path = self._resolve_expected_entry_path(entry, model_type=model_type)
if resolved_path and not os.path.isfile(resolved_path):
missing_paths.add(resolved_path)
return missing_paths
def _compute_model_status_maps(self, model_types):
if not model_types:
return {}, {}
try:
deps = self._build_dropdown_deps(model_types)
if deps is None:
return {}, {}
direct_status_map, aggregated_parent_status_map = model_dropdowns.get_model_download_status_maps(
deps, dropdown_types=list(model_types)
)
return direct_status_map, aggregated_parent_status_map
except Exception as exc:
self._errors.append(f"Model status map failed: {exc}")
return {}, {}
def _decorate_status_label(self, label, status):
return model_dropdowns.decorate_model_dropdown_label(label, status)
def _compute_global_stats(self):
total = 0
by_drive = defaultdict(int)
for path, info in self._file_info.items():
size = info.get("size", 0)
if size <= 0:
continue
total += size
drive = self._get_drive_label(path)
by_drive[drive] += size
self._stats_total = total
self._stats_by_drive = sorted(
by_drive.items(), key=lambda item: (-item[1], item[0])
)
def _get_drive_label(self, path):
drive, _ = os.path.splitdrive(path)
if drive:
return drive.rstrip(":").upper()
if path.startswith("\\\\"):
return "UNC"
return "ROOT"
def _get_model_types(self):
types = []
if isinstance(getattr(self, "models_def", None), dict) and self.models_def:
types = sorted(self.models_def.keys())
elif isinstance(self.transformer_types, list) and self.transformer_types:
types = list(self.transformer_types)
elif isinstance(self.displayed_model_types, list):
types = list(self.displayed_model_types)
seen = set()
ordered = []
for model_type in types:
if model_type and model_type not in seen:
ordered.append(model_type)
seen.add(model_type)
return ordered
def _collect_model_variants(self, model_type):
model_def = self.get_model_def(model_type)
if model_def is None:
return {"variants": [], "primary_paths": [], "files": set()}
default_dtype_policy = self._resolve_default_dtype_policy(model_type, model_def)
transformer_variants = self._collect_transformer_variants(
model_type, model_def, default_dtype_policy
)
module_variants = self._collect_module_variants(
model_type, model_def, default_dtype_policy
)
text_encoder_variants = self._collect_text_encoder_variants(
model_type, model_def, default_dtype_policy
)
lora_variants = self._collect_lora_variants(model_type)
transformer_files = set()
module_files = set()
text_encoder_files = set()
lora_files = set()
model_primary_paths = []
for variant in transformer_variants:
transformer_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
for variant in module_variants:
module_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
for variant in text_encoder_variants:
text_encoder_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
for variant in lora_variants:
lora_files.update(variant["files"])
model_primary_paths.extend(variant.get("primary_paths", []))
model_primary_paths = self._dedupe_paths(model_primary_paths)
other_files, shared_files = self._collect_other_files(
model_type,
model_def,
transformer_files.union(module_files),
text_encoder_files,
lora_files,
)
variants = list(transformer_variants) + list(module_variants) + list(text_encoder_variants) + list(lora_variants)
if other_files:
other_id = self._make_variant_id(model_type, "other", "other")
variants.append(
{
"id": other_id,
"label": VARIANT_LABEL_OTHER_NON_SHARED,
"type": "variant",
"files": other_files,
"primary_paths": [],
}
)
if shared_files:
shared_id = self._make_variant_id(model_type, "shared", "shared")
variants.append(
{
"id": shared_id,
"label": VARIANT_LABEL_OTHER_SHARED,
"type": "variant",
"files": shared_files,
"primary_paths": [],
}
)
model_files = set()
for variant in variants:
model_files.update(variant.get("files", set()))
missing_files = self._collect_expected_missing_files(model_type)
if missing_files:
missing_id = self._make_variant_id(model_type, "missing", "missing")
variants.append(
{
"id": missing_id,
"label": VARIANT_LABEL_MISSING,
"type": "missing",
"files": set(),
"missing_files": missing_files,
"primary_paths": [],
}
)
return {
"variants": variants,
"primary_paths": model_primary_paths,
"files": model_files,
}
def _collect_transformer_variants(
self, model_type, model_def, default_dtype_policy
):
choice_lists, modules = self._collect_transformer_choice_lists(model_type, model_def)
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
choice_lists, default_dtype_policy
)
variants = []
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
files, primary_paths = self._collect_transformer_files_for_variant(
model_type, model_def, modules, "", dtype_policy
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
label = file_label or label
variant_label = f"Transformer {label}"
variant_id = self._make_variant_id(model_type, "transformer", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": primary_paths,
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
label = f"{quant_label}{label_suffix}"
files, primary_paths = self._collect_transformer_files_for_variant(
model_type,
model_def,
modules,
quant_param,
dtype_policy,
token_match=token,
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
quant_label = file_label or quant_label
if not self._files_match_token(files, token):
continue
label = f"{quant_label}{label_suffix}" if quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Transformer {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "transformer", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": primary_paths,
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_module_variants(self, model_type, model_def, default_dtype_policy):
module_entries = self._expand_modules_with_ids(model_type)
if not module_entries:
return []
variants = []
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
for idx, entry in enumerate(module_entries):
module = entry.get("module")
module_id = entry.get("module_id", "")
module_name = self._get_module_display_name(model_type, module, module_id, idx)
choice_lists = self._collect_module_choice_lists(module)
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
choice_lists, default_dtype_policy
)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
files = self._collect_module_files_for_variant(
model_type,
module,
"",
dtype_policy,
)
if not files:
continue
file_label = self._detect_quant_label_from_files(files)
label = file_label or label
variant_label = f"Module {module_name} {label}".strip()
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "module", f"{module_name} {label}")
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [],
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
files = self._collect_module_files_for_variant(
model_type,
module,
quant_param,
dtype_policy,
token_match=token,
)
if not files:
continue
if not self._files_match_token(files, token):
continue
file_label = self._detect_quant_label_from_files(files)
effective_quant_label = file_label or quant_label
label = f"{effective_quant_label}{label_suffix}" if effective_quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Module {module_name} {label}".strip()
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "module", f"{module_name} {label}")
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [],
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _get_module_display_name(self, model_type, module, module_id, module_index):
if isinstance(module_id, str) and module_id:
return module_id
if isinstance(module, str):
module_type, _ = self._split_module_ref(module)
return module_type
return model_type
def _collect_module_choice_lists(self, module):
choice_lists = []
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1:
choice_lists.append(urls1)
if urls2:
choice_lists.append(urls2)
elif isinstance(module, list):
if module:
choice_lists.append(module)
else:
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type, "modules", sub_prop_name=sub_prop_name, return_list=True
)
except Exception:
mod_urls = []
if isinstance(mod_urls, list) and mod_urls:
if all(isinstance(item, list) for item in mod_urls):
for entry in mod_urls:
if entry:
choice_lists.append(entry)
else:
choice_lists.append(mod_urls)
return choice_lists
def _collect_module_files_for_variant(
self,
model_type,
module,
quantization,
dtype_policy,
token_match=None,
):
files = set()
is_exotic = quantization not in ("", "int8", "fp8")
if isinstance(module, dict):
for key in ("URLs", "URLs2"):
urls = module.get(key, [])
if not urls:
continue
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls,
)
if filename:
self._add_file(files, filename)
elif isinstance(module, list):
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(module, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=module,
)
if filename:
self._add_file(files, filename)
else:
filename = ""
if token_match and is_exotic:
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type,
"modules",
sub_prop_name=sub_prop_name,
return_list=True,
)
except Exception:
mod_urls = []
filename = self._select_filename_by_token(mod_urls, token_match)
if not filename:
try:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
module_type=module,
)
except Exception:
filename = ""
if filename:
self._add_file(files, filename)
return files
def _collect_transformer_choice_lists(self, model_type, model_def):
choice_lists = []
modules = self._expand_modules(model_type)
urls = self.get_model_recursive_prop(model_type, "URLs", return_list=True)
if urls:
choice_lists.append(urls)
if "URLs2" in model_def:
urls2 = self.get_model_recursive_prop(model_type, "URLs2", return_list=True)
if urls2:
choice_lists.append(urls2)
has_module_source = "module_source" in model_def
has_module_source2 = "module_source2" in model_def
for module in modules:
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1 and not has_module_source:
choice_lists.append(urls1)
if urls2 and not has_module_source2:
choice_lists.append(urls2)
elif isinstance(module, list):
if has_module_source:
continue
if module:
choice_lists.append(module)
else:
if has_module_source:
continue
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type, "modules", sub_prop_name=sub_prop_name, return_list=True
)
except Exception:
mod_urls = []
if isinstance(mod_urls, list) and mod_urls:
if all(isinstance(item, list) for item in mod_urls):
for entry in mod_urls:
if entry:
choice_lists.append(entry)
else:
choice_lists.append(mod_urls)
return choice_lists, modules
def _collect_transformer_files_for_variant(
self,
model_type,
model_def,
modules,
quantization,
dtype_policy,
token_match=None,
):
files = set()
primary_paths = []
model_filename = ""
is_exotic = quantization not in ("", "int8", "fp8")
if token_match and is_exotic:
urls = self.get_model_recursive_prop(model_type, "URLs", return_list=True)
model_filename = self._select_filename_by_token(urls, token_match)
if not model_filename:
model_filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
)
if model_filename:
self._add_file(files, model_filename)
primary_path = self._resolve_path(model_filename)
if primary_path and primary_path in files:
primary_paths.append(primary_path)
if "URLs2" in model_def:
model_filename2 = ""
if token_match and is_exotic:
urls2 = self.get_model_recursive_prop(
model_type, "URLs2", return_list=True
)
model_filename2 = self._select_filename_by_token(urls2, token_match)
if not model_filename2:
model_filename2 = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
submodel_no=2,
)
if model_filename2:
self._add_file(files, model_filename2)
primary_path2 = self._resolve_path(model_filename2)
if primary_path2 and primary_path2 in files:
primary_paths.append(primary_path2)
has_module_source = "module_source" in model_def
has_module_source2 = "module_source2" in model_def
for module in modules:
if isinstance(module, dict):
urls1 = module.get("URLs", [])
urls2 = module.get("URLs2", [])
if urls1 and not has_module_source:
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls1, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls1,
)
if filename:
self._add_file(files, filename)
if urls2 and not has_module_source2:
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(urls2, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=urls2,
)
if filename:
self._add_file(files, filename)
elif isinstance(module, list):
if has_module_source:
continue
filename = ""
if token_match and is_exotic:
filename = self._select_filename_by_token(module, token_match)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
URLs=module,
)
if filename:
self._add_file(files, filename)
else:
if has_module_source:
continue
if token_match and is_exotic:
module_type = module
sub_prop_name = "_list"
if isinstance(module_type, str) and "#" in module_type:
pos = module_type.rfind("#")
sub_prop_name = module_type[pos + 1 :]
module_type = module_type[:pos]
try:
mod_urls = self.get_model_recursive_prop(
module_type,
"modules",
sub_prop_name=sub_prop_name,
return_list=True,
)
except Exception:
mod_urls = []
filename = self._select_filename_by_token(mod_urls, token_match)
else:
filename = ""
try:
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quantization,
dtype_policy=dtype_policy,
module_type=module,
)
except Exception:
filename = ""
if filename:
self._add_file(files, filename)
return files, primary_paths
def _collect_text_encoder_variants(
self, model_type, model_def, default_dtype_policy
):
variants = []
text_encoder_urls = self.get_model_recursive_prop(
model_type, "text_encoder_URLs", return_list=True
)
text_encoder_folder = model_def.get("text_encoder_folder", None)
used_paths = set()
used_labels = set()
base_default = self._format_dtype_label(default_dtype_policy)
if text_encoder_urls:
base_dtypes, token_dtypes, token_labels = self._collect_variant_candidates(
[text_encoder_urls], default_dtype_policy
)
if base_dtypes:
for dtype_policy in sorted(base_dtypes):
label = self._format_variant_label("", dtype_policy, base_default)
filename = self.get_model_filename(
model_type=model_type,
quantization="",
dtype_policy=dtype_policy,
URLs=text_encoder_urls,
)
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
for token, dtypes in sorted(token_dtypes.items()):
quant_param = self._quant_param_from_token(token)
quant_label = token_labels.get(token) or self._format_quant_label(token)
for dtype_policy in sorted(dtypes):
label_suffix = ""
if len(dtypes) > 1:
label_suffix = f" {self._format_dtype_label(dtype_policy)}"
label = f"{quant_label}{label_suffix}"
filename = ""
if quant_param not in ("", "int8", "fp8"):
filename = self._select_filename_by_token(text_encoder_urls, token)
if not filename:
filename = self.get_model_filename(
model_type=model_type,
quantization=quant_param,
dtype_policy=dtype_policy,
URLs=text_encoder_urls,
)
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
if not self._files_match_token(files, token):
continue
file_label = self._detect_quant_label_from_files(files)
quant_label = file_label or quant_label
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
label = f"{quant_label}{label_suffix}" if quant_label else self._format_dtype_label(dtype_policy)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
else:
handler = self.get_model_handler(model_type)
get_name = getattr(handler, "get_text_encoder_filename", None)
if get_name is not None:
for quant_param in self._text_encoder_quant_candidates():
try:
filename = get_name(quant_param)
except Exception:
filename = None
if not filename:
continue
files = set()
self._add_file(
files, filename, force_folder=text_encoder_folder
)
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
quant_label = self._detect_quant_label_from_files(files)
label = quant_label or self._format_variant_label(
"",
self._detect_dtype_policy_from_name(os.path.basename(filename))
or default_dtype_policy,
base_default,
)
variant_label = f"Text Encoder {label}"
if variant_label in used_labels:
continue
variant_id = self._make_variant_id(model_type, "text", label)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
used_labels.add(variant_label)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_lora_variants(self, model_type):
lora_dir = self._safe_get_lora_dir(model_type)
if not lora_dir:
return []
loras = self.get_model_recursive_prop(model_type, "loras", return_list=True)
if loras is None:
return []
variants = []
used_paths = set()
used_labels = {}
for entry in self._ensure_list(loras):
if not isinstance(entry, str) or len(entry) == 0:
continue
source_entry = entry.split("|", 1)[0]
basename = os.path.basename(source_entry)
if len(basename) == 0:
continue
files = set()
if entry.startswith("http") and "|" in entry:
self._add_file(files, entry, lora_dir=lora_dir)
else:
self._add_file(files, os.path.join(lora_dir, basename))
if not files:
continue
resolved_path = next(iter(files))
if resolved_path in used_paths:
continue
used_paths.add(resolved_path)
stem = os.path.splitext(basename)[0]
base_label = f"Lora {stem}"
label_count = used_labels.get(base_label, 0)
used_labels[base_label] = label_count + 1
variant_label = base_label if label_count == 0 else f"{base_label} #{label_count + 1}"
variant_id = self._make_variant_id(model_type, "lora", basename)
variants.append(
{
"id": variant_id,
"label": variant_label,
"type": "variant",
"files": files,
"primary_paths": [resolved_path],
}
)
return sorted(variants, key=lambda v: v["label"].lower())
def _collect_other_files(
self,
model_type,
model_def,
transformer_files,
text_encoder_files,
lora_files,
):
other_files = set()
shared_files = set()
lora_dir = self._safe_get_lora_dir(model_type)
preload_urls = self.get_model_recursive_prop(
model_type, "preload_URLs", return_list=True
)
for url in self._ensure_list(preload_urls):
self._add_file(other_files, url, lora_dir=lora_dir)
vae_urls = model_def.get("VAE_URLs", [])
for url in self._ensure_list(vae_urls):
self._add_file(shared_files, url, lora_dir=lora_dir)
handler_files = self._collect_handler_files(model_type, model_def)
shared_files.update(handler_files)
other_files.difference_update(transformer_files)
other_files.difference_update(text_encoder_files)
other_files.difference_update(lora_files)
shared_files.difference_update(transformer_files)
shared_files.difference_update(text_encoder_files)
shared_files.difference_update(lora_files)
shared_files.difference_update(other_files)
return other_files, shared_files
def _resolve_default_dtype_policy(self, model_type, model_def):
dtype = model_def.get("dtype")
if isinstance(dtype, str) and dtype.lower() in ("fp16", "bf16"):
return dtype.lower()
policy = self.transformer_dtype_policy
if isinstance(policy, str) and policy:
return policy.lower()
return "bf16"
def _format_dtype_label(self, dtype_policy):
return "FP16" if dtype_policy == "fp16" else "BF16"
def _format_quant_label(self, token):
if not token:
return ""
label = quant_router.get_quantization_label(token)
if label:
return label
return str(token).replace("_", " ").upper()
def _format_variant_label(self, token, dtype_policy, default_label):
if token:
return self._format_quant_label(token)
if dtype_policy:
return self._format_dtype_label(dtype_policy)
return default_label
def _detect_quant_info(self, name):
if not name:
return "", ""
label = quant_router.detect_quantization_label_from_filename(name) or ""
if label:
return self._label_to_token(label), label
kind = quant_router.detect_quantization_kind_for_file(name, verboseLevel=0)
if kind and kind != "none":
kind = str(kind).lower()
return kind, kind.upper()
lower = str(name).lower()
aliases = self._get_quant_aliases()
for token in aliases:
if token in ("bf16", "fp16", "bfloat16", "float16"):
continue
if token and token in lower:
return token, ""
return "", ""
def _label_to_token(self, label):
if not label:
return ""
raw = str(label).strip()
lower = raw.lower()
if lower.startswith("gguf-"):
return lower.split("-", 1)[1]
if lower == "gguf":
return "gguf"
return lower
def _detect_dtype_policy_from_name(self, name):
if not name:
return ""
lower = str(name).lower()
if "fp16" in lower or "float16" in lower:
return "fp16"
if "bf16" in lower or "bfloat16" in lower:
return "bf16"
return ""
def _collect_variant_candidates(self, choice_lists, default_dtype_policy):
base_dtypes = set()
token_dtypes = defaultdict(set)
token_labels = {}
for choice_list in choice_lists:
for entry in self._ensure_list(choice_list):
if not entry:
continue
entry_str = str(entry)
name = self._locator_basename(entry_str)
quant_source = self._resolve_path(entry_str) or self._locator_path(entry_str) or entry_str
token, label = self._detect_quant_info(quant_source)
dtype_policy = self._detect_dtype_policy_from_name(name) or default_dtype_policy
if token:
token = str(token).lower()
token_dtypes[token].add(dtype_policy)
if label:
token_labels.setdefault(token, label)
else:
base_dtypes.add(dtype_policy)
if not base_dtypes and not token_dtypes:
base_dtypes.add(default_dtype_policy)
return base_dtypes, token_dtypes, token_labels
def _quant_param_from_token(self, token):
if not token:
return ""
lower = str(token).lower()
if "int8" in lower:
return "int8"
if "fp8" in lower or "float8" in lower:
return "fp8"
return lower
def _files_match_token(self, files, token):
if not token:
return True
token = str(token).lower()
for path in files:
base = os.path.basename(path).lower()
if token in base:
return True
label = quant_router.detect_quantization_label_from_filename(path)
if label and self._label_to_token(label) == token:
return True
return False
def _detect_quant_label_from_files(self, files):
for path in files:
label = quant_router.detect_quantization_label_from_filename(path)
if label:
return label
return ""
def _select_filename_by_token(self, urls, token):
if not urls or not token:
return ""
token = str(token).lower()
for entry in self._ensure_list(urls):
if not entry:
continue
if isinstance(entry, list):
for item in entry:
if not item:
continue
base = self._locator_basename(item).lower()
if token in base:
return item
label = quant_router.detect_quantization_label_from_filename(item)
if label and self._label_to_token(label) == token:
return item
continue
base = self._locator_basename(entry).lower()
if token in base:
return entry
label = quant_router.detect_quantization_label_from_filename(entry)
if label and self._label_to_token(label) == token:
return entry
return ""
def _split_module_ref(self, module_ref, default_sub_prop="_list"):
module_type = module_ref
sub_prop_name = default_sub_prop
if isinstance(module_ref, str) and "#" in module_ref:
pos = module_ref.rfind("#")
module_type = module_ref[:pos]
sub_prop_name = module_ref[pos + 1 :]
return module_type, sub_prop_name
def _resolve_recursive_prop_owner(self, model_type, prop, sub_prop_name, stack):
if not isinstance(model_type, str) or not model_type:
return ""
if model_type in stack or len(stack) > 20:
return model_type
model_def = self.get_model_def(model_type)
if not isinstance(model_def, dict):
return model_type
prop_value = model_def.get(prop, None)
if prop_value is None:
return model_type
if sub_prop_name is not None:
if sub_prop_name == "_list":
if not isinstance(prop_value, list) or len(prop_value) != 1:
return model_type
prop_value = prop_value[0]
else:
if not isinstance(prop_value, dict) or sub_prop_name not in prop_value:
return model_type
prop_value = prop_value[sub_prop_name]
if isinstance(prop_value, str):
next_model_type, next_sub_prop_name = self._split_module_ref(
prop_value, default_sub_prop=sub_prop_name
)
if not next_model_type:
return model_type
return self._resolve_recursive_prop_owner(
next_model_type, prop, next_sub_prop_name, stack + [model_type]
)
return model_type
def _resolve_final_module_id(self, module_ref):
if not isinstance(module_ref, str) or not module_ref:
return ""
module_type, sub_prop_name = self._split_module_ref(module_ref)
resolved_id = self._resolve_recursive_prop_owner(
module_type, "modules", sub_prop_name, []
)
return resolved_id or module_type
def _expand_modules_with_ids(self, model_type):
modules = self.get_model_recursive_prop(model_type, "modules", return_list=True)
expanded_modules = []
modules_count = len(modules)
for index, module in enumerate(modules):
if isinstance(module, str):
module_id = self._resolve_final_module_id(module)
if "#" in module:
expanded_modules.append({"module": module, "module_id": module_id})
continue
try:
expanded = self.get_model_recursive_prop(
module, "modules", sub_prop_name="_list", return_list=True
)
except Exception:
expanded = []
if expanded:
expanded_modules.append({"module": expanded, "module_id": module_id})
else:
expanded_modules.append({"module": module, "module_id": module_id})
else:
fallback_id = model_type if modules_count <= 1 else f"{model_type}#{index + 1}"
expanded_modules.append({"module": module, "module_id": fallback_id})
return expanded_modules
def _expand_modules(self, model_type):
return [entry.get("module") for entry in self._expand_modules_with_ids(model_type)]
def _make_variant_id(self, model_type, kind, label):
slug = "".join(ch.lower() if ch.isalnum() else "_" for ch in label).strip("_")
return f"variant::{model_type}::{kind}::{slug}"
def _dedupe_paths(self, paths):
seen = set()
ordered = []
for path in paths:
if not path or path in seen:
continue
ordered.append(path)
seen.add(path)
return ordered
def _get_quant_aliases(self):
if not hasattr(self, "_quant_alias_cache"):
aliases = set(quant_router.get_available_qtype_aliases() or [])
aliases.update({"int8", "int8n", "fp8"})
self._quant_alias_cache = sorted(
{alias for alias in aliases if alias}, key=len, reverse=True
)
return self._quant_alias_cache
def _text_encoder_quant_candidates(self):
candidates = {""}
aliases = self._get_quant_aliases()
for token in aliases:
if "int8" in token:
candidates.add("int8")
if "fp8" in token or "float8" in token:
candidates.add("fp8")
return sorted(candidates)
def _collect_text_encoder_files(self, model_type, model_def):
files = set()
text_encoder_urls = self.get_model_recursive_prop(
model_type, "text_encoder_URLs", return_list=True
)
text_encoder_folder = model_def.get("text_encoder_folder", None)
text_encoder_filename = None
if text_encoder_urls:
text_encoder_filename = self.get_model_filename(
model_type=model_type,
quantization=self.text_encoder_quantization,
dtype_policy=self.transformer_dtype_policy,
URLs=text_encoder_urls,
)
else:
handler = self.get_model_handler(model_type)
get_name = getattr(handler, "get_text_encoder_filename", None)
if get_name is not None:
try:
text_encoder_filename = get_name(self.text_encoder_quantization)
except Exception:
text_encoder_filename = None
if text_encoder_filename:
self._add_file(files, text_encoder_filename, force_folder=text_encoder_folder)
return files
def _collect_handler_file_paths(self, model_type, model_def):
handler = self.get_model_handler(model_type)
query = getattr(handler, "query_model_files", None)
if query is None:
return set()
base_model_type = self.get_base_model_type(model_type)
try:
download_defs = query(
self._compute_list,
base_model_type,
model_def,
)
except Exception as exc:
self._errors.append(f"{model_type}: {exc}")
return set()
return self._collect_download_def_file_paths(download_defs)
def _collect_download_def_file_paths(self, download_defs):
if not download_defs:
return set()
if isinstance(download_defs, dict):
download_defs = [download_defs]
files = set()
for download_def in download_defs:
if not isinstance(download_def, dict):
continue
source_folders = download_def.get("sourceFolderList", [])
file_lists = download_def.get("fileList", [])
target_folders = download_def.get("targetFolderList")
if target_folders is None:
target_folders = [None] * len(source_folders)
for source_folder, file_list, target_folder in zip(
source_folders, file_lists, target_folders
):
if target_folder == "":
target_folder = None
if source_folder is None:
source_folder = ""
if not file_list:
continue
for filename in file_list:
if not filename:
continue
rel_path = self._combine_download_path(
target_folder, source_folder, filename
)
files.add(self._resolve_download_relpath(rel_path))
return {path for path in files if path}
def _collect_global_shared_file_paths(self):
query = getattr(self, "query_global_shared_model_files", None)
if query is None:
return set()
try:
return self._collect_download_def_file_paths(query())
except Exception as exc:
self._errors.append(f"Global shared files: {exc}")
return set()
def _mark_global_shared_files(self):
for path in self._collect_global_shared_file_paths():
if path in self._file_usage_models:
self._file_usage_models[path].add(GLOBAL_SHARED_MODEL_TYPE)
def _collect_handler_files(self, model_type, model_def):
files = self._collect_handler_file_paths(model_type, model_def)
return {path for path in files if os.path.isfile(path)}
def _build_tree_structure(self, model_types):
families = defaultdict(list)
for model_type in model_types:
family = self.get_model_family(model_type, for_ui=True) or "unknown"
families[family].append(model_type)
family_order = sorted(
families.keys(),
key=lambda family: self.families_infos.get(family, (999, family))[0],
)
for family in family_order:
family_label = self.families_infos.get(family, (999, family))[1]
rows = []
for model_type in families[family]:
model_name = self.compact_name(
family_label, self.get_model_name(model_type)
)
parent_id = self.get_parent_model_type(model_type)
rows.append((model_name, model_type, parent_id))
rows.sort(key=lambda row: row[0])
parents_list, children_dict = self.create_models_hierarchy(rows)
family_node_id = f"family::{family}"
family_models = set(families[family])
family_usage_ids = self._collect_usage_ids_for_models(family_models)
family_files = self._collect_files_for_usage_ids(family_usage_ids)
self._register_node(
family_node_id,
family_label,
"family",
family_models,
family_files,
path_label=family_label,
)
base_nodes = []
for base_label, base_id in parents_list:
base_status = self._model_aggregated_status_map.get(
base_id, model_dropdowns.MODEL_FILE_STATUS_MISSING
)
base_label_decorated = self._decorate_status_label(base_label, base_status)
child_entries = children_dict.get(base_id, [])
child_nodes = []
base_models = set()
base_usage_ids = set()
base_path = f"{family_label} / {base_label}"
for child_label, child_model in child_entries:
child_status = self._model_direct_status_map.get(
child_model, model_dropdowns.MODEL_FILE_STATUS_MISSING
)
child_label_decorated = self._decorate_status_label(child_label, child_status)
base_models.add(child_model)
model_usage_ids = self._model_usage_ids.get(child_model, set())
base_usage_ids.update(model_usage_ids)
self._model_base_label[child_model] = base_label
self._model_family_label[child_model] = family_label
model_node_id = f"model::{child_model}"
model_files = self._collect_files_for_usage_ids(model_usage_ids)
primary_paths = self._model_primary.get(child_model, [])
model_path = f"{base_path} / {child_label}"
self._model_display_label[child_model] = model_path
self._register_node(
model_node_id,
child_label_decorated,
"model",
{child_model},
model_files,
primary_paths=primary_paths,
path_label=model_path,
)
variant_children = []
for variant in self._ordered_variants_for_display(child_model):
variant_id = variant["id"]
variant_label = variant["label"]
variant_type = variant.get("type", "variant")
variant_path = f"{model_path} / {variant_label}"
self._register_node(
variant_id,
variant_label,
variant_type,
{child_model},
variant.get("files", set()),
missing_files=variant.get("missing_files", set()),
primary_paths=variant.get("primary_paths", []),
path_label=variant_path,
)
variant_children.append(
{"id": variant_id, "label": variant_label, "type": variant_type}
)
child_nodes.append(
{
"id": model_node_id,
"label": child_label_decorated,
"type": "model",
"children": variant_children,
}
)
base_node_id = f"base::{family}::{base_id}"
base_files = self._collect_files_for_usage_ids(base_usage_ids)
self._register_node(
base_node_id,
base_label_decorated,
"base",
base_models,
base_files,
path_label=base_path,
)
base_nodes.append(
{
"id": base_node_id,
"label": base_label_decorated,
"type": "base",
"children": child_nodes,
}
)
self._tree_data.append(
{
"id": family_node_id,
"label": family_label,
"type": "family",
"children": base_nodes,
}
)
def _collect_usage_ids_for_models(self, model_set):
usage_ids = set()
for model_type in model_set:
usage_ids.update(self._model_usage_ids.get(model_type, set()))
return usage_ids
def _collect_files_for_usage_ids(self, usage_ids):
files = set()
for usage_id in usage_ids:
files.update(self._usage_files.get(usage_id, set()))
return files
def _ordered_variants_for_display(self, model_type):
variants = list(self._model_variants.get(model_type, []))
if not variants:
return []
non_missing = [variant for variant in variants if variant.get("type") != "missing"]
missing = [variant for variant in variants if variant.get("type") == "missing"]
return non_missing + missing
def _register_node(
self,
node_id,
label,
node_type,
model_set,
files_set,
missing_files=None,
primary_paths=None,
path_label=None,
):
files_set = set(files_set or set())
missing_files = set(missing_files or set())
missing_files.difference_update(files_set)
missing_files = {path for path in missing_files if path and not os.path.isfile(path)}
unique_files, shared_files = self._split_files_by_shared(files_set, model_set)
unique_size = self._sum_sizes(unique_files)
shared_size = self._sum_sizes(shared_files)
self._node_map[node_id] = {
"id": node_id,
"label": label,
"type": node_type,
"models": set(model_set),
"files": set(files_set),
"unique_files": unique_files,
"shared_files": shared_files,
"unique_size": unique_size,
"shared_size": shared_size,
"total_size": unique_size + shared_size,
"missing_files": missing_files,
"missing_count": len(missing_files),
"primary_paths": primary_paths or [],
"path_label": path_label or label,
}
def _sum_sizes(self, files_set):
total = 0
for path in files_set:
total += self._file_info.get(path, {}).get("size", 0)
return total
def _split_files_by_shared(self, files_set, model_set):
unique_files = set()
shared_files = set()
for path in files_set:
used_by = self._file_usage_models.get(path, set())
if used_by.issubset(model_set):
unique_files.add(path)
else:
shared_files.add(path)
return unique_files, shared_files
def _rebalance_other_shared_variants(self, model_types):
for model_type in model_types:
variants = self._model_variants.get(model_type, [])
if not variants:
continue
other_variant = None
shared_variant = None
for variant in variants:
variant_id = str(variant.get("id", ""))
parts = variant_id.split("::")
variant_kind = parts[2] if len(parts) >= 4 else ""
label = str(variant.get("label", "")).lower()
if variant_kind == "other" or label in ("other", "other non shared"):
other_variant = variant
elif variant_kind == "shared" or label in ("shared", "other shared"):
shared_variant = variant
if not other_variant and not shared_variant:
continue
combined = set()
if other_variant:
combined.update(other_variant.get("files", set()))
if shared_variant:
combined.update(shared_variant.get("files", set()))
new_variants = []
for variant in variants:
if variant is other_variant or variant is shared_variant:
continue
new_variants.append(variant)
if not combined:
self._model_variants[model_type] = new_variants
continue
model_set = {model_type}
shared_files = {
path
for path in combined
if not self._file_usage_models.get(path, set()).issubset(model_set)
}
other_files = combined - shared_files
if other_files:
if other_variant is None:
other_variant = {
"id": self._make_variant_id(model_type, "other", "other"),
"label": VARIANT_LABEL_OTHER_NON_SHARED,
"type": "variant",
"files": other_files,
"primary_paths": [],
}
else:
other_variant["files"] = other_files
new_variants.append(other_variant)
if shared_files:
if shared_variant is None:
shared_variant = {
"id": self._make_variant_id(model_type, "shared", "shared"),
"label": VARIANT_LABEL_OTHER_SHARED,
"type": "variant",
"files": shared_files,
"primary_paths": [],
}
else:
shared_variant["files"] = shared_files
new_variants.append(shared_variant)
self._model_variants[model_type] = new_variants
def _get_model_shared_label(self, model_type):
if model_type == GLOBAL_SHARED_MODEL_TYPE:
return GLOBAL_SHARED_LABEL
model_name = self.get_model_name(model_type)
if model_name:
return model_name
label = self._model_display_label.get(model_type)
if label:
return label
return model_type
def _shared_model_labels(self, path, exclude_models=None):
if exclude_models is None:
exclude_models = set()
labels = set()
for model_type in self._file_usage_models.get(path, set()):
if model_type in exclude_models:
continue
labels.add(self._get_model_shared_label(model_type))
return sorted(labels, key=lambda value: value.lower())
def _build_tree_html(self):
self._last_rendered_nodes_count = 0
css = """
<style>
.ckpt-wrap { display: flex; flex-direction: column; gap: 12px; }
.ckpt-panel { border: 1px solid var(--border-color-primary); border-radius: 14px; background: linear-gradient(135deg, rgba(255,255,255,0.02), rgba(0,0,0,0.02)); box-shadow: var(--shadow-drop-sm, 0 2px 8px rgba(0,0,0,0.08)); }
.ckpt-panel-inner { padding: 10px; }
.ckpt-tree { display: flex; flex-direction: column; gap: 8px; }
.ckpt-node { border: 1px solid var(--border-color-primary); border-radius: 10px; background: var(--background-fill-secondary); box-shadow: inset 0 0 0 1px rgba(255,255,255,0.02); }
.ckpt-node summary { list-style: none; cursor: pointer; padding: 8px 10px; }
.ckpt-node summary::-webkit-details-marker { display: none; }
.ckpt-row { display: flex; align-items: center; gap: 10px; border-radius: 8px; padding: 6px 6px; }
.ckpt-row.clickable { cursor: pointer; transition: background 0.15s ease; }
.ckpt-row.clickable:hover { background: rgba(127,127,127,0.08); }
.ckpt-label { flex: 1; font-weight: 600; color: inherit; }
.ckpt-label-missing { color: #c62828; }
.ckpt-row[data-node-type="missing"] .ckpt-label { color: #c62828 !important; font-weight: 700; }
.ckpt-sizes { display: flex; flex-direction: column; align-items: stretch; min-width: 128px; width: 128px; font-size: 0.85em; font-variant-numeric: tabular-nums; gap: 4px; }
.ckpt-size-unique { color: inherit; }
.ckpt-size-shared { color: inherit; }
.ckpt-sizes-missing { justify-content: center; }
.ckpt-size-missing { font-size: 0.82em; color: #c62828; text-align: center; font-weight: 600; }
.ckpt-badge { padding: 3px 8px; border-radius: 999px; font-weight: 600; font-size: 0.82em; font-variant-numeric: tabular-nums; min-width: 84px; width: 100%; display: inline-flex; align-items: center; justify-content: center; }
.ckpt-badge-unique { background: rgba(64, 120, 255, 0.18); color: #2f5bd4; border: 1px solid rgba(64, 120, 255, 0.4); }
.ckpt-badge-shared { background: rgba(240, 200, 40, 0.25); color: #a66a00; border: 1px solid rgba(240, 200, 40, 0.5); }
.ckpt-badge-zero { background: rgba(120, 120, 120, 0.12); color: #888; border: 1px solid rgba(120, 120, 120, 0.25); }
.ckpt-badge-missing { background: rgba(220, 60, 60, 0.18); color: #b71c1c; border: 1px solid rgba(220, 60, 60, 0.45); min-width: auto; width: auto; }
.ckpt-actions { display: flex; gap: 18px; }
.ckpt-action-btn { padding: 3px 12px; height: 30px; border: 1px solid var(--border-color-primary); border-radius: 6px; cursor: pointer; font-size: 0.85em; display: inline-flex; align-items: center; justify-content: center; font-weight: 600; background: transparent; color: inherit; }
.ckpt-action-btn:hover { box-shadow: var(--shadow-drop, 0 2px 6px rgba(0,0,0,0.12)); }
.ckpt-view-btn { background: rgba(60, 120, 255, 0.9) !important; color: #ffffff !important; border-color: rgba(60, 120, 255, 0.95) !important; }
.ckpt-view-btn:hover { background: rgba(60, 120, 255, 1) !important; }
.ckpt-delete { background: rgba(220, 60, 60, 0.95) !important; color: #ffffff !important; border-color: rgba(220, 60, 60, 0.95) !important; }
.ckpt-delete:hover { background: rgba(220, 60, 60, 1) !important; }
.ckpt-children { padding: 6px 8px 10px 18px; display: flex; flex-direction: column; gap: 6px; }
.ckpt-leaf { border: 1px solid var(--border-color-primary); border-radius: 8px; background: var(--background-fill-primary); padding: 6px 8px; }
.ckpt-right-column { align-self: flex-start; position: relative; overflow: visible !important; }
.ckpt-view-sticky { position: sticky; top: 16px; align-self: flex-start; }
.ckpt-view { border: 1px solid var(--border-color-primary); border-radius: 12px; background: linear-gradient(135deg, rgba(255,255,255,0.02), rgba(0,0,0,0.02)); padding: 12px; max-height: none; overflow: visible; }
.ckpt-view h4 { margin: 0 0 6px 0; }
.ckpt-view-summary { font-size: 0.85em; color: inherit; margin-bottom: 8px; display: flex; flex-wrap: wrap; gap: 12px; }
.ckpt-loading-message { font-size: 1.05em; font-weight: 600; color: inherit; padding: 16px 8px; }
.ckpt-muted { opacity: 0.75; }
.ckpt-stats { display: flex; flex-direction: column; gap: 6px; }
.ckpt-stat-main { font-weight: 600; font-size: 0.95em; }
.ckpt-stat-assumption { font-size: 0.82em; opacity: 0.85; }
.ckpt-drive-list { display: flex; flex-wrap: wrap; gap: 8px; }
.ckpt-drive { padding: 4px 8px; border-radius: 999px; background: var(--background-fill-primary); border: 1px solid var(--border-color-primary); font-size: 0.82em; }
.ckpt-file-tree { display: flex; flex-direction: column; gap: 6px; max-height: none; overflow: visible; }
.ckpt-folder { border: 1px solid var(--border-color-primary); border-radius: 8px; background: var(--background-fill-primary); }
.ckpt-folder.shared { border-color: rgba(240,200,40,0.45); }
.ckpt-folder.shared > summary { background: rgba(240,200,40,0.12); }
.ckpt-folder.unique { border-color: rgba(64, 120, 255, 0.35); }
.ckpt-folder.unique > summary { background: rgba(64, 120, 255, 0.12); }
.ckpt-folder summary { list-style: none; cursor: pointer; padding: 6px 8px; display: flex; align-items: center; gap: 8px; position: relative; }
.ckpt-folder summary::-webkit-details-marker { display: none; }
.ckpt-folder-name { flex: 1; font-weight: 600; }
.ckpt-folder-size { font-size: 0.8em; color: inherit; opacity: 0.75; font-variant-numeric: tabular-nums; }
.ckpt-folder-children { padding: 6px 8px 8px 18px; display: flex; flex-direction: column; gap: 4px; }
.ckpt-file-row { display: flex; gap: 8px; align-items: center; font-size: 0.85em; padding: 2px 0; }
.ckpt-file-row.shared { position: relative; }
.ckpt-file-row.shared { color: #a66a00; background: rgba(240, 200, 40, 0.12); border-radius: 6px; padding: 2px 6px; }
.ckpt-file-row.unique { background: rgba(64, 120, 255, 0.12); border-radius: 6px; padding: 2px 6px; }
.ckpt-file-row.missing-only { color: #b71c1c; background: rgba(220, 60, 60, 0.16); border-radius: 6px; padding: 2px 6px; }
.ckpt-file-row.missing { opacity: 0.6; }
.ckpt-file-name { flex: 1; word-break: break-all; }
.ckpt-file-size { min-width: 70px; text-align: right; color: inherit; opacity: 0.75; font-variant-numeric: tabular-nums; }
.ckpt-shared-tag { font-size: 0.72em; text-transform: uppercase; letter-spacing: 0.04em; border: 1px solid rgba(240,200,40,0.5); color: #a66a00; padding: 2px 6px; border-radius: 999px; background: rgba(240,200,40,0.12); }
.ckpt-folder-tag { margin-left: 6px; }
.ckpt-shared-tooltip { opacity: 0; visibility: hidden; pointer-events: none; position: absolute; top: 100%; left: 0; margin-top: 6px; padding: 6px 8px; background: var(--background-fill-primary); border: 1px solid var(--border-color-primary); border-radius: 8px; box-shadow: var(--shadow-drop, 0 2px 6px rgba(0,0,0,0.12)); font-size: 0.8em; z-index: 10; min-width: 160px; transform: translateY(4px); transition: opacity 0.15s ease, transform 0.15s ease; transition-delay: 0s; }
.ckpt-shared-tooltip-title { font-weight: 600; margin-bottom: 4px; }
.ckpt-file-row.shared:hover .ckpt-shared-tooltip { opacity: 1; visibility: visible; transform: translateY(0); transition-delay: 2s; }
.ckpt-folder.shared summary:hover .ckpt-shared-tooltip { opacity: 1; visibility: visible; transform: translateY(0); transition-delay: 2s; }
.ckpt-path { font-size: 0.82em; color: inherit; opacity: 0.8; margin-bottom: 6px; word-break: break-all; }
.ckpt-primary { font-size: 0.82em; color: inherit; opacity: 0.8; margin-bottom: 8px; word-break: break-all; }
.ckpt-missing-summary { margin: 10px 0; }
.ckpt-errors { padding: 8px 10px; border: 1px solid rgba(220, 80, 80, 0.4); background: var(--background-fill-primary); border-radius: 8px; font-size: 0.85em; }
.ckpt-modal-backdrop { position: fixed; inset: 0; background: rgba(0,0,0,0.45); display: none; align-items: center; justify-content: center; z-index: 9999; }
.ckpt-modal { width: min(420px, 90vw); background: var(--background-fill-primary); border: 1px solid var(--border-color-primary); border-radius: 14px; padding: 18px; box-shadow: 0 12px 28px rgba(0,0,0,0.25); display: flex; flex-direction: column; gap: 12px; }
.ckpt-modal-title { font-weight: 700; font-size: 1.05em; }
.ckpt-modal-body { font-size: 0.95em; color: inherit; opacity: 0.8; }
.ckpt-modal-actions { display: flex; gap: 12px; justify-content: flex-end; }
.ckpt-modal-btn { padding: 8px 14px; border-radius: 8px; border: 1px solid var(--border-color-primary); background: var(--button-secondary-background-fill, #f0f0f0); cursor: pointer; font-weight: 600; }
.ckpt-modal-btn.danger { background: rgba(220,80,80,0.15); border-color: rgba(220,80,80,0.5); color: #c62828; }
</style>
"""
tree_parts = [
css,
"<div id='ckpt_tree_root' data-status='ready' class='ckpt-wrap'>",
]
if self._errors:
errors_html = "<br>".join(html.escape(err) for err in self._errors[:10])
tree_parts.append(f"<div class='ckpt-errors'>{errors_html}</div>")
tree_parts.append("<div class='ckpt-panel'><div class='ckpt-panel-inner'>")
tree_parts.append(self._build_stats_html())
rendered_nodes_html = []
for family in self._tree_data:
rendered_nodes_html.append(self._render_node(family, is_family=True))
tree_parts.append("<div class='ckpt-tree'>")
tree_parts.extend(rendered_nodes_html)
tree_parts.append("</div></div></div></div>")
return "".join(tree_parts)
def _get_tree_css_block(self):
tree_html = self._build_tree_html()
start = tree_html.find("<style>")
if start < 0:
return ""
end = tree_html.find("</style>", start)
if end < 0:
return ""
return tree_html[start : end + len("</style>")]
def _build_filtered_tree_html(self, filter_text):
query = str(filter_text).strip()
self._current_tree_mode = "filtered"
self._current_filter_text = query
query_lower = query.casefold()
matches = []
for node_id, info in self._node_map.items():
if info.get("type") != "model":
continue
model_types = list(info.get("models", set()))
if len(model_types) != 1:
continue
model_type = model_types[0]
model_name = self.get_model_name(model_type)
if not isinstance(model_name, str):
continue
if query_lower not in model_name.casefold():
continue
status = self._model_direct_status_map.get(
model_type, model_dropdowns.MODEL_FILE_STATUS_MISSING
)
display_label = self._decorate_status_label(model_name, status)
matches.append((model_name, display_label, node_id, model_type))
matches.sort(key=lambda item: item[0].casefold())
css = self._get_tree_css_block()
self._last_rendered_nodes_count = 0
title = (
f"Filtered Finetunes for '{html.escape(query)}' "
f"<span class='ckpt-muted'>({len(matches)} match{'es' if len(matches) != 1 else ''})</span>"
)
transformer_quantization = self.server_config.get("transformer_quantization", self.transformer_quantization)
text_encoder_quantization = self.text_encoder_quantization
cfg_line = (
"Missing files assume config: "
f"transformer qtype={html.escape(str(transformer_quantization))}, "
f"text-encoder qtype={html.escape(str(text_encoder_quantization))}"
)
parts = [
css,
"<div id='ckpt_tree_root' data-status='ready' class='ckpt-wrap'>",
"<div class='ckpt-panel'><div class='ckpt-panel-inner'>",
f"<div class='ckpt-loading-message'>{title}</div>",
f"<div class='ckpt-stat-assumption'>{cfg_line}</div>",
"<div class='ckpt-tree'>",
]
if not matches:
parts.append(
"<div class='ckpt-leaf'><div class='ckpt-row'>"
"<div class='ckpt-label'>No finetune matches your filter.</div>"
"</div></div>"
)
for _, display_label, node_id, model_type in matches:
variant_children = [
{
"id": variant["id"],
"label": variant["label"],
"type": variant.get("type", "variant"),
"children": [],
}
for variant in self._ordered_variants_for_display(model_type)
]
model_node = {
"id": node_id,
"label": display_label,
"type": "model",
"children": variant_children,
}
parts.append(self._render_node(model_node))
parts.extend(["</div></div></div></div>"])
return "".join(parts)
def _build_stats_html(self):
total_label = self._format_gb(self._stats_total)
transformer_quantization = self.server_config.get("transformer_quantization", self.transformer_quantization)
text_encoder_quantization = self.text_encoder_quantization
cfg_line = (
"Missing files assume config: "
f"transformer qtype={html.escape(str(transformer_quantization))}, "
f"text-encoder qtype={html.escape(str(text_encoder_quantization))}"
)
drive_html = ""
if len(self._stats_by_drive) > 1:
drive_items = []
for drive, size in self._stats_by_drive:
drive_items.append(
f"<span class='ckpt-drive'>{html.escape(drive)}: {self._format_gb(size)}</span>"
)
drive_html = f"<div class='ckpt-drive-list'>{''.join(drive_items)}</div>"
return (
"<div class='ckpt-stats'>"
f"<div class='ckpt-stat-main'>Total Used: {total_label}</div>"
f"<div class='ckpt-stat-assumption'>{cfg_line}</div>"
f"{drive_html}</div>"
)
def _render_node(self, node, is_family=False):
self._last_rendered_nodes_count += 1
node_id = node["id"]
info = self._node_map.get(node_id, {})
label = node.get("label", info.get("label", node_id))
unique_size_value = info.get("unique_size", 0)
shared_size_value = info.get("shared_size", 0)
unique = self._format_gb(unique_size_value)
shared = self._format_gb(shared_size_value)
unique_label = self._format_size(unique_size_value)
shared_label = self._format_size(shared_size_value)
total_label = self._format_size(unique_size_value + shared_size_value)
missing_count = int(info.get("missing_count", 0) or 0)
node_type = info.get("type", node.get("type", ""))
children = node.get("children", [])
click_view = node_type in ("variant", "missing")
row_html = self._render_row(
label,
node_id,
unique,
shared,
unique_label,
shared_label,
total_label,
node_type,
missing_count=missing_count,
click_view=click_view,
)
if children:
node_id_attr = html.escape(node_id, quote=True)
is_open = node_id in self._expanded_nodes
open_attr = " open" if is_open else ""
child_html = ""
if is_open:
child_html = "".join(self._render_node(child) for child in children)
return (
f"<details class='ckpt-node' data-node-id=\"{node_id_attr}\"{open_attr}>"
f"<summary onclick='handlemodelSummaryClick(event, this);'>{row_html}</summary>"
f"<div class='ckpt-children'>{child_html}</div></details>"
)
return f"<div class='ckpt-leaf'>{row_html}</div>"
def _render_row(
self,
label,
node_id,
unique,
shared,
unique_label,
shared_label,
total_label,
node_type,
missing_count=0,
click_view=False,
):
label_display = html.escape(label)
label_attr = html.escape(label, quote=True)
node_id_attr = html.escape(node_id, quote=True)
type_attr = html.escape(node_type, quote=True)
row_class = "ckpt-row clickable" if click_view else "ckpt-row"
row_onclick = (
"onclick='handlemodelRowClick(event, this);'" if click_view else ""
)
label_class = "ckpt-label ckpt-label-missing" if node_type == "missing" else "ckpt-label"
if node_type == "missing":
file_count_label = f"{missing_count} file" if missing_count == 1 else f"{missing_count} files"
sizes_html = (
"<div class='ckpt-sizes ckpt-sizes-missing'>"
f"<div class='ckpt-size-missing'>{html.escape(file_count_label)}</div>"
"</div>"
)
else:
unique_badge_class = "ckpt-badge ckpt-badge-unique"
shared_badge_class = "ckpt-badge ckpt-badge-shared"
if unique == "0.00 GB":
unique_badge_class += " ckpt-badge-zero"
if shared == "0.00 GB":
shared_badge_class += " ckpt-badge-zero"
unique_badge = f"<span class='{unique_badge_class}'>{unique}</span>"
shared_badge = f"<span class='{shared_badge_class}'>{shared}</span>"
sizes_html = (
"<div class='ckpt-sizes'>"
f"<div class='ckpt-size-unique' title='Unique size'>{unique_badge}</div>"
f"<div class='ckpt-size-shared' title='Shared size'>{shared_badge}</div>"
"</div>"
)
show_view_button = node_type not in ("variant", "missing")
show_delete_button = node_type != "missing"
action_buttons = []
if show_view_button:
action_buttons.append(
f"<button class='ckpt-action-btn ckpt-view-btn' data-node-id=\"{node_id_attr}\" data-node-label=\"{label_attr}\" data-node-type=\"{type_attr}\" data-unique-size=\"{unique_label}\" data-shared-size=\"{shared_label}\" data-total-size=\"{total_label}\" onclick='event.preventDefault(); event.stopPropagation(); handlemodelAction(this, \"view\");'>View</button>"
)
if show_delete_button:
action_buttons.append(
f"<button class='ckpt-action-btn ckpt-delete' data-node-id=\"{node_id_attr}\" data-node-label=\"{label_attr}\" data-node-type=\"{type_attr}\" data-unique-size=\"{unique_label}\" data-shared-size=\"{shared_label}\" data-total-size=\"{total_label}\" onclick='event.preventDefault(); event.stopPropagation(); handlemodelAction(this, \"delete\");'>Delete</button>"
)
actions_html = f"<div class='ckpt-actions'>{''.join(action_buttons)}</div>"
return (
f"<div class='{row_class}' data-node-id=\"{node_id_attr}\" data-node-label=\"{label_attr}\" data-node-type=\"{type_attr}\" {row_onclick}>"
f"<div class='{label_class}'>{label_display}</div>"
f"{sizes_html}"
f"{actions_html}"
"</div>"
)
def _build_empty_view_html(self):
return (
"<div class='ckpt-view-sticky'><div class='ckpt-view'>"
"<h4>Files</h4>"
"<div class='ckpt-view-summary'>Select a branch to view its files.</div>"
"</div></div>"
)
def _build_loading_html(self):
return (
"<div id='ckpt_tree_root' data-status='loading' class='ckpt-wrap'>"
"<div class='ckpt-panel'><div class='ckpt-panel-inner'>"
"<div class='ckpt-loading-message'>Please wait while the models hierarchy is being built...</div>"
"</div></div></div>"
)
def _build_view_html(self, node_id):
node = self._node_map.get(node_id)
if node is None:
return self._build_empty_view_html()
if node.get("type") == "missing":
return self._build_missing_view_html(node)
files = sorted(node["files"])
model_set = set(node.get("models", set()))
shared_set = {
path
for path in files
if not self._file_usage_models.get(path, set()).issubset(node["models"])
}
shared_info = {}
for path in shared_set:
shared_labels = self._shared_model_labels(path, exclude_models=model_set)
if not shared_labels:
shared_labels = ["Shared with another model"]
shared_info[path] = shared_labels
file_tree = self._build_file_tree(files, shared_set, shared_info)
file_tree_html = self._render_file_tree(file_tree)
unique = self._format_gb(node["unique_size"])
shared = self._format_gb(node["shared_size"])
label_text = node["label"]
path_label = node.get("path_label") or label_text
header = html.escape(path_label)
unique_badge = f"<span class='ckpt-badge ckpt-badge-unique'>{unique}</span>"
shared_badge = f"<span class='ckpt-badge ckpt-badge-shared'>{shared}</span>"
shared_count = len(shared_set)
unique_count = max(0, len(files) - shared_count)
summary = (
f"<span class='ckpt-muted'>{unique_count} files</span> | unique {unique_badge} "
f"<span class='ckpt-muted'>{shared_count} files</span> | shared {shared_badge}"
)
missing_files = self._collect_missing_files_for_node(node)
primary_paths = node.get("primary_paths", [])
primary_html = ""
if primary_paths:
label = "model path" if len(primary_paths) == 1 else "model paths"
lines = "<br>".join(html.escape(path) for path in primary_paths)
primary_html = f"<div class='ckpt-primary'><strong>{label}:</strong><br>{lines}</div>"
missing_html = self._render_missing_files_block(missing_files) if node.get("type") == "model" and missing_files else ""
return (
"<div class='ckpt-view-sticky'><div class='ckpt-view'>"
f"<h4><strong>{header}</strong></h4>"
f"{primary_html}"
f"<div class='ckpt-view-summary'>{summary}</div>"
f"{file_tree_html}{missing_html}</div></div>"
)
def _build_missing_view_html(self, node):
missing_files = self._collect_missing_files_for_node(node)
label_text = node["label"]
path_label = node.get("path_label") or label_text
header = html.escape(path_label)
file_tree_html = self._render_missing_files_block(missing_files, show_title=True)
return (
"<div class='ckpt-view-sticky'><div class='ckpt-view'>"
f"<h4><strong>{header}</strong></h4>"
f"{file_tree_html}</div></div>"
)
def _collect_missing_files_for_node(self, node):
missing_files = set(node.get("missing_files", set()))
if node.get("type") == "model":
model_types = list(node.get("models", set()))
if len(model_types) == 1:
model_type = model_types[0]
for variant in self._ordered_variants_for_display(model_type):
missing_files.update(variant.get("missing_files", set()))
return sorted(missing_files, key=lambda path: self._display_path(path).lower())
def _render_missing_files_block(self, missing_files, show_title=True):
if not missing_files:
return "<div class='ckpt-file-tree'><div class='ckpt-muted'>No missing files.</div></div>"
file_rows = []
for path in missing_files:
display_path = html.escape(self._display_path(path))
full_path = html.escape(path)
file_rows.append(
f"<div class='ckpt-file-row missing-only' title='{full_path}'>"
f"<span class='ckpt-file-name'>{display_path}</span>"
"</div>"
)
title_html = (
f"<div class='ckpt-view-summary ckpt-missing-summary'><span class='ckpt-muted'>{len(missing_files)} files</span> | missing</div>"
if show_title
else ""
)
return f"{title_html}<div class='ckpt-file-tree'>{''.join(file_rows)}</div>"
def _build_file_tree(self, files, shared_set, shared_info=None):
if shared_info is None:
shared_info = {}
root = {
"children": {},
"files": [],
"size": 0,
"all_shared": False,
"all_unique": False,
"has_files": False,
"shared_with": set(),
"shared_count": 0,
"is_root": True,
}
for path in files:
info = self._file_info.get(path, {})
display = self._display_path(path)
parts = self._split_display_path(display)
name = parts[-1] if parts else display
shared_with = shared_info.get(path, [])
entry = {
"path": path,
"display": display,
"name": name,
"size": info.get("size", 0),
"exists": info.get("exists", False),
"shared": path in shared_set,
"shared_with": shared_with,
}
node = root
for part in parts[:-1]:
node = node["children"].setdefault(
part,
{
"children": {},
"files": [],
"size": 0,
"all_shared": False,
"all_unique": False,
"has_files": False,
"shared_with": set(),
"shared_count": 0,
"is_root": False,
},
)
node["files"].append(entry)
self._compute_folder_sizes(root)
return root
def _compute_folder_sizes(self, node):
total = sum(entry["size"] for entry in node["files"])
has_files = len(node["files"]) > 0
all_shared = all(entry["shared"] for entry in node["files"]) if node["files"] else True
all_unique = all(not entry["shared"] for entry in node["files"]) if node["files"] else True
shared_with = set()
shared_count = 0
for entry in node["files"]:
if entry.get("shared"):
shared_with.update(entry.get("shared_with", []))
for child in node["children"].values():
(
child_total,
child_has_files,
child_all_shared,
child_shared_with,
child_shared_count,
child_all_unique,
) = self._compute_folder_sizes(child)
total += child_total
has_files = has_files or child_has_files
all_shared = all_shared and child_all_shared
all_unique = all_unique and child_all_unique
shared_with.update(child_shared_with)
shared_count += child_shared_count
node["size"] = total
node["has_files"] = has_files
node["all_shared"] = has_files and all_shared
node["all_unique"] = has_files and all_unique
node["shared_with"] = shared_with
if node["all_shared"] and not node.get("is_root"):
shared_count += 1
node["shared_count"] = shared_count
return total, has_files, node["all_shared"], shared_with, shared_count, node["all_unique"]
def _render_file_tree(self, node):
open_shared_path = node.get("shared_count", 0) == 1
parts = []
for name in sorted(node["children"].keys(), key=lambda n: n.lower()):
parts.append(
self._render_folder(
name,
node["children"][name],
level=0,
open_shared_path=open_shared_path,
)
)
for entry in sorted(node["files"], key=lambda e: e["name"].lower()):
parts.append(self._render_file_entry(entry))
return "<div class='ckpt-file-tree'>" + "".join(parts) + "</div>"
def _render_folder(self, name, node, level, open_shared_path=False):
name, node = self._collapse_single_child_folder(name, node)
folder_name = html.escape(name)
size_label = self._format_size(node.get("size", 0))
shared_tag = ""
shared_tooltip = ""
folder_class = "ckpt-folder"
if node.get("all_shared"):
shared_tag = "<span class='ckpt-shared-tag ckpt-folder-tag'>Shared</span>"
folder_class += " shared"
shared_with = sorted(node.get("shared_with", []), key=lambda value: value.lower())
if shared_with:
lines = "".join(
f"<div>{html.escape(label)}</div>" for label in shared_with
)
shared_tooltip = (
"<div class='ckpt-shared-tooltip'>"
"<div class='ckpt-shared-tooltip-title'>Shared with</div>"
f"{lines}</div>"
)
elif node.get("all_unique"):
folder_class += " unique"
open_attr = ""
if level < 1 or (open_shared_path and node.get("shared_count", 0) == 1):
open_attr = " open"
children_html = []
for child_name in sorted(node["children"].keys(), key=lambda n: n.lower()):
children_html.append(
self._render_folder(
child_name,
node["children"][child_name],
level + 1,
open_shared_path=open_shared_path,
)
)
for entry in sorted(node["files"], key=lambda e: e["name"].lower()):
children_html.append(self._render_file_entry(entry))
return (
f"<details class='{folder_class}'{open_attr}>"
"<summary>"
f"<span class='ckpt-folder-name'>{folder_name}</span>"
f"{shared_tag}"
f"<span class='ckpt-folder-size'>{size_label}</span>"
f"{shared_tooltip}"
"</summary>"
"<div class='ckpt-folder-children'>"
+ "".join(children_html)
+ "</div></details>"
)
def _collapse_single_child_folder(self, name, node):
parts = [name]
current = node
while True:
if current.get("files"):
break
children = current.get("children", {})
if len(children) != 1:
break
child_name, child_node = next(iter(children.items()))
parts.append(child_name)
current = child_node
return os.sep.join(parts), current
def _render_file_entry(self, entry):
classes = ["ckpt-file-row"]
if entry.get("shared"):
classes.append("shared")
else:
classes.append("unique")
if not entry.get("exists"):
classes.append("missing")
class_attr = " ".join(classes)
name = html.escape(entry.get("name", ""))
size_label = self._format_size(entry.get("size", 0))
display = html.escape(entry.get("display", ""))
shared_tag = "<span class='ckpt-shared-tag'>Shared</span>" if entry.get("shared") else ""
shared_tooltip = ""
if entry.get("shared"):
shared_with = entry.get("shared_with", [])
if shared_with:
lines = "".join(
f"<div>{html.escape(label)}</div>" for label in shared_with
)
shared_tooltip = (
"<div class='ckpt-shared-tooltip'>"
"<div class='ckpt-shared-tooltip-title'>Shared with</div>"
f"{lines}</div>"
)
return (
f"<div class='{class_attr}' title='{display}'>"
f"<span class='ckpt-file-name'>{name}</span>"
f"{shared_tag}"
f"<span class='ckpt-file-size'>{size_label}</span>"
f"{shared_tooltip}"
"</div>"
)
def _split_display_path(self, display_path):
drive, rest = os.path.splitdrive(display_path)
if drive:
drive = drive.rstrip("\\/")
rest = rest.replace("\\", "/")
parts = [part for part in rest.split("/") if part]
if drive:
return [drive] + parts
if display_path.startswith("\\\\"):
return ["UNC"] + parts
return parts
def _add_file(self, files_set, value, force_folder=None, lora_dir=None):
path = self._resolve_path(value, force_folder=force_folder, lora_dir=lora_dir)
if path and os.path.isfile(path):
files_set.add(path)
def _download_root_name(self):
root = fl.get_download_location()
if not root or os.path.isabs(root):
return ""
norm = os.path.normpath(root)
if not norm or norm == ".":
return ""
return norm.split(os.sep)[0]
def _resolve_repo_relative_path(self, rel_path):
if not rel_path:
return None
rel_path = str(rel_path)
if rel_path.startswith("http"):
return None
root_name = self._download_root_name()
if not root_name:
return None
rel_norm = os.path.normpath(rel_path)
parts = rel_norm.split(os.sep)
if parts and parts[0].lower() == root_name.lower():
return self._normalize_path(os.path.join(self._repo_root, rel_norm))
return None
def _get_local_model_filename(self, model_filename, use_locator=True, extra_paths=None, lora_dir=None):
try:
return fl.get_local_model_filename(model_filename, use_locator=use_locator, extra_paths=extra_paths, lora_dir=lora_dir)
except Exception:
return None
def _locator_path(self, value, lora_dir=None):
if not value:
return ""
if not isinstance(value, str):
value = str(value)
try:
return fl.extract_alternate_path(value, lora_dir=lora_dir)
except Exception:
source_value = value.split("|", 1)[0]
return os.path.basename(source_value) if source_value.startswith("http") else source_value
def _locator_basename(self, value, lora_dir=None):
path = self._locator_path(value, lora_dir=lora_dir)
return os.path.basename(path) if path else ""
def _download_location(self, value, force_folder=None, lora_dir=None):
try:
return fl.get_download_location(value, force_path=force_folder, lora_dir=lora_dir)
except Exception:
source_value = str(value).split("|", 1)[0]
filename = os.path.basename(source_value) if source_value.startswith("http") else source_value
return fl.get_download_location(filename, force_path=force_folder)
def _resolve_path(self, value, force_folder=None, lora_dir=None):
if not value:
return None
if not isinstance(value, str):
value = str(value)
if os.path.isabs(value):
abs_path = self._normalize_path(value)
if os.path.isfile(abs_path):
return abs_path
basename = os.path.basename(abs_path)
return abs_path
repo_rel = self._resolve_repo_relative_path(value)
if repo_rel:
return repo_rel
local_path = self._get_local_model_filename(value, extra_paths=force_folder, lora_dir=lora_dir)
if local_path:
return self._normalize_path(local_path)
expected = self._download_location(value, force_folder=force_folder, lora_dir=lora_dir)
return self._normalize_path(expected)
def _resolve_download_relpath(self, rel_path):
if rel_path and os.path.isabs(rel_path):
return self._normalize_path(rel_path)
repo_rel = self._resolve_repo_relative_path(rel_path)
if repo_rel:
return repo_rel
local = fl.locate_file(rel_path, error_if_none=False)
if local:
return self._normalize_path(local)
return self._normalize_path(fl.get_download_location(rel_path))
def _resolve_folder_path(self, source_folder, target_folder):
parts = []
if target_folder:
parts.append(target_folder)
if source_folder:
parts.append(source_folder)
rel_path = os.path.join(*parts) if parts else ""
if rel_path and os.path.isabs(rel_path):
return self._normalize_path(rel_path)
repo_rel = self._resolve_repo_relative_path(rel_path)
if repo_rel:
return repo_rel
if not rel_path:
return self._normalize_path(fl.get_download_location())
folder = fl.locate_folder(rel_path, error_if_none=False)
if folder:
return self._normalize_path(folder)
return self._normalize_path(fl.get_download_location(rel_path))
def _list_folder_files(self, folder_path):
files = set()
if not folder_path or not os.path.isdir(folder_path):
return files
for root, _, filenames in os.walk(folder_path):
for name in filenames:
files.add(self._normalize_path(os.path.join(root, name)))
return files
def _combine_download_path(self, target_folder, source_folder, filename):
parts = []
if target_folder:
parts.append(target_folder)
if source_folder:
parts.append(source_folder)
parts.append(filename)
return os.path.join(*parts)
def _compute_list(self, filename):
if not filename:
return []
return [os.path.basename(str(filename))]
def _ensure_list(self, value):
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def _safe_get_lora_dir(self, model_type):
try:
return self.get_lora_dir(model_type)
except Exception:
return None
def _normalize_path(self, path):
return os.path.normcase(os.path.abspath(path))
def _format_gb(self, size_bytes):
size_gb = size_bytes / (1024 ** 3)
return f"{size_gb:.2f} GB"
def _format_size(self, size_bytes):
if size_bytes >= 1024 ** 3:
return f"{size_bytes / (1024 ** 3):.2f} GB"
if size_bytes >= 1024 ** 2:
return f"{size_bytes / (1024 ** 2):.1f} MB"
if size_bytes >= 1024:
return f"{size_bytes / 1024:.1f} KB"
return f"{size_bytes} B"
def _display_path(self, path):
try:
rel = os.path.relpath(path, self._repo_root)
if not rel.startswith(".."):
return rel
except Exception:
pass
return path
def _get_model_roots(self):
roots = []
for root in getattr(fl, "_models_paths", []) or []:
if not root:
continue
if os.path.isabs(root):
abs_root = root
else:
abs_root = os.path.join(self._repo_root, root)
abs_root = self._normalize_path(abs_root)
if abs_root not in roots:
roots.append(abs_root)
return roots
def _build_basename_index(self, basenames, min_size=1):
basenames = {name.lower() for name in basenames if name}
if not basenames:
return {}
found = {}
remaining = set(basenames)
for root in self._get_model_roots():
if not remaining:
break
if not os.path.isdir(root):
continue
for name in list(remaining):
candidate = os.path.join(root, name)
if os.path.isfile(candidate):
try:
if os.path.getsize(candidate) < min_size:
continue
except OSError:
continue
found[name] = self._normalize_path(candidate)
remaining.discard(name)
if not remaining:
break
if os.path.normcase(root) == os.path.normcase(self._repo_root):
continue
for dirpath, _, filenames in os.walk(root):
for filename in filenames:
key = filename.lower()
if key in remaining:
full_path = os.path.join(dirpath, filename)
try:
if os.path.getsize(full_path) < min_size:
continue
except OSError:
continue
found[key] = self._normalize_path(full_path)
remaining.discard(key)
if not remaining:
break
if not remaining:
break
return found
def _find_in_model_roots(self, basename, min_size=1):
if not basename:
return None
key = basename.lower()
if key in self._basename_index:
return self._basename_index.get(key)
found = self._build_basename_index({basename}, min_size=min_size)
if found:
self._basename_index.update(found)
return self._basename_index.get(key)
self._basename_index[key] = None
return None
def _repair_missing_paths(self):
missing = []
for path in self._file_usage_models.keys():
try:
size = os.path.getsize(path) if os.path.isfile(path) else -1
except OSError:
size = -1
if size <= 0:
missing.append(path)
if not missing:
return
basenames = {os.path.basename(path) for path in missing}
found = self._build_basename_index(basenames, min_size=1)
if not found:
return
for old_path in list(missing):
key = os.path.basename(old_path).lower()
new_path = found.get(key)
if not new_path or new_path == old_path:
continue
self._swap_path(old_path, new_path)
def _swap_path(self, old_path, new_path):
usage_ids = self._file_usage.pop(old_path, set())
if usage_ids:
self._file_usage[new_path].update(usage_ids)
for usage_id in usage_ids:
files = self._usage_files.get(usage_id)
if files is not None:
files.discard(old_path)
files.add(new_path)
model_type = self._variant_to_model.get(usage_id)
if model_type:
for variant in self._model_variants.get(model_type, []):
if variant.get("id") != usage_id:
continue
files_set = variant.get("files")
if isinstance(files_set, set):
files_set.discard(old_path)
files_set.add(new_path)
primary_paths = variant.get("primary_paths", [])
variant["primary_paths"] = [
new_path if path == old_path else path for path in primary_paths
]
break
model_types = self._file_usage_models.pop(old_path, set())
if model_types:
self._file_usage_models[new_path].update(model_types)
for model_type in model_types:
files = self._model_files.get(model_type)
if files is not None:
files.discard(old_path)
files.add(new_path)
primary_paths = self._model_primary.get(model_type)
if primary_paths:
self._model_primary[model_type] = [
new_path if path == old_path else path for path in primary_paths
]