File size: 5,164 Bytes
ce847d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
"""Manually parse protobuf structure of extracted files."""
from pathlib import Path
EXTRACT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\extracted_models")
def read_varint(data, pos):
val = 0
shift = 0
while pos < len(data):
b = data[pos]
pos += 1
val |= (b & 0x7f) << shift
if not (b & 0x80):
break
shift += 7
return val, pos
def parse_protobuf_fields(data, max_fields=10):
"""Parse protobuf wire format and return field info."""
pos = 0
fields = []
for _ in range(max_fields):
if pos >= len(data):
break
tag_byte = data[pos]
field_num = tag_byte >> 3
wire_type = tag_byte & 0x07
pos += 1
if wire_type == 0: # varint
val, pos = read_varint(data, pos)
fields.append((field_num, 'varint', val, None))
elif wire_type == 2: # length-delimited
length, pos = read_varint(data, pos)
if length > len(data) - pos or length < 0:
fields.append((field_num, 'len-delim', length, 'OVERFLOW'))
break
preview = data[pos:pos+min(length, 100)]
pos += length
fields.append((field_num, 'len-delim', length, preview))
elif wire_type == 1: # 64-bit
val = data[pos:pos+8]
pos += 8
fields.append((field_num, '64bit', int.from_bytes(val, 'little'), None))
elif wire_type == 5: # 32-bit
val = data[pos:pos+4]
pos += 4
fields.append((field_num, '32bit', int.from_bytes(val, 'little'), None))
else:
fields.append((field_num, f'wire{wire_type}', 0, 'UNKNOWN'))
break
return fields
# Check top 10 largest heap files
files = sorted(
[f for f in EXTRACT_DIR.glob("*.bin") if "0x271a" in f.name],
key=lambda f: f.stat().st_size,
reverse=True
)
print("=" * 70)
print("PROTOBUF STRUCTURE ANALYSIS of largest heap files")
print("=" * 70)
for f in files[:10]:
data = open(f, 'rb').read(2048)
size = f.stat().st_size
print(f"\n{f.name} ({size//1024}KB):")
print(f" First 32 bytes: {data[:32].hex()}")
fields = parse_protobuf_fields(data)
for fn, wt, val, preview in fields:
if wt == 'varint':
print(f" field={fn} {wt} value={val}")
elif wt == 'len-delim':
if preview == 'OVERFLOW':
print(f" field={fn} {wt} length={val} OVERFLOW!")
elif val < 200 and preview:
try:
txt = preview.decode('utf-8', errors='replace')
printable = all(c.isprintable() or c in '\n\r\t' for c in txt[:50])
if printable and len(txt) > 0:
print(f" field={fn} {wt} length={val} text='{txt[:80]}'")
else:
print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}")
except:
print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}")
else:
if preview:
print(f" field={fn} {wt} length={val} first_bytes={preview[:20].hex()}")
else:
print(f" field={fn} {wt} length={val}")
else:
print(f" field={fn} {wt} value={val}")
# Also check a mid-sized file that might be a complete model
print("\n" + "=" * 70)
print("CHECKING MID-SIZED FILES (100KB - 2MB range)")
print("=" * 70)
mid_files = sorted(
[f for f in EXTRACT_DIR.glob("*.bin")
if "0x271a" in f.name and 100*1024 < f.stat().st_size < 2*1024*1024],
key=lambda f: f.stat().st_size,
reverse=True
)
import onnx
valid_count = 0
for f in mid_files[:100]:
try:
m = onnx.load(str(f))
valid_count += 1
print(f" VALID: {f.name} ({f.stat().st_size//1024}KB)")
print(f" ir={m.ir_version} producer='{m.producer_name}' "
f"graph='{m.graph.name}' nodes={len(m.graph.node)}")
except:
pass
if valid_count == 0:
print(" No valid ONNX models in mid-range files either.")
# Check if the largest files might be a container/archive
print("\n" + "=" * 70)
print("CHECKING FOR INTERNAL ONNX BOUNDARIES IN LARGEST FILE")
print("=" * 70)
biggest = files[0]
data = open(biggest, 'rb').read()
print(f"File: {biggest.name}, total size: {len(data)} bytes")
# Search for all occurrences of valid ONNX-like starts
import re
# Look for 0x08 [3-9] 0x12 pattern (ir_version + field2)
pattern = re.compile(b'\\x08[\\x03-\\x09]\\x12')
matches = [(m.start(), data[m.start()+1]) for m in pattern.finditer(data[:1000])]
print(f"ONNX-like headers in first 1000 bytes: {len(matches)}")
for offset, ir in matches[:10]:
print(f" offset={offset}: ir_version={ir}")
# Also search for "ONNX" string, "onnx" string, "graph" string
for needle in [b'ONNX', b'onnx', b'graph', b'Conv', b'Relu', b'BatchNorm', b'MatMul']:
positions = [m.start() for m in re.finditer(re.escape(needle), data[:50000])]
if positions:
print(f" Found '{needle.decode()}' at offsets: {positions[:5]}")
|