Spaces:
Running on Zero
Running on Zero
| from functools import lru_cache | |
| from pathlib import Path | |
| from time import perf_counter | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from transformers import ( | |
| AutoConfig, | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| GenerationConfig, | |
| ) | |
| DEFAULT_MODEL_ID = "dphn/dolphin-2.9.4-llama3.1-8b" | |
| ROOT = Path(__file__).parent | |
| TEXT_DIR = ROOT / "text" | |
| THEORY_PATH = ROOT / "notes" / "deployment_interview_theory.md" | |
| LESSONS = { | |
| "00 pipeline 自动配置": "00_pipeline_auto.py", | |
| "01 tokenizer + model": "01_auto_tokenizer_model.py", | |
| "02 模块分开配置": "02_split_configuration.py", | |
| "03 streaming + stopping": "03_streaming_and_stopping.py", | |
| "04 forward + logits + KV cache": "04_forward_logits_kv_cache.py", | |
| "05 架构 + 中间态输出": "05_architecture_and_intermediates.py", | |
| } | |
| LESSON_NOTES = { | |
| "00 pipeline 自动配置": """ | |
| `pipeline("text-generation")` 是最高层封装。它会自动加载 tokenizer、model、generation config,并完成 tokenize、generate、decode。 | |
| 面试表达:pipeline 适合快速验证;生产部署通常需要把 tokenizer、model、生成参数、设备和流式输出拆开控制。 | |
| """, | |
| "01 tokenizer + model": """ | |
| 这一层显式拆开 `AutoTokenizer` 和 `AutoModelForCausalLM`。 | |
| 关键链路:messages -> chat template -> token ids -> model.generate -> new token ids -> decode。 | |
| """, | |
| "02 模块分开配置": """ | |
| 这一层把架构配置、分词器配置、模型加载配置、生成配置分开。 | |
| 面试表达:拆开配置后才能精确控制 dtype、device、max_new_tokens、temperature、top_p、pad/eos token 等部署关键项。 | |
| """, | |
| "03 streaming + stopping": """ | |
| streaming 不是减少总计算量,而是让用户更早看到输出。 | |
| 停止条件通常来自 `eos_token_id`、stop string、`max_new_tokens` 或自定义 `StoppingCriteria`。 | |
| """, | |
| "04 forward + logits + KV cache": """ | |
| 不调用 `generate()`,直接看一次 forward。 | |
| 模型输出 logits,解码策略从 logits 中选下一个 token。`past_key_values` 就是 KV cache,是长上下文和高并发部署里的显存大头之一。 | |
| """, | |
| "05 架构 + 中间态输出": """ | |
| 这一层用 `print(model)`、`output_hidden_states=True` 和可选的 `output_attentions=True` 看模型内部。 | |
| `hidden_states` 能看到 embedding 和每层 decoder block 后的表示;`attentions` 能看到每层每个 head 对输入 token 的注意力矩阵,但它的显存开销是 seq_len 平方级,长 prompt 时要谨慎开启。 | |
| """, | |
| } | |
| def read_text(path, fallback): | |
| if path.exists(): | |
| return path.read_text(encoding="utf-8") | |
| return fallback | |
| TEXT_README = read_text(TEXT_DIR / "README.md", "文本教程文件还没有生成。") | |
| THEORY_TEXT = read_text(THEORY_PATH, "理论笔记文件还没有生成。") | |
| def device_summary(): | |
| if torch.cuda.is_available(): | |
| name = torch.cuda.get_device_name(0) | |
| return f"cuda:0 ({name})" | |
| if hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| return "mps" | |
| return "cpu" | |
| def model_load_kwargs(): | |
| if torch.cuda.is_available(): | |
| dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 | |
| return {"torch_dtype": dtype, "device_map": "auto"} | |
| return {"torch_dtype": torch.float32} | |
| def load_components(model_id): | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| config = AutoConfig.from_pretrained(model_id) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| config=config, | |
| low_cpu_mem_usage=True, | |
| **model_load_kwargs(), | |
| ) | |
| if not hasattr(model, "hf_device_map"): | |
| if torch.cuda.is_available(): | |
| model = model.to("cuda") | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| model = model.to("mps") | |
| model.eval() | |
| return tokenizer, model, config | |
| def model_device(model): | |
| return next(model.parameters()).device | |
| def build_messages(system_prompt, user_prompt): | |
| messages = [] | |
| if system_prompt.strip(): | |
| messages.append({"role": "system", "content": system_prompt.strip()}) | |
| messages.append({"role": "user", "content": user_prompt.strip()}) | |
| return messages | |
| def build_prompt(tokenizer, messages): | |
| if tokenizer.chat_template: | |
| return tokenizer.apply_chat_template( | |
| messages, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| ) | |
| rendered = [] | |
| for message in messages: | |
| rendered.append(f"{message['role']}: {message['content']}") | |
| rendered.append("assistant:") | |
| return "\n".join(rendered) | |
| def generation_config(tokenizer, max_new_tokens, do_sample, temperature, top_p): | |
| config = { | |
| "max_new_tokens": int(max_new_tokens), | |
| "do_sample": bool(do_sample), | |
| "repetition_penalty": 1.05, | |
| "pad_token_id": tokenizer.pad_token_id, | |
| "eos_token_id": tokenizer.eos_token_id, | |
| } | |
| if do_sample: | |
| config["temperature"] = float(temperature) | |
| config["top_p"] = float(top_p) | |
| return GenerationConfig(**config) | |
| def inspect_config(config, tokenizer, model): | |
| rows = [ | |
| ("model_type", getattr(config, "model_type", "unknown")), | |
| ("hidden_size", getattr(config, "hidden_size", "unknown")), | |
| ("num_hidden_layers", getattr(config, "num_hidden_layers", "unknown")), | |
| ("num_attention_heads", getattr(config, "num_attention_heads", "unknown")), | |
| ("num_key_value_heads", getattr(config, "num_key_value_heads", "unknown")), | |
| ("vocab_size", getattr(config, "vocab_size", "unknown")), | |
| ("pad_token_id", tokenizer.pad_token_id), | |
| ("eos_token_id", tokenizer.eos_token_id), | |
| ("runtime_device", str(model_device(model))), | |
| ] | |
| return "\n".join(f"{key}: {value}" for key, value in rows) | |
| def tensor_summary(value): | |
| if value is None: | |
| return "None" | |
| if hasattr(value, "shape"): | |
| return ( | |
| f"shape={tuple(value.shape)}, dtype={value.dtype}, device={value.device}" | |
| ) | |
| return type(value).__name__ | |
| def architecture_summary(config, model): | |
| lines = [ | |
| f"model class: {type(model).__name__}", | |
| f"model_type: {getattr(config, 'model_type', 'unknown')}", | |
| f"hidden_size: {getattr(config, 'hidden_size', 'unknown')}", | |
| f"num_hidden_layers: {getattr(config, 'num_hidden_layers', 'unknown')}", | |
| f"num_attention_heads: {getattr(config, 'num_attention_heads', 'unknown')}", | |
| f"num_key_value_heads: {getattr(config, 'num_key_value_heads', 'unknown')}", | |
| f"vocab_size: {getattr(config, 'vocab_size', 'unknown')}", | |
| "", | |
| "top-level modules:", | |
| ] | |
| for name, module in model.named_children(): | |
| lines.append(f"- {name}: {type(module).__name__}") | |
| if hasattr(model, "model") and hasattr(model.model, "layers"): | |
| layers = model.model.layers | |
| lines.extend(["", f"decoder layers: {len(layers)}"]) | |
| if layers: | |
| first_layer = layers[0] | |
| lines.append("first decoder layer modules:") | |
| for name, module in first_layer.named_children(): | |
| lines.append(f"- {name}: {type(module).__name__}") | |
| lines.extend(["", "full model repr:", str(model)]) | |
| return "\n".join(lines) | |
| def summarize_tensor_sequence(name, values): | |
| if values is None: | |
| return f"{name}: None" | |
| lines = [f"{name} count: {len(values)}"] | |
| for index, value in enumerate(values): | |
| lines.append(f"{name}[{index:02d}]: {tensor_summary(value)}") | |
| return "\n".join(lines) | |
| def summarize_cache(cache): | |
| if cache is None: | |
| return "KV cache: None" | |
| lines = [f"KV cache type: {type(cache).__name__}"] | |
| try: | |
| lines.append(f"KV cache layers: {len(cache)}") | |
| except TypeError: | |
| lines.append("KV cache layers: unknown") | |
| if hasattr(cache, "get_seq_length"): | |
| lines.append(f"KV cache sequence length: {cache.get_seq_length()}") | |
| if hasattr(cache, "key_cache") and getattr(cache, "key_cache"): | |
| lines.append( | |
| f"layer0 key: {tensor_summary(cache.key_cache[0])}" | |
| ) | |
| lines.append( | |
| f"layer0 value: {tensor_summary(cache.value_cache[0])}" | |
| ) | |
| elif isinstance(cache, (tuple, list)) and cache: | |
| first_layer = cache[0] | |
| if isinstance(first_layer, (tuple, list)) and len(first_layer) >= 2: | |
| lines.append(f"layer0 key: {tensor_summary(first_layer[0])}") | |
| lines.append(f"layer0 value: {tensor_summary(first_layer[1])}") | |
| return "\n".join(lines) | |
| def explain_tokens(tokenizer, prompt): | |
| encoded = tokenizer(prompt, return_tensors="pt") | |
| ids = encoded["input_ids"][0].tolist() | |
| tokens = tokenizer.convert_ids_to_tokens(ids) | |
| preview = list(zip(ids[:80], tokens[:80])) | |
| lines = [f"token_count: {len(ids)}", "", "first_tokens:"] | |
| lines.extend(f"{token_id:>8} {token}" for token_id, token in preview) | |
| if len(ids) > 80: | |
| lines.append(f"... {len(ids) - 80} more tokens") | |
| return "\n".join(lines) | |
| def run_experiment( | |
| lesson_name, | |
| model_id, | |
| system_prompt, | |
| user_prompt, | |
| max_new_tokens, | |
| do_sample, | |
| temperature, | |
| top_p, | |
| output_attentions, | |
| ): | |
| if not user_prompt.strip(): | |
| raise gr.Error("请输入 user prompt。") | |
| start = perf_counter() | |
| tokenizer, model, config = load_components(model_id.strip() or DEFAULT_MODEL_ID) | |
| messages = build_messages(system_prompt, user_prompt) | |
| prompt = build_prompt(tokenizer, messages) | |
| inputs = tokenizer(prompt, return_tensors="pt").to(model_device(model)) | |
| input_len = inputs["input_ids"].shape[-1] | |
| if lesson_name == "04 forward + logits + KV cache": | |
| with torch.no_grad(): | |
| outputs = model(**inputs, use_cache=True) | |
| logits = outputs.logits[:, -1, :] | |
| next_token_id = logits.argmax(dim=-1) | |
| next_token = tokenizer.decode(next_token_id) | |
| cache = outputs.past_key_values | |
| try: | |
| cache_layers = len(cache) | |
| except TypeError: | |
| cache_layers = "unknown" | |
| if hasattr(cache, "get_seq_length"): | |
| cache_seq_len = cache.get_seq_length() | |
| else: | |
| cache_seq_len = input_len | |
| elapsed = perf_counter() - start | |
| answer = ( | |
| "这是一次 forward 观察,不是完整生成。\n\n" | |
| f"greedy next token id: {next_token_id.item()}\n" | |
| f"greedy next token: {next_token!r}\n" | |
| f"logits shape: {tuple(logits.shape)}\n" | |
| f"KV cache type: {type(cache).__name__}\n" | |
| f"KV cache layers: {cache_layers}\n" | |
| f"KV cache sequence length: {cache_seq_len}" | |
| ) | |
| metrics = { | |
| "lesson": lesson_name, | |
| "input_tokens": input_len, | |
| "new_tokens": 1, | |
| "elapsed_seconds": round(elapsed, 3), | |
| "device": str(model_device(model)), | |
| } | |
| return answer, inspect_config(config, tokenizer, model), explain_tokens(tokenizer, prompt), metrics | |
| if lesson_name == "05 架构 + 中间态输出": | |
| try: | |
| with torch.no_grad(): | |
| outputs = model( | |
| **inputs, | |
| use_cache=True, | |
| output_hidden_states=True, | |
| output_attentions=bool(output_attentions), | |
| return_dict=True, | |
| ) | |
| except Exception as exc: | |
| if output_attentions: | |
| raise gr.Error( | |
| "开启 output_attentions 时 forward 失败。attention 矩阵显存开销较大," | |
| "可以先关闭 output_attentions 再观察 hidden_states 和 KV cache。" | |
| ) from exc | |
| raise | |
| logits = outputs.logits | |
| last_token_logits = logits[:, -1, :] | |
| next_token_id = last_token_logits.argmax(dim=-1) | |
| next_token = tokenizer.decode(next_token_id) | |
| elapsed = perf_counter() - start | |
| answer = "\n\n".join( | |
| [ | |
| "这是一次架构和中间态观察,不是完整生成。", | |
| "\n".join( | |
| [ | |
| f"outputs type: {type(outputs).__name__}", | |
| f"outputs keys: {list(outputs.keys())}", | |
| f"logits: {tensor_summary(logits)}", | |
| f"last token logits: {tensor_summary(last_token_logits)}", | |
| f"greedy next token id: {next_token_id.item()}", | |
| f"greedy next token: {next_token!r}", | |
| ] | |
| ), | |
| summarize_tensor_sequence("hidden_states", outputs.hidden_states), | |
| summarize_tensor_sequence("attentions", outputs.attentions), | |
| summarize_cache(outputs.past_key_values), | |
| ] | |
| ) | |
| metrics = { | |
| "lesson": lesson_name, | |
| "input_tokens": input_len, | |
| "elapsed_seconds": round(elapsed, 3), | |
| "logits_shape": list(logits.shape), | |
| "hidden_state_count": len(outputs.hidden_states) | |
| if outputs.hidden_states is not None | |
| else 0, | |
| "attention_count": len(outputs.attentions) | |
| if outputs.attentions is not None | |
| else 0, | |
| "output_attentions": bool(output_attentions), | |
| "device": str(model_device(model)), | |
| } | |
| return answer, architecture_summary(config, model), explain_tokens(tokenizer, prompt), metrics | |
| gen_config = generation_config( | |
| tokenizer, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=do_sample, | |
| temperature=temperature, | |
| top_p=top_p, | |
| ) | |
| with torch.no_grad(): | |
| outputs = model.generate(**inputs, generation_config=gen_config) | |
| new_token_ids = outputs[0][input_len:] | |
| answer = tokenizer.decode(new_token_ids, skip_special_tokens=True).strip() | |
| elapsed = perf_counter() - start | |
| metrics = { | |
| "lesson": lesson_name, | |
| "input_tokens": input_len, | |
| "new_tokens": int(new_token_ids.shape[-1]), | |
| "elapsed_seconds": round(elapsed, 3), | |
| "tokens_per_second": round(float(new_token_ids.shape[-1]) / elapsed, 3) | |
| if elapsed > 0 | |
| else None, | |
| "device": str(model_device(model)), | |
| } | |
| return answer, inspect_config(config, tokenizer, model), explain_tokens(tokenizer, prompt), metrics | |
| def lesson_note(lesson_name): | |
| return LESSON_NOTES.get(lesson_name, "") | |
| def lesson_code(lesson_name): | |
| file_name = LESSONS.get(lesson_name) | |
| if not file_name: | |
| return "" | |
| return read_text(TEXT_DIR / file_name, "代码文件不存在。") | |
| def lesson_file_name(lesson_name): | |
| return LESSONS.get(lesson_name, "") | |
| with gr.Blocks(title="大模型文本部署实验台") as demo: | |
| gr.Markdown( | |
| """ | |
| # 大模型文本部署实验台 | |
| 从 Hugging Face `pipeline` 自动配置开始,逐步拆到 tokenizer、model、generation config、streaming、logits 和 KV cache。 | |
| 默认模型:`dphn/dolphin-2.9.4-llama3.1-8b`。这是一个基于 Llama 3.1 8B 的 Dolphin ChatML 模型,适合在 ZeroGPU 上观察 8B 级别文本模型部署链路。 | |
| """ | |
| ) | |
| with gr.Tabs(): | |
| with gr.Tab("实验"): | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| lesson = gr.Dropdown( | |
| choices=list(LESSONS.keys()), | |
| value="00 pipeline 自动配置", | |
| label="实验层级", | |
| ) | |
| model_id = gr.Textbox( | |
| value=DEFAULT_MODEL_ID, | |
| label="模型", | |
| ) | |
| system_prompt = gr.Textbox( | |
| value="你是一个讲解大模型部署的老师,回答要清晰、简洁、面试友好。", | |
| label="system prompt", | |
| lines=2, | |
| ) | |
| user_prompt = gr.Textbox( | |
| value="用三句话解释 tokenizer 在文本大模型部署中的作用。", | |
| label="user prompt", | |
| lines=4, | |
| ) | |
| with gr.Column(scale=3): | |
| max_new_tokens = gr.Slider( | |
| minimum=16, | |
| maximum=256, | |
| value=120, | |
| step=8, | |
| label="max_new_tokens", | |
| ) | |
| do_sample = gr.Checkbox(value=False, label="do_sample") | |
| temperature = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.5, | |
| value=0.7, | |
| step=0.1, | |
| label="temperature", | |
| ) | |
| top_p = gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.9, | |
| step=0.05, | |
| label="top_p", | |
| ) | |
| output_attentions = gr.Checkbox( | |
| value=False, | |
| label="output_attentions", | |
| ) | |
| run_btn = gr.Button("运行", variant="primary") | |
| with gr.Row(): | |
| lesson_summary = gr.Markdown(value=lesson_note("00 pipeline 自动配置")) | |
| runtime = gr.Textbox( | |
| value="ZeroGPU 会在点击运行后为 @spaces.GPU 函数临时分配 GPU。", | |
| label="运行环境", | |
| interactive=False, | |
| ) | |
| with gr.Row(): | |
| answer = gr.Textbox(label="模型输出", lines=12) | |
| metrics = gr.JSON(label="指标") | |
| with gr.Accordion("token、配置与架构观察", open=False): | |
| with gr.Row(): | |
| config_view = gr.Textbox(label="模型与配置", lines=10) | |
| token_view = gr.Textbox(label="token 预览", lines=10) | |
| with gr.Tab("理论"): | |
| gr.Markdown(TEXT_README) | |
| gr.Markdown(THEORY_TEXT) | |
| with gr.Tab("代码"): | |
| code_selector = gr.Dropdown( | |
| choices=list(LESSONS.keys()), | |
| value="00 pipeline 自动配置", | |
| label="代码文件", | |
| ) | |
| code_file = gr.Textbox( | |
| value=lesson_file_name("00 pipeline 自动配置"), | |
| label="文件名", | |
| interactive=False, | |
| ) | |
| code_view = gr.Code( | |
| value=lesson_code("00 pipeline 自动配置"), | |
| language="python", | |
| label="源码", | |
| ) | |
| lesson.change(lesson_note, inputs=lesson, outputs=lesson_summary) | |
| run_btn.click( | |
| run_experiment, | |
| inputs=[ | |
| lesson, | |
| model_id, | |
| system_prompt, | |
| user_prompt, | |
| max_new_tokens, | |
| do_sample, | |
| temperature, | |
| top_p, | |
| output_attentions, | |
| ], | |
| outputs=[answer, config_view, token_view, metrics], | |
| ) | |
| code_selector.change(lesson_file_name, inputs=code_selector, outputs=code_file) | |
| code_selector.change(lesson_code, inputs=code_selector, outputs=code_view) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=16).launch(ssr_mode=False) | |