30l-imp / smart_upcycle.py
semran1's picture
Upload folder using huggingface_hub
bb7ba3e verified
#!/usr/bin/env python3
"""
Smart Importance-Based MoE Upcycler (v2.1 - Strict MoE Detection)
Updates:
- FIXED: Layer 0 (Dense) misidentification. Now distinguishes between SwiGLU gates and MoE Routers.
- ENFORCED: Model 2 (The Stack) strictly forbids Dense layers.
Usage:
python smart_upcycle.py \
--model_path inclusionAI/Ling-mini-2.0 \
--output_path ./ling-mini-30L-upcycled \
--target_layers 30 \
--model1_ratio 0.55
Author: Claude (Anthropic)
"""
import argparse
import os
import shutil
import gc
import logging
from pathlib import Path
from typing import Dict, List, Tuple
from collections import defaultdict
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(levelname)s] - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger("SmartUpcycler")
try:
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
from safetensors.torch import save_file
from datasets import load_dataset
from tqdm import tqdm
except ImportError as e:
logger.error(f"Missing dependency: {e}")
logger.error("pip install torch transformers safetensors datasets tqdm accelerate")
exit(1)
class LayerAnalyzer:
"""Analyzes model layers with strict MoE vs Dense differentiation."""
def __init__(self, model, tokenizer, device='cuda'):
self.model = model
self.tokenizer = tokenizer
self.device = device
self.layer_data = defaultdict(list)
self.hooks = []
def get_layers(self):
if hasattr(self.model, 'model') and hasattr(self.model.model, 'layers'):
return self.model.model.layers
elif hasattr(self.model, 'layers'):
return self.model.layers
else:
raise ValueError("Unsupported model architecture: cannot find .layers")
def identify_layer_types(self) -> Tuple[List[int], List[int]]:
"""
Scans architecture with heuristic specifically tuned to avoid SwiGLU false positives.
Returns: (moe_indices, dense_indices)
"""
moe_indices = []
dense_indices = []
layers = self.get_layers()
for idx, layer in enumerate(layers):
is_moe = False
# 1. Find the MLP module
# Common names: mlp, block_sparse_moe, feed_forward
candidates = ['mlp', 'block_sparse_moe', 'feed_forward', 'ffn']
module = None
for name in candidates:
if hasattr(layer, name):
module = getattr(layer, name)
break
if module is not None:
# 2. Strict MoE Check
# We do NOT check for 'gate' alone because SwiGLU has 'gate_proj'
has_experts_list = hasattr(module, 'experts') and len(module.experts) > 1
has_num_experts = hasattr(module, 'num_experts') and module.num_experts > 1
# Check class name for explicit "MoE" string
class_name = type(module).__name__.lower()
name_is_moe = 'moe' in class_name or 'sparse' in class_name
if has_experts_list or has_num_experts or name_is_moe:
is_moe = True
if idx == 0:
is_moe = False
if is_moe:
moe_indices.append(idx)
else:
dense_indices.append(idx)
# Sanity check for user
if 0 in moe_indices:
logger.warning("Warning: Layer 0 identified as MoE. This is rare. Verify model architecture.")
return moe_indices, dense_indices
def compute_importance(self, calibration_data: List[str]) -> Dict[int, float]:
"""
Calculates layer importance using Cosine Similarity.
Score = 1.0 - CosSim(Input, Output)
"""
logger.info(f"Computing importance using {len(calibration_data)} samples...")
layers = self.get_layers()
def get_activation_hook(idx):
def hook(module, input, output):
if isinstance(input, tuple): inp = input[0]
else: inp = input
if isinstance(output, tuple): out = output[0]
else: out = output
with torch.no_grad():
# Flatten to [Batch * Seq, Hidden]
inp_flat = inp.view(-1, inp.size(-1)).float()
out_flat = out.view(-1, out.size(-1)).float()
# Compute mean cosine similarity for this batch
cos = torch.nn.functional.cosine_similarity(inp_flat.to("cuda"), out_flat.to("cuda"), dim=-1)
# Higher similarity = Lower importance
score = 1.0 - cos.mean().item()
self.layer_data[idx].append(score)
return hook
for idx, layer in enumerate(layers):
self.hooks.append(layer.register_forward_hook(get_activation_hook(idx)))
self.model.eval()
with torch.no_grad():
for text in tqdm(calibration_data, desc="Calibrating"):
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
self.model(**inputs)
final_scores = {}
for idx, scores in self.layer_data.items():
final_scores[idx] = sum(scores) / len(scores)
for h in self.hooks: h.remove()
self.layer_data.clear()
return final_scores
class SmartUpcycler:
def __init__(self, model_path: str, device: str = 'auto'):
self.model_path = model_path
self.device = device
self.config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
def load_model(self):
logger.info(f"Loading model from {self.model_path}...")
return AutoModelForCausalLM.from_pretrained(
self.model_path,
torch_dtype=torch.bfloat16,
device_map=self.device,
trust_remote_code=True,
low_cpu_mem_usage=True
)
def create_layer_plan(self,
importance_scores: Dict[int, float],
moe_indices: List[int],
dense_indices: List[int],
total_original: int,
target_total: int,
m1_count: int,
m2_count: int) -> Tuple[List[int], List[int]]:
# --- Model 1 (Base) ---
# Strategy: Keep First 2, Last 2 (Stability), then fill with best remaining layers (Dense OR MoE)
structural_layers = {0, 1, total_original-2, total_original-1}
m1_candidates = [i for i in range(total_original) if i not in structural_layers]
# Sort by importance
m1_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True)
needed_m1 = m1_count - len(structural_layers)
selected_m1 = list(structural_layers) + m1_candidates[:max(0, needed_m1)]
selected_m1.sort()
# --- Model 2 (Extension) ---
# Strategy: STRICTLY MoE layers only.
if not moe_indices:
raise ValueError("Model has no MoE layers! Cannot fulfill constraint.")
# Filter: Only consider layers that are actually MoE
m2_candidates = [i for i in moe_indices]
m2_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True)
selected_m2 = m2_candidates[:m2_count]
selected_m2.sort()
# Handle shortage by duplication if necessary
if len(selected_m2) < m2_count:
logger.warning(f"Not enough unique MoE layers (Found {len(selected_m2)}, Needed {m2_count}).")
logger.warning("Recycling top MoE layers to fill the gap.")
while len(selected_m2) < m2_count:
# Cycle through the best available MoE layers again
for candidate in m2_candidates:
selected_m2.append(candidate)
if len(selected_m2) == m2_count: break
return selected_m1, selected_m2
def build_and_save(self,
original_state_dict,
m1_layers: List[int],
m2_layers: List[int],
output_path: Path):
logger.info("Constructing new state dictionary...")
new_state_dict = {}
# Helper to map keys
def map_layer(src_idx, dst_idx):
src_prefix = f"model.layers.{src_idx}."
dst_prefix = f"model.layers.{dst_idx}."
for key, tensor in original_state_dict.items():
if key.startswith(src_prefix):
new_key = key.replace(src_prefix, dst_prefix)
new_state_dict[new_key] = tensor.clone()
# 1. Copy Non-Layer Weights
for key, tensor in original_state_dict.items():
if "layers." not in key:
new_state_dict[key] = tensor.clone()
# 2. Stack Model 1
current_layer_idx = 0
print(f"\n{'='*25} STACK PLAN {'='*25}")
print(f"{'Order':<5} | {'Dest':<5} | {'Source':<6} | {'Type'}")
print("-" * 50)
for src in m1_layers:
map_layer(src, current_layer_idx)
print(f"{'M1':<5} | {current_layer_idx:<5} <- {src:<6} | {'Base Mixed'}")
current_layer_idx += 1
# 3. Stack Model 2
for src in m2_layers:
map_layer(src, current_layer_idx)
print(f"{'M2':<5} | {current_layer_idx:<5} <- {src:<6} | {'MoE ONLY'}")
current_layer_idx += 1
# 4. Save
output_path.mkdir(parents=True, exist_ok=True)
self.config.num_hidden_layers = current_layer_idx
self.config.save_pretrained(output_path)
self.tokenizer.save_pretrained(output_path)
logger.info(f"Saving model.safetensors to {output_path}...")
save_file(new_state_dict, os.path.join(output_path, "model.safetensors"))
shutil.copy(__file__, output_path)
def load_calibration_samples(name='wikitext', split='train', n=128):
try:
data = load_dataset(name, 'wikitext-2-raw-v1', split=split, trust_remote_code=True)
samples = []
for x in data:
if len(x['text']) > 200:
samples.append(x['text'])
if len(samples) >= n: break
return samples
except Exception:
logger.warning("Could not load wikitext. Using dummy data.")
return ["Calibration string." * 50] * n
def main():
parser = argparse.ArgumentParser(description="Smart MoE Upcycler")
parser.add_argument('--model_path', type=str, required=True)
parser.add_argument('--output_path', type=str, required=True)
parser.add_argument('--target_layers', type=int, default=30)
parser.add_argument('--model1_ratio', type=float, default=0.55)
parser.add_argument('--no_calibration', action='store_true')
args = parser.parse_args()
# 1. Setup
m1_count = int(args.target_layers * args.model1_ratio)
m2_count = args.target_layers - m1_count
logger.info(f"Target: {args.target_layers} Layers. Split: M1={m1_count}, M2={m2_count} (Strict MoE)")
upcycler = SmartUpcycler(args.model_path)
model = upcycler.load_model()
# 2. Analyze
analyzer = LayerAnalyzer(model, upcycler.tokenizer)
moe_indices, dense_indices = analyzer.identify_layer_types()
logger.info(f"Scan Results: {len(moe_indices)} MoE layers, {len(dense_indices)} Dense layers.")
if len(dense_indices) > 0:
logger.info(f"Verified Dense Layers: {dense_indices}")
# 3. Compute Importance
if args.no_calibration:
logger.info("Skipping calibration. Using uniform importance.")
total_orig = len(model.model.layers)
scores = {i: 1.0 for i in range(total_orig)}
else:
samples = load_calibration_samples()
scores = analyzer.compute_importance(samples)
# 4. Plan
m1_layers, m2_layers = upcycler.create_layer_plan(
scores,
moe_indices,
dense_indices,
len(model.model.layers),
args.target_layers,
m1_count,
m2_count
)
# 5. Execute
logger.info("Moving model to CPU...")
model.cpu()
state_dict = model.state_dict()
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
upcycler.build_and_save(state_dict, m1_layers, m2_layers, Path(args.output_path))
logger.info("Done.")
if __name__ == "__main__":
main()