Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| from transformers import AutoConfig | |
| import math | |
| from typing import Dict, Tuple, Optional | |
| class LLMMemoryCalculator: | |
| def __init__(self): | |
| self.precision_bytes = { | |
| 'fp32': 4, | |
| 'fp16': 2, | |
| 'bf16': 2, | |
| 'int8': 1, | |
| 'int4': 0.5 | |
| } | |
| # ------------------------------------------------- | |
| # 📥 基础工具 | |
| # ------------------------------------------------- | |
| def get_model_config(self, model_id: str) -> Dict: | |
| """获取模型配置""" | |
| try: | |
| config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) | |
| return config | |
| except Exception as e: | |
| raise Exception(f"无法获取模型配置: {str(e)}") | |
| def get_file_size_from_url(self, model_id: str, filename: str) -> int: | |
| """通过 HEAD 请求获取文件大小(备用)""" | |
| try: | |
| url = f"https://huggingface.co/{model_id}/resolve/main/{filename}" | |
| response = requests.head(url, timeout=10) | |
| if response.status_code == 200: | |
| content_length = response.headers.get('Content-Length') | |
| if content_length: | |
| return int(content_length) | |
| return 0 | |
| except: | |
| return 0 | |
| # ------------------------------------------------- | |
| # 📦 获取模型权重大小 | |
| # ------------------------------------------------- | |
| def get_model_size_from_hf(self, model_id: str) -> Tuple[float, str]: | |
| """优先使用 *.index.json 中的 metadata.total_size,回退到文件列表/HEAD""" | |
| try: | |
| # 1️⃣ 尝试读取 index.json(safetensors > pytorch) | |
| for index_name, tag in [ | |
| ("model.safetensors.index.json", "safetensors_index"), | |
| ("pytorch_model.bin.index.json", "pytorch_index") | |
| ]: | |
| url = f"https://huggingface.co/{model_id}/resolve/main/{index_name}" | |
| resp = requests.get(url, timeout=10) | |
| if resp.status_code == 200: | |
| try: | |
| data = resp.json() | |
| except ValueError: | |
| # 某些仓库 index.json 以文本形式存储,需要手动解析 | |
| data = json.loads(resp.text) | |
| total_bytes = data.get("metadata", {}).get("total_size", 0) | |
| if total_bytes > 0: | |
| return total_bytes / (1024 ** 3), tag | |
| # 2️⃣ 调用 Hub API,尝试直接读取 size 字段 | |
| api_url = f"https://huggingface.co/api/models/{model_id}" | |
| response = requests.get(api_url, timeout=10) | |
| if response.status_code != 200: | |
| raise Exception(f"API请求失败: {response.status_code}") | |
| model_info = response.json() | |
| # 2a. 查找 siblings 列表中带 size 的 .safetensors 文件 | |
| safetensors_files = [f for f in model_info.get('siblings', []) | |
| if f['rfilename'].endswith('.safetensors') and 'size' in f] | |
| if safetensors_files: | |
| total_size = sum(f['size'] for f in safetensors_files) | |
| return total_size / (1024 ** 3), "safetensors_files" | |
| # 2b. 使用 HEAD 请求补全未包含 size 的 .safetensors 文件 | |
| safetensors_no_size = [f for f in model_info.get('siblings', []) | |
| if f['rfilename'].endswith('.safetensors')] | |
| if safetensors_no_size: | |
| total_size = 0 | |
| for f in safetensors_no_size: | |
| total_size += self.get_file_size_from_url(model_id, f['rfilename']) | |
| if total_size > 0: | |
| return total_size / (1024 ** 3), "safetensors_head" | |
| # 2c. 同理处理 pytorch_model-xxxxx.bin | |
| pytorch_files = [f for f in model_info.get('siblings', []) | |
| if f['rfilename'].endswith('.bin') and 'size' in f] | |
| if pytorch_files: | |
| total_size = sum(f['size'] for f in pytorch_files) | |
| return total_size / (1024 ** 3), "pytorch_files" | |
| pytorch_no_size = [f for f in model_info.get('siblings', []) | |
| if f['rfilename'].endswith('.bin')] | |
| if pytorch_no_size: | |
| total_size = 0 | |
| for f in pytorch_no_size: | |
| total_size += self.get_file_size_from_url(model_id, f['rfilename']) | |
| if total_size > 0: | |
| return total_size / (1024 ** 3), "pytorch_head" | |
| # 3️⃣ 如果仍然无法确定大小,走估算逻辑 | |
| raise Exception("未找到权重大小信息") | |
| except Exception: | |
| # 估算 | |
| return self.estimate_model_size_from_config(model_id) | |
| # ------------------------------------------------- | |
| # 📐 估算逻辑(与原始保持一致) | |
| # ------------------------------------------------- | |
| def estimate_model_size_from_config(self, model_id: str) -> Tuple[float, str]: | |
| """根据 config.json 估算模型大小(FP16)""" | |
| try: | |
| config = self.get_model_config(model_id) | |
| vocab_size = getattr(config, 'vocab_size', 50000) | |
| hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096)) | |
| num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32)) | |
| intermediate_size = getattr(config, 'intermediate_size', hidden_size * 4) | |
| # Embedding | |
| embedding_params = vocab_size * hidden_size | |
| # Transformer layer | |
| attention_params = 4 * hidden_size * hidden_size | |
| ffn_params = 2 * hidden_size * intermediate_size | |
| ln_params = 2 * hidden_size | |
| params_per_layer = attention_params + ffn_params + ln_params | |
| total_params = embedding_params + num_layers * params_per_layer | |
| if hasattr(config, 'tie_word_embeddings') and not config.tie_word_embeddings: | |
| total_params += vocab_size * hidden_size | |
| model_size_gb = (total_params * 2) / (1024 ** 3) # 默认 fp16 | |
| return model_size_gb, "estimated" | |
| except Exception as e: | |
| raise Exception(f"无法估算模型大小: {str(e)}") | |
| # ------------------------------------------------- | |
| # 🗄️ KV Cache 计算(原逻辑保持) | |
| # ------------------------------------------------- | |
| def calculate_kv_cache_size(self, config, context_length: int, batch_size: int = 1) -> Dict[str, float]: | |
| try: | |
| num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32)) | |
| hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096)) | |
| num_attention_heads = getattr(config, 'num_attention_heads', getattr(config, 'num_heads', 32)) | |
| num_key_value_heads = getattr(config, 'num_key_value_heads', num_attention_heads) | |
| is_mla = hasattr(config, 'kv_lora_rank') and config.kv_lora_rank is not None | |
| head_dim = hidden_size // num_attention_heads | |
| if is_mla: | |
| kv_lora_rank = getattr(config, 'kv_lora_rank', 512) | |
| kv_cache_per_token = kv_lora_rank * 2 | |
| attention_type = "MLA" | |
| elif num_key_value_heads < num_attention_heads: | |
| kv_cache_per_token = num_key_value_heads * head_dim * 2 | |
| attention_type = "GQA" | |
| else: | |
| kv_cache_per_token = num_attention_heads * head_dim * 2 | |
| attention_type = "MHA" | |
| total_kv_cache = (kv_cache_per_token * context_length * num_layers * batch_size * 2) / (1024 ** 3) | |
| return { | |
| 'size_gb': total_kv_cache, | |
| 'attention_type': attention_type, | |
| 'num_kv_heads': num_key_value_heads, | |
| 'num_attention_heads': num_attention_heads, | |
| 'head_dim': head_dim | |
| } | |
| except Exception as e: | |
| raise Exception(f"计算KV Cache失败: {str(e)}") | |
| # ------------------------------------------------- | |
| # 🧮 综合内存需求计算(保持不变) | |
| # ------------------------------------------------- | |
| def calculate_memory_requirements(self, model_id: str, gpu_memory_gb: float, num_gpus: int, | |
| context_length: int, utilization_rate: float = 0.9) -> Dict: | |
| try: | |
| config = self.get_model_config(model_id) | |
| model_size_gb, size_source = self.get_model_size_from_hf(model_id) | |
| kv_info = self.calculate_kv_cache_size(config, context_length) | |
| available_memory = gpu_memory_gb * num_gpus * utilization_rate | |
| other_overhead = model_size_gb * 0.1 | |
| total_memory_needed = model_size_gb + kv_info['size_gb'] + other_overhead | |
| is_feasible = total_memory_needed <= available_memory | |
| memory_margin = available_memory - total_memory_needed | |
| memory_per_gpu = total_memory_needed / num_gpus | |
| return { | |
| 'model_id': model_id, | |
| 'model_size_gb': round(model_size_gb, 2), | |
| 'size_source': size_source, | |
| 'kv_cache_gb': round(kv_info['size_gb'], 2), | |
| 'attention_type': kv_info['attention_type'], | |
| 'other_overhead_gb': round(other_overhead, 2), | |
| 'total_memory_needed_gb': round(total_memory_needed, 2), | |
| 'available_memory_gb': round(available_memory, 2), | |
| 'memory_margin_gb': round(memory_margin, 2), | |
| 'memory_per_gpu_gb': round(memory_per_gpu, 2), | |
| 'is_feasible': is_feasible, | |
| 'utilization_per_gpu': round((memory_per_gpu / gpu_memory_gb) * 100, 1), | |
| 'config_info': { | |
| 'num_layers': getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 'N/A')), | |
| 'hidden_size': getattr(config, 'hidden_size', getattr(config, 'd_model', 'N/A')), | |
| 'num_attention_heads': kv_info['num_attention_heads'], | |
| 'num_kv_heads': kv_info['num_kv_heads'], | |
| 'head_dim': kv_info['head_dim'] | |
| } | |
| } | |
| except Exception as e: | |
| return {'error': str(e)} | |
| # ------------------------------------------------- | |
| # 🌟 Gradio 界面构建(保持原逻辑) | |
| # ------------------------------------------------- | |
| def create_gradio_interface(): | |
| calculator = LLMMemoryCalculator() | |
| def calculate_memory(model_id, gpu_memory, num_gpus, context_length, utilization_rate): | |
| if not model_id.strip(): | |
| return "请输入模型ID" | |
| try: | |
| result = calculator.calculate_memory_requirements( | |
| model_id.strip(), | |
| float(gpu_memory), | |
| int(num_gpus), | |
| int(context_length), | |
| float(utilization_rate) / 100 | |
| ) | |
| if 'error' in result: | |
| return f"❌ 错误: {result['error']}" | |
| status = "✅ 可以运行" if result['is_feasible'] else "❌ 显存不足" | |
| output = f""" | |
| ## 模型分析结果 | |
| **模型**: {result['model_id']} | |
| **状态**: {status} | |
| ### 📊 内存分析 | |
| - **模型大小**: {result['model_size_gb']} GB ({result['size_source']}) | |
| - **KV Cache**: {result['kv_cache_gb']} GB | |
| - **其他开销**: {result['other_overhead_gb']} GB | |
| - **总需求**: {result['total_memory_needed_gb']} GB | |
| - **可用显存**: {result['available_memory_gb']} GB | |
| - **剩余显存**: {result['memory_margin_gb']} GB | |
| ### 🔧 模型配置 | |
| - **注意力类型**: {result['attention_type']} | |
| - **层数**: {result['config_info']['num_layers']} | |
| - **隐藏维度**: {result['config_info']['hidden_size']} | |
| - **注意力头数**: {result['config_info']['num_attention_heads']} | |
| - **KV头数**: {result['config_info']['num_kv_heads']} | |
| - **头维度**: {result['config_info']['head_dim']} | |
| ### 💾 GPU使用情况 | |
| - **每GPU内存**: {result['memory_per_gpu_gb']} GB | |
| - **每GPU利用率**: {result['utilization_per_gpu']}% | |
| ### 💡 建议 | |
| """ | |
| if result['is_feasible']: | |
| output += f"✅ 当前配置可以成功运行该模型。剩余 {result['memory_margin_gb']} GB 显存。" | |
| else: | |
| needed_extra = abs(result['memory_margin_gb']) | |
| output += f"❌ 需要额外 {needed_extra} GB 显存才能运行。\n建议:\n- 增加GPU数量\n- 使用更大显存的GPU\n- 减少上下文长度\n- 使用模型量化(如int8/int4)" | |
| return output | |
| except Exception as e: | |
| return f"❌ 计算出错: {str(e)}" | |
| with gr.Blocks(title="LLM GPU内存计算器", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🚀 LLM GPU内存需求计算器") | |
| gr.Markdown("输入模型信息和硬件配置,计算是否能够成功运行大语言模型") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("## 📝 输入参数") | |
| model_id = gr.Textbox(label="🤗 Hugging Face 模型ID", | |
| placeholder="例如: deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", | |
| value="deepseek-ai/DeepSeek-R1-0528-Qwen3-8B") | |
| with gr.Row(): | |
| gpu_memory = gr.Number(label="💾 单张GPU显存 (GB)", value=24, minimum=1, maximum=1000) | |
| num_gpus = gr.Number(label="🔢 GPU数量", value=1, minimum=1, maximum=64, precision=0) | |
| with gr.Row(): | |
| context_length = gr.Number(label="📏 上下文长度", value=16384, minimum=512, maximum=1000000, precision=0) | |
| utilization_rate = gr.Slider(label="⚡ 显存利用率 (%)", minimum=50, maximum=95, value=90, step=5) | |
| calculate_btn = gr.Button("🔍 计算内存需求", variant="primary") | |
| with gr.Column(scale=2): | |
| gr.Markdown("## 📊 计算结果") | |
| output = gr.Markdown("点击计算按钮开始分析...") | |
| calculate_btn.click(fn=calculate_memory, | |
| inputs=[model_id, gpu_memory, num_gpus, context_length, utilization_rate], | |
| outputs=output) | |
| gr.Markdown(""" | |
| ## 📚 使用示例 | |
| **小型模型**: `microsoft/DialoGPT-medium` | |
| **中型模型**: `microsoft/DialoGPT-large` | |
| **大型模型**: `meta-llama/Llama-2-7b-hf` | |
| **超大模型**: `meta-llama/Llama-2-13b-hf` | |
| 注意:某些模型可能需要申请访问权限。 | |
| """) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_gradio_interface() | |
| demo.launch(share=True, debug=True) |