| """ |
| Component 10: Export, quantization, benchmarking, and packaging. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import shutil |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Any, Dict, Tuple |
|
|
| import torch |
| import torch.nn as nn |
| import yaml |
|
|
| |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from src.model_architecture.code_transformer import CodeTransformerLM, ModelConfig, get_model_presets |
| from src.tokenizer.code_tokenizer import CodeTokenizer |
| from src.evaluation_system.code_eval import restore_code_from_structured |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Run Component 10 export/optimization.") |
| parser.add_argument("--config", default="configs/component10_export_config.yaml") |
| return parser.parse_args() |
|
|
|
|
| def load_yaml(path: Path) -> Dict[str, Any]: |
| if not path.exists(): |
| raise FileNotFoundError(f"Config not found: {path}") |
| data = yaml.safe_load(path.read_text(encoding="utf-8-sig")) |
| if not isinstance(data, dict): |
| raise ValueError("Invalid YAML format.") |
| return data |
|
|
|
|
| def build_model_config(path: Path) -> ModelConfig: |
| cfg = load_yaml(path) |
| preset = cfg.get("preset") |
| model_cfg = cfg.get("model", {}) |
| if preset: |
| merged = get_model_presets()[preset].__dict__.copy() |
| merged.update(model_cfg) |
| return ModelConfig(**merged) |
| return ModelConfig(**model_cfg) |
|
|
|
|
| def prepare_prompt_ids(tokenizer: CodeTokenizer, prompt: str) -> list[int]: |
| text = tokenizer.format_training_sample(prompt=prompt, code="", language="python") |
| text = text.replace(" <NL>", "").strip() |
| ids = tokenizer.encode(text) |
| eos = tokenizer.special_token_ids.get("<EOS>") |
| if eos is not None and len(ids) > 1 and ids[-1] == int(eos): |
| ids = ids[:-1] |
| return ids |
|
|
|
|
| @torch.no_grad() |
| def benchmark_tokens_per_sec( |
| model: CodeTransformerLM, |
| tokenizer: CodeTokenizer, |
| prompt: str, |
| max_new_tokens: int, |
| device: torch.device, |
| ) -> Dict[str, float]: |
| model.eval() |
| ids = prepare_prompt_ids(tokenizer, prompt) |
| input_ids = torch.tensor([ids], dtype=torch.long, device=device) |
|
|
| eos_id = tokenizer.special_token_ids.get("<EOS>") |
|
|
| if device.type == "cuda": |
| torch.cuda.empty_cache() |
| torch.cuda.reset_peak_memory_stats() |
| torch.cuda.synchronize() |
|
|
| t0 = time.perf_counter() |
| generated = 0 |
| for _ in range(max_new_tokens): |
| out = model(input_ids=input_ids) |
| next_id = torch.argmax(out["logits"][:, -1, :], dim=-1, keepdim=True) |
| input_ids = torch.cat([input_ids, next_id], dim=1) |
| generated += 1 |
| if eos_id is not None and int(next_id.item()) == int(eos_id): |
| break |
|
|
| if device.type == "cuda": |
| torch.cuda.synchronize() |
| dt = max(1e-6, time.perf_counter() - t0) |
|
|
| decoded = tokenizer.decode(input_ids[0].tolist()) |
| code = restore_code_from_structured(decoded) |
|
|
| peak_vram = float(torch.cuda.max_memory_allocated() / (1024**3)) if device.type == "cuda" else 0.0 |
|
|
| return { |
| "generated_tokens": float(generated), |
| "seconds": float(dt), |
| "tokens_per_second": float(generated / dt), |
| "peak_vram_gb": peak_vram, |
| "preview_code": code[:300], |
| } |
|
|
|
|
| def bytes_to_gb(n: int) -> float: |
| return float(n / (1024**3)) |
|
|
|
|
| def write_portable_launcher(portable_dir: Path) -> None: |
| bat = r"""@echo off`r`ntitle MINDI 1.0 420M |
| setlocal |
| cd /d "%~dp0" |
| if not exist .venv ( |
| echo [setup] Creating virtual environment... |
| py -3 -m venv .venv |
| ) |
| call .venv\Scripts\activate.bat |
| python -m pip install --upgrade pip >nul |
| python -m pip install -r requirements_portable.txt |
| python app\launch_portable_chat.py --config app\portable_chat_config.yaml |
| endlocal |
| """ |
| (portable_dir / "Start_MINDI.bat").write_text(bat, encoding="utf-8") |
|
|
|
|
| def write_portable_requirements(portable_dir: Path) -> None: |
| req = """torch==2.5.1 |
| tokenizers==0.20.1 |
| pyyaml==6.0.2 |
| gradio==5.5.0 |
| pygments==2.19.2 |
| """ |
| (portable_dir / "requirements_portable.txt").write_text(req, encoding="utf-8") |
|
|
|
|
| def write_portable_chat_files(portable_dir: Path, port: int) -> None: |
| app_dir = portable_dir / "app" |
| app_dir.mkdir(parents=True, exist_ok=True) |
|
|
| cfg = f"""model: |
| model_config_path: app/model_config.yaml |
| quantized_state_path: model/model_step3200_int8_state.pt |
| tokenizer_dir: model/tokenizer |
| |
| server: |
| host: 127.0.0.1 |
| port: {port} |
| """ |
| (app_dir / "portable_chat_config.yaml").write_text(cfg, encoding="utf-8") |
|
|
| launch = r'''from __future__ import annotations |
| |
| import argparse |
| import sys |
| from pathlib import Path |
| |
| import gradio as gr |
| import torch |
| import yaml |
| |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
| |
| from src.model_architecture.code_transformer import CodeTransformerLM, ModelConfig, get_model_presets |
| from src.tokenizer.code_tokenizer import CodeTokenizer |
| from src.inference_engine.inference_engine import InferenceEngine, DecodingConfig |
| |
| |
| def load_yaml(path: Path): |
| return yaml.safe_load(path.read_text(encoding="utf-8-sig")) |
| |
| |
| def build_model_config(path: Path) -> ModelConfig: |
| cfg = load_yaml(path) |
| preset = cfg.get("preset") |
| model_cfg = cfg.get("model", {}) |
| if preset: |
| merged = get_model_presets()[preset].__dict__.copy() |
| merged.update(model_cfg) |
| return ModelConfig(**merged) |
| return ModelConfig(**model_cfg) |
| |
| |
| def main(): |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--config", default="app/portable_chat_config.yaml") |
| ap.add_argument("--self_test", action="store_true") |
| args = ap.parse_args() |
| |
| cfg = load_yaml(PROJECT_ROOT / args.config) |
| mcfg = build_model_config(PROJECT_ROOT / cfg["model"]["model_config_path"]) |
| |
| tokenizer = CodeTokenizer.load(str(PROJECT_ROOT / cfg["model"]["tokenizer_dir"])) |
| model = CodeTransformerLM(mcfg).cpu().float() |
| model = torch.quantization.quantize_dynamic(model, {torch.nn.Linear}, dtype=torch.qint8) |
| state = torch.load(PROJECT_ROOT / cfg["model"]["quantized_state_path"], map_location="cpu") |
| model.load_state_dict(state) |
| |
| engine = InferenceEngine(model=model, tokenizer=tokenizer, device=torch.device("cpu")) |
| dcfg = DecodingConfig(max_new_tokens=220, min_tokens_before_stop_check=64) |
| |
| if args.self_test: |
| out = engine.generate_with_retry("Write a Python function to add two numbers.", "python", dcfg) |
| code = out["final"]["code"] |
| print("portable_self_test_ok=", bool(code.strip())) |
| return |
| |
| def respond(prompt, history): |
| history = history or [] |
| p = (prompt or "").strip() |
| if not p: |
| return history, "" |
| out = engine.generate_with_retry(p, "python", dcfg) |
| history.append((p, out["final"]["code"])) |
| return history, "" |
| |
| with gr.Blocks(title="MINDI 1.0 420M") as demo: |
| gr.Markdown("## MINDI 1.0 420M (INT8 Portable)") |
| chat = gr.Chatbot(height=520) |
| box = gr.Textbox(label="Prompt", lines=4) |
| btn = gr.Button("Generate") |
| clear = gr.Button("Clear") |
| btn.click(respond, [box, chat], [chat, box]) |
| box.submit(respond, [box, chat], [chat, box]) |
| clear.click(lambda: ([], ""), None, [chat, box]) |
| |
| demo.launch(server_name=cfg["server"].get("host", "127.0.0.1"), server_port=int(cfg["server"].get("port", 7861)), share=False, inbrowser=False) |
| |
| |
| if __name__ == "__main__": |
| main() |
| ''' |
| (app_dir / "launch_portable_chat.py").write_text(launch, encoding="utf-8") |
|
|
|
|
| def copy_runtime_sources(portable_dir: Path) -> None: |
| src_root = PROJECT_ROOT / "src" |
| dst_root = portable_dir / "src" |
| needed = [ |
| "__init__.py", |
| "model_architecture/__init__.py", |
| "model_architecture/code_transformer.py", |
| "tokenizer/__init__.py", |
| "tokenizer/code_tokenizer.py", |
| "evaluation_system/__init__.py", |
| "evaluation_system/code_eval.py", |
| "inference_engine/__init__.py", |
| "inference_engine/inference_engine.py", |
| ] |
| for rel in needed: |
| src = src_root / rel |
| dst = dst_root / rel |
| dst.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(src, dst) |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| cfg = load_yaml(PROJECT_ROOT / args.config) |
|
|
| mcfg = build_model_config(PROJECT_ROOT / cfg["model"]["model_config_path"]) |
| tokenizer = CodeTokenizer.load(str(PROJECT_ROOT / cfg["model"]["tokenizer_dir"])) |
|
|
| source_ckpt = PROJECT_ROOT / cfg["model"]["source_checkpoint_path"] |
| if not source_ckpt.exists(): |
| raise FileNotFoundError(f"Source checkpoint not found: {source_ckpt}") |
|
|
| |
| baseline_device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| baseline = CodeTransformerLM(mcfg).to(baseline_device) |
| payload = torch.load(source_ckpt, map_location=baseline_device) |
| baseline.load_state_dict(payload["model_state"]) |
| if baseline_device.type == "cuda": |
| baseline.half() |
|
|
| bench_prompt = str(cfg["benchmark"].get("prompt", "Write a Python function to add two numbers.")) |
| max_new_tokens = int(cfg["benchmark"].get("max_new_tokens", 120)) |
|
|
| baseline_metrics = benchmark_tokens_per_sec( |
| model=baseline, |
| tokenizer=tokenizer, |
| prompt=bench_prompt, |
| max_new_tokens=max_new_tokens, |
| device=baseline_device, |
| ) |
|
|
| |
| quant_model = CodeTransformerLM(mcfg).cpu().float() |
| payload_cpu = torch.load(source_ckpt, map_location="cpu") |
| quant_model.load_state_dict(payload_cpu["model_state"]) |
| quant_model = torch.quantization.quantize_dynamic(quant_model, {nn.Linear}, dtype=torch.qint8) |
|
|
| q_path = PROJECT_ROOT / cfg["quantization"]["quantized_output_path"] |
| q_path.parent.mkdir(parents=True, exist_ok=True) |
| torch.save(quant_model.state_dict(), q_path) |
|
|
| quant_metrics = benchmark_tokens_per_sec( |
| model=quant_model, |
| tokenizer=tokenizer, |
| prompt=bench_prompt, |
| max_new_tokens=max_new_tokens, |
| device=torch.device("cpu"), |
| ) |
|
|
| before_size_gb = bytes_to_gb(source_ckpt.stat().st_size) |
| after_size_gb = bytes_to_gb(q_path.stat().st_size) |
|
|
| report = { |
| "source_checkpoint": str(source_ckpt), |
| "quantized_checkpoint": str(q_path), |
| "size_before_gb": before_size_gb, |
| "size_after_gb": after_size_gb, |
| "baseline_device": str(baseline_device), |
| "baseline_tokens_per_second": baseline_metrics["tokens_per_second"], |
| "quantized_tokens_per_second": quant_metrics["tokens_per_second"], |
| "baseline_peak_vram_gb": baseline_metrics["peak_vram_gb"], |
| "quantized_peak_vram_gb": quant_metrics["peak_vram_gb"], |
| "baseline_generated_tokens": baseline_metrics["generated_tokens"], |
| "quantized_generated_tokens": quant_metrics["generated_tokens"], |
| } |
|
|
| report_path = PROJECT_ROOT / cfg["outputs"]["benchmark_report_json"] |
| report_path.parent.mkdir(parents=True, exist_ok=True) |
| report_path.write_text(json.dumps(report, indent=2), encoding="utf-8") |
|
|
| |
| portable_dir = PROJECT_ROOT / cfg["package"]["output_dir"] |
| if portable_dir.exists(): |
| shutil.rmtree(portable_dir) |
| portable_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| (portable_dir / "model").mkdir(parents=True, exist_ok=True) |
| shutil.copy2(q_path, portable_dir / "model" / q_path.name) |
|
|
| tok_src = PROJECT_ROOT / cfg["model"]["tokenizer_dir"] |
| tok_dst = portable_dir / "model" / "tokenizer" |
| shutil.copytree(tok_src, tok_dst) |
|
|
| shutil.copy2(PROJECT_ROOT / cfg["model"]["model_config_path"], portable_dir / "app" / "model_config.yaml") if (portable_dir / "app").exists() else None |
| write_portable_chat_files(portable_dir, port=int(cfg["package"].get("app_port", 7861))) |
| shutil.copy2(PROJECT_ROOT / cfg["model"]["model_config_path"], portable_dir / "app" / "model_config.yaml") |
|
|
| copy_runtime_sources(portable_dir) |
| write_portable_requirements(portable_dir) |
| write_portable_launcher(portable_dir) |
|
|
| |
| py = PROJECT_ROOT / ".venv" / "Scripts" / "python.exe" |
| if py.exists(): |
| import subprocess |
|
|
| cmd = [str(py), str(portable_dir / "app" / "launch_portable_chat.py"), "--config", "app/portable_chat_config.yaml", "--self_test"] |
| proc = subprocess.run(cmd, cwd=str(portable_dir), capture_output=True, text=True, timeout=120) |
| verify_ok = (proc.returncode == 0) and ("portable_self_test_ok= True" in (proc.stdout + proc.stderr)) |
| else: |
| verify_ok = False |
|
|
| print("Component 10 export completed.") |
| print(f"INT8 model saved: {q_path}") |
| print(f"Benchmark report: {report_path}") |
| print(f"Portable package: {portable_dir}") |
| print(f"Portable self-test ok: {verify_ok}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|