| import json | |
| import os | |
| import shutil | |
| import torch | |
| from collections import defaultdict | |
| from safetensors.torch import load_file, save_file | |
| from tqdm import tqdm | |
| def shared_pointers(tensors): | |
| ptrs = defaultdict(list) | |
| for k, v in tensors.items(): | |
| ptrs[v.data_ptr()].append(k) | |
| return [names for names in ptrs.values() if len(names) > 1] | |
| def check_file_size(sf_filename, pt_filename): | |
| sf_size = os.stat(sf_filename).st_size | |
| pt_size = os.stat(pt_filename).st_size | |
| if (sf_size - pt_size) / pt_size > 0.01: | |
| raise RuntimeError(f"File size difference exceeds 1% between {sf_filename} and {pt_filename}") | |
| def convert_file(pt_filename, sf_filename, copy_add_data=True): | |
| source_folder = os.path.dirname(pt_filename) | |
| dest_folder = os.path.dirname(sf_filename) | |
| loaded = torch.load(pt_filename, map_location="cpu") | |
| loaded = loaded.get("state_dict", loaded) | |
| shared = shared_pointers(loaded) | |
| for shared_weights in shared: | |
| for name in shared_weights[1:]: | |
| loaded.pop(name) | |
| loaded = {k: v.contiguous().half() for k, v in loaded.items()} | |
| os.makedirs(dest_folder, exist_ok=True) | |
| save_file(loaded, sf_filename, metadata={"format": "pt"}) | |
| check_file_size(sf_filename, pt_filename) | |
| if copy_add_data: | |
| copy_additional_files(source_folder, dest_folder) | |
| reloaded = load_file(sf_filename) | |
| for k, v in loaded.items(): | |
| if not torch.equal(v, reloaded[k]): | |
| raise RuntimeError(f"Mismatch in tensors for key {k}.") | |
| def rename(pt_filename): | |
| return pt_filename.replace("pytorch_model", "model").replace(".bin", ".safetensors") | |
| def copy_additional_files(source_folder, dest_folder): | |
| for file in os.listdir(source_folder): | |
| file_path = os.path.join(source_folder, file) | |
| if os.path.isfile(file_path) and not (file.endswith('.bin') or file.endswith('.py')): | |
| shutil.copy(file_path, dest_folder) | |
| def find_index_file(source_folder): | |
| for file in os.listdir(source_folder): | |
| if file.endswith('.bin.index.json'): | |
| return file | |
| return None | |
| def convert_files(source_folder, dest_folder, delete_old): | |
| index_file = find_index_file(source_folder) | |
| if not index_file: | |
| raise RuntimeError("Index file not found. Please ensure the correct folder is specified.") | |
| index_file = os.path.join(source_folder, index_file) | |
| with open(index_file) as f: | |
| index_data = json.load(f) | |
| for pt_filename in tqdm(set(index_data["weight_map"].values())): | |
| full_pt_filename = os.path.join(source_folder, pt_filename) | |
| sf_filename = os.path.join(dest_folder, rename(pt_filename)) | |
| convert_file(full_pt_filename, sf_filename, copy_add_data=False) | |
| if delete_old: | |
| os.remove(full_pt_filename) | |
| copy_additional_files(source_folder, dest_folder) | |
| index_path = os.path.join(dest_folder, "model.safetensors.index.json") | |
| with open(index_path, "w") as f: | |
| new_map = {k: rename(v) for k, v in index_data["weight_map"].items()} | |
| json.dump({**index_data, "weight_map": new_map}, f, indent=4) | |
| def main(): | |
| script_dir = os.path.dirname(os.path.realpath(__file__)) | |
| source_folder = input("Source folder for PyTorch files (leave blank for script's directory): ").strip() or script_dir | |
| dest_folder = input("Destination folder for SafeTensors files (leave blank for default): ").strip() | |
| if not dest_folder: | |
| model_name = os.path.basename(os.path.normpath(source_folder)) | |
| dest_folder = os.path.join(source_folder, model_name + "_safetensors") | |
| delete_old = input("Delete old PyTorch files? (Y/N): ").strip().upper() == 'Y' | |
| if "pytorch_model.bin" in os.listdir(source_folder): | |
| convert_file(os.path.join(source_folder, "pytorch_model.bin"), os.path.join(dest_folder, "model.safetensors"), copy_add_data=True) | |
| if delete_old: | |
| os.remove(os.path.join(source_folder, "pytorch_model.bin")) | |
| else: | |
| convert_files(source_folder, dest_folder, delete_old) | |
| if __name__ == "__main__": | |
| main() | |