| import os
|
| import torch
|
| import numpy as np
|
| import gguf
|
| import argparse
|
| from tqdm import tqdm
|
|
|
| from safetensors.torch import load_file
|
|
|
| QUANTIZATION_THRESHOLD = 1024
|
| REARRANGE_THRESHOLD = 512
|
| MAX_TENSOR_NAME_LENGTH = 127
|
|
|
|
|
| class QuantError(Exception):
|
| pass
|
|
|
| class quants:
|
| @staticmethod
|
| def quantize(data, data_qtype):
|
|
|
| if data_qtype == GGMLQuantizationType.F16:
|
| return data.astype(np.float16)
|
| elif data_qtype == GGMLQuantizationType.BF16:
|
| return data.astype(np.float32)
|
| else:
|
| raise QuantError("Unsupported quantization type")
|
|
|
| class ModelTemplate:
|
| arch = "invalid"
|
| shape_fix = False
|
| keys_detect = []
|
| keys_banned = []
|
|
|
| class ModelFlux(ModelTemplate):
|
| arch = "fluxz"
|
| keys_detect = [
|
| ("transformer_blocks.0.attn.norm_added_k.weight",),
|
| ("double_blocks.0.img_attn.proj.weight",),
|
| ]
|
| keys_banned = ["transformer_blocks.0.attn.norm_added_k.weight",]
|
|
|
| class ModelSD3(ModelTemplate):
|
| arch = "sd3"
|
| keys_detect = [
|
| ("transformer_blocks.0.attn.add_q_proj.weight",),
|
| ("joint_blocks.0.x_block.attn.qkv.weight",),
|
| ]
|
| keys_banned = ["transformer_blocks.0.attn.add_q_proj.weight",]
|
|
|
| class ModelSDXL(ModelTemplate):
|
| arch = "sdxl"
|
| shape_fix = True
|
| keys_detect = [
|
| ("down_blocks.0.downsamplers.0.conv.weight", "add_embedding.linear_1.weight",),
|
| (
|
| "input_blocks.3.0.op.weight", "input_blocks.6.0.op.weight",
|
| "output_blocks.2.2.conv.weight", "output_blocks.5.2.conv.weight",
|
| ),
|
| ("label_emb.0.0.weight",),
|
| ]
|
|
|
| class ModelSD1(ModelTemplate):
|
| arch = "sd1"
|
| shape_fix = False
|
| keys_detect = [
|
| ("down_blocks.0.downsamplers.0.conv.weight",),
|
| (
|
| "input_blocks.3.0.op.weight", "input_blocks.6.0.op.weight", "input_blocks.9.0.op.weight",
|
| "output_blocks.2.1.conv.weight", "output_blocks.5.2.conv.weight", "output_blocks.8.2.conv.weight"
|
| ),
|
| ]
|
|
|
|
|
| arch_list = [ModelSD3, ModelFlux, ModelSDXL, ModelSD1]
|
|
|
| def is_model_arch(model, state_dict):
|
|
|
| matched = False
|
| invalid = False
|
| for match_list in model.keys_detect:
|
| if all(key in state_dict for key in match_list):
|
| matched = True
|
| invalid = any(key in state_dict for key in model.keys_banned)
|
| break
|
| assert not invalid, "Model architecture not allowed for conversion! (i.e. reference VS diffusers format)"
|
| return matched
|
|
|
| def detect_arch(state_dict):
|
| model_arch = None
|
| for arch in arch_list:
|
| if is_model_arch(arch, state_dict):
|
| model_arch = arch
|
| break
|
| assert model_arch is not None, "Unknown model architecture!"
|
| return model_arch
|
|
|
| def parse_args():
|
| parser = argparse.ArgumentParser(description="Generate F16 GGUF files from single UNET")
|
| parser.add_argument("--src", required=True, help="Source model ckpt file.")
|
| parser.add_argument("--dst", help="Output unet gguf file.")
|
| args = parser.parse_args()
|
|
|
| if not os.path.isfile(args.src):
|
| parser.error("No input provided!")
|
|
|
| return args
|
|
|
| def load_state_dict(path):
|
| if any(path.endswith(x) for x in [".ckpt", ".pt", ".bin", ".pth"]):
|
| state_dict = torch.load(path, map_location="cpu", weights_only=True)
|
| state_dict = state_dict.get("model", state_dict)
|
| else:
|
| state_dict = load_file(path)
|
|
|
|
|
| sd = {}
|
| has_prefix = any(["model.diffusion_model." in x for x in state_dict.keys()])
|
| for k, v in state_dict.items():
|
| if has_prefix and "model.diffusion_model." not in k:
|
| continue
|
| if has_prefix:
|
| k = k.replace("model.diffusion_model.", "")
|
| sd[k] = v
|
|
|
| return sd
|
|
|
| def load_model(path):
|
| state_dict = load_state_dict(path)
|
| model_arch = detect_arch(state_dict)
|
| print(f"* Architecture detected from input: {model_arch.arch}")
|
| writer = gguf.GGUFWriter(path=None, arch=model_arch.arch)
|
| return (writer, state_dict, model_arch)
|
|
|
| def handle_tensors(args, writer, state_dict, model_arch):
|
| name_lengths = tuple(sorted(
|
| ((key, len(key)) for key in state_dict.keys()),
|
| key=lambda item: item[1],
|
| reverse=True,
|
| ))
|
| if not name_lengths:
|
| return
|
| max_name_len = name_lengths[0][1]
|
| if max_name_len > MAX_TENSOR_NAME_LENGTH:
|
| bad_list = ", ".join(f"{key!r} ({namelen})" for key, namelen in name_lengths if namelen > MAX_TENSOR_NAME_LENGTH)
|
| raise ValueError(f"Can only handle tensor names up to {MAX_TENSOR_NAME_LENGTH} characters. Tensors exceeding the limit: {bad_list}")
|
| for key, data in tqdm(state_dict.items()):
|
| old_dtype = data.dtype
|
|
|
| if data.dtype == torch.bfloat16:
|
| data = data.to(torch.float32).numpy()
|
|
|
| elif data.dtype in [getattr(torch, "float8_e4m3fn", "_invalid"), getattr(torch, "float8_e5m2", "_invalid")]:
|
| data = data.to(torch.float16).numpy()
|
| else:
|
| data = data.numpy()
|
|
|
| n_dims = len(data.shape)
|
| data_shape = data.shape
|
| data_qtype = getattr(
|
| gguf.GGMLQuantizationType,
|
| "BF16" if old_dtype == torch.bfloat16 else "F16"
|
| )
|
|
|
|
|
| n_params = 1
|
| for dim_size in data_shape:
|
| n_params *= dim_size
|
|
|
|
|
| blacklist = {
|
| "time_embedding.",
|
| "add_embedding.",
|
| "time_in.",
|
| "txt_in.",
|
| "vector_in.",
|
| "img_in.",
|
| "guidance_in.",
|
| "final_layer.",
|
| }
|
|
|
| if old_dtype in (torch.float32, torch.bfloat16):
|
| if n_dims == 1:
|
|
|
|
|
| data_qtype = gguf.GGMLQuantizationType.F32
|
|
|
| elif n_params <= QUANTIZATION_THRESHOLD:
|
|
|
| data_qtype = gguf.GGMLQuantizationType.F32
|
|
|
| elif ".weight" in key and any(x in key for x in blacklist):
|
| data_qtype = gguf.GGMLQuantizationType.F32
|
|
|
| if (model_arch.shape_fix
|
| and n_dims > 1
|
| and n_params >= REARRANGE_THRESHOLD
|
| and (n_params / 256).is_integer()
|
| and not (data.shape[-1] / 256).is_integer()
|
| ):
|
| orig_shape = data.shape
|
| data = data.reshape(n_params // 256, 256)
|
| writer.add_array(f"comfy.gguf.orig_shape.{key}", tuple(int(dim) for dim in orig_shape))
|
|
|
| new_name = key
|
|
|
| shape_str = f"{{{', '.join(str(n) for n in reversed(data.shape))}}}"
|
| tqdm.write(f"{f'%-{max_name_len + 4}s' % f'{new_name}'} {old_dtype} --> {data_qtype.name}, shape = {shape_str}")
|
|
|
| writer.add_tensor(new_name, data, raw_dtype=data_qtype)
|
|
|
|
|
|
|
| def load_model(path):
|
| state_dict = load_state_dict(path)
|
| model_arch = detect_arch(state_dict)
|
| print(f"* Architecture detected from input: {model_arch.arch}")
|
| return state_dict, model_arch
|
|
|
| ...
|
|
|
| if __name__ == "__main__":
|
| args = parse_args()
|
| path = args.src
|
| state_dict, model_arch = load_model(path)
|
|
|
| if next(iter(state_dict.values())).dtype == torch.bfloat16:
|
| out_path = f"{os.path.splitext(path)[0]}-BF16.gguf"
|
| else:
|
| out_path = f"{os.path.splitext(path)[0]}-F16.gguf"
|
|
|
| out_path = args.dst or out_path
|
| if os.path.isfile(out_path):
|
| input("Output exists enter to continue or ctrl+c to abort!")
|
|
|
| writer = gguf.GGUFWriter(path=out_path, arch=model_arch.arch)
|
| writer.add_quantization_version(1)
|
|
|
| handle_tensors(args, writer, state_dict, model_arch)
|
| writer.write_header_to_file()
|
| writer.write_kv_data_to_file()
|
| writer.write_tensors_to_file()
|
| writer.close()
|
|
|