|
|
|
|
|
"""串行执行切分后的 AXModel 子图以复现完整推理路径。""" |
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import json |
|
|
import logging |
|
|
import sys |
|
|
import types |
|
|
from pathlib import Path |
|
|
from typing import Dict, List |
|
|
|
|
|
import numpy as np |
|
|
import onnx |
|
|
from axengine import InferenceSession |
|
|
|
|
|
PROJECT_ROOT = Path(__file__).resolve().parent |
|
|
SCRIPTS_DIR = PROJECT_ROOT / "scripts" |
|
|
if SCRIPTS_DIR.as_posix() not in sys.path: |
|
|
sys.path.insert(0, SCRIPTS_DIR.as_posix()) |
|
|
|
|
|
from split_onnx_by_subconfigs import ( |
|
|
SubGraphSpec, |
|
|
build_graph_index, |
|
|
derive_interface, |
|
|
ordered_specs, |
|
|
sanitize, |
|
|
trace_nodes_between, |
|
|
untouched_components, |
|
|
) |
|
|
|
|
|
|
|
|
def _ensure_numpy_core_alias() -> None: |
|
|
"""兼容 numpy>=2 保存、numpy<2 读取时的 pickle 模块路径差异。""" |
|
|
if "numpy._core" in sys.modules: |
|
|
return |
|
|
try: |
|
|
core_module = sys.modules.get("numpy.core") or __import__("numpy.core") |
|
|
except Exception: |
|
|
return |
|
|
alias = types.ModuleType("numpy._core") |
|
|
alias.__dict__.update(core_module.__dict__) |
|
|
sys.modules["numpy._core"] = alias |
|
|
submods = { |
|
|
"multiarray": getattr(core_module, "multiarray", None), |
|
|
"umath": getattr(core_module, "umath", None), |
|
|
"numerictypes": getattr(core_module, "numerictypes", None), |
|
|
"_multiarray_umath": getattr(core_module, "_multiarray_umath", None), |
|
|
} |
|
|
for name, module in submods.items(): |
|
|
if module is not None: |
|
|
sys.modules[f"numpy._core.{name}"] = module |
|
|
|
|
|
|
|
|
def load_specs(config_path: Path, onnx_path: Path) -> List[SubGraphSpec]: |
|
|
with config_path.open("r", encoding="utf-8") as f: |
|
|
config = json.load(f) |
|
|
sub_configs = config.get("compiler", {}).get("sub_configs", []) |
|
|
if not sub_configs: |
|
|
raise ValueError("配置文件中缺少 compiler.sub_configs") |
|
|
|
|
|
model = onnx.load(onnx_path.as_posix()) |
|
|
index = build_graph_index(model) |
|
|
|
|
|
specs: List[SubGraphSpec] = [] |
|
|
covered_nodes = set() |
|
|
for idx, entry in enumerate(sub_configs): |
|
|
start = [name for name in entry.get("start_tensor_names", []) if name] |
|
|
end = [name for name in entry.get("end_tensor_names", []) if name] |
|
|
if not start or not end: |
|
|
raise ValueError(f"sub_config[{idx}] 缺少 tensor 名称") |
|
|
spec = SubGraphSpec( |
|
|
label=f"cfg_{idx:02d}", |
|
|
start=start, |
|
|
end=end, |
|
|
node_names=set(), |
|
|
source="config", |
|
|
) |
|
|
nodes = trace_nodes_between(spec, index) |
|
|
spec.node_names = nodes |
|
|
covered_nodes.update(nodes) |
|
|
specs.append(spec) |
|
|
|
|
|
leftovers = untouched_components(index.node_order, covered_nodes, index) |
|
|
for idx, component in enumerate(leftovers): |
|
|
start, end = derive_interface(component, index) |
|
|
if not end: |
|
|
continue |
|
|
spec = SubGraphSpec( |
|
|
label=f"auto_{idx:02d}", |
|
|
start=start, |
|
|
end=end, |
|
|
node_names=component, |
|
|
source="auto", |
|
|
) |
|
|
specs.append(spec) |
|
|
|
|
|
return ordered_specs(specs, index) |
|
|
|
|
|
|
|
|
def expected_model_path(spec: SubGraphSpec, model_dir: Path) -> Path: |
|
|
head = sanitize(spec.start[0]) if spec.start else "const" |
|
|
tail = sanitize(spec.end[0]) if spec.end else "out" |
|
|
filename = f"{spec.label}_{head}_to_{tail}_{spec.source}.axmodel" |
|
|
path = model_dir / filename |
|
|
if not path.exists(): |
|
|
raise FileNotFoundError(f"未找到模型文件: {path}") |
|
|
return path |
|
|
|
|
|
|
|
|
def run_pipeline( |
|
|
specs: List[SubGraphSpec], |
|
|
model_dir: Path, |
|
|
feed_dict: Dict[str, np.ndarray], |
|
|
) -> Dict[str, np.ndarray]: |
|
|
tensor_store: Dict[str, np.ndarray] = dict(feed_dict) |
|
|
for spec in specs: |
|
|
model_path = expected_model_path(spec, model_dir) |
|
|
session = InferenceSession(model_path.as_posix()) |
|
|
inputs = {} |
|
|
missing = [] |
|
|
for name in spec.start: |
|
|
value = tensor_store.get(name) |
|
|
if value is None: |
|
|
missing.append(name) |
|
|
else: |
|
|
inputs[name] = value |
|
|
if missing: |
|
|
raise KeyError(f"模型 {model_path.name} 缺少输入张量: {missing}") |
|
|
results = session.run(spec.end, inputs) |
|
|
if not isinstance(results, (list, tuple)): |
|
|
results = [results] |
|
|
for out_name, value in zip(spec.end, results): |
|
|
tensor_store[out_name] = value |
|
|
shapes = {out_name: tuple(value.shape) for out_name, value in zip(spec.end, results)} |
|
|
logging.info("完成 %s: %s", model_path.name, shapes) |
|
|
return tensor_store |
|
|
|
|
|
|
|
|
def load_input_file(path: Path, single_name: str | None) -> Dict[str, np.ndarray]: |
|
|
suffix = path.suffix.lower() |
|
|
if suffix == ".npz": |
|
|
data = np.load(path, allow_pickle=False) |
|
|
return {key: np.asarray(data[key]) for key in data.files} |
|
|
|
|
|
if suffix == ".npy": |
|
|
_ensure_numpy_core_alias() |
|
|
blob = np.load(path, allow_pickle=True) |
|
|
if isinstance(blob, np.ndarray) and blob.dtype == object: |
|
|
obj = blob.item() |
|
|
if isinstance(obj, dict): |
|
|
return {str(k): np.asarray(v) for k, v in obj.items()} |
|
|
if single_name is None: |
|
|
raise ValueError("单一 .npy 输入需要通过 --single-input-name 指定张量名") |
|
|
return {single_name: np.asarray(obj)} |
|
|
if single_name is None: |
|
|
raise ValueError("单一 .npy 输入需要通过 --single-input-name 指定张量名") |
|
|
return {single_name: np.asarray(blob)} |
|
|
|
|
|
raise ValueError(f"暂不支持的输入文件格式: {path}") |
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser(description="串行执行切分后的 AXModel") |
|
|
parser.add_argument("--onnx", required=True, type=Path, help="原始未切分 ONNX") |
|
|
parser.add_argument("--config", required=True, type=Path, help="sub_config 配置文件") |
|
|
parser.add_argument("--model-dir", required=True, type=Path, help=".axmodel 所在目录") |
|
|
parser.add_argument( |
|
|
"--input-file", |
|
|
required=True, |
|
|
type=Path, |
|
|
help="包含原始输入张量的 npz 或 npy 文件", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--single-input-name", |
|
|
type=str, |
|
|
help="当 --input-file 是单张量 .npy 文件时指定对应的输入名", |
|
|
) |
|
|
parser.add_argument("--dump-npz", type=Path, help="可选,将最终输出保存为 npz") |
|
|
parser.add_argument("--log", default="INFO", help="日志等级") |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def main() -> None: |
|
|
args = parse_args() |
|
|
logging.basicConfig(level=getattr(logging, args.log.upper(), logging.INFO)) |
|
|
|
|
|
specs = load_specs(args.config, args.onnx) |
|
|
feed = load_input_file(args.input_file, args.single_input_name) |
|
|
store = run_pipeline(specs, args.model_dir, feed) |
|
|
|
|
|
final_outputs = {name: store[name] for name in specs[-1].end} |
|
|
for name, value in final_outputs.items(): |
|
|
logging.info("最终输出 %s: shape=%s", name, value.shape) |
|
|
|
|
|
if args.dump_npz: |
|
|
np.savez(args.dump_npz, **final_outputs) |
|
|
logging.info("已保存输出到 %s", args.dump_npz) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
""" |
|
|
python3 infer_split_axmodels.py \ |
|
|
--onnx onnx-models/transformer.onnx \ |
|
|
--config pulsar2_configs/transformers_subgraph.json \ |
|
|
--model-dir compiled_slice_quant_onnx \ |
|
|
--input-file onnx-calibration-no-controlnet/transformer_inputs_prompt000_step00.npy \ |
|
|
--single-input-name timestep |
|
|
""" |
|
|
main() |
|
|
|