|
|
"""Manually parse protobuf structure of extracted files.""" |
|
|
from pathlib import Path |
|
|
|
|
|
EXTRACT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\extracted_models") |
|
|
|
|
|
def read_varint(data, pos): |
|
|
val = 0 |
|
|
shift = 0 |
|
|
while pos < len(data): |
|
|
b = data[pos] |
|
|
pos += 1 |
|
|
val |= (b & 0x7f) << shift |
|
|
if not (b & 0x80): |
|
|
break |
|
|
shift += 7 |
|
|
return val, pos |
|
|
|
|
|
def parse_protobuf_fields(data, max_fields=10): |
|
|
"""Parse protobuf wire format and return field info.""" |
|
|
pos = 0 |
|
|
fields = [] |
|
|
for _ in range(max_fields): |
|
|
if pos >= len(data): |
|
|
break |
|
|
tag_byte = data[pos] |
|
|
field_num = tag_byte >> 3 |
|
|
wire_type = tag_byte & 0x07 |
|
|
pos += 1 |
|
|
|
|
|
if wire_type == 0: |
|
|
val, pos = read_varint(data, pos) |
|
|
fields.append((field_num, 'varint', val, None)) |
|
|
elif wire_type == 2: |
|
|
length, pos = read_varint(data, pos) |
|
|
if length > len(data) - pos or length < 0: |
|
|
fields.append((field_num, 'len-delim', length, 'OVERFLOW')) |
|
|
break |
|
|
preview = data[pos:pos+min(length, 100)] |
|
|
pos += length |
|
|
fields.append((field_num, 'len-delim', length, preview)) |
|
|
elif wire_type == 1: |
|
|
val = data[pos:pos+8] |
|
|
pos += 8 |
|
|
fields.append((field_num, '64bit', int.from_bytes(val, 'little'), None)) |
|
|
elif wire_type == 5: |
|
|
val = data[pos:pos+4] |
|
|
pos += 4 |
|
|
fields.append((field_num, '32bit', int.from_bytes(val, 'little'), None)) |
|
|
else: |
|
|
fields.append((field_num, f'wire{wire_type}', 0, 'UNKNOWN')) |
|
|
break |
|
|
return fields |
|
|
|
|
|
|
|
|
files = sorted( |
|
|
[f for f in EXTRACT_DIR.glob("*.bin") if "0x271a" in f.name], |
|
|
key=lambda f: f.stat().st_size, |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
print("=" * 70) |
|
|
print("PROTOBUF STRUCTURE ANALYSIS of largest heap files") |
|
|
print("=" * 70) |
|
|
|
|
|
for f in files[:10]: |
|
|
data = open(f, 'rb').read(2048) |
|
|
size = f.stat().st_size |
|
|
print(f"\n{f.name} ({size//1024}KB):") |
|
|
print(f" First 32 bytes: {data[:32].hex()}") |
|
|
|
|
|
fields = parse_protobuf_fields(data) |
|
|
for fn, wt, val, preview in fields: |
|
|
if wt == 'varint': |
|
|
print(f" field={fn} {wt} value={val}") |
|
|
elif wt == 'len-delim': |
|
|
if preview == 'OVERFLOW': |
|
|
print(f" field={fn} {wt} length={val} OVERFLOW!") |
|
|
elif val < 200 and preview: |
|
|
try: |
|
|
txt = preview.decode('utf-8', errors='replace') |
|
|
printable = all(c.isprintable() or c in '\n\r\t' for c in txt[:50]) |
|
|
if printable and len(txt) > 0: |
|
|
print(f" field={fn} {wt} length={val} text='{txt[:80]}'") |
|
|
else: |
|
|
print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}") |
|
|
except: |
|
|
print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}") |
|
|
else: |
|
|
if preview: |
|
|
print(f" field={fn} {wt} length={val} first_bytes={preview[:20].hex()}") |
|
|
else: |
|
|
print(f" field={fn} {wt} length={val}") |
|
|
else: |
|
|
print(f" field={fn} {wt} value={val}") |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("CHECKING MID-SIZED FILES (100KB - 2MB range)") |
|
|
print("=" * 70) |
|
|
|
|
|
mid_files = sorted( |
|
|
[f for f in EXTRACT_DIR.glob("*.bin") |
|
|
if "0x271a" in f.name and 100*1024 < f.stat().st_size < 2*1024*1024], |
|
|
key=lambda f: f.stat().st_size, |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
import onnx |
|
|
valid_count = 0 |
|
|
for f in mid_files[:100]: |
|
|
try: |
|
|
m = onnx.load(str(f)) |
|
|
valid_count += 1 |
|
|
print(f" VALID: {f.name} ({f.stat().st_size//1024}KB)") |
|
|
print(f" ir={m.ir_version} producer='{m.producer_name}' " |
|
|
f"graph='{m.graph.name}' nodes={len(m.graph.node)}") |
|
|
except: |
|
|
pass |
|
|
|
|
|
if valid_count == 0: |
|
|
print(" No valid ONNX models in mid-range files either.") |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("CHECKING FOR INTERNAL ONNX BOUNDARIES IN LARGEST FILE") |
|
|
print("=" * 70) |
|
|
|
|
|
biggest = files[0] |
|
|
data = open(biggest, 'rb').read() |
|
|
print(f"File: {biggest.name}, total size: {len(data)} bytes") |
|
|
|
|
|
|
|
|
import re |
|
|
|
|
|
pattern = re.compile(b'\\x08[\\x03-\\x09]\\x12') |
|
|
matches = [(m.start(), data[m.start()+1]) for m in pattern.finditer(data[:1000])] |
|
|
print(f"ONNX-like headers in first 1000 bytes: {len(matches)}") |
|
|
for offset, ir in matches[:10]: |
|
|
print(f" offset={offset}: ir_version={ir}") |
|
|
|
|
|
|
|
|
for needle in [b'ONNX', b'onnx', b'graph', b'Conv', b'Relu', b'BatchNorm', b'MatMul']: |
|
|
positions = [m.start() for m in re.finditer(re.escape(needle), data[:50000])] |
|
|
if positions: |
|
|
print(f" Found '{needle.decode()}' at offsets: {positions[:5]}") |
|
|
|