llm-scope / backend /architecture_parser.py
Omar
Upgrade transformers and add fallbacks
98e130a
"""
Architecture parser - produces LINEAR PIPELINE representation of transformer models.
Shows the sequential flow of data through the model as a flowchart.
"""
import re
from typing import Dict, Any, List, Optional, Tuple
from collections import OrderedDict
import torch
import torch.nn as nn
from transformers import AutoConfig, AutoModel, AutoModelForCausalLM, AutoModelForSeq2SeqLM
# Monkeypatch for transformers import issues in some environment/model combinations
try:
import transformers.utils.import_utils as import_utils
if not hasattr(import_utils, "is_torch_fx_available"):
import_utils.is_torch_fx_available = lambda: False
except (ImportError, AttributeError):
pass
def format_params(count: int) -> str:
"""Format parameter count in human-readable form."""
if count >= 1e12:
return f"{count / 1e12:.2f}T"
elif count >= 1e9:
return f"{count / 1e9:.2f}B"
elif count >= 1e6:
return f"{count / 1e6:.2f}M"
elif count >= 1e3:
return f"{count / 1e3:.2f}K"
else:
return str(count)
def get_module_type(module: nn.Module, name: str) -> str:
"""Infer module type from class name and module name."""
class_name = module.__class__.__name__.lower()
name_lower = name.lower()
# Check if this is a model wrapper (contains "model" in class name) - should be treated as module
is_model_wrapper = 'model' in class_name and ('for' in class_name or class_name.endswith('model'))
if is_model_wrapper:
return 'module'
if 'embedding' in class_name:
return 'embedding'
elif 'attention' in class_name or 'attn' in class_name:
return 'attention'
elif 'mlp' in class_name or 'feedforward' in class_name or 'ffn' in class_name:
return 'mlp'
elif 'layernorm' in class_name or 'rmsnorm' in class_name:
return 'norm'
elif 'linear' in class_name:
return 'linear'
elif 'conv' in class_name:
return 'linear'
elif 'dropout' in class_name:
return 'dropout'
elif 'pool' in class_name:
return 'pooler'
elif 'head' in class_name or 'lm_head' in name_lower:
return 'head'
# Check for MoE/expert - but only for actual MoE layers, not model wrappers
elif ('expert' in class_name or 'moe' in class_name) and 'layer' in class_name:
return 'mlp'
elif 'expert' in class_name and 'model' not in class_name:
return 'mlp'
# Check name patterns
if 'embed' in name_lower:
return 'embedding'
elif 'attn' in name_lower or 'attention' in name_lower:
return 'attention'
elif 'mlp' in name_lower or 'fc' in name_lower or 'ffn' in name_lower:
return 'mlp'
elif 'norm' in name_lower or 'ln' in name_lower:
return 'norm'
elif 'head' in name_lower:
return 'head'
elif 'expert' in name_lower and 'model' not in name_lower:
return 'mlp'
return 'module'
def count_parameters(module: nn.Module) -> int:
"""Count all parameters in a module recursively."""
return sum(p.numel() for p in module.parameters())
def humanize_name(name: str) -> str:
"""Convert module name to human-readable format."""
# Handle indexed names like "0", "1" etc
if name.isdigit():
return f"Layer {name}"
# Convert snake_case to Title Case
name = name.replace('_', ' ')
# Handle common abbreviations
replacements = {
'Wte': 'Token Embedding',
'Wpe': 'Position Embedding',
'Ln F': 'Final LayerNorm',
'Ln 1': 'LayerNorm 1',
'Ln 2': 'LayerNorm 2',
'Attn': 'Attention',
'Mlp': 'MLP',
'Lm Head': 'LM Head',
'Q Proj': 'Query',
'K Proj': 'Key',
'V Proj': 'Value',
'O Proj': 'Output',
'Out Proj': 'Output',
'C Attn': 'QKV Projection',
'C Proj': 'Output Projection',
'C Fc': 'Up Projection',
'Up Proj': 'Up Projection',
'Down Proj': 'Down Projection',
'Gate Proj': 'Gate Projection',
}
result = name.title()
for old, new in replacements.items():
result = result.replace(old, new)
return result
def is_modality_encoder(name: str, module: nn.Module) -> bool:
"""
Check if a module is a separate MODALITY encoder (vision tower, audio encoder, etc.)
This should only match top-level modality-specific encoders, not internal components.
"""
name_lower = name.lower()
class_lower = module.__class__.__name__.lower()
# Specific patterns for modality encoders (must have modality keyword)
modality_keywords = ['vision', 'image', 'audio', 'video', 'visual', 'pixel']
# Must contain a modality keyword
has_modality = any(kw in name_lower or kw in class_lower for kw in modality_keywords)
if not has_modality:
return False
# And should be a substantial module (tower, model, encoder)
structure_keywords = ['tower', 'model', 'encoder', 'backbone']
has_structure = any(kw in name_lower or kw in class_lower for kw in structure_keywords)
# Or just "vision_tower", "image_encoder" style names
return has_structure or name_lower in ['vision', 'visual', 'image']
def extract_pipeline_steps(module: nn.Module, name: str, depth: int = 0, max_depth: int = 4, detect_parallel: bool = True) -> List[Dict[str, Any]]:
"""
Extract pipeline steps from a module.
Handles both linear and parallel (multimodal) architectures.
Returns a list of steps where parallel branches are marked.
detect_parallel: Only look for parallel modality encoders at top level (depth 0-1)
"""
steps = []
children = list(module.named_children())
if not children:
return steps
# Categorize children
embeddings = []
vision_modules = [] # Vision tower, projector
language_model = None # Main language model
layer_container = None
layer_list = []
norms = []
heads = []
others = []
for child_name, child_module in children:
child_params = count_parameters(child_module)
if child_params == 0:
continue
child_type = get_module_type(child_module, child_name)
name_lower = child_name.lower()
class_lower = child_module.__class__.__name__.lower()
# Detect multimodal components at appropriate depth
if detect_parallel and depth <= 1:
# Vision tower or projector
if is_modality_encoder(child_name, child_module) or 'projector' in name_lower or 'projector' in class_lower:
vision_modules.append((child_name, child_module))
continue
# Main language model (separate from vision)
if 'language_model' in name_lower or 'text_model' in name_lower:
language_model = (child_name, child_module)
continue
if child_type == 'embedding':
embeddings.append((child_name, child_module))
elif child_type == 'norm':
norms.append((child_name, child_module))
elif child_type == 'head':
heads.append((child_name, child_module))
elif child_name.isdigit():
layer_list.append((child_name, child_module))
elif 'layer' in name_lower or 'block' in name_lower or name_lower == 'h':
sub_children = list(child_module.named_children())
if sub_children and sub_children[0][0].isdigit():
layer_container = (child_name, child_module)
else:
others.append((child_name, child_module))
else:
others.append((child_name, child_module))
# Handle multimodal: vision path + language model as parallel branches
if vision_modules and language_model:
parallel_branches = []
# Vision branch: vision_tower + projector in sequence
vision_steps = []
for vm_name, vm_module in vision_modules:
vm_substeps = extract_pipeline_steps(vm_module, vm_name, depth + 1, max_depth, detect_parallel=False)
if vm_substeps:
step = {
"name": humanize_name(vm_name),
"type": "encoder",
"params": count_parameters(vm_module),
"class": vm_module.__class__.__name__,
"substeps": vm_substeps,
"_collapsed": True,
}
else:
step = build_step(vm_module, vm_name, depth + 1, max_depth)
vision_steps.append(step)
vision_branch = {
"name": "Vision Path",
"type": "encoder",
"params": sum(count_parameters(m) for _, m in vision_modules),
"substeps": vision_steps,
"_collapsed": False,
}
parallel_branches.append(vision_branch)
# Language model branch
lm_name, lm_module = language_model
lm_steps = extract_pipeline_steps(lm_module, lm_name, depth + 1, max_depth, detect_parallel=False)
if not lm_steps:
lm_steps = [build_step(lm_module, lm_name, depth + 1, max_depth)]
lang_branch = {
"name": "Language Model",
"type": "module",
"params": count_parameters(lm_module),
"class": lm_module.__class__.__name__,
"substeps": lm_steps,
"_collapsed": False,
}
parallel_branches.append(lang_branch)
steps.append({
"name": "Multimodal Processing",
"type": "parallel",
"params": sum(b.get("params", 0) for b in parallel_branches),
"branches": parallel_branches,
"_collapsed": False,
})
# Skip normal processing - we handled everything
embeddings = []
norms = []
layer_container = None
layer_list = []
others = []
# Handle case where only vision modules exist (no separate language_model)
elif vision_modules:
for enc_name, enc_module in vision_modules:
enc_steps = extract_pipeline_steps(enc_module, enc_name, depth + 1, max_depth, detect_parallel=False)
if enc_steps:
steps.append({
"name": humanize_name(enc_name),
"type": "encoder",
"params": count_parameters(enc_module),
"class": enc_module.__class__.__name__,
"substeps": enc_steps,
"_collapsed": True,
})
else:
steps.append(build_step(enc_module, enc_name, depth + 1, max_depth))
# 1. Regular embeddings (if not already handled in parallel)
for child_name, child_module in embeddings:
step = build_step(child_module, child_name, depth + 1, max_depth)
steps.append(step)
# 2. Transformer layers
if layer_container:
container_name, container_module = layer_container
layer_children = [(n, m) for n, m in container_module.named_children() if count_parameters(m) > 0]
if layer_children:
first_layer = layer_children[0][1]
total_params = sum(count_parameters(m) for _, m in layer_children)
layer_substeps = extract_layer_internals(first_layer, depth + 2, max_depth)
layer_shape = get_layer_shape_info(first_layer)
layer_step = {
"name": f"Transformer Layers",
"type": "layers",
"params": total_params,
"class": first_layer.__class__.__name__,
"count": len(layer_children),
"substeps": layer_substeps,
"_collapsed": False,
}
if layer_shape:
layer_step["shape"] = layer_shape
steps.append(layer_step)
elif layer_list:
first_layer = layer_list[0][1]
total_params = sum(count_parameters(m) for _, m in layer_list)
layer_substeps = extract_layer_internals(first_layer, depth + 2, max_depth)
layer_shape = get_layer_shape_info(first_layer)
layer_step = {
"name": f"Transformer Layers",
"type": "layers",
"params": total_params,
"class": first_layer.__class__.__name__,
"count": len(layer_list),
"substeps": layer_substeps,
"_collapsed": False,
}
if layer_shape:
layer_step["shape"] = layer_shape
steps.append(layer_step)
# 3. Other modules
for child_name, child_module in others:
child_type = get_module_type(child_module, child_name)
if child_type == 'module':
sub_steps = extract_pipeline_steps(child_module, child_name, depth + 1, max_depth, detect_parallel=detect_parallel)
if sub_steps:
steps.extend(sub_steps)
else:
step = build_step(child_module, child_name, depth + 1, max_depth)
steps.append(step)
else:
step = build_step(child_module, child_name, depth + 1, max_depth)
steps.append(step)
# 4. Final norms
for child_name, child_module in norms:
step = build_step(child_module, child_name, depth + 1, max_depth)
steps.append(step)
# 5. Output heads
for child_name, child_module in heads:
step = build_step(child_module, child_name, depth + 1, max_depth)
steps.append(step)
return steps
def extract_layer_internals(layer_module: nn.Module, depth: int, max_depth: int) -> List[Dict[str, Any]]:
"""Extract the internal flow of a single transformer layer."""
steps = []
children = list(layer_module.named_children())
# Categorize
norms = []
attentions = []
mlps = []
others = []
for child_name, child_module in children:
child_params = count_parameters(child_module)
if child_params == 0:
continue
child_type = get_module_type(child_module, child_name)
if child_type == 'norm':
norms.append((child_name, child_module))
elif child_type == 'attention':
attentions.append((child_name, child_module))
elif child_type == 'mlp':
mlps.append((child_name, child_module))
else:
others.append((child_name, child_module))
# Typical transformer layer flow: norm1 -> attn -> norm2 -> mlp
# But order depends on architecture (pre-norm vs post-norm)
# For now, just order: attention first, then MLP, with norms interspersed
norm_idx = 0
# Attention block
if norms and norm_idx < len(norms):
step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth)
steps.append(step)
norm_idx += 1
for child_name, child_module in attentions:
step = build_step(child_module, child_name, depth, max_depth)
steps.append(step)
# MLP block
if norms and norm_idx < len(norms):
step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth)
steps.append(step)
norm_idx += 1
for child_name, child_module in mlps:
step = build_step(child_module, child_name, depth, max_depth)
steps.append(step)
# Remaining norms
while norm_idx < len(norms):
step = build_step(norms[norm_idx][1], norms[norm_idx][0], depth, max_depth)
steps.append(step)
norm_idx += 1
# Others
for child_name, child_module in others:
step = build_step(child_module, child_name, depth, max_depth)
steps.append(step)
return steps
def get_module_shape(module: nn.Module) -> Optional[str]:
"""Extract shape information from a module."""
class_name = module.__class__.__name__
# Linear layers
if hasattr(module, 'in_features') and hasattr(module, 'out_features'):
return f"{module.in_features}{module.out_features}"
# Embedding layers
if hasattr(module, 'num_embeddings') and hasattr(module, 'embedding_dim'):
return f"{module.num_embeddings} × {module.embedding_dim}"
# LayerNorm / RMSNorm - check multiple possible attribute names
if hasattr(module, 'normalized_shape'):
shape = module.normalized_shape
if isinstance(shape, (list, tuple)):
return f"dim={shape[0]}" if len(shape) == 1 else str(shape)
return f"dim={shape}"
# RMSNorm often uses 'weight' shape
if 'rmsnorm' in class_name.lower() or 'layernorm' in class_name.lower():
if hasattr(module, 'weight') and module.weight is not None:
return f"dim={module.weight.shape[0]}"
# Conv layers
if hasattr(module, 'in_channels') and hasattr(module, 'out_channels'):
kernel = getattr(module, 'kernel_size', None)
if kernel:
return f"{module.in_channels}{module.out_channels}, k={kernel}"
return f"{module.in_channels}{module.out_channels}"
# Attention - try to get num_heads and head_dim
if hasattr(module, 'num_heads'):
head_dim = getattr(module, 'head_dim', None)
if head_dim:
return f"heads={module.num_heads}, dim={head_dim}"
return f"heads={module.num_heads}"
if hasattr(module, 'num_attention_heads'):
head_dim = getattr(module, 'head_dim', None)
if head_dim:
return f"heads={module.num_attention_heads}, dim={head_dim}"
return f"heads={module.num_attention_heads}"
# MLP/FFN - try to infer from children
if 'mlp' in class_name.lower() or 'feedforward' in class_name.lower():
# Look for up/gate projection to get intermediate size
for child_name, child in module.named_children():
if hasattr(child, 'out_features'):
return f"→ {child.out_features}"
# Try to get hidden_size from config stored on module
if hasattr(module, 'config'):
cfg = module.config
if hasattr(cfg, 'hidden_size'):
return f"hidden={cfg.hidden_size}"
return None
def get_layer_shape_info(layer_module: nn.Module) -> Optional[str]:
"""Extract shape info from a transformer layer by looking at its components."""
hidden_size = None
intermediate_size = None
num_heads = None
for name, child in layer_module.named_modules():
name_lower = name.lower()
# Find num_heads
if not num_heads:
if hasattr(child, 'num_heads'):
num_heads = child.num_heads
elif hasattr(child, 'num_attention_heads'):
num_heads = child.num_attention_heads
# Find hidden_size from multiple sources
if not hidden_size:
# From attention head_dim * num_heads
if hasattr(child, 'num_heads') and hasattr(child, 'head_dim'):
hidden_size = child.num_heads * child.head_dim
# From hidden_size attribute
elif hasattr(child, 'hidden_size'):
hidden_size = child.hidden_size
# From norm layers
elif hasattr(child, 'normalized_shape'):
shape = child.normalized_shape
if isinstance(shape, (list, tuple)):
hidden_size = shape[0]
else:
hidden_size = shape
# From norm weight shape
elif ('norm' in name_lower or 'ln' in name_lower) and hasattr(child, 'weight') and child.weight is not None:
try:
hidden_size = child.weight.shape[0]
except:
pass
# From q_proj or similar linear layers (in_features = hidden_size)
elif ('q_proj' in name_lower or 'query' in name_lower) and hasattr(child, 'in_features'):
hidden_size = child.in_features
# From o_proj output (out_features = hidden_size)
elif ('o_proj' in name_lower or 'out_proj' in name_lower) and hasattr(child, 'out_features'):
hidden_size = child.out_features
# Find intermediate size from MLP
if not intermediate_size:
if ('up' in name_lower or 'gate' in name_lower or 'fc1' in name_lower or 'w1' in name_lower or 'w2' in name_lower) and hasattr(child, 'out_features'):
intermediate_size = child.out_features
parts = []
if hidden_size:
parts.append(f"d={hidden_size}")
if intermediate_size:
parts.append(f"ffn={intermediate_size}")
if num_heads:
parts.append(f"h={num_heads}")
return ", ".join(parts) if parts else None
def build_step(module: nn.Module, name: str, depth: int, max_depth: int) -> Dict[str, Any]:
"""Build a single pipeline step from a module."""
params = count_parameters(module)
module_type = get_module_type(module, name)
display_name = humanize_name(name)
step = {
"name": display_name,
"type": module_type,
"params": params,
"class": module.__class__.__name__,
}
# Add shape information
shape = get_module_shape(module)
if shape:
step["shape"] = shape
# Add substeps for complex modules (if not too deep)
if depth < max_depth:
children = list(module.named_children())
if children:
substeps = []
for child_name, child_module in children:
child_params = count_parameters(child_module)
if child_params > 0:
child_step = build_step(child_module, child_name, depth + 1, max_depth)
substeps.append(child_step)
if substeps:
step["substeps"] = substeps
step["_collapsed"] = True
return step
def build_pipeline(model: nn.Module, model_name: str = "Model") -> Dict[str, Any]:
"""
Build a linear pipeline structure from a PyTorch model.
This shows the sequential flow of data through the model.
"""
total_params = count_parameters(model)
# Extract pipeline steps
steps = extract_pipeline_steps(model, model_name, depth=0, max_depth=4)
return {
"name": model_name,
"params": total_params,
"class": model.__class__.__name__,
"steps": steps
}
def load_model_for_inspection(model_id: str) -> Tuple[nn.Module, AutoConfig]:
"""Load a model architecture without downloading weights."""
from huggingface_hub import hf_hub_download, list_repo_files
import json
# Check if this repo uses Mistral's native format (params.json instead of config.json)
try:
repo_files = list_repo_files(repo_id=model_id)
has_params_json = 'params.json' in repo_files
has_config_json = 'config.json' in repo_files
except:
has_params_json = False
has_config_json = True
if has_params_json and not has_config_json:
# Load Mistral native format and convert to pipeline directly
return None, None # Signal to use parse_mistral_params instead
config = AutoConfig.from_pretrained(model_id, trust_remote_code=True)
# Use meta device to avoid allocating actual memory for weights
with torch.device('meta'):
model = None
errors = []
# Try to guess the model class from config
archs = getattr(config, "architectures", [])
is_encoder_decoder = getattr(config, "is_encoder_decoder", False)
# Determine order of AutoModel classes to try
if is_encoder_decoder or any("Seq2Seq" in a or "ConditionalGeneration" in a for a in archs):
model_classes = [
(AutoModelForSeq2SeqLM, "Seq2SeqLM"),
(AutoModelForCausalLM, "CausalLM"),
(AutoModel, "AutoModel")
]
else:
model_classes = [
(AutoModelForCausalLM, "CausalLM"),
(AutoModel, "AutoModel"),
(AutoModelForSeq2SeqLM, "Seq2SeqLM")
]
for model_class, label in model_classes:
try:
model = model_class.from_config(config, trust_remote_code=True)
if model is not None:
break
except Exception as e:
errors.append(f"{label}: {e}")
if model is None:
raise ValueError(f"Could not load model architecture. Errors: {errors}")
return model, config
def parse_mistral_native_format(model_id: str) -> Dict[str, Any]:
"""Parse Mistral's native params.json format."""
from huggingface_hub import hf_hub_download
import json
params_path = hf_hub_download(repo_id=model_id, filename='params.json')
with open(params_path) as f:
params = json.load(f)
# Extract dimensions
hidden_size = params.get('dim', 0)
num_layers = params.get('n_layers', 0)
num_heads = params.get('n_heads', 0)
num_kv_heads = params.get('n_kv_heads', num_heads)
vocab_size = params.get('vocab_size', 0)
intermediate_size = params.get('hidden_dim', hidden_size * 4)
head_dim = params.get('head_dim', hidden_size // num_heads if num_heads > 0 else 0)
# Check for MoE
moe_config = params.get('moe', {})
num_experts = moe_config.get('num_experts', 0)
num_experts_per_tok = moe_config.get('num_experts_per_tok', 2)
expert_hidden_dim = moe_config.get('expert_hidden_dim', intermediate_size)
num_shared_experts = moe_config.get('num_shared_experts', 0)
first_k_dense = moe_config.get('first_k_dense_replace', 0) # First K layers use dense MLP
# Check for vision encoder
vision_config = params.get('vision_encoder', None)
# Calculate parameters
embed_params = vocab_size * hidden_size
# Attention params per layer (with potential LoRA/MLA components)
q_lora_rank = params.get('q_lora_rank', 0)
kv_lora_rank = params.get('kv_lora_rank', 0)
v_head_dim = params.get('v_head_dim', head_dim) # V uses different head dim
if q_lora_rank > 0:
# Multi-head Latent Attention (MLA) - compressed projections
# Q: down_proj + up_proj
q_params = hidden_size * q_lora_rank + q_lora_rank * num_heads * head_dim
# K: down_proj + up_proj (shared with V in latent space)
k_params = hidden_size * kv_lora_rank + kv_lora_rank * num_kv_heads * head_dim
# V: uses v_head_dim
v_params = hidden_size * kv_lora_rank + kv_lora_rank * num_kv_heads * v_head_dim
# O: output projection from v_head_dim back to hidden
o_params = num_heads * v_head_dim * hidden_size
attn_params = q_params + k_params + v_params + o_params
else:
q_params = hidden_size * num_heads * head_dim
kv_params = hidden_size * num_kv_heads * head_dim
attn_params = q_params + 2 * kv_params + num_heads * head_dim * hidden_size
norm_params = hidden_size
# MLP params - handle dense vs MoE layers
dense_mlp_params = 3 * hidden_size * intermediate_size
if num_experts > 0:
# MoE: each expert has gate + up + down projections
single_expert_params = 3 * hidden_size * expert_hidden_dim
moe_mlp_params = num_experts * single_expert_params
if num_shared_experts > 0:
# Shared experts use same size as routed experts
moe_mlp_params += num_shared_experts * single_expert_params
moe_mlp_params += hidden_size * num_experts # Router
# Calculate layer params for dense and MoE layers separately
num_dense_layers = min(first_k_dense, num_layers)
num_moe_layers = num_layers - num_dense_layers
dense_layer_params = attn_params + dense_mlp_params + 2 * norm_params
moe_layer_params = attn_params + moe_mlp_params + 2 * norm_params
total_layer_params = (dense_layer_params * num_dense_layers) + (moe_layer_params * num_moe_layers)
mlp_params = moe_mlp_params # For display purposes, show MoE params
else:
mlp_params = dense_mlp_params
layer_params = attn_params + mlp_params + 2 * norm_params
total_layer_params = layer_params * num_layers
lm_head_params = 0 if params.get('tied_embeddings', True) else vocab_size * hidden_size
total_params = embed_params + total_layer_params + norm_params + lm_head_params
# Vision encoder params
vision_params = 0
vision_steps = []
if vision_config:
v_hidden = vision_config.get('hidden_size', 0)
v_layers = vision_config.get('num_hidden_layers', 0)
v_intermediate = vision_config.get('intermediate_size', v_hidden * 4)
v_heads = vision_config.get('num_attention_heads', 0)
patch_size = vision_config.get('patch_size', 14)
patch_embed_params = 3 * (patch_size ** 2) * v_hidden
v_attn = 4 * v_hidden * v_hidden
v_mlp = 2 * v_hidden * v_intermediate
v_layer_params = v_attn + v_mlp + 2 * v_hidden
vision_params = patch_embed_params + v_layer_params * v_layers
vision_steps = [
{
"name": "Patch Embedding",
"type": "embedding",
"params": patch_embed_params,
"shape": f"{patch_size}×{patch_size} patches → {v_hidden}",
"class": "Conv2d"
},
{
"name": "Vision Transformer Layers",
"type": "layers",
"params": v_layer_params * v_layers,
"count": v_layers,
"shape": f"d={v_hidden}, h={v_heads}",
"class": "ViTBlock",
"_collapsed": True
}
]
total_params += vision_params
# Build pipeline
steps = []
# Embedding
steps.append({
"name": "Token Embedding",
"type": "embedding",
"params": embed_params,
"shape": f"{vocab_size:,} × {hidden_size}",
"class": "Embedding"
})
# Build layer substeps
layer_substeps = [
{
"name": "Input LayerNorm",
"type": "norm",
"params": norm_params,
"shape": f"dim={hidden_size}",
"class": "RMSNorm"
},
{
"name": "Self Attention",
"type": "attention",
"params": attn_params,
"shape": f"heads={num_heads}, kv_heads={num_kv_heads}, dim={head_dim}",
"class": "Attention",
"_collapsed": True
},
{
"name": "Post-Attention LayerNorm",
"type": "norm",
"params": norm_params,
"shape": f"dim={hidden_size}",
"class": "RMSNorm"
}
]
if num_experts > 0:
layer_substeps.append({
"name": "MoE",
"type": "mlp",
"params": mlp_params,
"shape": f"{num_experts} experts, top-{num_experts_per_tok}",
"class": "MixtureOfExperts",
"_collapsed": True
})
layer_shape = f"d={hidden_size}, ffn={expert_hidden_dim}, h={num_heads}, experts={num_experts}"
else:
layer_substeps.append({
"name": "MLP",
"type": "mlp",
"params": mlp_params,
"shape": f"{hidden_size}{intermediate_size}{hidden_size}",
"class": "MLP",
"_collapsed": True
})
layer_shape = f"d={hidden_size}, ffn={intermediate_size}, h={num_heads}"
moe_label = " (MoE)" if num_experts > 0 else ""
steps.append({
"name": f"Transformer Layers{moe_label}",
"type": "layers",
"params": total_layer_params,
"count": num_layers,
"shape": layer_shape,
"class": "TransformerBlock",
"substeps": layer_substeps,
"_collapsed": False
})
# Final norm
steps.append({
"name": "Final LayerNorm",
"type": "norm",
"params": norm_params,
"shape": f"dim={hidden_size}",
"class": "RMSNorm"
})
# LM Head
steps.append({
"name": "LM Head",
"type": "head",
"params": lm_head_params if lm_head_params > 0 else embed_params,
"shape": f"{hidden_size}{vocab_size:,}" + (" (tied)" if lm_head_params == 0 else ""),
"class": "Linear"
})
# Wrap with vision if present
if vision_config:
vision_branch = {
"name": "Vision Encoder",
"type": "encoder",
"params": vision_params,
"substeps": vision_steps,
"_collapsed": True
}
lang_branch = {
"name": "Language Model",
"type": "module",
"params": total_params - vision_params,
"substeps": steps,
"_collapsed": False
}
steps = [{
"name": "Multimodal Processing",
"type": "parallel",
"params": total_params,
"branches": [vision_branch, lang_branch],
"_collapsed": False
}]
model_type = "mistral"
if num_experts > 0:
model_type = "mistral_moe"
return {
"name": model_type.upper(),
"params": total_params,
"formatted_params": format_params(total_params),
"model_type": model_type,
"class": "MistralModel",
"steps": steps
}
def load_model_from_config(config_dict: Dict[str, Any]) -> Tuple[nn.Module, AutoConfig]:
"""Load a model architecture from a config dictionary."""
config = AutoConfig.for_model(**config_dict)
with torch.device('meta'):
model = None
errors = []
# Try to guess the model class from config
archs = getattr(config, "architectures", [])
is_encoder_decoder = getattr(config, "is_encoder_decoder", False)
# Determine order of AutoModel classes to try
if is_encoder_decoder or any("Seq2Seq" in a or "ConditionalGeneration" in a for a in archs):
model_classes = [
(AutoModelForSeq2SeqLM, "Seq2SeqLM"),
(AutoModelForCausalLM, "CausalLM"),
(AutoModel, "AutoModel")
]
else:
model_classes = [
(AutoModelForCausalLM, "CausalLM"),
(AutoModel, "AutoModel"),
(AutoModelForSeq2SeqLM, "Seq2SeqLM")
]
for model_class, label in model_classes:
try:
model = model_class.from_config(config, trust_remote_code=True)
if model is not None:
break
except Exception as e:
errors.append(f"{label}: {e}")
if model is None:
raise ValueError(f"Could not load model from config. Errors: {errors}")
return model, config
def parse_model(model_id: str) -> Dict[str, Any]:
"""Parse a model from HuggingFace and return pipeline structure."""
model, config = load_model_for_inspection(model_id)
# If model is None, it means we need to use Mistral native format
if model is None and config is None:
return parse_mistral_native_format(model_id)
model_name = getattr(config, 'model_type', 'Model').upper()
pipeline = build_pipeline(model, model_name)
total_params = count_parameters(model)
pipeline["params"] = total_params
pipeline["formatted_params"] = format_params(total_params)
pipeline["model_type"] = getattr(config, 'model_type', 'unknown')
return pipeline
def parse_config(config_dict: Dict[str, Any]) -> Dict[str, Any]:
"""Parse a model from config dict and return pipeline structure."""
model, config = load_model_from_config(config_dict)
model_name = getattr(config, 'model_type', 'Model').upper()
pipeline = build_pipeline(model, model_name)
total_params = count_parameters(model)
pipeline["params"] = total_params
pipeline["formatted_params"] = format_params(total_params)
pipeline["model_type"] = getattr(config, 'model_type', 'unknown')
return pipeline