Soon_Merger_Toolkit / merge_utils3.py
AlekseyCalvin's picture
Rename merge_utils.py to merge_utils3.py
a750ef3 verified
import os
import yaml
import gc
import torch
import shutil
from pathlib import Path
# --- CRITICAL PATCH: MUST RUN BEFORE MERGEKIT IMPORTS ---
import pydantic
from pydantic import ConfigDict, BaseModel
BaseModel.model_config = ConfigDict(arbitrary_types_allowed=True)
try:
from mergekit.config import MergeConfiguration
from mergekit.merge import run_merge
from mergekit.architecture import get_architecture_info
except ImportError:
print("MergeKit not found. Please install 'mergekit' in requirements.txt")
def execute_mergekit_config(config_dict, out_path, shard_gb, device="cpu"):
"""
Executes a MergeKit run based on a dictionary config.
Optimized for CPU execution with aggressive sharding.
"""
# Convert dict to YAML string first to ensure validation passes through standard flow
config_yaml = yaml.dump(config_dict)
print("--- Generated MergeKit Config ---")
print(config_yaml)
print("---------------------------------")
conf = MergeConfiguration.model_validate(yaml.safe_load(config_yaml))
run_merge(
conf,
out_path=out_path,
device=device,
low_cpu_mem=True,
copy_tokenizer=True,
lazy_unpickle=True,
max_shard_size=int(shard_gb * 1024**3)
)
# Force cleanup
gc.collect()
def build_full_merge_config(
method, models, base_model, weights, density,
dtype, tokenizer_source, layer_ranges
):
"""
Constructs the YAML dictionary for general merging (Linear, SLERP, TIES, etc.)
"""
# Basic Config
config = {
"merge_method": method.lower(),
"base_model": base_model if base_model else models[0],
"dtype": dtype,
"tokenizer_source": tokenizer_source,
"models": []
}
# Helper to parse weights safely
w_list = []
if weights:
try:
w_list = [float(x.strip()) for x in weights.split(',')]
except:
print("Warning: Could not parse weights, defaulting to 1.0")
# Model Construction
for i, m in enumerate(models):
entry = {"model": m, "parameters": {}}
# Method Specific Param Injection
if method.lower() in ["ties", "dare_ties", "dare_linear"]:
entry["parameters"]["weight"] = w_list[i] if i < len(w_list) else 1.0
entry["parameters"]["density"] = density
elif method.lower() == "slerp":
# SLERP usually takes 't' parameter via weight, but often requires layer slices
# If layer_ranges is provided (JSON), use that. Otherwise use weight as 't'
if layer_ranges and "slices" in layer_ranges:
# Advanced Slice Config
pass # mergekit handles slices at root level usually, but we inject here if needed
else:
entry["parameters"]["weight"] = w_list[i] if i < len(w_list) else 1.0
elif method.lower() == "linear":
entry["parameters"]["weight"] = w_list[i] if i < len(w_list) else 1.0
config["models"].append(entry)
# Inject Slices/Layer Ranges if provided (Raw JSON override)
if layer_ranges.strip():
try:
extra_params = yaml.safe_load(layer_ranges)
if isinstance(extra_params, dict):
config.update(extra_params)
except Exception as e:
print(f"Error parsing layer ranges JSON: {e}")
return config
def build_moe_config(
base_model, experts, gate_mode, dtype,
tokenizer_source, positive_prompts=None
):
"""
Constructs the YAML dictionary for Mixture of Experts (MoE)
"""
config = {
"base_model": base_model,
"gate_mode": gate_mode,
"dtype": dtype,
"tokenizer_source": tokenizer_source,
"experts": []
}
# Parse experts
for i, exp in enumerate(experts):
expert_entry = {
"source_model": exp,
"positive_prompts": [f"expert_{i}"] # Placeholder if not provided
}
config["experts"].append(expert_entry)
return config