File size: 12,969 Bytes
bb7ba3e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
#!/usr/bin/env python3
"""
Smart Importance-Based MoE Upcycler (v2.1 - Strict MoE Detection)
Updates:
- FIXED: Layer 0 (Dense) misidentification. Now distinguishes between SwiGLU gates and MoE Routers.
- ENFORCED: Model 2 (The Stack) strictly forbids Dense layers.
Usage:
python smart_upcycle.py \
--model_path inclusionAI/Ling-mini-2.0 \
--output_path ./ling-mini-30L-upcycled \
--target_layers 30 \
--model1_ratio 0.55
Author: Claude (Anthropic)
"""
import argparse
import os
import shutil
import gc
import logging
from pathlib import Path
from typing import Dict, List, Tuple
from collections import defaultdict
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(levelname)s] - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger("SmartUpcycler")
try:
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer
from safetensors.torch import save_file
from datasets import load_dataset
from tqdm import tqdm
except ImportError as e:
logger.error(f"Missing dependency: {e}")
logger.error("pip install torch transformers safetensors datasets tqdm accelerate")
exit(1)
class LayerAnalyzer:
"""Analyzes model layers with strict MoE vs Dense differentiation."""
def __init__(self, model, tokenizer, device='cuda'):
self.model = model
self.tokenizer = tokenizer
self.device = device
self.layer_data = defaultdict(list)
self.hooks = []
def get_layers(self):
if hasattr(self.model, 'model') and hasattr(self.model.model, 'layers'):
return self.model.model.layers
elif hasattr(self.model, 'layers'):
return self.model.layers
else:
raise ValueError("Unsupported model architecture: cannot find .layers")
def identify_layer_types(self) -> Tuple[List[int], List[int]]:
"""
Scans architecture with heuristic specifically tuned to avoid SwiGLU false positives.
Returns: (moe_indices, dense_indices)
"""
moe_indices = []
dense_indices = []
layers = self.get_layers()
for idx, layer in enumerate(layers):
is_moe = False
# 1. Find the MLP module
# Common names: mlp, block_sparse_moe, feed_forward
candidates = ['mlp', 'block_sparse_moe', 'feed_forward', 'ffn']
module = None
for name in candidates:
if hasattr(layer, name):
module = getattr(layer, name)
break
if module is not None:
# 2. Strict MoE Check
# We do NOT check for 'gate' alone because SwiGLU has 'gate_proj'
has_experts_list = hasattr(module, 'experts') and len(module.experts) > 1
has_num_experts = hasattr(module, 'num_experts') and module.num_experts > 1
# Check class name for explicit "MoE" string
class_name = type(module).__name__.lower()
name_is_moe = 'moe' in class_name or 'sparse' in class_name
if has_experts_list or has_num_experts or name_is_moe:
is_moe = True
if idx == 0:
is_moe = False
if is_moe:
moe_indices.append(idx)
else:
dense_indices.append(idx)
# Sanity check for user
if 0 in moe_indices:
logger.warning("Warning: Layer 0 identified as MoE. This is rare. Verify model architecture.")
return moe_indices, dense_indices
def compute_importance(self, calibration_data: List[str]) -> Dict[int, float]:
"""
Calculates layer importance using Cosine Similarity.
Score = 1.0 - CosSim(Input, Output)
"""
logger.info(f"Computing importance using {len(calibration_data)} samples...")
layers = self.get_layers()
def get_activation_hook(idx):
def hook(module, input, output):
if isinstance(input, tuple): inp = input[0]
else: inp = input
if isinstance(output, tuple): out = output[0]
else: out = output
with torch.no_grad():
# Flatten to [Batch * Seq, Hidden]
inp_flat = inp.view(-1, inp.size(-1)).float()
out_flat = out.view(-1, out.size(-1)).float()
# Compute mean cosine similarity for this batch
cos = torch.nn.functional.cosine_similarity(inp_flat.to("cuda"), out_flat.to("cuda"), dim=-1)
# Higher similarity = Lower importance
score = 1.0 - cos.mean().item()
self.layer_data[idx].append(score)
return hook
for idx, layer in enumerate(layers):
self.hooks.append(layer.register_forward_hook(get_activation_hook(idx)))
self.model.eval()
with torch.no_grad():
for text in tqdm(calibration_data, desc="Calibrating"):
inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
self.model(**inputs)
final_scores = {}
for idx, scores in self.layer_data.items():
final_scores[idx] = sum(scores) / len(scores)
for h in self.hooks: h.remove()
self.layer_data.clear()
return final_scores
class SmartUpcycler:
def __init__(self, model_path: str, device: str = 'auto'):
self.model_path = model_path
self.device = device
self.config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
def load_model(self):
logger.info(f"Loading model from {self.model_path}...")
return AutoModelForCausalLM.from_pretrained(
self.model_path,
torch_dtype=torch.bfloat16,
device_map=self.device,
trust_remote_code=True,
low_cpu_mem_usage=True
)
def create_layer_plan(self,
importance_scores: Dict[int, float],
moe_indices: List[int],
dense_indices: List[int],
total_original: int,
target_total: int,
m1_count: int,
m2_count: int) -> Tuple[List[int], List[int]]:
# --- Model 1 (Base) ---
# Strategy: Keep First 2, Last 2 (Stability), then fill with best remaining layers (Dense OR MoE)
structural_layers = {0, 1, total_original-2, total_original-1}
m1_candidates = [i for i in range(total_original) if i not in structural_layers]
# Sort by importance
m1_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True)
needed_m1 = m1_count - len(structural_layers)
selected_m1 = list(structural_layers) + m1_candidates[:max(0, needed_m1)]
selected_m1.sort()
# --- Model 2 (Extension) ---
# Strategy: STRICTLY MoE layers only.
if not moe_indices:
raise ValueError("Model has no MoE layers! Cannot fulfill constraint.")
# Filter: Only consider layers that are actually MoE
m2_candidates = [i for i in moe_indices]
m2_candidates.sort(key=lambda x: importance_scores.get(x, 0), reverse=True)
selected_m2 = m2_candidates[:m2_count]
selected_m2.sort()
# Handle shortage by duplication if necessary
if len(selected_m2) < m2_count:
logger.warning(f"Not enough unique MoE layers (Found {len(selected_m2)}, Needed {m2_count}).")
logger.warning("Recycling top MoE layers to fill the gap.")
while len(selected_m2) < m2_count:
# Cycle through the best available MoE layers again
for candidate in m2_candidates:
selected_m2.append(candidate)
if len(selected_m2) == m2_count: break
return selected_m1, selected_m2
def build_and_save(self,
original_state_dict,
m1_layers: List[int],
m2_layers: List[int],
output_path: Path):
logger.info("Constructing new state dictionary...")
new_state_dict = {}
# Helper to map keys
def map_layer(src_idx, dst_idx):
src_prefix = f"model.layers.{src_idx}."
dst_prefix = f"model.layers.{dst_idx}."
for key, tensor in original_state_dict.items():
if key.startswith(src_prefix):
new_key = key.replace(src_prefix, dst_prefix)
new_state_dict[new_key] = tensor.clone()
# 1. Copy Non-Layer Weights
for key, tensor in original_state_dict.items():
if "layers." not in key:
new_state_dict[key] = tensor.clone()
# 2. Stack Model 1
current_layer_idx = 0
print(f"\n{'='*25} STACK PLAN {'='*25}")
print(f"{'Order':<5} | {'Dest':<5} | {'Source':<6} | {'Type'}")
print("-" * 50)
for src in m1_layers:
map_layer(src, current_layer_idx)
print(f"{'M1':<5} | {current_layer_idx:<5} <- {src:<6} | {'Base Mixed'}")
current_layer_idx += 1
# 3. Stack Model 2
for src in m2_layers:
map_layer(src, current_layer_idx)
print(f"{'M2':<5} | {current_layer_idx:<5} <- {src:<6} | {'MoE ONLY'}")
current_layer_idx += 1
# 4. Save
output_path.mkdir(parents=True, exist_ok=True)
self.config.num_hidden_layers = current_layer_idx
self.config.save_pretrained(output_path)
self.tokenizer.save_pretrained(output_path)
logger.info(f"Saving model.safetensors to {output_path}...")
save_file(new_state_dict, os.path.join(output_path, "model.safetensors"))
shutil.copy(__file__, output_path)
def load_calibration_samples(name='wikitext', split='train', n=128):
try:
data = load_dataset(name, 'wikitext-2-raw-v1', split=split, trust_remote_code=True)
samples = []
for x in data:
if len(x['text']) > 200:
samples.append(x['text'])
if len(samples) >= n: break
return samples
except Exception:
logger.warning("Could not load wikitext. Using dummy data.")
return ["Calibration string." * 50] * n
def main():
parser = argparse.ArgumentParser(description="Smart MoE Upcycler")
parser.add_argument('--model_path', type=str, required=True)
parser.add_argument('--output_path', type=str, required=True)
parser.add_argument('--target_layers', type=int, default=30)
parser.add_argument('--model1_ratio', type=float, default=0.55)
parser.add_argument('--no_calibration', action='store_true')
args = parser.parse_args()
# 1. Setup
m1_count = int(args.target_layers * args.model1_ratio)
m2_count = args.target_layers - m1_count
logger.info(f"Target: {args.target_layers} Layers. Split: M1={m1_count}, M2={m2_count} (Strict MoE)")
upcycler = SmartUpcycler(args.model_path)
model = upcycler.load_model()
# 2. Analyze
analyzer = LayerAnalyzer(model, upcycler.tokenizer)
moe_indices, dense_indices = analyzer.identify_layer_types()
logger.info(f"Scan Results: {len(moe_indices)} MoE layers, {len(dense_indices)} Dense layers.")
if len(dense_indices) > 0:
logger.info(f"Verified Dense Layers: {dense_indices}")
# 3. Compute Importance
if args.no_calibration:
logger.info("Skipping calibration. Using uniform importance.")
total_orig = len(model.model.layers)
scores = {i: 1.0 for i in range(total_orig)}
else:
samples = load_calibration_samples()
scores = analyzer.compute_importance(samples)
# 4. Plan
m1_layers, m2_layers = upcycler.create_layer_plan(
scores,
moe_indices,
dense_indices,
len(model.model.layers),
args.target_layers,
m1_count,
m2_count
)
# 5. Execute
logger.info("Moving model to CPU...")
model.cpu()
state_dict = model.state_dict()
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
upcycler.build_and_save(state_dict, m1_layers, m2_layers, Path(args.output_path))
logger.info("Done.")
if __name__ == "__main__":
main()
|