| |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import time |
| from pathlib import Path |
|
|
| import msgpack |
|
|
|
|
| ROOT = Path(__file__).resolve().parent |
| ARTIFACTS = ROOT / "artifacts" |
|
|
|
|
| def vmhwm_kb() -> int | None: |
| with open("/proc/self/status", "r", encoding="utf-8", errors="ignore") as fh: |
| for line in fh: |
| if line.startswith("VmHWM:"): |
| return int(line.split()[1]) |
| return None |
|
|
|
|
| def parse_one(path: Path) -> dict: |
| blob = path.read_bytes() |
| start = time.time() |
| before = vmhwm_kb() |
| obj = msgpack.unpackb(blob, raw=False) |
| after = vmhwm_kb() |
| return { |
| "path": str(path), |
| "elapsed": time.time() - start, |
| "vmhwm_before_kb": before, |
| "vmhwm_after_kb": after, |
| "python_type": type(obj).__name__, |
| "length": len(obj), |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--control", |
| type=Path, |
| default=ARTIFACTS / "control_bin32_same_size.msgpack", |
| ) |
| parser.add_argument( |
| "--malicious", |
| type=Path, |
| default=ARTIFACTS / "malicious_array32_empty_strings_20000000.msgpack", |
| ) |
| parser.add_argument("--json-out", type=Path) |
| args = parser.parse_args() |
|
|
| control = parse_one(args.control) |
| malicious = parse_one(args.malicious) |
| summary = { |
| "format": "MessagePack (.msgpack)", |
| "msgpack_version": msgpack.__version__, |
| "entrypoint": "msgpack.unpackb(..., raw=False)", |
| "control": control, |
| "malicious": malicious, |
| "delta": { |
| "elapsed_ratio": malicious["elapsed"] / max(control["elapsed"], 1e-9), |
| "vmhwm_delta_kb": malicious["vmhwm_after_kb"] - control["vmhwm_after_kb"], |
| "same_size_files": args.control.stat().st_size == args.malicious.stat().st_size, |
| "repro_ok": ( |
| control["python_type"] == "bytes" |
| and malicious["python_type"] == "list" |
| and malicious["length"] == 20_000_000 |
| and malicious["vmhwm_after_kb"] - control["vmhwm_after_kb"] >= 120_000 |
| ), |
| }, |
| } |
| if args.json_out: |
| args.json_out.write_text(json.dumps(summary, indent=2) + "\n") |
| print(json.dumps(summary, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|