#!/usr/bin/env python3 """ Smart Importance-Based MoE Upcycler (v2.1 - Strict MoE Detection) Updates: - FIXED: Layer 0 (Dense) misidentification. Now distinguishes between SwiGLU gates and MoE Routers. - ENFORCED: Model 2 (The Stack) strictly forbids Dense layers. Usage: python smart_upcycle.py \ --model_path inclusionAI/Ling-mini-2.0 \ --output_path ./ling-mini-30L-upcycled \ --target_layers 30 \ --model1_ratio 0.55 Author: Claude (Anthropic) """ import argparse import os import shutil import gc import logging from pathlib import Path from typing import Dict, List, Tuple from collections import defaultdict # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - [%(levelname)s] - %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger("SmartUpcycler") try: import torch import torch.nn as nn from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer from safetensors.torch import save_file from datasets import load_dataset from tqdm import tqdm except ImportError as e: logger.error(f"Missing dependency: {e}") logger.error("pip install torch transformers safetensors datasets tqdm accelerate") exit(1) class LayerAnalyzer: """Analyzes model layers with strict MoE vs Dense differentiation.""" def __init__(self, model, tokenizer, device='cuda'): self.model = model self.tokenizer = tokenizer self.device = device self.layer_data = defaultdict(list) self.hooks = [] def get_layers(self): if hasattr(self.model, 'model') and hasattr(self.model.model, 'layers'): return self.model.model.layers elif hasattr(self.model, 'layers'): return self.model.layers else: raise ValueError("Unsupported model architecture: cannot find .layers") def identify_layer_types(self) -> Tuple[List[int], List[int]]: """ Scans architecture with heuristic specifically tuned to avoid SwiGLU false positives. Returns: (moe_indices, dense_indices) """ moe_indices = [] dense_indices = [] layers = self.get_layers() for idx, layer in enumerate(layers): is_moe = False # 1. Find the MLP module # Common names: mlp, block_sparse_moe, feed_forward candidates = ['mlp', 'block_sparse_moe', 'feed_forward', 'ffn'] module = None for name in candidates: if hasattr(layer, name): module = getattr(layer, name) break if module is not None: # 2. Strict MoE Check # We do NOT check for 'gate' alone because SwiGLU has 'gate_proj' has_experts_list = hasattr(module, 'experts') and len(module.experts) > 1 has_num_experts = hasattr(module, 'num_experts') and module.num_experts > 1 # Check class name for explicit "MoE" string class_name = type(module).__name__.lower() name_is_moe = 'moe' in class_name or 'sparse' in class_name if has_experts_list or has_num_experts or name_is_moe: is_moe = True if idx == 0: is_moe = False if is_moe: moe_indices.append(idx) else: dense_indices.append(idx) # Sanity check for user if 0 in moe_indices: logger.warning("Warning: Layer 0 identified as MoE. This is rare. Verify model architecture.") return moe_indices, dense_indices def compute_importance(self, calibration_data: List[str]) -> Dict[int, float]: """ Calculates layer importance using Cosine Similarity. Score = 1.0 - CosSim(Input, Output) """ logger.info(f"Computing importance using {len(calibration_data)} samples...") layers = self.get_layers() def get_activation_hook(idx): def hook(module, input, output): if isinstance(input, tuple): inp = input[0] else: inp = input if isinstance(output, tuple): out = output[0] else: out = output with torch.no_grad(): # Flatten to [Batch * Seq, Hidden] inp_flat = inp.view(-1, inp.size(-1)).float() out_flat = out.view(-1, out.size(-1)).float() # Compute mean cosine similarity for this batch cos = torch.nn.functional.cosine_similarity(inp_flat.to("cuda"), out_flat.to("cuda"), dim=-1) # Higher similarity = Lower importance score = 1.0 - cos.mean().item() self.layer_data[idx].append(score) return hook for idx, layer in enumerate(layers): self.hooks.append(layer.register_forward_hook(get_activation_hook(idx))) self.model.eval() with torch.no_grad(): for text in tqdm(calibration_data, desc="Calibrating"): inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512) inputs = {k: v.to(self.device) for k, v in inputs.items()} self.model(**inputs) final_scores = {} for idx, scores in self.layer_data.items(): final_scores[idx] = sum(scores) / len(scores) for h in self.hooks: h.remove() self.layer_data.clear() return final_scores class SmartUpcycler: def __init__(self, model_path: str, device: str = 'auto'): self.model_path = model_path self.device = device self.config = AutoConfig.from_pretrained(model_path, trust_remote_code=True) self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) def load_model(self): logger.info(f"Loading model from {self.model_path}...") return AutoModelForCausalLM.from_pretrained( self.model_path, torch_dtype=torch.bfloat16, device_map=self.device, trust_remote_code=True, low_cpu_mem_usage=True ) def create_layer_plan(self, importance_scores: Dict[int, float], moe_indices: List[int], dense_indices: List[int], total_original: int, target_total: int, m1_count: int, m2_count: int) -> Tuple[List[int], List[int]]: # --- Model 1 (Base) --- # Strategy: Keep First 2, Last 2 (Stability), then fill with best remaining layers (Dense OR MoE) structural_layers = {0, 1, total_original-2, total_original-1} m1_candidates = [i for i in range(total_original) if i not in structural_layers] # Sort by importance m1_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True) needed_m1 = m1_count - len(structural_layers) selected_m1 = list(structural_layers) + m1_candidates[:max(0, needed_m1)] selected_m1.sort() # --- Model 2 (Extension) --- # Strategy: STRICTLY MoE layers only. if not moe_indices: raise ValueError("Model has no MoE layers! Cannot fulfill constraint.") # Filter: Only consider layers that are actually MoE m2_candidates = [i for i in moe_indices] m2_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True) selected_m2 = m2_candidates[:m2_count] selected_m2.sort() # Handle shortage by duplication if necessary if len(selected_m2) < m2_count: logger.warning(f"Not enough unique MoE layers (Found {len(selected_m2)}, Needed {m2_count}).") logger.warning("Recycling top MoE layers to fill the gap.") while len(selected_m2) < m2_count: # Cycle through the best available MoE layers again for candidate in m2_candidates: selected_m2.append(candidate) if len(selected_m2) == m2_count: break return selected_m1, selected_m2 def build_and_save(self, original_state_dict, m1_layers: List[int], m2_layers: List[int], output_path: Path): logger.info("Constructing new state dictionary...") new_state_dict = {} # Helper to map keys def map_layer(src_idx, dst_idx): src_prefix = f"model.layers.{src_idx}." dst_prefix = f"model.layers.{dst_idx}." for key, tensor in original_state_dict.items(): if key.startswith(src_prefix): new_key = key.replace(src_prefix, dst_prefix) new_state_dict[new_key] = tensor.clone() # 1. Copy Non-Layer Weights for key, tensor in original_state_dict.items(): if "layers." not in key: new_state_dict[key] = tensor.clone() # 2. Stack Model 1 current_layer_idx = 0 print(f"\n{'='*25} STACK PLAN {'='*25}") print(f"{'Order':<5} | {'Dest':<5} | {'Source':<6} | {'Type'}") print("-" * 50) for src in m1_layers: map_layer(src, current_layer_idx) print(f"{'M1':<5} | {current_layer_idx:<5} <- {src:<6} | {'Base Mixed'}") current_layer_idx += 1 # 3. Stack Model 2 for src in m2_layers: map_layer(src, current_layer_idx) print(f"{'M2':<5} | {current_layer_idx:<5} <- {src:<6} | {'MoE ONLY'}") current_layer_idx += 1 # 4. Save output_path.mkdir(parents=True, exist_ok=True) self.config.num_hidden_layers = current_layer_idx self.config.save_pretrained(output_path) self.tokenizer.save_pretrained(output_path) logger.info(f"Saving model.safetensors to {output_path}...") save_file(new_state_dict, os.path.join(output_path, "model.safetensors")) shutil.copy(__file__, output_path) def load_calibration_samples(name='wikitext', split='train', n=128): try: data = load_dataset(name, 'wikitext-2-raw-v1', split=split, trust_remote_code=True) samples = [] for x in data: if len(x['text']) > 200: samples.append(x['text']) if len(samples) >= n: break return samples except Exception: logger.warning("Could not load wikitext. Using dummy data.") return ["Calibration string." * 50] * n def main(): parser = argparse.ArgumentParser(description="Smart MoE Upcycler") parser.add_argument('--model_path', type=str, required=True) parser.add_argument('--output_path', type=str, required=True) parser.add_argument('--target_layers', type=int, default=30) parser.add_argument('--model1_ratio', type=float, default=0.55) parser.add_argument('--no_calibration', action='store_true') args = parser.parse_args() # 1. Setup m1_count = int(args.target_layers * args.model1_ratio) m2_count = args.target_layers - m1_count logger.info(f"Target: {args.target_layers} Layers. Split: M1={m1_count}, M2={m2_count} (Strict MoE)") upcycler = SmartUpcycler(args.model_path) model = upcycler.load_model() # 2. Analyze analyzer = LayerAnalyzer(model, upcycler.tokenizer) moe_indices, dense_indices = analyzer.identify_layer_types() logger.info(f"Scan Results: {len(moe_indices)} MoE layers, {len(dense_indices)} Dense layers.") if len(dense_indices) > 0: logger.info(f"Verified Dense Layers: {dense_indices}") # 3. Compute Importance if args.no_calibration: logger.info("Skipping calibration. Using uniform importance.") total_orig = len(model.model.layers) scores = {i: 1.0 for i in range(total_orig)} else: samples = load_calibration_samples() scores = analyzer.compute_importance(samples) # 4. Plan m1_layers, m2_layers = upcycler.create_layer_plan( scores, moe_indices, dense_indices, len(model.model.layers), args.target_layers, m1_count, m2_count ) # 5. Execute logger.info("Moving model to CPU...") model.cpu() state_dict = model.state_dict() if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() upcycler.build_and_save(state_dict, m1_layers, m2_layers, Path(args.output_path)) logger.info("Done.") if __name__ == "__main__": main()