""" Architecture parser - produces LINEAR PIPELINE representation of transformer models. Shows the sequential flow of data through the model as a flowchart. """ import re from typing import Dict, Any, List, Optional, Tuple from collections import OrderedDict import torch import torch.nn as nn from transformers import AutoConfig, AutoModel, AutoModelForCausalLM, AutoModelForSeq2SeqLM # Monkeypatch for transformers import issues in some environment/model combinations try: import transformers.utils.import_utils as import_utils if not hasattr(import_utils, "is_torch_fx_available"): import_utils.is_torch_fx_available = lambda: False except (ImportError, AttributeError): pass def format_params(count: int) -> str: """Format parameter count in human-readable form.""" if count >= 1e12: return f"{count / 1e12:.2f}T" elif count >= 1e9: return f"{count / 1e9:.2f}B" elif count >= 1e6: return f"{count / 1e6:.2f}M" elif count >= 1e3: return f"{count / 1e3:.2f}K" else: return str(count) def get_module_type(module: nn.Module, name: str) -> str: """Infer module type from class name and module name.""" class_name = module.__class__.__name__.lower() name_lower = name.lower() # Check if this is a model wrapper (contains "model" in class name) - should be treated as module is_model_wrapper = 'model' in class_name and ('for' in class_name or class_name.endswith('model')) if is_model_wrapper: return 'module' if 'embedding' in class_name: return 'embedding' elif 'attention' in class_name or 'attn' in class_name: return 'attention' elif 'mlp' in class_name or 'feedforward' in class_name or 'ffn' in class_name: return 'mlp' elif 'layernorm' in class_name or 'rmsnorm' in class_name: return 'norm' elif 'linear' in class_name: return 'linear' elif 'conv' in class_name: return 'linear' elif 'dropout' in class_name: return 'dropout' elif 'pool' in class_name: return 'pooler' elif 'head' in class_name or 'lm_head' in name_lower: return 'head' # Check for MoE/expert - but only for actual MoE layers, not model wrappers elif ('expert' in class_name or 'moe' in class_name) and 'layer' in class_name: return 'mlp' elif 'expert' in class_name and 'model' not in class_name: return 'mlp' # Check name patterns if 'embed' in name_lower: return 'embedding' elif 'attn' in name_lower or 'attention' in name_lower: return 'attention' elif 'mlp' in name_lower or 'fc' in name_lower or 'ffn' in name_lower: return 'mlp' elif 'norm' in name_lower or 'ln' in name_lower: return 'norm' elif 'head' in name_lower: return 'head' elif 'expert' in name_lower and 'model' not in name_lower: return 'mlp' return 'module' def count_parameters(module: nn.Module) -> int: """Count all parameters in a module recursively.""" return sum(p.numel() for p in module.parameters()) def humanize_name(name: str) -> str: """Convert module name to human-readable format.""" # Handle indexed names like "0", "1" etc if name.isdigit(): return f"Layer {name}" # Convert snake_case to Title Case name = name.replace('_', ' ') # Handle common abbreviations replacements = { 'Wte': 'Token Embedding', 'Wpe': 'Position Embedding', 'Ln F': 'Final LayerNorm', 'Ln 1': 'LayerNorm 1', 'Ln 2': 'LayerNorm 2', 'Attn': 'Attention', 'Mlp': 'MLP', 'Lm Head': 'LM Head', 'Q Proj': 'Query', 'K Proj': 'Key', 'V Proj': 'Value', 'O Proj': 'Output', 'Out Proj': 'Output', 'C Attn': 'QKV Projection', 'C Proj': 'Output Projection', 'C Fc': 'Up Projection', 'Up Proj': 'Up Projection', 'Down Proj': 'Down Projection', 'Gate Proj': 'Gate Projection', } result = name.title() for old, new in replacements.items(): result = result.replace(old, new) return result def is_modality_encoder(name: str, module: nn.Module) -> bool: """ Check if a module is a separate MODALITY encoder (vision tower, audio encoder, etc.) This should only match top-level modality-specific encoders, not internal components. """ name_lower = name.lower() class_lower = module.__class__.__name__.lower() # Specific patterns for modality encoders (must have modality keyword) modality_keywords = ['vision', 'image', 'audio', 'video', 'visual', 'pixel'] # Must contain a modality keyword has_modality = any(kw in name_lower or kw in class_lower for kw in modality_keywords) if not has_modality: return False # And should be a substantial module (tower, model, encoder) structure_keywords = ['tower', 'model', 'encoder', 'backbone'] has_structure = any(kw in name_lower or kw in class_lower for kw in structure_keywords) # Or just "vision_tower", "image_encoder" style names return has_structure or name_lower in ['vision', 'visual', 'image'] def extract_pipeline_steps(module: nn.Module, name: str, depth: int = 0, max_depth: int = 4, detect_parallel: bool = True) -> List[Dict[str, Any]]: """ Extract pipeline steps from a module. Handles both linear and parallel (multimodal) architectures. Returns a list of steps where parallel branches are marked. detect_parallel: Only look for parallel modality encoders at top level (depth 0-1) """ steps = [] children = list(module.named_children()) if not children: return steps # Categorize children embeddings = [] vision_modules = [] # Vision tower, projector language_model = None # Main language model layer_container = None layer_list = [] norms = [] heads = [] others = [] for child_name, child_module in children: child_params = count_parameters(child_module) if child_params == 0: continue child_type = get_module_type(child_module, child_name) name_lower = child_name.lower() class_lower = child_module.__class__.__name__.lower() # Detect multimodal components at appropriate depth if detect_parallel and depth <= 1: # Vision tower or projector if is_modality_encoder(child_name, child_module) or 'projector' in name_lower or 'projector' in class_lower: vision_modules.append((child_name, child_module)) continue # Main language model (separate from vision) if 'language_model' in name_lower or 'text_model' in name_lower: language_model = (child_name, child_module) continue if child_type == 'embedding': embeddings.append((child_name, child_module)) elif child_type == 'norm': norms.append((child_name, child_module)) elif child_type == 'head': heads.append((child_name, child_module)) elif child_name.isdigit(): layer_list.append((child_name, child_module)) elif 'layer' in name_lower or 'block' in name_lower or name_lower == 'h': sub_children = list(child_module.named_children()) if sub_children and sub_children[0][0].isdigit(): layer_container = (child_name, child_module) else: others.append((child_name, child_module)) else: others.append((child_name, child_module)) # Handle multimodal: vision path + language model as parallel branches if vision_modules and language_model: parallel_branches = [] # Vision branch: vision_tower + projector in sequence vision_steps = [] for vm_name, vm_module in vision_modules: vm_substeps = extract_pipeline_steps(vm_module, vm_name, depth + 1, max_depth, detect_parallel=False) if vm_substeps: step = { "name": humanize_name(vm_name), "type": "encoder", "params": count_parameters(vm_module), "class": vm_module.__class__.__name__, "substeps": vm_substeps, "_collapsed": True, } else: step = build_step(vm_module, vm_name, depth + 1, max_depth) vision_steps.append(step) vision_branch = { "name": "Vision Path", "type": "encoder", "params": sum(count_parameters(m) for _, m in vision_modules), "substeps": vision_steps, "_collapsed": False, } parallel_branches.append(vision_branch) # Language model branch lm_name, lm_module = language_model lm_steps = extract_pipeline_steps(lm_module, lm_name, depth + 1, max_depth, detect_parallel=False) if not lm_steps: lm_steps = [build_step(lm_module, lm_name, depth + 1, max_depth)] lang_branch = { "name": "Language Model", "type": "module", "params": count_parameters(lm_module), "class": lm_module.__class__.__name__, "substeps": lm_steps, "_collapsed": False, } parallel_branches.append(lang_branch) steps.append({ "name": "Multimodal Processing", "type": "parallel", "params": sum(b.get("params", 0) for b in parallel_branches), "branches": parallel_branches, "_collapsed": False, }) # Skip normal processing - we handled everything embeddings = [] norms = [] layer_container = None layer_list = [] others = [] # Handle case where only vision modules exist (no separate language_model) elif vision_modules: for enc_name, enc_module in vision_modules: enc_steps = extract_pipeline_steps(enc_module, enc_name, depth + 1, max_depth, detect_parallel=False) if enc_steps: steps.append({ "name": humanize_name(enc_name), "type": "encoder", "params": count_parameters(enc_module), "class": enc_module.__class__.__name__, "substeps": enc_steps, "_collapsed": True, }) else: steps.append(build_step(enc_module, enc_name, depth + 1, max_depth)) # 1. Regular embeddings (if not already handled in parallel) for child_name, child_module in embeddings: step = build_step(child_module, child_name, depth + 1, max_depth) steps.append(step) # 2. Transformer layers if layer_container: container_name, container_module = layer_container layer_children = [(n, m) for n, m in container_module.named_children() if count_parameters(m) > 0] if layer_children: first_layer = layer_children[0][1] total_params = sum(count_parameters(m) for _, m in layer_children) layer_substeps = extract_layer_internals(first_layer, depth + 2, max_depth) layer_shape = get_layer_shape_info(first_layer) layer_step = { "name": f"Transformer Layers", "type": "layers", "params": total_params, "class": first_layer.__class__.__name__, "count": len(layer_children), "substeps": layer_substeps, "_collapsed": False, } if layer_shape: layer_step["shape"] = layer_shape steps.append(layer_step) elif layer_list: first_layer = layer_list[0][1] total_params = sum(count_parameters(m) for _, m in layer_list) layer_substeps = extract_layer_internals(first_layer, depth + 2, max_depth) layer_shape = get_layer_shape_info(first_layer) layer_step = { "name": f"Transformer Layers", "type": "layers", "params": total_params, "class": first_layer.__class__.__name__, "count": len(layer_list), "substeps": layer_substeps, "_collapsed": False, } if layer_shape: layer_step["shape"] = layer_shape steps.append(layer_step) # 3. Other modules for child_name, child_module in others: child_type = get_module_type(child_module, child_name) if child_type == 'module': sub_steps = extract_pipeline_steps(child_module, child_name, depth + 1, max_depth, detect_parallel=detect_parallel) if sub_steps: steps.extend(sub_steps) else: step = build_step(child_module, child_name, depth + 1, max_depth) steps.append(step) else: step = build_step(child_module, child_name, depth + 1, max_depth) steps.append(step) # 4. Final norms for child_name, child_module in norms: step = build_step(child_module, child_name, depth + 1, max_depth) steps.append(step) # 5. Output heads for child_name, child_module in heads: step = build_step(child_module, child_name, depth + 1, max_depth) steps.append(step) return steps def extract_layer_internals(layer_module: nn.Module, depth: int, max_depth: int) -> List[Dict[str, Any]]: """Extract the internal flow of a single transformer layer.""" steps = [] children = list(layer_module.named_children()) # Categorize norms = [] attentions = [] mlps = [] others = [] for child_name, child_module in children: child_params = count_parameters(child_module) if child_params == 0: continue child_type = get_module_type(child_module, child_name) if child_type == 'norm': norms.append((child_name, child_module)) elif child_type == 'attention': attentions.append((child_name, child_module)) elif child_type == 'mlp': mlps.append((child_name, child_module)) else: others.append((child_name, child_module)) # Typical transformer layer flow: norm1 -> attn -> norm2 -> mlp # But order depends on architecture (pre-norm vs post-norm) # For now, just order: attention first, then MLP, with norms interspersed norm_idx = 0 # Attention block if norms and norm_idx < len(norms): step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth) steps.append(step) norm_idx += 1 for child_name, child_module in attentions: step = build_step(child_module, child_name, depth, max_depth) steps.append(step) # MLP block if norms and norm_idx < len(norms): step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth) steps.append(step) norm_idx += 1 for child_name, child_module in mlps: step = build_step(child_module, child_name, depth, max_depth) steps.append(step) # Remaining norms while norm_idx < len(norms): step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth) steps.append(step) norm_idx += 1 # Others for child_name, child_module in others: step = build_step(child_module, child_name, depth, max_depth) steps.append(step) return steps def get_module_shape(module: nn.Module) -> Optional[str]: """Extract shape information from a module.""" class_name = module.__class__.__name__ # Linear layers if hasattr(module, 'in_features') and hasattr(module, 'out_features'): return f"{module.in_features} → {module.out_features}" # Embedding layers if hasattr(module, 'num_embeddings') and hasattr(module, 'embedding_dim'): return f"{module.num_embeddings} × {module.embedding_dim}" # LayerNorm / RMSNorm - check multiple possible attribute names if hasattr(module, 'normalized_shape'): shape = module.normalized_shape if isinstance(shape, (list, tuple)): return f"dim={shape[0]}" if len(shape) == 1 else str(shape) return f"dim={shape}" # RMSNorm often uses 'weight' shape if 'rmsnorm' in class_name.lower() or 'layernorm' in class_name.lower(): if hasattr(module, 'weight') and module.weight is not None: return f"dim={module.weight.shape[0]}" # Conv layers if hasattr(module, 'in_channels') and hasattr(module, 'out_channels'): kernel = getattr(module, 'kernel_size', None) if kernel: return f"{module.in_channels}→{module.out_channels}, k={kernel}" return f"{module.in_channels} → {module.out_channels}" # Attention - try to get num_heads and head_dim if hasattr(module, 'num_heads'): head_dim = getattr(module, 'head_dim', None) if head_dim: return f"heads={module.num_heads}, dim={head_dim}" return f"heads={module.num_heads}" if hasattr(module, 'num_attention_heads'): head_dim = getattr(module, 'head_dim', None) if head_dim: return f"heads={module.num_attention_heads}, dim={head_dim}" return f"heads={module.num_attention_heads}" # MLP/FFN - try to infer from children if 'mlp' in class_name.lower() or 'feedforward' in class_name.lower(): # Look for up/gate projection to get intermediate size for child_name, child in module.named_children(): if hasattr(child, 'out_features'): return f"→ {child.out_features}" # Try to get hidden_size from config stored on module if hasattr(module, 'config'): cfg = module.config if hasattr(cfg, 'hidden_size'): return f"hidden={cfg.hidden_size}" return None def get_layer_shape_info(layer_module: nn.Module) -> Optional[str]: """Extract shape info from a transformer layer by looking at its components.""" hidden_size = None intermediate_size = None num_heads = None for name, child in layer_module.named_modules(): name_lower = name.lower() # Find num_heads if not num_heads: if hasattr(child, 'num_heads'): num_heads = child.num_heads elif hasattr(child, 'num_attention_heads'): num_heads = child.num_attention_heads # Find hidden_size from multiple sources if not hidden_size: # From attention head_dim * num_heads if hasattr(child, 'num_heads') and hasattr(child, 'head_dim'): hidden_size = child.num_heads * child.head_dim # From hidden_size attribute elif hasattr(child, 'hidden_size'): hidden_size = child.hidden_size # From norm layers elif hasattr(child, 'normalized_shape'): shape = child.normalized_shape if isinstance(shape, (list, tuple)): hidden_size = shape[0] else: hidden_size = shape # From norm weight shape elif ('norm' in name_lower or 'ln' in name_lower) and hasattr(child, 'weight') and child.weight is not None: try: hidden_size = child.weight.shape[0] except: pass # From q_proj or similar linear layers (in_features = hidden_size) elif ('q_proj' in name_lower or 'query' in name_lower) and hasattr(child, 'in_features'): hidden_size = child.in_features # From o_proj output (out_features = hidden_size) elif ('o_proj' in name_lower or 'out_proj' in name_lower) and hasattr(child, 'out_features'): hidden_size = child.out_features # Find intermediate size from MLP if not intermediate_size: if ('up' in name_lower or 'gate' in name_lower or 'fc1' in name_lower or 'w1' in name_lower or 'w2' in name_lower) and hasattr(child, 'out_features'): intermediate_size = child.out_features parts = [] if hidden_size: parts.append(f"d={hidden_size}") if intermediate_size: parts.append(f"ffn={intermediate_size}") if num_heads: parts.append(f"h={num_heads}") return ", ".join(parts) if parts else None def build_step(module: nn.Module, name: str, depth: int, max_depth: int) -> Dict[str, Any]: """Build a single pipeline step from a module.""" params = count_parameters(module) module_type = get_module_type(module, name) display_name = humanize_name(name) step = { "name": display_name, "type": module_type, "params": params, "class": module.__class__.__name__, } # Add shape information shape = get_module_shape(module) if shape: step["shape"] = shape # Add substeps for complex modules (if not too deep) if depth < max_depth: children = list(module.named_children()) if children: substeps = [] for child_name, child_module in children: child_params = count_parameters(child_module) if child_params > 0: child_step = build_step(child_module, child_name, depth + 1, max_depth) substeps.append(child_step) if substeps: step["substeps"] = substeps step["_collapsed"] = True return step def build_pipeline(model: nn.Module, model_name: str = "Model") -> Dict[str, Any]: """ Build a linear pipeline structure from a PyTorch model. This shows the sequential flow of data through the model. """ total_params = count_parameters(model) # Extract pipeline steps steps = extract_pipeline_steps(model, model_name, depth=0, max_depth=4) return { "name": model_name, "params": total_params, "class": model.__class__.__name__, "steps": steps } def load_model_for_inspection(model_id: str) -> Tuple[nn.Module, AutoConfig]: """Load a model architecture without downloading weights.""" from huggingface_hub import hf_hub_download, list_repo_files import json # Check if this repo uses Mistral's native format (params.json instead of config.json) try: repo_files = list_repo_files(repo_id=model_id) has_params_json = 'params.json' in repo_files has_config_json = 'config.json' in repo_files except: has_params_json = False has_config_json = True if has_params_json and not has_config_json: # Load Mistral native format and convert to pipeline directly return None, None # Signal to use parse_mistral_params instead config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) # Use meta device to avoid allocating actual memory for weights with torch.device('meta'): model = None errors = [] # Try to guess the model class from config archs = getattr(config, "architectures", []) is_encoder_decoder = getattr(config, "is_encoder_decoder", False) # Determine order of AutoModel classes to try if is_encoder_decoder or any("Seq2Seq" in a or "ConditionalGeneration" in a for a in archs): model_classes = [ (AutoModelForSeq2SeqLM, "Seq2SeqLM"), (AutoModelForCausalLM, "CausalLM"), (AutoModel, "AutoModel") ] else: model_classes = [ (AutoModelForCausalLM, "CausalLM"), (AutoModel, "AutoModel"), (AutoModelForSeq2SeqLM, "Seq2SeqLM") ] for model_class, label in model_classes: try: model = model_class.from_config(config, trust_remote_code=True) if model is not None: break except Exception as e: errors.append(f"{label}: {e}") if model is None: raise ValueError(f"Could not load model architecture. Errors: {errors}") return model, config def parse_mistral_native_format(model_id: str) -> Dict[str, Any]: """Parse Mistral's native params.json format.""" from huggingface_hub import hf_hub_download import json params_path = hf_hub_download(repo_id=model_id, filename='params.json') with open(params_path) as f: params = json.load(f) # Extract dimensions hidden_size = params.get('dim', 0) num_layers = params.get('n_layers', 0) num_heads = params.get('n_heads', 0) num_kv_heads = params.get('n_kv_heads', num_heads) vocab_size = params.get('vocab_size', 0) intermediate_size = params.get('hidden_dim', hidden_size * 4) head_dim = params.get('head_dim', hidden_size // num_heads if num_heads > 0 else 0) # Check for MoE moe_config = params.get('moe', {}) num_experts = moe_config.get('num_experts', 0) num_experts_per_tok = moe_config.get('num_experts_per_tok', 2) expert_hidden_dim = moe_config.get('expert_hidden_dim', intermediate_size) num_shared_experts = moe_config.get('num_shared_experts', 0) first_k_dense = moe_config.get('first_k_dense_replace', 0) # First K layers use dense MLP # Check for vision encoder vision_config = params.get('vision_encoder', None) # Calculate parameters embed_params = vocab_size * hidden_size # Attention params per layer (with potential LoRA/MLA components) q_lora_rank = params.get('q_lora_rank', 0) kv_lora_rank = params.get('kv_lora_rank', 0) v_head_dim = params.get('v_head_dim', head_dim) # V uses different head dim if q_lora_rank > 0: # Multi-head Latent Attention (MLA) - compressed projections # Q: down_proj + up_proj q_params = hidden_size * q_lora_rank + q_lora_rank * num_heads * head_dim # K: down_proj + up_proj (shared with V in latent space) k_params = hidden_size * kv_lora_rank + kv_lora_rank * num_kv_heads * head_dim # V: uses v_head_dim v_params = hidden_size * kv_lora_rank + kv_lora_rank * num_kv_heads * v_head_dim # O: output projection from v_head_dim back to hidden o_params = num_heads * v_head_dim * hidden_size attn_params = q_params + k_params + v_params + o_params else: q_params = hidden_size * num_heads * head_dim kv_params = hidden_size * num_kv_heads * head_dim attn_params = q_params + 2 * kv_params + num_heads * head_dim * hidden_size norm_params = hidden_size # MLP params - handle dense vs MoE layers dense_mlp_params = 3 * hidden_size * intermediate_size if num_experts > 0: # MoE: each expert has gate + up + down projections single_expert_params = 3 * hidden_size * expert_hidden_dim moe_mlp_params = num_experts * single_expert_params if num_shared_experts > 0: # Shared experts use same size as routed experts moe_mlp_params += num_shared_experts * single_expert_params moe_mlp_params += hidden_size * num_experts # Router # Calculate layer params for dense and MoE layers separately num_dense_layers = min(first_k_dense, num_layers) num_moe_layers = num_layers - num_dense_layers dense_layer_params = attn_params + dense_mlp_params + 2 * norm_params moe_layer_params = attn_params + moe_mlp_params + 2 * norm_params total_layer_params = (dense_layer_params * num_dense_layers) + (moe_layer_params * num_moe_layers) mlp_params = moe_mlp_params # For display purposes, show MoE params else: mlp_params = dense_mlp_params layer_params = attn_params + mlp_params + 2 * norm_params total_layer_params = layer_params * num_layers lm_head_params = 0 if params.get('tied_embeddings', True) else vocab_size * hidden_size total_params = embed_params + total_layer_params + norm_params + lm_head_params # Vision encoder params vision_params = 0 vision_steps = [] if vision_config: v_hidden = vision_config.get('hidden_size', 0) v_layers = vision_config.get('num_hidden_layers', 0) v_intermediate = vision_config.get('intermediate_size', v_hidden * 4) v_heads = vision_config.get('num_attention_heads', 0) patch_size = vision_config.get('patch_size', 14) patch_embed_params = 3 * (patch_size ** 2) * v_hidden v_attn = 4 * v_hidden * v_hidden v_mlp = 2 * v_hidden * v_intermediate v_layer_params = v_attn + v_mlp + 2 * v_hidden vision_params = patch_embed_params + v_layer_params * v_layers vision_steps = [ { "name": "Patch Embedding", "type": "embedding", "params": patch_embed_params, "shape": f"{patch_size}×{patch_size} patches → {v_hidden}", "class": "Conv2d" }, { "name": "Vision Transformer Layers", "type": "layers", "params": v_layer_params * v_layers, "count": v_layers, "shape": f"d={v_hidden}, h={v_heads}", "class": "ViTBlock", "_collapsed": True } ] total_params += vision_params # Build pipeline steps = [] # Embedding steps.append({ "name": "Token Embedding", "type": "embedding", "params": embed_params, "shape": f"{vocab_size:,} × {hidden_size}", "class": "Embedding" }) # Build layer substeps layer_substeps = [ { "name": "Input LayerNorm", "type": "norm", "params": norm_params, "shape": f"dim={hidden_size}", "class": "RMSNorm" }, { "name": "Self Attention", "type": "attention", "params": attn_params, "shape": f"heads={num_heads}, kv_heads={num_kv_heads}, dim={head_dim}", "class": "Attention", "_collapsed": True }, { "name": "Post-Attention LayerNorm", "type": "norm", "params": norm_params, "shape": f"dim={hidden_size}", "class": "RMSNorm" } ] if num_experts > 0: layer_substeps.append({ "name": "MoE", "type": "mlp", "params": mlp_params, "shape": f"{num_experts} experts, top-{num_experts_per_tok}", "class": "MixtureOfExperts", "_collapsed": True }) layer_shape = f"d={hidden_size}, ffn={expert_hidden_dim}, h={num_heads}, experts={num_experts}" else: layer_substeps.append({ "name": "MLP", "type": "mlp", "params": mlp_params, "shape": f"{hidden_size} → {intermediate_size} → {hidden_size}", "class": "MLP", "_collapsed": True }) layer_shape = f"d={hidden_size}, ffn={intermediate_size}, h={num_heads}" moe_label = " (MoE)" if num_experts > 0 else "" steps.append({ "name": f"Transformer Layers{moe_label}", "type": "layers", "params": total_layer_params, "count": num_layers, "shape": layer_shape, "class": "TransformerBlock", "substeps": layer_substeps, "_collapsed": False }) # Final norm steps.append({ "name": "Final LayerNorm", "type": "norm", "params": norm_params, "shape": f"dim={hidden_size}", "class": "RMSNorm" }) # LM Head steps.append({ "name": "LM Head", "type": "head", "params": lm_head_params if lm_head_params > 0 else embed_params, "shape": f"{hidden_size} → {vocab_size:,}" + (" (tied)" if lm_head_params == 0 else ""), "class": "Linear" }) # Wrap with vision if present if vision_config: vision_branch = { "name": "Vision Encoder", "type": "encoder", "params": vision_params, "substeps": vision_steps, "_collapsed": True } lang_branch = { "name": "Language Model", "type": "module", "params": total_params - vision_params, "substeps": steps, "_collapsed": False } steps = [{ "name": "Multimodal Processing", "type": "parallel", "params": total_params, "branches": [vision_branch, lang_branch], "_collapsed": False }] model_type = "mistral" if num_experts > 0: model_type = "mistral_moe" return { "name": model_type.upper(), "params": total_params, "formatted_params": format_params(total_params), "model_type": model_type, "class": "MistralModel", "steps": steps } def load_model_from_config(config_dict: Dict[str, Any]) -> Tuple[nn.Module, AutoConfig]: """Load a model architecture from a config dictionary.""" config = AutoConfig.for_model(**config_dict) with torch.device('meta'): model = None errors = [] # Try to guess the model class from config archs = getattr(config, "architectures", []) is_encoder_decoder = getattr(config, "is_encoder_decoder", False) # Determine order of AutoModel classes to try if is_encoder_decoder or any("Seq2Seq" in a or "ConditionalGeneration" in a for a in archs): model_classes = [ (AutoModelForSeq2SeqLM, "Seq2SeqLM"), (AutoModelForCausalLM, "CausalLM"), (AutoModel, "AutoModel") ] else: model_classes = [ (AutoModelForCausalLM, "CausalLM"), (AutoModel, "AutoModel"), (AutoModelForSeq2SeqLM, "Seq2SeqLM") ] for model_class, label in model_classes: try: model = model_class.from_config(config, trust_remote_code=True) if model is not None: break except Exception as e: errors.append(f"{label}: {e}") if model is None: raise ValueError(f"Could not load model from config. Errors: {errors}") return model, config def parse_model(model_id: str) -> Dict[str, Any]: """Parse a model from HuggingFace and return pipeline structure.""" model, config = load_model_for_inspection(model_id) # If model is None, it means we need to use Mistral native format if model is None and config is None: return parse_mistral_native_format(model_id) model_name = getattr(config, 'model_type', 'Model').upper() pipeline = build_pipeline(model, model_name) total_params = count_parameters(model) pipeline["params"] = total_params pipeline["formatted_params"] = format_params(total_params) pipeline["model_type"] = getattr(config, 'model_type', 'unknown') return pipeline def parse_config(config_dict: Dict[str, Any]) -> Dict[str, Any]: """Parse a model from config dict and return pipeline structure.""" model, config = load_model_from_config(config_dict) model_name = getattr(config, 'model_type', 'Model').upper() pipeline = build_pipeline(model, model_name) total_params = count_parameters(model) pipeline["params"] = total_params pipeline["formatted_params"] = format_params(total_params) pipeline["model_type"] = getattr(config, 'model_type', 'unknown') return pipeline