Spaces:
Running
Running
| """ | |
| Architecture parser - produces LINEAR PIPELINE representation of transformer models. | |
| Shows the sequential flow of data through the model as a flowchart. | |
| """ | |
| import re | |
| from typing import Dict, Any, List, Optional, Tuple | |
| from collections import OrderedDict | |
| import torch | |
| import torch.nn as nn | |
| from transformers import AutoConfig, AutoModel, AutoModelForCausalLM, AutoModelForSeq2SeqLM | |
| # Monkeypatch for transformers import issues in some environment/model combinations | |
| try: | |
| import transformers.utils.import_utils as import_utils | |
| if not hasattr(import_utils, "is_torch_fx_available"): | |
| import_utils.is_torch_fx_available = lambda: False | |
| except (ImportError, AttributeError): | |
| pass | |
| def format_params(count: int) -> str: | |
| """Format parameter count in human-readable form.""" | |
| if count >= 1e12: | |
| return f"{count / 1e12:.2f}T" | |
| elif count >= 1e9: | |
| return f"{count / 1e9:.2f}B" | |
| elif count >= 1e6: | |
| return f"{count / 1e6:.2f}M" | |
| elif count >= 1e3: | |
| return f"{count / 1e3:.2f}K" | |
| else: | |
| return str(count) | |
| def get_module_type(module: nn.Module, name: str) -> str: | |
| """Infer module type from class name and module name.""" | |
| class_name = module.__class__.__name__.lower() | |
| name_lower = name.lower() | |
| # Check if this is a model wrapper (contains "model" in class name) - should be treated as module | |
| is_model_wrapper = 'model' in class_name and ('for' in class_name or class_name.endswith('model')) | |
| if is_model_wrapper: | |
| return 'module' | |
| if 'embedding' in class_name: | |
| return 'embedding' | |
| elif 'attention' in class_name or 'attn' in class_name: | |
| return 'attention' | |
| elif 'mlp' in class_name or 'feedforward' in class_name or 'ffn' in class_name: | |
| return 'mlp' | |
| elif 'layernorm' in class_name or 'rmsnorm' in class_name: | |
| return 'norm' | |
| elif 'linear' in class_name: | |
| return 'linear' | |
| elif 'conv' in class_name: | |
| return 'linear' | |
| elif 'dropout' in class_name: | |
| return 'dropout' | |
| elif 'pool' in class_name: | |
| return 'pooler' | |
| elif 'head' in class_name or 'lm_head' in name_lower: | |
| return 'head' | |
| # Check for MoE/expert - but only for actual MoE layers, not model wrappers | |
| elif ('expert' in class_name or 'moe' in class_name) and 'layer' in class_name: | |
| return 'mlp' | |
| elif 'expert' in class_name and 'model' not in class_name: | |
| return 'mlp' | |
| # Check name patterns | |
| if 'embed' in name_lower: | |
| return 'embedding' | |
| elif 'attn' in name_lower or 'attention' in name_lower: | |
| return 'attention' | |
| elif 'mlp' in name_lower or 'fc' in name_lower or 'ffn' in name_lower: | |
| return 'mlp' | |
| elif 'norm' in name_lower or 'ln' in name_lower: | |
| return 'norm' | |
| elif 'head' in name_lower: | |
| return 'head' | |
| elif 'expert' in name_lower and 'model' not in name_lower: | |
| return 'mlp' | |
| return 'module' | |
| def count_parameters(module: nn.Module) -> int: | |
| """Count all parameters in a module recursively.""" | |
| return sum(p.numel() for p in module.parameters()) | |
| def humanize_name(name: str) -> str: | |
| """Convert module name to human-readable format.""" | |
| # Handle indexed names like "0", "1" etc | |
| if name.isdigit(): | |
| return f"Layer {name}" | |
| # Convert snake_case to Title Case | |
| name = name.replace('_', ' ') | |
| # Handle common abbreviations | |
| replacements = { | |
| 'Wte': 'Token Embedding', | |
| 'Wpe': 'Position Embedding', | |
| 'Ln F': 'Final LayerNorm', | |
| 'Ln 1': 'LayerNorm 1', | |
| 'Ln 2': 'LayerNorm 2', | |
| 'Attn': 'Attention', | |
| 'Mlp': 'MLP', | |
| 'Lm Head': 'LM Head', | |
| 'Q Proj': 'Query', | |
| 'K Proj': 'Key', | |
| 'V Proj': 'Value', | |
| 'O Proj': 'Output', | |
| 'Out Proj': 'Output', | |
| 'C Attn': 'QKV Projection', | |
| 'C Proj': 'Output Projection', | |
| 'C Fc': 'Up Projection', | |
| 'Up Proj': 'Up Projection', | |
| 'Down Proj': 'Down Projection', | |
| 'Gate Proj': 'Gate Projection', | |
| } | |
| result = name.title() | |
| for old, new in replacements.items(): | |
| result = result.replace(old, new) | |
| return result | |
| def is_modality_encoder(name: str, module: nn.Module) -> bool: | |
| """ | |
| Check if a module is a separate MODALITY encoder (vision tower, audio encoder, etc.) | |
| This should only match top-level modality-specific encoders, not internal components. | |
| """ | |
| name_lower = name.lower() | |
| class_lower = module.__class__.__name__.lower() | |
| # Specific patterns for modality encoders (must have modality keyword) | |
| modality_keywords = ['vision', 'image', 'audio', 'video', 'visual', 'pixel'] | |
| # Must contain a modality keyword | |
| has_modality = any(kw in name_lower or kw in class_lower for kw in modality_keywords) | |
| if not has_modality: | |
| return False | |
| # And should be a substantial module (tower, model, encoder) | |
| structure_keywords = ['tower', 'model', 'encoder', 'backbone'] | |
| has_structure = any(kw in name_lower or kw in class_lower for kw in structure_keywords) | |
| # Or just "vision_tower", "image_encoder" style names | |
| return has_structure or name_lower in ['vision', 'visual', 'image'] | |
| def extract_pipeline_steps(module: nn.Module, name: str, depth: int = 0, max_depth: int = 4, detect_parallel: bool = True) -> List[Dict[str, Any]]: | |
| """ | |
| Extract pipeline steps from a module. | |
| Handles both linear and parallel (multimodal) architectures. | |
| Returns a list of steps where parallel branches are marked. | |
| detect_parallel: Only look for parallel modality encoders at top level (depth 0-1) | |
| """ | |
| steps = [] | |
| children = list(module.named_children()) | |
| if not children: | |
| return steps | |
| # Categorize children | |
| embeddings = [] | |
| vision_modules = [] # Vision tower, projector | |
| language_model = None # Main language model | |
| layer_container = None | |
| layer_list = [] | |
| norms = [] | |
| heads = [] | |
| others = [] | |
| for child_name, child_module in children: | |
| child_params = count_parameters(child_module) | |
| if child_params == 0: | |
| continue | |
| child_type = get_module_type(child_module, child_name) | |
| name_lower = child_name.lower() | |
| class_lower = child_module.__class__.__name__.lower() | |
| # Detect multimodal components at appropriate depth | |
| if detect_parallel and depth <= 1: | |
| # Vision tower or projector | |
| if is_modality_encoder(child_name, child_module) or 'projector' in name_lower or 'projector' in class_lower: | |
| vision_modules.append((child_name, child_module)) | |
| continue | |
| # Main language model (separate from vision) | |
| if 'language_model' in name_lower or 'text_model' in name_lower: | |
| language_model = (child_name, child_module) | |
| continue | |
| if child_type == 'embedding': | |
| embeddings.append((child_name, child_module)) | |
| elif child_type == 'norm': | |
| norms.append((child_name, child_module)) | |
| elif child_type == 'head': | |
| heads.append((child_name, child_module)) | |
| elif child_name.isdigit(): | |
| layer_list.append((child_name, child_module)) | |
| elif 'layer' in name_lower or 'block' in name_lower or name_lower == 'h': | |
| sub_children = list(child_module.named_children()) | |
| if sub_children and sub_children[0][0].isdigit(): | |
| layer_container = (child_name, child_module) | |
| else: | |
| others.append((child_name, child_module)) | |
| else: | |
| others.append((child_name, child_module)) | |
| # Handle multimodal: vision path + language model as parallel branches | |
| if vision_modules and language_model: | |
| parallel_branches = [] | |
| # Vision branch: vision_tower + projector in sequence | |
| vision_steps = [] | |
| for vm_name, vm_module in vision_modules: | |
| vm_substeps = extract_pipeline_steps(vm_module, vm_name, depth + 1, max_depth, detect_parallel=False) | |
| if vm_substeps: | |
| step = { | |
| "name": humanize_name(vm_name), | |
| "type": "encoder", | |
| "params": count_parameters(vm_module), | |
| "class": vm_module.__class__.__name__, | |
| "substeps": vm_substeps, | |
| "_collapsed": True, | |
| } | |
| else: | |
| step = build_step(vm_module, vm_name, depth + 1, max_depth) | |
| vision_steps.append(step) | |
| vision_branch = { | |
| "name": "Vision Path", | |
| "type": "encoder", | |
| "params": sum(count_parameters(m) for _, m in vision_modules), | |
| "substeps": vision_steps, | |
| "_collapsed": False, | |
| } | |
| parallel_branches.append(vision_branch) | |
| # Language model branch | |
| lm_name, lm_module = language_model | |
| lm_steps = extract_pipeline_steps(lm_module, lm_name, depth + 1, max_depth, detect_parallel=False) | |
| if not lm_steps: | |
| lm_steps = [build_step(lm_module, lm_name, depth + 1, max_depth)] | |
| lang_branch = { | |
| "name": "Language Model", | |
| "type": "module", | |
| "params": count_parameters(lm_module), | |
| "class": lm_module.__class__.__name__, | |
| "substeps": lm_steps, | |
| "_collapsed": False, | |
| } | |
| parallel_branches.append(lang_branch) | |
| steps.append({ | |
| "name": "Multimodal Processing", | |
| "type": "parallel", | |
| "params": sum(b.get("params", 0) for b in parallel_branches), | |
| "branches": parallel_branches, | |
| "_collapsed": False, | |
| }) | |
| # Skip normal processing - we handled everything | |
| embeddings = [] | |
| norms = [] | |
| layer_container = None | |
| layer_list = [] | |
| others = [] | |
| # Handle case where only vision modules exist (no separate language_model) | |
| elif vision_modules: | |
| for enc_name, enc_module in vision_modules: | |
| enc_steps = extract_pipeline_steps(enc_module, enc_name, depth + 1, max_depth, detect_parallel=False) | |
| if enc_steps: | |
| steps.append({ | |
| "name": humanize_name(enc_name), | |
| "type": "encoder", | |
| "params": count_parameters(enc_module), | |
| "class": enc_module.__class__.__name__, | |
| "substeps": enc_steps, | |
| "_collapsed": True, | |
| }) | |
| else: | |
| steps.append(build_step(enc_module, enc_name, depth + 1, max_depth)) | |
| # 1. Regular embeddings (if not already handled in parallel) | |
| for child_name, child_module in embeddings: | |
| step = build_step(child_module, child_name, depth + 1, max_depth) | |
| steps.append(step) | |
| # 2. Transformer layers | |
| if layer_container: | |
| container_name, container_module = layer_container | |
| layer_children = [(n, m) for n, m in container_module.named_children() if count_parameters(m) > 0] | |
| if layer_children: | |
| first_layer = layer_children[0][1] | |
| total_params = sum(count_parameters(m) for _, m in layer_children) | |
| layer_substeps = extract_layer_internals(first_layer, depth + 2, max_depth) | |
| layer_shape = get_layer_shape_info(first_layer) | |
| layer_step = { | |
| "name": f"Transformer Layers", | |
| "type": "layers", | |
| "params": total_params, | |
| "class": first_layer.__class__.__name__, | |
| "count": len(layer_children), | |
| "substeps": layer_substeps, | |
| "_collapsed": False, | |
| } | |
| if layer_shape: | |
| layer_step["shape"] = layer_shape | |
| steps.append(layer_step) | |
| elif layer_list: | |
| first_layer = layer_list[0][1] | |
| total_params = sum(count_parameters(m) for _, m in layer_list) | |
| layer_substeps = extract_layer_internals(first_layer, depth + 2, max_depth) | |
| layer_shape = get_layer_shape_info(first_layer) | |
| layer_step = { | |
| "name": f"Transformer Layers", | |
| "type": "layers", | |
| "params": total_params, | |
| "class": first_layer.__class__.__name__, | |
| "count": len(layer_list), | |
| "substeps": layer_substeps, | |
| "_collapsed": False, | |
| } | |
| if layer_shape: | |
| layer_step["shape"] = layer_shape | |
| steps.append(layer_step) | |
| # 3. Other modules | |
| for child_name, child_module in others: | |
| child_type = get_module_type(child_module, child_name) | |
| if child_type == 'module': | |
| sub_steps = extract_pipeline_steps(child_module, child_name, depth + 1, max_depth, detect_parallel=detect_parallel) | |
| if sub_steps: | |
| steps.extend(sub_steps) | |
| else: | |
| step = build_step(child_module, child_name, depth + 1, max_depth) | |
| steps.append(step) | |
| else: | |
| step = build_step(child_module, child_name, depth + 1, max_depth) | |
| steps.append(step) | |
| # 4. Final norms | |
| for child_name, child_module in norms: | |
| step = build_step(child_module, child_name, depth + 1, max_depth) | |
| steps.append(step) | |
| # 5. Output heads | |
| for child_name, child_module in heads: | |
| step = build_step(child_module, child_name, depth + 1, max_depth) | |
| steps.append(step) | |
| return steps | |
| def extract_layer_internals(layer_module: nn.Module, depth: int, max_depth: int) -> List[Dict[str, Any]]: | |
| """Extract the internal flow of a single transformer layer.""" | |
| steps = [] | |
| children = list(layer_module.named_children()) | |
| # Categorize | |
| norms = [] | |
| attentions = [] | |
| mlps = [] | |
| others = [] | |
| for child_name, child_module in children: | |
| child_params = count_parameters(child_module) | |
| if child_params == 0: | |
| continue | |
| child_type = get_module_type(child_module, child_name) | |
| if child_type == 'norm': | |
| norms.append((child_name, child_module)) | |
| elif child_type == 'attention': | |
| attentions.append((child_name, child_module)) | |
| elif child_type == 'mlp': | |
| mlps.append((child_name, child_module)) | |
| else: | |
| others.append((child_name, child_module)) | |
| # Typical transformer layer flow: norm1 -> attn -> norm2 -> mlp | |
| # But order depends on architecture (pre-norm vs post-norm) | |
| # For now, just order: attention first, then MLP, with norms interspersed | |
| norm_idx = 0 | |
| # Attention block | |
| if norms and norm_idx < len(norms): | |
| step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth) | |
| steps.append(step) | |
| norm_idx += 1 | |
| for child_name, child_module in attentions: | |
| step = build_step(child_module, child_name, depth, max_depth) | |
| steps.append(step) | |
| # MLP block | |
| if norms and norm_idx < len(norms): | |
| step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth) | |
| steps.append(step) | |
| norm_idx += 1 | |
| for child_name, child_module in mlps: | |
| step = build_step(child_module, child_name, depth, max_depth) | |
| steps.append(step) | |
| # Remaining norms | |
| while norm_idx < len(norms): | |
| step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth) | |
| steps.append(step) | |
| norm_idx += 1 | |
| # Others | |
| for child_name, child_module in others: | |
| step = build_step(child_module, child_name, depth, max_depth) | |
| steps.append(step) | |
| return steps | |
| def get_module_shape(module: nn.Module) -> Optional[str]: | |
| """Extract shape information from a module.""" | |
| class_name = module.__class__.__name__ | |
| # Linear layers | |
| if hasattr(module, 'in_features') and hasattr(module, 'out_features'): | |
| return f"{module.in_features} → {module.out_features}" | |
| # Embedding layers | |
| if hasattr(module, 'num_embeddings') and hasattr(module, 'embedding_dim'): | |
| return f"{module.num_embeddings} × {module.embedding_dim}" | |
| # LayerNorm / RMSNorm - check multiple possible attribute names | |
| if hasattr(module, 'normalized_shape'): | |
| shape = module.normalized_shape | |
| if isinstance(shape, (list, tuple)): | |
| return f"dim={shape[0]}" if len(shape) == 1 else str(shape) | |
| return f"dim={shape}" | |
| # RMSNorm often uses 'weight' shape | |
| if 'rmsnorm' in class_name.lower() or 'layernorm' in class_name.lower(): | |
| if hasattr(module, 'weight') and module.weight is not None: | |
| return f"dim={module.weight.shape[0]}" | |
| # Conv layers | |
| if hasattr(module, 'in_channels') and hasattr(module, 'out_channels'): | |
| kernel = getattr(module, 'kernel_size', None) | |
| if kernel: | |
| return f"{module.in_channels}→{module.out_channels}, k={kernel}" | |
| return f"{module.in_channels} → {module.out_channels}" | |
| # Attention - try to get num_heads and head_dim | |
| if hasattr(module, 'num_heads'): | |
| head_dim = getattr(module, 'head_dim', None) | |
| if head_dim: | |
| return f"heads={module.num_heads}, dim={head_dim}" | |
| return f"heads={module.num_heads}" | |
| if hasattr(module, 'num_attention_heads'): | |
| head_dim = getattr(module, 'head_dim', None) | |
| if head_dim: | |
| return f"heads={module.num_attention_heads}, dim={head_dim}" | |
| return f"heads={module.num_attention_heads}" | |
| # MLP/FFN - try to infer from children | |
| if 'mlp' in class_name.lower() or 'feedforward' in class_name.lower(): | |
| # Look for up/gate projection to get intermediate size | |
| for child_name, child in module.named_children(): | |
| if hasattr(child, 'out_features'): | |
| return f"→ {child.out_features}" | |
| # Try to get hidden_size from config stored on module | |
| if hasattr(module, 'config'): | |
| cfg = module.config | |
| if hasattr(cfg, 'hidden_size'): | |
| return f"hidden={cfg.hidden_size}" | |
| return None | |
| def get_layer_shape_info(layer_module: nn.Module) -> Optional[str]: | |
| """Extract shape info from a transformer layer by looking at its components.""" | |
| hidden_size = None | |
| intermediate_size = None | |
| num_heads = None | |
| for name, child in layer_module.named_modules(): | |
| name_lower = name.lower() | |
| # Find num_heads | |
| if not num_heads: | |
| if hasattr(child, 'num_heads'): | |
| num_heads = child.num_heads | |
| elif hasattr(child, 'num_attention_heads'): | |
| num_heads = child.num_attention_heads | |
| # Find hidden_size from multiple sources | |
| if not hidden_size: | |
| # From attention head_dim * num_heads | |
| if hasattr(child, 'num_heads') and hasattr(child, 'head_dim'): | |
| hidden_size = child.num_heads * child.head_dim | |
| # From hidden_size attribute | |
| elif hasattr(child, 'hidden_size'): | |
| hidden_size = child.hidden_size | |
| # From norm layers | |
| elif hasattr(child, 'normalized_shape'): | |
| shape = child.normalized_shape | |
| if isinstance(shape, (list, tuple)): | |
| hidden_size = shape[0] | |
| else: | |
| hidden_size = shape | |
| # From norm weight shape | |
| elif ('norm' in name_lower or 'ln' in name_lower) and hasattr(child, 'weight') and child.weight is not None: | |
| try: | |
| hidden_size = child.weight.shape[0] | |
| except: | |
| pass | |
| # From q_proj or similar linear layers (in_features = hidden_size) | |
| elif ('q_proj' in name_lower or 'query' in name_lower) and hasattr(child, 'in_features'): | |
| hidden_size = child.in_features | |
| # From o_proj output (out_features = hidden_size) | |
| elif ('o_proj' in name_lower or 'out_proj' in name_lower) and hasattr(child, 'out_features'): | |
| hidden_size = child.out_features | |
| # Find intermediate size from MLP | |
| if not intermediate_size: | |
| if ('up' in name_lower or 'gate' in name_lower or 'fc1' in name_lower or 'w1' in name_lower or 'w2' in name_lower) and hasattr(child, 'out_features'): | |
| intermediate_size = child.out_features | |
| parts = [] | |
| if hidden_size: | |
| parts.append(f"d={hidden_size}") | |
| if intermediate_size: | |
| parts.append(f"ffn={intermediate_size}") | |
| if num_heads: | |
| parts.append(f"h={num_heads}") | |
| return ", ".join(parts) if parts else None | |
| def build_step(module: nn.Module, name: str, depth: int, max_depth: int) -> Dict[str, Any]: | |
| """Build a single pipeline step from a module.""" | |
| params = count_parameters(module) | |
| module_type = get_module_type(module, name) | |
| display_name = humanize_name(name) | |
| step = { | |
| "name": display_name, | |
| "type": module_type, | |
| "params": params, | |
| "class": module.__class__.__name__, | |
| } | |
| # Add shape information | |
| shape = get_module_shape(module) | |
| if shape: | |
| step["shape"] = shape | |
| # Add substeps for complex modules (if not too deep) | |
| if depth < max_depth: | |
| children = list(module.named_children()) | |
| if children: | |
| substeps = [] | |
| for child_name, child_module in children: | |
| child_params = count_parameters(child_module) | |
| if child_params > 0: | |
| child_step = build_step(child_module, child_name, depth + 1, max_depth) | |
| substeps.append(child_step) | |
| if substeps: | |
| step["substeps"] = substeps | |
| step["_collapsed"] = True | |
| return step | |
| def build_pipeline(model: nn.Module, model_name: str = "Model") -> Dict[str, Any]: | |
| """ | |
| Build a linear pipeline structure from a PyTorch model. | |
| This shows the sequential flow of data through the model. | |
| """ | |
| total_params = count_parameters(model) | |
| # Extract pipeline steps | |
| steps = extract_pipeline_steps(model, model_name, depth=0, max_depth=4) | |
| return { | |
| "name": model_name, | |
| "params": total_params, | |
| "class": model.__class__.__name__, | |
| "steps": steps | |
| } | |
| def load_model_for_inspection(model_id: str) -> Tuple[nn.Module, AutoConfig]: | |
| """Load a model architecture without downloading weights.""" | |
| from huggingface_hub import hf_hub_download, list_repo_files | |
| import json | |
| # Check if this repo uses Mistral's native format (params.json instead of config.json) | |
| try: | |
| repo_files = list_repo_files(repo_id=model_id) | |
| has_params_json = 'params.json' in repo_files | |
| has_config_json = 'config.json' in repo_files | |
| except: | |
| has_params_json = False | |
| has_config_json = True | |
| if has_params_json and not has_config_json: | |
| # Load Mistral native format and convert to pipeline directly | |
| return None, None # Signal to use parse_mistral_params instead | |
| config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) | |
| # Use meta device to avoid allocating actual memory for weights | |
| with torch.device('meta'): | |
| model = None | |
| errors = [] | |
| # Try to guess the model class from config | |
| archs = getattr(config, "architectures", []) | |
| is_encoder_decoder = getattr(config, "is_encoder_decoder", False) | |
| # Determine order of AutoModel classes to try | |
| if is_encoder_decoder or any("Seq2Seq" in a or "ConditionalGeneration" in a for a in archs): | |
| model_classes = [ | |
| (AutoModelForSeq2SeqLM, "Seq2SeqLM"), | |
| (AutoModelForCausalLM, "CausalLM"), | |
| (AutoModel, "AutoModel") | |
| ] | |
| else: | |
| model_classes = [ | |
| (AutoModelForCausalLM, "CausalLM"), | |
| (AutoModel, "AutoModel"), | |
| (AutoModelForSeq2SeqLM, "Seq2SeqLM") | |
| ] | |
| for model_class, label in model_classes: | |
| try: | |
| model = model_class.from_config(config, trust_remote_code=True) | |
| if model is not None: | |
| break | |
| except Exception as e: | |
| errors.append(f"{label}: {e}") | |
| if model is None: | |
| raise ValueError(f"Could not load model architecture. Errors: {errors}") | |
| return model, config | |
| def parse_mistral_native_format(model_id: str) -> Dict[str, Any]: | |
| """Parse Mistral's native params.json format.""" | |
| from huggingface_hub import hf_hub_download | |
| import json | |
| params_path = hf_hub_download(repo_id=model_id, filename='params.json') | |
| with open(params_path) as f: | |
| params = json.load(f) | |
| # Extract dimensions | |
| hidden_size = params.get('dim', 0) | |
| num_layers = params.get('n_layers', 0) | |
| num_heads = params.get('n_heads', 0) | |
| num_kv_heads = params.get('n_kv_heads', num_heads) | |
| vocab_size = params.get('vocab_size', 0) | |
| intermediate_size = params.get('hidden_dim', hidden_size * 4) | |
| head_dim = params.get('head_dim', hidden_size // num_heads if num_heads > 0 else 0) | |
| # Check for MoE | |
| moe_config = params.get('moe', {}) | |
| num_experts = moe_config.get('num_experts', 0) | |
| num_experts_per_tok = moe_config.get('num_experts_per_tok', 2) | |
| expert_hidden_dim = moe_config.get('expert_hidden_dim', intermediate_size) | |
| num_shared_experts = moe_config.get('num_shared_experts', 0) | |
| first_k_dense = moe_config.get('first_k_dense_replace', 0) # First K layers use dense MLP | |
| # Check for vision encoder | |
| vision_config = params.get('vision_encoder', None) | |
| # Calculate parameters | |
| embed_params = vocab_size * hidden_size | |
| # Attention params per layer (with potential LoRA/MLA components) | |
| q_lora_rank = params.get('q_lora_rank', 0) | |
| kv_lora_rank = params.get('kv_lora_rank', 0) | |
| v_head_dim = params.get('v_head_dim', head_dim) # V uses different head dim | |
| if q_lora_rank > 0: | |
| # Multi-head Latent Attention (MLA) - compressed projections | |
| # Q: down_proj + up_proj | |
| q_params = hidden_size * q_lora_rank + q_lora_rank * num_heads * head_dim | |
| # K: down_proj + up_proj (shared with V in latent space) | |
| k_params = hidden_size * kv_lora_rank + kv_lora_rank * num_kv_heads * head_dim | |
| # V: uses v_head_dim | |
| v_params = hidden_size * kv_lora_rank + kv_lora_rank * num_kv_heads * v_head_dim | |
| # O: output projection from v_head_dim back to hidden | |
| o_params = num_heads * v_head_dim * hidden_size | |
| attn_params = q_params + k_params + v_params + o_params | |
| else: | |
| q_params = hidden_size * num_heads * head_dim | |
| kv_params = hidden_size * num_kv_heads * head_dim | |
| attn_params = q_params + 2 * kv_params + num_heads * head_dim * hidden_size | |
| norm_params = hidden_size | |
| # MLP params - handle dense vs MoE layers | |
| dense_mlp_params = 3 * hidden_size * intermediate_size | |
| if num_experts > 0: | |
| # MoE: each expert has gate + up + down projections | |
| single_expert_params = 3 * hidden_size * expert_hidden_dim | |
| moe_mlp_params = num_experts * single_expert_params | |
| if num_shared_experts > 0: | |
| # Shared experts use same size as routed experts | |
| moe_mlp_params += num_shared_experts * single_expert_params | |
| moe_mlp_params += hidden_size * num_experts # Router | |
| # Calculate layer params for dense and MoE layers separately | |
| num_dense_layers = min(first_k_dense, num_layers) | |
| num_moe_layers = num_layers - num_dense_layers | |
| dense_layer_params = attn_params + dense_mlp_params + 2 * norm_params | |
| moe_layer_params = attn_params + moe_mlp_params + 2 * norm_params | |
| total_layer_params = (dense_layer_params * num_dense_layers) + (moe_layer_params * num_moe_layers) | |
| mlp_params = moe_mlp_params # For display purposes, show MoE params | |
| else: | |
| mlp_params = dense_mlp_params | |
| layer_params = attn_params + mlp_params + 2 * norm_params | |
| total_layer_params = layer_params * num_layers | |
| lm_head_params = 0 if params.get('tied_embeddings', True) else vocab_size * hidden_size | |
| total_params = embed_params + total_layer_params + norm_params + lm_head_params | |
| # Vision encoder params | |
| vision_params = 0 | |
| vision_steps = [] | |
| if vision_config: | |
| v_hidden = vision_config.get('hidden_size', 0) | |
| v_layers = vision_config.get('num_hidden_layers', 0) | |
| v_intermediate = vision_config.get('intermediate_size', v_hidden * 4) | |
| v_heads = vision_config.get('num_attention_heads', 0) | |
| patch_size = vision_config.get('patch_size', 14) | |
| patch_embed_params = 3 * (patch_size ** 2) * v_hidden | |
| v_attn = 4 * v_hidden * v_hidden | |
| v_mlp = 2 * v_hidden * v_intermediate | |
| v_layer_params = v_attn + v_mlp + 2 * v_hidden | |
| vision_params = patch_embed_params + v_layer_params * v_layers | |
| vision_steps = [ | |
| { | |
| "name": "Patch Embedding", | |
| "type": "embedding", | |
| "params": patch_embed_params, | |
| "shape": f"{patch_size}×{patch_size} patches → {v_hidden}", | |
| "class": "Conv2d" | |
| }, | |
| { | |
| "name": "Vision Transformer Layers", | |
| "type": "layers", | |
| "params": v_layer_params * v_layers, | |
| "count": v_layers, | |
| "shape": f"d={v_hidden}, h={v_heads}", | |
| "class": "ViTBlock", | |
| "_collapsed": True | |
| } | |
| ] | |
| total_params += vision_params | |
| # Build pipeline | |
| steps = [] | |
| # Embedding | |
| steps.append({ | |
| "name": "Token Embedding", | |
| "type": "embedding", | |
| "params": embed_params, | |
| "shape": f"{vocab_size:,} × {hidden_size}", | |
| "class": "Embedding" | |
| }) | |
| # Build layer substeps | |
| layer_substeps = [ | |
| { | |
| "name": "Input LayerNorm", | |
| "type": "norm", | |
| "params": norm_params, | |
| "shape": f"dim={hidden_size}", | |
| "class": "RMSNorm" | |
| }, | |
| { | |
| "name": "Self Attention", | |
| "type": "attention", | |
| "params": attn_params, | |
| "shape": f"heads={num_heads}, kv_heads={num_kv_heads}, dim={head_dim}", | |
| "class": "Attention", | |
| "_collapsed": True | |
| }, | |
| { | |
| "name": "Post-Attention LayerNorm", | |
| "type": "norm", | |
| "params": norm_params, | |
| "shape": f"dim={hidden_size}", | |
| "class": "RMSNorm" | |
| } | |
| ] | |
| if num_experts > 0: | |
| layer_substeps.append({ | |
| "name": "MoE", | |
| "type": "mlp", | |
| "params": mlp_params, | |
| "shape": f"{num_experts} experts, top-{num_experts_per_tok}", | |
| "class": "MixtureOfExperts", | |
| "_collapsed": True | |
| }) | |
| layer_shape = f"d={hidden_size}, ffn={expert_hidden_dim}, h={num_heads}, experts={num_experts}" | |
| else: | |
| layer_substeps.append({ | |
| "name": "MLP", | |
| "type": "mlp", | |
| "params": mlp_params, | |
| "shape": f"{hidden_size} → {intermediate_size} → {hidden_size}", | |
| "class": "MLP", | |
| "_collapsed": True | |
| }) | |
| layer_shape = f"d={hidden_size}, ffn={intermediate_size}, h={num_heads}" | |
| moe_label = " (MoE)" if num_experts > 0 else "" | |
| steps.append({ | |
| "name": f"Transformer Layers{moe_label}", | |
| "type": "layers", | |
| "params": total_layer_params, | |
| "count": num_layers, | |
| "shape": layer_shape, | |
| "class": "TransformerBlock", | |
| "substeps": layer_substeps, | |
| "_collapsed": False | |
| }) | |
| # Final norm | |
| steps.append({ | |
| "name": "Final LayerNorm", | |
| "type": "norm", | |
| "params": norm_params, | |
| "shape": f"dim={hidden_size}", | |
| "class": "RMSNorm" | |
| }) | |
| # LM Head | |
| steps.append({ | |
| "name": "LM Head", | |
| "type": "head", | |
| "params": lm_head_params if lm_head_params > 0 else embed_params, | |
| "shape": f"{hidden_size} → {vocab_size:,}" + (" (tied)" if lm_head_params == 0 else ""), | |
| "class": "Linear" | |
| }) | |
| # Wrap with vision if present | |
| if vision_config: | |
| vision_branch = { | |
| "name": "Vision Encoder", | |
| "type": "encoder", | |
| "params": vision_params, | |
| "substeps": vision_steps, | |
| "_collapsed": True | |
| } | |
| lang_branch = { | |
| "name": "Language Model", | |
| "type": "module", | |
| "params": total_params - vision_params, | |
| "substeps": steps, | |
| "_collapsed": False | |
| } | |
| steps = [{ | |
| "name": "Multimodal Processing", | |
| "type": "parallel", | |
| "params": total_params, | |
| "branches": [vision_branch, lang_branch], | |
| "_collapsed": False | |
| }] | |
| model_type = "mistral" | |
| if num_experts > 0: | |
| model_type = "mistral_moe" | |
| return { | |
| "name": model_type.upper(), | |
| "params": total_params, | |
| "formatted_params": format_params(total_params), | |
| "model_type": model_type, | |
| "class": "MistralModel", | |
| "steps": steps | |
| } | |
| def load_model_from_config(config_dict: Dict[str, Any]) -> Tuple[nn.Module, AutoConfig]: | |
| """Load a model architecture from a config dictionary.""" | |
| config = AutoConfig.for_model(**config_dict) | |
| with torch.device('meta'): | |
| model = None | |
| errors = [] | |
| # Try to guess the model class from config | |
| archs = getattr(config, "architectures", []) | |
| is_encoder_decoder = getattr(config, "is_encoder_decoder", False) | |
| # Determine order of AutoModel classes to try | |
| if is_encoder_decoder or any("Seq2Seq" in a or "ConditionalGeneration" in a for a in archs): | |
| model_classes = [ | |
| (AutoModelForSeq2SeqLM, "Seq2SeqLM"), | |
| (AutoModelForCausalLM, "CausalLM"), | |
| (AutoModel, "AutoModel") | |
| ] | |
| else: | |
| model_classes = [ | |
| (AutoModelForCausalLM, "CausalLM"), | |
| (AutoModel, "AutoModel"), | |
| (AutoModelForSeq2SeqLM, "Seq2SeqLM") | |
| ] | |
| for model_class, label in model_classes: | |
| try: | |
| model = model_class.from_config(config, trust_remote_code=True) | |
| if model is not None: | |
| break | |
| except Exception as e: | |
| errors.append(f"{label}: {e}") | |
| if model is None: | |
| raise ValueError(f"Could not load model from config. Errors: {errors}") | |
| return model, config | |
| def parse_model(model_id: str) -> Dict[str, Any]: | |
| """Parse a model from HuggingFace and return pipeline structure.""" | |
| model, config = load_model_for_inspection(model_id) | |
| # If model is None, it means we need to use Mistral native format | |
| if model is None and config is None: | |
| return parse_mistral_native_format(model_id) | |
| model_name = getattr(config, 'model_type', 'Model').upper() | |
| pipeline = build_pipeline(model, model_name) | |
| total_params = count_parameters(model) | |
| pipeline["params"] = total_params | |
| pipeline["formatted_params"] = format_params(total_params) | |
| pipeline["model_type"] = getattr(config, 'model_type', 'unknown') | |
| return pipeline | |
| def parse_config(config_dict: Dict[str, Any]) -> Dict[str, Any]: | |
| """Parse a model from config dict and return pipeline structure.""" | |
| model, config = load_model_from_config(config_dict) | |
| model_name = getattr(config, 'model_type', 'Model').upper() | |
| pipeline = build_pipeline(model, model_name) | |
| total_params = count_parameters(model) | |
| pipeline["params"] = total_params | |
| pipeline["formatted_params"] = format_params(total_params) | |
| pipeline["model_type"] = getattr(config, 'model_type', 'unknown') | |
| return pipeline | |