|
|
""" |
|
|
MiniMind Export Utilities |
|
|
Export models to ONNX, GGUF (llama.cpp), and other formats. |
|
|
""" |
|
|
|
|
|
import json |
|
|
import struct |
|
|
from typing import Optional, Dict, Any, List |
|
|
from pathlib import Path |
|
|
from dataclasses import dataclass, asdict |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ExportConfig: |
|
|
"""Configuration for model export.""" |
|
|
|
|
|
opset_version: int = 17 |
|
|
use_external_data: bool = False |
|
|
optimize_for_mobile: bool = True |
|
|
|
|
|
|
|
|
gguf_quant_type: str = "Q4_K_M" |
|
|
gguf_use_mmap: bool = True |
|
|
|
|
|
|
|
|
max_seq_len: int = 2048 |
|
|
batch_size: int = 1 |
|
|
|
|
|
|
|
|
def export_to_onnx( |
|
|
model: nn.Module, |
|
|
output_path: str, |
|
|
config: Optional[ExportConfig] = None, |
|
|
sample_input: Optional[torch.Tensor] = None, |
|
|
) -> str: |
|
|
""" |
|
|
Export model to ONNX format. |
|
|
|
|
|
Args: |
|
|
model: PyTorch model to export |
|
|
output_path: Path to save ONNX model |
|
|
config: Export configuration |
|
|
sample_input: Sample input tensor for tracing |
|
|
|
|
|
Returns: |
|
|
Path to exported model |
|
|
""" |
|
|
config = config or ExportConfig() |
|
|
output_path = Path(output_path) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
model.eval() |
|
|
device = next(model.parameters()).device |
|
|
|
|
|
|
|
|
if sample_input is None: |
|
|
sample_input = torch.randint( |
|
|
0, 1000, |
|
|
(config.batch_size, config.max_seq_len), |
|
|
dtype=torch.long, |
|
|
device=device, |
|
|
) |
|
|
|
|
|
|
|
|
dynamic_axes = { |
|
|
"input_ids": {0: "batch_size", 1: "sequence_length"}, |
|
|
"logits": {0: "batch_size", 1: "sequence_length"}, |
|
|
} |
|
|
|
|
|
|
|
|
class ONNXWrapper(nn.Module): |
|
|
def __init__(self, model): |
|
|
super().__init__() |
|
|
self.model = model |
|
|
|
|
|
def forward(self, input_ids): |
|
|
_, logits, _, _ = self.model(input_ids) |
|
|
return logits |
|
|
|
|
|
wrapped_model = ONNXWrapper(model) |
|
|
|
|
|
|
|
|
torch.onnx.export( |
|
|
wrapped_model, |
|
|
(sample_input,), |
|
|
str(output_path), |
|
|
opset_version=config.opset_version, |
|
|
input_names=["input_ids"], |
|
|
output_names=["logits"], |
|
|
dynamic_axes=dynamic_axes, |
|
|
do_constant_folding=True, |
|
|
) |
|
|
|
|
|
print(f"ONNX model exported to {output_path}") |
|
|
|
|
|
|
|
|
if config.optimize_for_mobile: |
|
|
try: |
|
|
import onnx |
|
|
from onnxruntime.transformers import optimizer |
|
|
|
|
|
optimized_path = output_path.with_suffix(".optimized.onnx") |
|
|
onnx_model = onnx.load(str(output_path)) |
|
|
|
|
|
|
|
|
from onnx import optimizer as onnx_optimizer |
|
|
passes = ["fuse_bn_into_conv", "fuse_consecutive_transposes"] |
|
|
optimized_model = onnx_optimizer.optimize(onnx_model, passes) |
|
|
onnx.save(optimized_model, str(optimized_path)) |
|
|
|
|
|
print(f"Optimized ONNX model saved to {optimized_path}") |
|
|
except ImportError: |
|
|
print("Note: Install onnx and onnxruntime for optimization") |
|
|
|
|
|
return str(output_path) |
|
|
|
|
|
|
|
|
|
|
|
GGUF_MAGIC = 0x46554747 |
|
|
GGUF_VERSION = 3 |
|
|
|
|
|
GGUF_TYPE_UINT8 = 0 |
|
|
GGUF_TYPE_INT8 = 1 |
|
|
GGUF_TYPE_UINT16 = 2 |
|
|
GGUF_TYPE_INT16 = 3 |
|
|
GGUF_TYPE_UINT32 = 4 |
|
|
GGUF_TYPE_INT32 = 5 |
|
|
GGUF_TYPE_FLOAT32 = 6 |
|
|
GGUF_TYPE_BOOL = 7 |
|
|
GGUF_TYPE_STRING = 8 |
|
|
GGUF_TYPE_ARRAY = 9 |
|
|
GGUF_TYPE_UINT64 = 10 |
|
|
GGUF_TYPE_INT64 = 11 |
|
|
GGUF_TYPE_FLOAT64 = 12 |
|
|
|
|
|
|
|
|
class GGUFWriter: |
|
|
"""Writer for GGUF format (llama.cpp compatible).""" |
|
|
|
|
|
def __init__(self, output_path: str): |
|
|
self.output_path = Path(output_path) |
|
|
self.metadata: Dict[str, Any] = {} |
|
|
self.tensors: List[Dict[str, Any]] = [] |
|
|
|
|
|
def add_metadata(self, key: str, value: Any, value_type: int = None): |
|
|
"""Add metadata key-value pair.""" |
|
|
self.metadata[key] = {"value": value, "type": value_type} |
|
|
|
|
|
def add_tensor(self, name: str, tensor: torch.Tensor, quant_type: str = "F32"): |
|
|
"""Add a tensor to be written.""" |
|
|
self.tensors.append({ |
|
|
"name": name, |
|
|
"data": tensor.cpu().numpy(), |
|
|
"quant_type": quant_type, |
|
|
}) |
|
|
|
|
|
def _write_string(self, f, s: str): |
|
|
"""Write a string in GGUF format.""" |
|
|
encoded = s.encode("utf-8") |
|
|
f.write(struct.pack("<Q", len(encoded))) |
|
|
f.write(encoded) |
|
|
|
|
|
def _write_metadata_value(self, f, value: Any, value_type: int): |
|
|
"""Write a metadata value.""" |
|
|
f.write(struct.pack("<I", value_type)) |
|
|
|
|
|
if value_type == GGUF_TYPE_UINT32: |
|
|
f.write(struct.pack("<I", value)) |
|
|
elif value_type == GGUF_TYPE_INT32: |
|
|
f.write(struct.pack("<i", value)) |
|
|
elif value_type == GGUF_TYPE_FLOAT32: |
|
|
f.write(struct.pack("<f", value)) |
|
|
elif value_type == GGUF_TYPE_STRING: |
|
|
self._write_string(f, value) |
|
|
elif value_type == GGUF_TYPE_BOOL: |
|
|
f.write(struct.pack("<?", value)) |
|
|
|
|
|
def write(self): |
|
|
"""Write the GGUF file.""" |
|
|
self.output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
with open(self.output_path, "wb") as f: |
|
|
|
|
|
f.write(struct.pack("<I", GGUF_MAGIC)) |
|
|
f.write(struct.pack("<I", GGUF_VERSION)) |
|
|
f.write(struct.pack("<Q", len(self.tensors))) |
|
|
f.write(struct.pack("<Q", len(self.metadata))) |
|
|
|
|
|
|
|
|
for key, meta in self.metadata.items(): |
|
|
self._write_string(f, key) |
|
|
self._write_metadata_value(f, meta["value"], meta["type"]) |
|
|
|
|
|
|
|
|
tensor_data_offset = f.tell() |
|
|
for tensor_info in self.tensors: |
|
|
self._write_string(f, tensor_info["name"]) |
|
|
data = tensor_info["data"] |
|
|
|
|
|
|
|
|
f.write(struct.pack("<I", len(data.shape))) |
|
|
|
|
|
|
|
|
for dim in data.shape: |
|
|
f.write(struct.pack("<Q", dim)) |
|
|
|
|
|
|
|
|
f.write(struct.pack("<I", GGUF_TYPE_FLOAT32)) |
|
|
|
|
|
|
|
|
f.write(struct.pack("<Q", 0)) |
|
|
|
|
|
|
|
|
alignment = 32 |
|
|
current_pos = f.tell() |
|
|
padding = (alignment - (current_pos % alignment)) % alignment |
|
|
f.write(b"\x00" * padding) |
|
|
|
|
|
|
|
|
for tensor_info in self.tensors: |
|
|
data = tensor_info["data"].astype("float32") |
|
|
f.write(data.tobytes()) |
|
|
|
|
|
print(f"GGUF model written to {self.output_path}") |
|
|
|
|
|
|
|
|
def export_to_gguf( |
|
|
model: nn.Module, |
|
|
output_path: str, |
|
|
model_config: Any, |
|
|
config: Optional[ExportConfig] = None, |
|
|
) -> str: |
|
|
""" |
|
|
Export model to GGUF format for llama.cpp. |
|
|
|
|
|
Args: |
|
|
model: PyTorch model to export |
|
|
output_path: Path to save GGUF model |
|
|
model_config: Model configuration |
|
|
config: Export configuration |
|
|
|
|
|
Returns: |
|
|
Path to exported model |
|
|
""" |
|
|
config = config or ExportConfig() |
|
|
writer = GGUFWriter(output_path) |
|
|
|
|
|
|
|
|
writer.add_metadata("general.architecture", "mind2", GGUF_TYPE_STRING) |
|
|
writer.add_metadata("general.name", model_config.model_name, GGUF_TYPE_STRING) |
|
|
writer.add_metadata("mind2.context_length", model_config.max_position_embeddings, GGUF_TYPE_UINT32) |
|
|
writer.add_metadata("mind2.embedding_length", model_config.hidden_size, GGUF_TYPE_UINT32) |
|
|
writer.add_metadata("mind2.block_count", model_config.num_hidden_layers, GGUF_TYPE_UINT32) |
|
|
writer.add_metadata("mind2.attention.head_count", model_config.num_attention_heads, GGUF_TYPE_UINT32) |
|
|
writer.add_metadata("mind2.attention.head_count_kv", model_config.num_key_value_heads, GGUF_TYPE_UINT32) |
|
|
writer.add_metadata("mind2.rope.freq_base", model_config.rope_theta, GGUF_TYPE_FLOAT32) |
|
|
writer.add_metadata("mind2.expert_count", model_config.num_experts, GGUF_TYPE_UINT32) |
|
|
writer.add_metadata("mind2.expert_used_count", model_config.num_experts_per_tok, GGUF_TYPE_UINT32) |
|
|
|
|
|
|
|
|
writer.add_metadata("tokenizer.ggml.model", "gpt2", GGUF_TYPE_STRING) |
|
|
|
|
|
|
|
|
state_dict = model.state_dict() |
|
|
tensor_name_map = { |
|
|
"model.embed_tokens.weight": "token_embd.weight", |
|
|
"model.norm.weight": "output_norm.weight", |
|
|
"lm_head.weight": "output.weight", |
|
|
} |
|
|
|
|
|
for name, tensor in state_dict.items(): |
|
|
|
|
|
gguf_name = tensor_name_map.get(name, name) |
|
|
|
|
|
|
|
|
if "layers." in name: |
|
|
parts = name.split(".") |
|
|
layer_idx = parts[2] |
|
|
|
|
|
if "self_attn.q_proj" in name: |
|
|
gguf_name = f"blk.{layer_idx}.attn_q.weight" |
|
|
elif "self_attn.k_proj" in name: |
|
|
gguf_name = f"blk.{layer_idx}.attn_k.weight" |
|
|
elif "self_attn.v_proj" in name: |
|
|
gguf_name = f"blk.{layer_idx}.attn_v.weight" |
|
|
elif "self_attn.o_proj" in name: |
|
|
gguf_name = f"blk.{layer_idx}.attn_output.weight" |
|
|
elif "input_layernorm" in name: |
|
|
gguf_name = f"blk.{layer_idx}.attn_norm.weight" |
|
|
elif "post_attention_layernorm" in name: |
|
|
gguf_name = f"blk.{layer_idx}.ffn_norm.weight" |
|
|
elif "mlp.gate" in name: |
|
|
gguf_name = f"blk.{layer_idx}.ffn_gate.weight" |
|
|
elif "experts" in name: |
|
|
expert_idx = parts[4] |
|
|
if "gate_proj" in name: |
|
|
gguf_name = f"blk.{layer_idx}.ffn_gate_exps.{expert_idx}.weight" |
|
|
elif "up_proj" in name: |
|
|
gguf_name = f"blk.{layer_idx}.ffn_up_exps.{expert_idx}.weight" |
|
|
elif "down_proj" in name: |
|
|
gguf_name = f"blk.{layer_idx}.ffn_down_exps.{expert_idx}.weight" |
|
|
|
|
|
writer.add_tensor(gguf_name, tensor) |
|
|
|
|
|
writer.write() |
|
|
return str(output_path) |
|
|
|
|
|
|
|
|
def export_for_android( |
|
|
model: nn.Module, |
|
|
output_dir: str, |
|
|
model_config: Any, |
|
|
export_formats: List[str] = ["onnx", "gguf"], |
|
|
) -> Dict[str, str]: |
|
|
""" |
|
|
Export model in formats suitable for Android deployment. |
|
|
|
|
|
Args: |
|
|
model: PyTorch model |
|
|
output_dir: Output directory |
|
|
model_config: Model configuration |
|
|
export_formats: List of formats to export |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping format to output path |
|
|
""" |
|
|
output_dir = Path(output_dir) |
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
outputs = {} |
|
|
|
|
|
config = ExportConfig( |
|
|
optimize_for_mobile=True, |
|
|
max_seq_len=512, |
|
|
) |
|
|
|
|
|
if "onnx" in export_formats: |
|
|
onnx_path = output_dir / f"{model_config.model_name}.onnx" |
|
|
outputs["onnx"] = export_to_onnx(model, str(onnx_path), config) |
|
|
|
|
|
if "gguf" in export_formats: |
|
|
gguf_path = output_dir / f"{model_config.model_name}.gguf" |
|
|
outputs["gguf"] = export_to_gguf(model, str(gguf_path), model_config, config) |
|
|
|
|
|
|
|
|
model_info = { |
|
|
"model_name": model_config.model_name, |
|
|
"vocab_size": model_config.vocab_size, |
|
|
"hidden_size": model_config.hidden_size, |
|
|
"num_layers": model_config.num_hidden_layers, |
|
|
"num_heads": model_config.num_attention_heads, |
|
|
"max_seq_len": config.max_seq_len, |
|
|
"exports": {k: str(v) for k, v in outputs.items()}, |
|
|
} |
|
|
|
|
|
info_path = output_dir / "model_info.json" |
|
|
with open(info_path, "w") as f: |
|
|
json.dump(model_info, f, indent=2) |
|
|
|
|
|
print(f"Model info saved to {info_path}") |
|
|
outputs["info"] = str(info_path) |
|
|
|
|
|
return outputs |
|
|
|