|
|
|
|
|
""" |
|
|
Smart Importance-Based MoE Upcycler (v2.1 - Strict MoE Detection) |
|
|
|
|
|
Updates: |
|
|
- FIXED: Layer 0 (Dense) misidentification. Now distinguishes between SwiGLU gates and MoE Routers. |
|
|
- ENFORCED: Model 2 (The Stack) strictly forbids Dense layers. |
|
|
|
|
|
Usage: |
|
|
python smart_upcycle.py \ |
|
|
--model_path inclusionAI/Ling-mini-2.0 \ |
|
|
--output_path ./ling-mini-30L-upcycled \ |
|
|
--target_layers 30 \ |
|
|
--model1_ratio 0.55 |
|
|
|
|
|
Author: Claude (Anthropic) |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import os |
|
|
import shutil |
|
|
import gc |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Tuple |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - [%(levelname)s] - %(message)s', |
|
|
datefmt='%H:%M:%S' |
|
|
) |
|
|
logger = logging.getLogger("SmartUpcycler") |
|
|
|
|
|
try: |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer |
|
|
from safetensors.torch import save_file |
|
|
from datasets import load_dataset |
|
|
from tqdm import tqdm |
|
|
except ImportError as e: |
|
|
logger.error(f"Missing dependency: {e}") |
|
|
logger.error("pip install torch transformers safetensors datasets tqdm accelerate") |
|
|
exit(1) |
|
|
|
|
|
class LayerAnalyzer: |
|
|
"""Analyzes model layers with strict MoE vs Dense differentiation.""" |
|
|
|
|
|
def __init__(self, model, tokenizer, device='cuda'): |
|
|
self.model = model |
|
|
self.tokenizer = tokenizer |
|
|
self.device = device |
|
|
self.layer_data = defaultdict(list) |
|
|
self.hooks = [] |
|
|
|
|
|
def get_layers(self): |
|
|
if hasattr(self.model, 'model') and hasattr(self.model.model, 'layers'): |
|
|
return self.model.model.layers |
|
|
elif hasattr(self.model, 'layers'): |
|
|
return self.model.layers |
|
|
else: |
|
|
raise ValueError("Unsupported model architecture: cannot find .layers") |
|
|
|
|
|
def identify_layer_types(self) -> Tuple[List[int], List[int]]: |
|
|
""" |
|
|
Scans architecture with heuristic specifically tuned to avoid SwiGLU false positives. |
|
|
Returns: (moe_indices, dense_indices) |
|
|
""" |
|
|
moe_indices = [] |
|
|
dense_indices = [] |
|
|
layers = self.get_layers() |
|
|
|
|
|
for idx, layer in enumerate(layers): |
|
|
is_moe = False |
|
|
|
|
|
|
|
|
|
|
|
candidates = ['mlp', 'block_sparse_moe', 'feed_forward', 'ffn'] |
|
|
module = None |
|
|
for name in candidates: |
|
|
if hasattr(layer, name): |
|
|
module = getattr(layer, name) |
|
|
break |
|
|
|
|
|
if module is not None: |
|
|
|
|
|
|
|
|
has_experts_list = hasattr(module, 'experts') and len(module.experts) > 1 |
|
|
has_num_experts = hasattr(module, 'num_experts') and module.num_experts > 1 |
|
|
|
|
|
|
|
|
class_name = type(module).__name__.lower() |
|
|
name_is_moe = 'moe' in class_name or 'sparse' in class_name |
|
|
|
|
|
if has_experts_list or has_num_experts or name_is_moe: |
|
|
is_moe = True |
|
|
if idx == 0: |
|
|
is_moe = False |
|
|
|
|
|
if is_moe: |
|
|
moe_indices.append(idx) |
|
|
else: |
|
|
dense_indices.append(idx) |
|
|
|
|
|
|
|
|
if 0 in moe_indices: |
|
|
logger.warning("Warning: Layer 0 identified as MoE. This is rare. Verify model architecture.") |
|
|
|
|
|
return moe_indices, dense_indices |
|
|
|
|
|
def compute_importance(self, calibration_data: List[str]) -> Dict[int, float]: |
|
|
""" |
|
|
Calculates layer importance using Cosine Similarity. |
|
|
Score = 1.0 - CosSim(Input, Output) |
|
|
""" |
|
|
logger.info(f"Computing importance using {len(calibration_data)} samples...") |
|
|
layers = self.get_layers() |
|
|
|
|
|
def get_activation_hook(idx): |
|
|
def hook(module, input, output): |
|
|
if isinstance(input, tuple): inp = input[0] |
|
|
else: inp = input |
|
|
if isinstance(output, tuple): out = output[0] |
|
|
else: out = output |
|
|
|
|
|
with torch.no_grad(): |
|
|
|
|
|
inp_flat = inp.view(-1, inp.size(-1)).float() |
|
|
out_flat = out.view(-1, out.size(-1)).float() |
|
|
|
|
|
|
|
|
cos = torch.nn.functional.cosine_similarity(inp_flat.to("cuda"), out_flat.to("cuda"), dim=-1) |
|
|
|
|
|
score = 1.0 - cos.mean().item() |
|
|
self.layer_data[idx].append(score) |
|
|
return hook |
|
|
|
|
|
for idx, layer in enumerate(layers): |
|
|
self.hooks.append(layer.register_forward_hook(get_activation_hook(idx))) |
|
|
|
|
|
self.model.eval() |
|
|
with torch.no_grad(): |
|
|
for text in tqdm(calibration_data, desc="Calibrating"): |
|
|
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512) |
|
|
inputs = {k: v.to(self.device) for k, v in inputs.items()} |
|
|
self.model(**inputs) |
|
|
|
|
|
final_scores = {} |
|
|
for idx, scores in self.layer_data.items(): |
|
|
final_scores[idx] = sum(scores) / len(scores) |
|
|
|
|
|
for h in self.hooks: h.remove() |
|
|
self.layer_data.clear() |
|
|
|
|
|
return final_scores |
|
|
|
|
|
class SmartUpcycler: |
|
|
def __init__(self, model_path: str, device: str = 'auto'): |
|
|
self.model_path = model_path |
|
|
self.device = device |
|
|
self.config = AutoConfig.from_pretrained(model_path, trust_remote_code=True) |
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
|
|
|
|
|
def load_model(self): |
|
|
logger.info(f"Loading model from {self.model_path}...") |
|
|
return AutoModelForCausalLM.from_pretrained( |
|
|
self.model_path, |
|
|
torch_dtype=torch.bfloat16, |
|
|
device_map=self.device, |
|
|
trust_remote_code=True, |
|
|
low_cpu_mem_usage=True |
|
|
) |
|
|
|
|
|
def create_layer_plan(self, |
|
|
importance_scores: Dict[int, float], |
|
|
moe_indices: List[int], |
|
|
dense_indices: List[int], |
|
|
total_original: int, |
|
|
target_total: int, |
|
|
m1_count: int, |
|
|
m2_count: int) -> Tuple[List[int], List[int]]: |
|
|
|
|
|
|
|
|
|
|
|
structural_layers = {0, 1, total_original-2, total_original-1} |
|
|
m1_candidates = [i for i in range(total_original) if i not in structural_layers] |
|
|
|
|
|
|
|
|
m1_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True) |
|
|
|
|
|
needed_m1 = m1_count - len(structural_layers) |
|
|
selected_m1 = list(structural_layers) + m1_candidates[:max(0, needed_m1)] |
|
|
selected_m1.sort() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not moe_indices: |
|
|
raise ValueError("Model has no MoE layers! Cannot fulfill constraint.") |
|
|
|
|
|
|
|
|
m2_candidates = [i for i in moe_indices] |
|
|
m2_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True) |
|
|
|
|
|
selected_m2 = m2_candidates[:m2_count] |
|
|
selected_m2.sort() |
|
|
|
|
|
|
|
|
if len(selected_m2) < m2_count: |
|
|
logger.warning(f"Not enough unique MoE layers (Found {len(selected_m2)}, Needed {m2_count}).") |
|
|
logger.warning("Recycling top MoE layers to fill the gap.") |
|
|
while len(selected_m2) < m2_count: |
|
|
|
|
|
for candidate in m2_candidates: |
|
|
selected_m2.append(candidate) |
|
|
if len(selected_m2) == m2_count: break |
|
|
|
|
|
return selected_m1, selected_m2 |
|
|
|
|
|
def build_and_save(self, |
|
|
original_state_dict, |
|
|
m1_layers: List[int], |
|
|
m2_layers: List[int], |
|
|
output_path: Path): |
|
|
|
|
|
logger.info("Constructing new state dictionary...") |
|
|
new_state_dict = {} |
|
|
|
|
|
|
|
|
def map_layer(src_idx, dst_idx): |
|
|
src_prefix = f"model.layers.{src_idx}." |
|
|
dst_prefix = f"model.layers.{dst_idx}." |
|
|
|
|
|
for key, tensor in original_state_dict.items(): |
|
|
if key.startswith(src_prefix): |
|
|
new_key = key.replace(src_prefix, dst_prefix) |
|
|
new_state_dict[new_key] = tensor.clone() |
|
|
|
|
|
|
|
|
for key, tensor in original_state_dict.items(): |
|
|
if "layers." not in key: |
|
|
new_state_dict[key] = tensor.clone() |
|
|
|
|
|
|
|
|
current_layer_idx = 0 |
|
|
print(f"\n{'='*25} STACK PLAN {'='*25}") |
|
|
print(f"{'Order':<5} | {'Dest':<5} | {'Source':<6} | {'Type'}") |
|
|
print("-" * 50) |
|
|
|
|
|
for src in m1_layers: |
|
|
map_layer(src, current_layer_idx) |
|
|
print(f"{'M1':<5} | {current_layer_idx:<5} <- {src:<6} | {'Base Mixed'}") |
|
|
current_layer_idx += 1 |
|
|
|
|
|
|
|
|
for src in m2_layers: |
|
|
map_layer(src, current_layer_idx) |
|
|
print(f"{'M2':<5} | {current_layer_idx:<5} <- {src:<6} | {'MoE ONLY'}") |
|
|
current_layer_idx += 1 |
|
|
|
|
|
|
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
self.config.num_hidden_layers = current_layer_idx |
|
|
self.config.save_pretrained(output_path) |
|
|
self.tokenizer.save_pretrained(output_path) |
|
|
|
|
|
logger.info(f"Saving model.safetensors to {output_path}...") |
|
|
save_file(new_state_dict, os.path.join(output_path, "model.safetensors")) |
|
|
shutil.copy(__file__, output_path) |
|
|
|
|
|
def load_calibration_samples(name='wikitext', split='train', n=128): |
|
|
try: |
|
|
data = load_dataset(name, 'wikitext-2-raw-v1', split=split, trust_remote_code=True) |
|
|
samples = [] |
|
|
for x in data: |
|
|
if len(x['text']) > 200: |
|
|
samples.append(x['text']) |
|
|
if len(samples) >= n: break |
|
|
return samples |
|
|
except Exception: |
|
|
logger.warning("Could not load wikitext. Using dummy data.") |
|
|
return ["Calibration string." * 50] * n |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Smart MoE Upcycler") |
|
|
parser.add_argument('--model_path', type=str, required=True) |
|
|
parser.add_argument('--output_path', type=str, required=True) |
|
|
parser.add_argument('--target_layers', type=int, default=30) |
|
|
parser.add_argument('--model1_ratio', type=float, default=0.55) |
|
|
parser.add_argument('--no_calibration', action='store_true') |
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
m1_count = int(args.target_layers * args.model1_ratio) |
|
|
m2_count = args.target_layers - m1_count |
|
|
|
|
|
logger.info(f"Target: {args.target_layers} Layers. Split: M1={m1_count}, M2={m2_count} (Strict MoE)") |
|
|
|
|
|
upcycler = SmartUpcycler(args.model_path) |
|
|
model = upcycler.load_model() |
|
|
|
|
|
|
|
|
analyzer = LayerAnalyzer(model, upcycler.tokenizer) |
|
|
moe_indices, dense_indices = analyzer.identify_layer_types() |
|
|
|
|
|
logger.info(f"Scan Results: {len(moe_indices)} MoE layers, {len(dense_indices)} Dense layers.") |
|
|
if len(dense_indices) > 0: |
|
|
logger.info(f"Verified Dense Layers: {dense_indices}") |
|
|
|
|
|
|
|
|
if args.no_calibration: |
|
|
logger.info("Skipping calibration. Using uniform importance.") |
|
|
total_orig = len(model.model.layers) |
|
|
scores = {i: 1.0 for i in range(total_orig)} |
|
|
else: |
|
|
samples = load_calibration_samples() |
|
|
scores = analyzer.compute_importance(samples) |
|
|
|
|
|
|
|
|
m1_layers, m2_layers = upcycler.create_layer_plan( |
|
|
scores, |
|
|
moe_indices, |
|
|
dense_indices, |
|
|
len(model.model.layers), |
|
|
args.target_layers, |
|
|
m1_count, |
|
|
m2_count |
|
|
) |
|
|
|
|
|
|
|
|
logger.info("Moving model to CPU...") |
|
|
model.cpu() |
|
|
state_dict = model.state_dict() |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
gc.collect() |
|
|
|
|
|
upcycler.build_and_save(state_dict, m1_layers, m2_layers, Path(args.output_path)) |
|
|
logger.info("Done.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|