Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import AutoConfig | |
| from typing import Dict, Any, Tuple, Optional | |
| import math | |
| def get_model_config(model_id: str) -> AutoConfig: | |
| """获取模型配置信息""" | |
| try: | |
| # 使用transformers的AutoConfig,更加可靠 | |
| config = AutoConfig.from_pretrained( | |
| model_id, | |
| trust_remote_code=True, # 支持自定义模型 | |
| revision="main" | |
| ) | |
| return config | |
| except Exception as e: | |
| raise Exception(f"无法获取模型配置: {str(e)}") | |
| def analyze_attention_mechanism(config: AutoConfig) -> Dict[str, Any]: | |
| """分析注意力机制类型""" | |
| model_type = getattr(config, "model_type", "").lower() | |
| architecture = getattr(config, "architectures", []) | |
| # 检测各种优化技术 | |
| attention_info = { | |
| "uses_gqa": False, | |
| "uses_mla": False, | |
| "uses_sliding_window": False, | |
| "attention_type": "Multi-Head Attention (MHA)" | |
| } | |
| # 检测GQA (Grouped Query Attention) | |
| num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0)) | |
| num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads) | |
| if num_key_value_heads < num_attention_heads and num_key_value_heads > 0: | |
| attention_info["uses_gqa"] = True | |
| attention_info["attention_type"] = "Grouped Query Attention (GQA)" | |
| # 检测MLA (Multi-head Latent Attention) - 主要在DeepSeek-V2等模型中 | |
| if "deepseek" in model_type or any("deepseek" in str(arch).lower() for arch in architecture): | |
| if hasattr(config, "kv_lora_rank") or hasattr(config, "q_lora_rank"): | |
| attention_info["uses_mla"] = True | |
| attention_info["attention_type"] = "Multi-head Latent Attention (MLA)" | |
| # 检测滑动窗口注意力 | |
| if hasattr(config, "sliding_window") or hasattr(config, "attention_window_size"): | |
| attention_info["uses_sliding_window"] = True | |
| # 特殊模型类型检测 | |
| if "llama" in model_type: | |
| attention_info["attention_type"] = "RoPE + GQA" if attention_info["uses_gqa"] else "RoPE + MHA" | |
| elif "mistral" in model_type: | |
| attention_info["attention_type"] = "Sliding Window + GQA" if attention_info["uses_gqa"] else "Sliding Window + MHA" | |
| elif "qwen" in model_type: | |
| attention_info["attention_type"] = "QWen Attention (GQA)" if attention_info["uses_gqa"] else "QWen Attention" | |
| return attention_info | |
| def calculate_kv_cache_size(config: AutoConfig, sequence_length: int = 2048, batch_size: int = 1) -> Dict[str, Any]: | |
| """计算KV cache大小""" | |
| # 获取基本参数,兼容不同的参数名 | |
| num_layers = getattr(config, "num_hidden_layers", getattr(config, "n_layer", getattr(config, "num_layers", 0))) | |
| num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0)) | |
| num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads) | |
| hidden_size = getattr(config, "hidden_size", getattr(config, "n_embd", getattr(config, "d_model", 0))) | |
| # 计算head dimension | |
| head_dim = hidden_size // num_attention_heads if num_attention_heads > 0 else 0 | |
| # 如果是MLA,需要特殊处理 | |
| kv_lora_rank = getattr(config, "kv_lora_rank", 0) | |
| if kv_lora_rank > 0: # MLA架构 | |
| # MLA中KV的维度被压缩 | |
| effective_kv_dim = kv_lora_rank | |
| else: | |
| effective_kv_dim = head_dim * num_key_value_heads | |
| # 计算每个token的KV cache大小 (Key + Value) | |
| # 使用FP16 (2 bytes per element) | |
| bytes_per_element = 2 | |
| kv_size_per_token_per_layer = 2 * effective_kv_dim * bytes_per_element # K + V | |
| # 总的KV cache大小 | |
| total_kv_cache_bytes = kv_size_per_token_per_layer * num_layers * sequence_length * batch_size | |
| # 转换为更友好的单位 | |
| def format_bytes(bytes_val): | |
| if bytes_val < 1024: | |
| return f"{bytes_val} B" | |
| elif bytes_val < 1024**2: | |
| return f"{bytes_val/1024:.2f} KB" | |
| elif bytes_val < 1024**3: | |
| return f"{bytes_val/(1024**2):.2f} MB" | |
| else: | |
| return f"{bytes_val/(1024**3):.2f} GB" | |
| return { | |
| "num_layers": num_layers, | |
| "num_attention_heads": num_attention_heads, | |
| "num_key_value_heads": num_key_value_heads, | |
| "head_dim": head_dim, | |
| "hidden_size": hidden_size, | |
| "effective_kv_dim": effective_kv_dim, | |
| "kv_size_per_token": format_bytes(kv_size_per_token_per_layer * num_layers), | |
| "total_kv_cache": format_bytes(total_kv_cache_bytes), | |
| "total_kv_cache_bytes": total_kv_cache_bytes, | |
| "kv_lora_rank": kv_lora_rank | |
| } | |
| def analyze_model(model_id: str, sequence_length: int = 2048, batch_size: int = 1) -> str: | |
| """分析模型并返回结果""" | |
| try: | |
| # 获取模型配置 | |
| config = get_model_config(model_id) | |
| # 分析注意力机制 | |
| attention_info = analyze_attention_mechanism(config) | |
| # 计算KV cache大小 | |
| kv_info = calculate_kv_cache_size(config, sequence_length, batch_size) | |
| # 格式化输出 | |
| result = f""" | |
| ## 模型信息分析 - {model_id} | |
| ### 基本参数 | |
| - **模型类型**: {getattr(config, 'model_type', 'Unknown')} | |
| - **层数**: {kv_info['num_layers']} | |
| - **隐藏层大小**: {kv_info['hidden_size']} | |
| - **注意力头数**: {kv_info['num_attention_heads']} | |
| - **KV头数**: {kv_info['num_key_value_heads']} | |
| - **每个头的维度**: {kv_info['head_dim']} | |
| ### 注意力机制优化 | |
| - **注意力类型**: {attention_info['attention_type']} | |
| - **使用GQA**: {'✅ 是' if attention_info['uses_gqa'] else '❌ 否'} | |
| - **使用MLA**: {'✅ 是' if attention_info['uses_mla'] else '❌ 否'} | |
| - **滑动窗口**: {'✅ 是' if attention_info['uses_sliding_window'] else '❌ 否'} | |
| ### KV Cache 存储分析 | |
| - **序列长度**: {sequence_length} | |
| - **批量大小**: {batch_size} | |
| - **有效KV维度**: {kv_info['effective_kv_dim']} | |
| - **每个token的KV存储**: {kv_info['kv_size_per_token']} | |
| - **总KV Cache大小**: {kv_info['total_kv_cache']} | |
| ### 优化效果分析 | |
| """ | |
| # 计算GQA的内存节省 | |
| if attention_info['uses_gqa']: | |
| original_kv_heads = kv_info['num_attention_heads'] | |
| actual_kv_heads = kv_info['num_key_value_heads'] | |
| memory_reduction = (1 - actual_kv_heads / original_kv_heads) * 100 | |
| result += f"- **GQA内存节省**: {memory_reduction:.1f}% (KV头数从{original_kv_heads}减少到{actual_kv_heads})\n" | |
| # MLA的特殊说明 | |
| if attention_info['uses_mla']: | |
| result += f"- **MLA压缩**: KV维度被压缩到{kv_info['kv_lora_rank']}维\n" | |
| # 内存使用建议 | |
| total_gb = kv_info['total_kv_cache_bytes'] / (1024**3) | |
| if total_gb > 8: | |
| result += f"\n⚠️ **内存警告**: KV Cache需要{total_gb:.2f}GB内存,建议使用高端GPU" | |
| elif total_gb > 4: | |
| result += f"\n💡 **内存提示**: KV Cache需要{total_gb:.2f}GB内存,中等配置可运行" | |
| else: | |
| result += f"\n✅ **内存友好**: KV Cache仅需{total_gb:.2f}GB内存" | |
| return result | |
| except Exception as e: | |
| return f"❌ 分析失败: {str(e)}" | |
| # 创建Gradio界面 | |
| def create_interface(): | |
| with gr.Blocks(title="Hugging Face模型KV Cache分析器", theme=gr.themes.Soft()) as iface: | |
| gr.Markdown("# 🤗 Hugging Face模型KV Cache分析器") | |
| gr.Markdown("输入模型ID来分析其KV cache大小和注意力机制优化情况") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| model_input = gr.Textbox( | |
| label="模型ID", | |
| placeholder="例如: deepseek-ai/DeepSeek-R1-0528", | |
| value="deepseek-ai/DeepSeek-R1-0528" | |
| ) | |
| with gr.Column(scale=1): | |
| seq_len_input = gr.Number( | |
| label="序列长度", | |
| value=2048, | |
| minimum=1, | |
| maximum=131072 | |
| ) | |
| with gr.Column(scale=1): | |
| batch_size_input = gr.Number( | |
| label="批量大小", | |
| value=1, | |
| minimum=1, | |
| maximum=128 | |
| ) | |
| analyze_btn = gr.Button("🔍 分析模型", variant="primary", size="lg") | |
| output = gr.Markdown(label="分析结果") | |
| # 添加一些示例模型 | |
| gr.Markdown("### 💡 热门模型示例") | |
| example_models = [ | |
| ["deepseek-ai/DeepSeek-V3-0324", 32768, 1], | |
| ["deepseek-ai/DeepSeek-R1-0528", 32768, 1], | |
| ["Qwen/Qwen3-8B", 32768, 1], | |
| ] | |
| gr.Examples( | |
| examples=example_models, | |
| inputs=[model_input, seq_len_input, batch_size_input], | |
| outputs=output, | |
| fn=analyze_model, | |
| cache_examples=False | |
| ) | |
| analyze_btn.click( | |
| fn=analyze_model, | |
| inputs=[model_input, seq_len_input, batch_size_input], | |
| outputs=output | |
| ) | |
| gr.Markdown(""" | |
| ### 📖 说明 | |
| - **GQA**: Grouped Query Attention,通过减少KV头数来节省内存 | |
| - **MLA**: Multi-head Latent Attention,通过低秩分解压缩KV cache | |
| - **滑动窗口**: 限制注意力范围来减少计算和内存使用 | |
| - KV Cache大小计算基于FP16精度 (每个元素2字节) | |
| - 使用 `transformers.AutoConfig` 获取配置,支持自定义模型 | |
| ### 🛠️ 安装依赖 | |
| ```bash | |
| pip install gradio transformers torch | |
| ``` | |
| """) | |
| return iface | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch(share=True) |