from functools import lru_cache from pathlib import Path from time import perf_counter import gradio as gr import spaces import torch from transformers import ( AutoConfig, AutoModelForCausalLM, AutoTokenizer, GenerationConfig, ) DEFAULT_MODEL_ID = "dphn/dolphin-2.9.4-llama3.1-8b" ROOT = Path(__file__).parent TEXT_DIR = ROOT / "text" THEORY_PATH = ROOT / "notes" / "deployment_interview_theory.md" LESSONS = { "00 pipeline 自动配置": "00_pipeline_auto.py", "01 tokenizer + model": "01_auto_tokenizer_model.py", "02 模块分开配置": "02_split_configuration.py", "03 streaming + stopping": "03_streaming_and_stopping.py", "04 forward + logits + KV cache": "04_forward_logits_kv_cache.py", "05 架构 + 中间态输出": "05_architecture_and_intermediates.py", } LESSON_NOTES = { "00 pipeline 自动配置": """ `pipeline("text-generation")` 是最高层封装。它会自动加载 tokenizer、model、generation config,并完成 tokenize、generate、decode。 面试表达:pipeline 适合快速验证;生产部署通常需要把 tokenizer、model、生成参数、设备和流式输出拆开控制。 """, "01 tokenizer + model": """ 这一层显式拆开 `AutoTokenizer` 和 `AutoModelForCausalLM`。 关键链路:messages -> chat template -> token ids -> model.generate -> new token ids -> decode。 """, "02 模块分开配置": """ 这一层把架构配置、分词器配置、模型加载配置、生成配置分开。 面试表达:拆开配置后才能精确控制 dtype、device、max_new_tokens、temperature、top_p、pad/eos token 等部署关键项。 """, "03 streaming + stopping": """ streaming 不是减少总计算量,而是让用户更早看到输出。 停止条件通常来自 `eos_token_id`、stop string、`max_new_tokens` 或自定义 `StoppingCriteria`。 """, "04 forward + logits + KV cache": """ 不调用 `generate()`,直接看一次 forward。 模型输出 logits,解码策略从 logits 中选下一个 token。`past_key_values` 就是 KV cache,是长上下文和高并发部署里的显存大头之一。 """, "05 架构 + 中间态输出": """ 这一层用 `print(model)`、`output_hidden_states=True` 和可选的 `output_attentions=True` 看模型内部。 `hidden_states` 能看到 embedding 和每层 decoder block 后的表示;`attentions` 能看到每层每个 head 对输入 token 的注意力矩阵,但它的显存开销是 seq_len 平方级,长 prompt 时要谨慎开启。 """, } def read_text(path, fallback): if path.exists(): return path.read_text(encoding="utf-8") return fallback TEXT_README = read_text(TEXT_DIR / "README.md", "文本教程文件还没有生成。") THEORY_TEXT = read_text(THEORY_PATH, "理论笔记文件还没有生成。") def device_summary(): if torch.cuda.is_available(): name = torch.cuda.get_device_name(0) return f"cuda:0 ({name})" if hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): return "mps" return "cpu" def model_load_kwargs(): if torch.cuda.is_available(): dtype = torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 return {"torch_dtype": dtype, "device_map": "auto"} return {"torch_dtype": torch.float32} @lru_cache(maxsize=1) def load_components(model_id): tokenizer = AutoTokenizer.from_pretrained(model_id) config = AutoConfig.from_pretrained(model_id) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained( model_id, config=config, low_cpu_mem_usage=True, **model_load_kwargs(), ) if not hasattr(model, "hf_device_map"): if torch.cuda.is_available(): model = model.to("cuda") elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): model = model.to("mps") model.eval() return tokenizer, model, config def model_device(model): return next(model.parameters()).device def build_messages(system_prompt, user_prompt): messages = [] if system_prompt.strip(): messages.append({"role": "system", "content": system_prompt.strip()}) messages.append({"role": "user", "content": user_prompt.strip()}) return messages def build_prompt(tokenizer, messages): if tokenizer.chat_template: return tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, ) rendered = [] for message in messages: rendered.append(f"{message['role']}: {message['content']}") rendered.append("assistant:") return "\n".join(rendered) def generation_config(tokenizer, max_new_tokens, do_sample, temperature, top_p): config = { "max_new_tokens": int(max_new_tokens), "do_sample": bool(do_sample), "repetition_penalty": 1.05, "pad_token_id": tokenizer.pad_token_id, "eos_token_id": tokenizer.eos_token_id, } if do_sample: config["temperature"] = float(temperature) config["top_p"] = float(top_p) return GenerationConfig(**config) def inspect_config(config, tokenizer, model): rows = [ ("model_type", getattr(config, "model_type", "unknown")), ("hidden_size", getattr(config, "hidden_size", "unknown")), ("num_hidden_layers", getattr(config, "num_hidden_layers", "unknown")), ("num_attention_heads", getattr(config, "num_attention_heads", "unknown")), ("num_key_value_heads", getattr(config, "num_key_value_heads", "unknown")), ("vocab_size", getattr(config, "vocab_size", "unknown")), ("pad_token_id", tokenizer.pad_token_id), ("eos_token_id", tokenizer.eos_token_id), ("runtime_device", str(model_device(model))), ] return "\n".join(f"{key}: {value}" for key, value in rows) def tensor_summary(value): if value is None: return "None" if hasattr(value, "shape"): return ( f"shape={tuple(value.shape)}, dtype={value.dtype}, device={value.device}" ) return type(value).__name__ def architecture_summary(config, model): lines = [ f"model class: {type(model).__name__}", f"model_type: {getattr(config, 'model_type', 'unknown')}", f"hidden_size: {getattr(config, 'hidden_size', 'unknown')}", f"num_hidden_layers: {getattr(config, 'num_hidden_layers', 'unknown')}", f"num_attention_heads: {getattr(config, 'num_attention_heads', 'unknown')}", f"num_key_value_heads: {getattr(config, 'num_key_value_heads', 'unknown')}", f"vocab_size: {getattr(config, 'vocab_size', 'unknown')}", "", "top-level modules:", ] for name, module in model.named_children(): lines.append(f"- {name}: {type(module).__name__}") if hasattr(model, "model") and hasattr(model.model, "layers"): layers = model.model.layers lines.extend(["", f"decoder layers: {len(layers)}"]) if layers: first_layer = layers[0] lines.append("first decoder layer modules:") for name, module in first_layer.named_children(): lines.append(f"- {name}: {type(module).__name__}") lines.extend(["", "full model repr:", str(model)]) return "\n".join(lines) def summarize_tensor_sequence(name, values): if values is None: return f"{name}: None" lines = [f"{name} count: {len(values)}"] for index, value in enumerate(values): lines.append(f"{name}[{index:02d}]: {tensor_summary(value)}") return "\n".join(lines) def summarize_cache(cache): if cache is None: return "KV cache: None" lines = [f"KV cache type: {type(cache).__name__}"] try: lines.append(f"KV cache layers: {len(cache)}") except TypeError: lines.append("KV cache layers: unknown") if hasattr(cache, "get_seq_length"): lines.append(f"KV cache sequence length: {cache.get_seq_length()}") if hasattr(cache, "key_cache") and getattr(cache, "key_cache"): lines.append( f"layer0 key: {tensor_summary(cache.key_cache[0])}" ) lines.append( f"layer0 value: {tensor_summary(cache.value_cache[0])}" ) elif isinstance(cache, (tuple, list)) and cache: first_layer = cache[0] if isinstance(first_layer, (tuple, list)) and len(first_layer) >= 2: lines.append(f"layer0 key: {tensor_summary(first_layer[0])}") lines.append(f"layer0 value: {tensor_summary(first_layer[1])}") return "\n".join(lines) def explain_tokens(tokenizer, prompt): encoded = tokenizer(prompt, return_tensors="pt") ids = encoded["input_ids"][0].tolist() tokens = tokenizer.convert_ids_to_tokens(ids) preview = list(zip(ids[:80], tokens[:80])) lines = [f"token_count: {len(ids)}", "", "first_tokens:"] lines.extend(f"{token_id:>8} {token}" for token_id, token in preview) if len(ids) > 80: lines.append(f"... {len(ids) - 80} more tokens") return "\n".join(lines) @spaces.GPU(duration=120) def run_experiment( lesson_name, model_id, system_prompt, user_prompt, max_new_tokens, do_sample, temperature, top_p, output_attentions, ): if not user_prompt.strip(): raise gr.Error("请输入 user prompt。") start = perf_counter() tokenizer, model, config = load_components(model_id.strip() or DEFAULT_MODEL_ID) messages = build_messages(system_prompt, user_prompt) prompt = build_prompt(tokenizer, messages) inputs = tokenizer(prompt, return_tensors="pt").to(model_device(model)) input_len = inputs["input_ids"].shape[-1] if lesson_name == "04 forward + logits + KV cache": with torch.no_grad(): outputs = model(**inputs, use_cache=True) logits = outputs.logits[:, -1, :] next_token_id = logits.argmax(dim=-1) next_token = tokenizer.decode(next_token_id) cache = outputs.past_key_values try: cache_layers = len(cache) except TypeError: cache_layers = "unknown" if hasattr(cache, "get_seq_length"): cache_seq_len = cache.get_seq_length() else: cache_seq_len = input_len elapsed = perf_counter() - start answer = ( "这是一次 forward 观察,不是完整生成。\n\n" f"greedy next token id: {next_token_id.item()}\n" f"greedy next token: {next_token!r}\n" f"logits shape: {tuple(logits.shape)}\n" f"KV cache type: {type(cache).__name__}\n" f"KV cache layers: {cache_layers}\n" f"KV cache sequence length: {cache_seq_len}" ) metrics = { "lesson": lesson_name, "input_tokens": input_len, "new_tokens": 1, "elapsed_seconds": round(elapsed, 3), "device": str(model_device(model)), } return answer, inspect_config(config, tokenizer, model), explain_tokens(tokenizer, prompt), metrics if lesson_name == "05 架构 + 中间态输出": try: with torch.no_grad(): outputs = model( **inputs, use_cache=True, output_hidden_states=True, output_attentions=bool(output_attentions), return_dict=True, ) except Exception as exc: if output_attentions: raise gr.Error( "开启 output_attentions 时 forward 失败。attention 矩阵显存开销较大," "可以先关闭 output_attentions 再观察 hidden_states 和 KV cache。" ) from exc raise logits = outputs.logits last_token_logits = logits[:, -1, :] next_token_id = last_token_logits.argmax(dim=-1) next_token = tokenizer.decode(next_token_id) elapsed = perf_counter() - start answer = "\n\n".join( [ "这是一次架构和中间态观察,不是完整生成。", "\n".join( [ f"outputs type: {type(outputs).__name__}", f"outputs keys: {list(outputs.keys())}", f"logits: {tensor_summary(logits)}", f"last token logits: {tensor_summary(last_token_logits)}", f"greedy next token id: {next_token_id.item()}", f"greedy next token: {next_token!r}", ] ), summarize_tensor_sequence("hidden_states", outputs.hidden_states), summarize_tensor_sequence("attentions", outputs.attentions), summarize_cache(outputs.past_key_values), ] ) metrics = { "lesson": lesson_name, "input_tokens": input_len, "elapsed_seconds": round(elapsed, 3), "logits_shape": list(logits.shape), "hidden_state_count": len(outputs.hidden_states) if outputs.hidden_states is not None else 0, "attention_count": len(outputs.attentions) if outputs.attentions is not None else 0, "output_attentions": bool(output_attentions), "device": str(model_device(model)), } return answer, architecture_summary(config, model), explain_tokens(tokenizer, prompt), metrics gen_config = generation_config( tokenizer, max_new_tokens=max_new_tokens, do_sample=do_sample, temperature=temperature, top_p=top_p, ) with torch.no_grad(): outputs = model.generate(**inputs, generation_config=gen_config) new_token_ids = outputs[0][input_len:] answer = tokenizer.decode(new_token_ids, skip_special_tokens=True).strip() elapsed = perf_counter() - start metrics = { "lesson": lesson_name, "input_tokens": input_len, "new_tokens": int(new_token_ids.shape[-1]), "elapsed_seconds": round(elapsed, 3), "tokens_per_second": round(float(new_token_ids.shape[-1]) / elapsed, 3) if elapsed > 0 else None, "device": str(model_device(model)), } return answer, inspect_config(config, tokenizer, model), explain_tokens(tokenizer, prompt), metrics def lesson_note(lesson_name): return LESSON_NOTES.get(lesson_name, "") def lesson_code(lesson_name): file_name = LESSONS.get(lesson_name) if not file_name: return "" return read_text(TEXT_DIR / file_name, "代码文件不存在。") def lesson_file_name(lesson_name): return LESSONS.get(lesson_name, "") with gr.Blocks(title="大模型文本部署实验台") as demo: gr.Markdown( """ # 大模型文本部署实验台 从 Hugging Face `pipeline` 自动配置开始,逐步拆到 tokenizer、model、generation config、streaming、logits 和 KV cache。 默认模型:`dphn/dolphin-2.9.4-llama3.1-8b`。这是一个基于 Llama 3.1 8B 的 Dolphin ChatML 模型,适合在 ZeroGPU 上观察 8B 级别文本模型部署链路。 """ ) with gr.Tabs(): with gr.Tab("实验"): with gr.Row(): with gr.Column(scale=4): lesson = gr.Dropdown( choices=list(LESSONS.keys()), value="00 pipeline 自动配置", label="实验层级", ) model_id = gr.Textbox( value=DEFAULT_MODEL_ID, label="模型", ) system_prompt = gr.Textbox( value="你是一个讲解大模型部署的老师,回答要清晰、简洁、面试友好。", label="system prompt", lines=2, ) user_prompt = gr.Textbox( value="用三句话解释 tokenizer 在文本大模型部署中的作用。", label="user prompt", lines=4, ) with gr.Column(scale=3): max_new_tokens = gr.Slider( minimum=16, maximum=256, value=120, step=8, label="max_new_tokens", ) do_sample = gr.Checkbox(value=False, label="do_sample") temperature = gr.Slider( minimum=0.1, maximum=1.5, value=0.7, step=0.1, label="temperature", ) top_p = gr.Slider( minimum=0.1, maximum=1.0, value=0.9, step=0.05, label="top_p", ) output_attentions = gr.Checkbox( value=False, label="output_attentions", ) run_btn = gr.Button("运行", variant="primary") with gr.Row(): lesson_summary = gr.Markdown(value=lesson_note("00 pipeline 自动配置")) runtime = gr.Textbox( value="ZeroGPU 会在点击运行后为 @spaces.GPU 函数临时分配 GPU。", label="运行环境", interactive=False, ) with gr.Row(): answer = gr.Textbox(label="模型输出", lines=12) metrics = gr.JSON(label="指标") with gr.Accordion("token、配置与架构观察", open=False): with gr.Row(): config_view = gr.Textbox(label="模型与配置", lines=10) token_view = gr.Textbox(label="token 预览", lines=10) with gr.Tab("理论"): gr.Markdown(TEXT_README) gr.Markdown(THEORY_TEXT) with gr.Tab("代码"): code_selector = gr.Dropdown( choices=list(LESSONS.keys()), value="00 pipeline 自动配置", label="代码文件", ) code_file = gr.Textbox( value=lesson_file_name("00 pipeline 自动配置"), label="文件名", interactive=False, ) code_view = gr.Code( value=lesson_code("00 pipeline 自动配置"), language="python", label="源码", ) lesson.change(lesson_note, inputs=lesson, outputs=lesson_summary) run_btn.click( run_experiment, inputs=[ lesson, model_id, system_prompt, user_prompt, max_new_tokens, do_sample, temperature, top_p, output_attentions, ], outputs=[answer, config_view, token_view, metrics], ) code_selector.change(lesson_file_name, inputs=code_selector, outputs=code_file) code_selector.change(lesson_code, inputs=code_selector, outputs=code_view) if __name__ == "__main__": demo.queue(max_size=16).launch(ssr_mode=False)