Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| """ | |
| Model Adapter Layer | |
| Abstracts architecture differences to provide unified interface for visualizations | |
| """ | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, Any, Optional | |
| import torch | |
| import numpy as np | |
| import logging | |
| from .model_config import get_model_config, ModelConfig | |
| logger = logging.getLogger(__name__) | |
| class ModelAdapter(ABC): | |
| """ | |
| Abstract base class for model-specific adaptations | |
| Provides unified interface for extracting internal states across different architectures | |
| """ | |
| def __init__(self, model: Any, tokenizer: Any, config: ModelConfig): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.config = config | |
| self.model_id = None | |
| def get_num_layers(self) -> int: | |
| """Get total number of transformer layers""" | |
| pass | |
| def get_num_heads(self) -> int: | |
| """Get number of attention heads (Q heads for GQA)""" | |
| pass | |
| def get_num_kv_heads(self) -> Optional[int]: | |
| """Get number of KV heads (None for MHA, < num_heads for GQA)""" | |
| pass | |
| # Properties for convenience access | |
| def num_layers(self) -> int: | |
| """Convenience property for get_num_layers()""" | |
| return self.get_num_layers() | |
| def num_heads(self) -> int: | |
| """Convenience property for get_num_heads()""" | |
| return self.get_num_heads() | |
| def model_dimension(self) -> int: | |
| """Get model hidden dimension from HuggingFace model config""" | |
| # Try common attribute names for hidden dimension | |
| if hasattr(self.model.config, 'hidden_size'): | |
| return self.model.config.hidden_size | |
| elif hasattr(self.model.config, 'n_embd'): | |
| return self.model.config.n_embd | |
| elif hasattr(self.model.config, 'd_model'): | |
| return self.model.config.d_model | |
| # Fallback | |
| return 768 | |
| def get_layer_module(self, layer_idx: int): | |
| """Get the transformer layer module at given index""" | |
| pass | |
| def get_attention_module(self, layer_idx: int): | |
| """Get the attention sub-module for a layer""" | |
| pass | |
| def get_ffn_module(self, layer_idx: int): | |
| """Get the feed-forward network sub-module for a layer""" | |
| pass | |
| def get_qkv_projections(self, layer_idx: int): | |
| """ | |
| Get Q, K, V projection modules for a layer | |
| Returns: | |
| Tuple of (q_proj, k_proj, v_proj) modules | |
| """ | |
| pass | |
| def extract_attention(self, outputs: Any, layer_idx: int, tokens: Optional[list] = None) -> Dict[str, Any]: | |
| """ | |
| Extract attention weights in normalized format | |
| Args: | |
| outputs: Model outputs with attentions | |
| layer_idx: Layer index to extract from | |
| tokens: Optional list of token strings | |
| Returns: | |
| Dict with 'weights', 'tokens', 'num_heads' keys | |
| """ | |
| if not hasattr(outputs, 'attentions') or not outputs.attentions: | |
| raise ValueError("Model outputs do not contain attention weights") | |
| layer_attention = outputs.attentions[layer_idx] | |
| # Shape: (batch_size, num_heads, seq_len, seq_len) | |
| # Average across all heads for visualization | |
| # HuggingFace already expands GQA to full head count | |
| avg_attention = layer_attention[0].mean(dim=0).detach().cpu().numpy() | |
| # Sample if matrix is too large | |
| if avg_attention.shape[0] > 100: | |
| indices = np.random.choice(avg_attention.shape[0], 100, replace=False) | |
| avg_attention = avg_attention[indices][:, indices] | |
| if tokens: | |
| tokens = [tokens[i] for i in sorted(indices)] | |
| return { | |
| "weights": avg_attention, | |
| "tokens": tokens, | |
| "num_heads": layer_attention.shape[1] | |
| } | |
| def normalize_config(self) -> Dict[str, Any]: | |
| """ | |
| Return standardized model configuration | |
| """ | |
| return { | |
| "model_id": self.model_id, | |
| "display_name": self.config["display_name"], | |
| "architecture": self.config["architecture"], | |
| "num_layers": self.get_num_layers(), | |
| "num_heads": self.get_num_heads(), | |
| "num_kv_heads": self.get_num_kv_heads(), | |
| "vocab_size": self.model.config.vocab_size, | |
| "context_length": self.config["context_length"], | |
| "attention_type": self.config["attention_type"] | |
| } | |
| class CodeGenAdapter(ModelAdapter): | |
| """ | |
| Adapter for Salesforce CodeGen / GPT-NeoX architecture | |
| Standard multi-head attention | |
| """ | |
| def get_num_layers(self) -> int: | |
| return self.model.config.n_layer | |
| def get_num_heads(self) -> int: | |
| return self.model.config.n_head | |
| def get_num_kv_heads(self) -> Optional[int]: | |
| return None # Standard MHA - all heads have separate K,V | |
| def get_layer_module(self, layer_idx: int): | |
| """ | |
| CodeGen structure: model.transformer.h[layer_idx] | |
| """ | |
| return self.model.transformer.h[layer_idx] | |
| def get_attention_module(self, layer_idx: int): | |
| """ | |
| CodeGen attention: model.transformer.h[layer_idx].attn | |
| """ | |
| return self.model.transformer.h[layer_idx].attn | |
| def get_ffn_module(self, layer_idx: int): | |
| """ | |
| CodeGen FFN: model.transformer.h[layer_idx].mlp | |
| """ | |
| return self.model.transformer.h[layer_idx].mlp | |
| def get_qkv_projections(self, layer_idx: int): | |
| """ | |
| CodeGen Q, K, V projections | |
| CodeGen uses a combined QKV projection that needs to be split | |
| """ | |
| attn = self.get_attention_module(layer_idx) | |
| # CodeGen typically has qkv_proj or separate q_proj, k_proj, v_proj | |
| # Check which structure exists | |
| if hasattr(attn, 'qkv_proj'): | |
| # Combined projection - will need to split in the extractor | |
| return (attn.qkv_proj, attn.qkv_proj, attn.qkv_proj) | |
| else: | |
| # Separate projections (fallback) | |
| return (getattr(attn, 'q_proj', None), | |
| getattr(attn, 'k_proj', None), | |
| getattr(attn, 'v_proj', None)) | |
| class CodeLlamaAdapter(ModelAdapter): | |
| """ | |
| Adapter for Meta Code-Llama / LLaMA architecture | |
| Uses Grouped Query Attention (GQA) | |
| """ | |
| def get_num_layers(self) -> int: | |
| return self.model.config.num_hidden_layers | |
| def get_num_heads(self) -> int: | |
| return self.model.config.num_attention_heads | |
| def get_num_kv_heads(self) -> Optional[int]: | |
| """ | |
| LLaMA uses GQA - fewer KV heads than Q heads | |
| """ | |
| return getattr(self.model.config, 'num_key_value_heads', None) | |
| def get_layer_module(self, layer_idx: int): | |
| """ | |
| LLaMA structure: model.model.layers[layer_idx] | |
| Note: Extra .model nesting for CausalLM wrapper | |
| """ | |
| return self.model.model.layers[layer_idx] | |
| def get_attention_module(self, layer_idx: int): | |
| """ | |
| LLaMA attention: model.model.layers[layer_idx].self_attn | |
| """ | |
| return self.model.model.layers[layer_idx].self_attn | |
| def get_ffn_module(self, layer_idx: int): | |
| """ | |
| LLaMA FFN: model.model.layers[layer_idx].mlp | |
| """ | |
| return self.model.model.layers[layer_idx].mlp | |
| def get_qkv_projections(self, layer_idx: int): | |
| """ | |
| LLaMA Q, K, V projections | |
| LLaMA has separate q_proj, k_proj, v_proj modules | |
| Note: K and V use GQA (fewer heads than Q) | |
| """ | |
| attn = self.get_attention_module(layer_idx) | |
| return (attn.q_proj, attn.k_proj, attn.v_proj) | |
| def create_adapter(model: Any, tokenizer: Any, model_id: str) -> ModelAdapter: | |
| """ | |
| Factory function to create appropriate adapter for a model | |
| Args: | |
| model: Loaded transformer model | |
| tokenizer: Model tokenizer | |
| model_id: Model identifier (e.g., "codegen-350m") | |
| Returns: | |
| ModelAdapter instance | |
| Raises: | |
| ValueError: If model_id is not supported | |
| """ | |
| config = get_model_config(model_id) | |
| if not config: | |
| raise ValueError(f"Unknown model ID: {model_id}") | |
| architecture = config["architecture"] | |
| if architecture == "gpt_neox": | |
| logger.info(f"Creating CodeGen adapter for {model_id}") | |
| adapter = CodeGenAdapter(model, tokenizer, config) | |
| elif architecture == "llama": | |
| logger.info(f"Creating Code-Llama adapter for {model_id}") | |
| adapter = CodeLlamaAdapter(model, tokenizer, config) | |
| else: | |
| raise ValueError(f"Unsupported architecture: {architecture}") | |
| adapter.model_id = model_id | |
| return adapter | |