oneocr / _archive /analysis /analyze_extracted.py
OneOCR Dev
OneOCR - reverse engineering complete, ONNX pipeline 53% match rate
ce847d4
"""Manually parse protobuf structure of extracted files."""
from pathlib import Path
EXTRACT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\extracted_models")
def read_varint(data, pos):
val = 0
shift = 0
while pos < len(data):
b = data[pos]
pos += 1
val |= (b & 0x7f) << shift
if not (b & 0x80):
break
shift += 7
return val, pos
def parse_protobuf_fields(data, max_fields=10):
"""Parse protobuf wire format and return field info."""
pos = 0
fields = []
for _ in range(max_fields):
if pos >= len(data):
break
tag_byte = data[pos]
field_num = tag_byte >> 3
wire_type = tag_byte & 0x07
pos += 1
if wire_type == 0: # varint
val, pos = read_varint(data, pos)
fields.append((field_num, 'varint', val, None))
elif wire_type == 2: # length-delimited
length, pos = read_varint(data, pos)
if length > len(data) - pos or length < 0:
fields.append((field_num, 'len-delim', length, 'OVERFLOW'))
break
preview = data[pos:pos+min(length, 100)]
pos += length
fields.append((field_num, 'len-delim', length, preview))
elif wire_type == 1: # 64-bit
val = data[pos:pos+8]
pos += 8
fields.append((field_num, '64bit', int.from_bytes(val, 'little'), None))
elif wire_type == 5: # 32-bit
val = data[pos:pos+4]
pos += 4
fields.append((field_num, '32bit', int.from_bytes(val, 'little'), None))
else:
fields.append((field_num, f'wire{wire_type}', 0, 'UNKNOWN'))
break
return fields
# Check top 10 largest heap files
files = sorted(
[f for f in EXTRACT_DIR.glob("*.bin") if "0x271a" in f.name],
key=lambda f: f.stat().st_size,
reverse=True
)
print("=" * 70)
print("PROTOBUF STRUCTURE ANALYSIS of largest heap files")
print("=" * 70)
for f in files[:10]:
data = open(f, 'rb').read(2048)
size = f.stat().st_size
print(f"\n{f.name} ({size//1024}KB):")
print(f" First 32 bytes: {data[:32].hex()}")
fields = parse_protobuf_fields(data)
for fn, wt, val, preview in fields:
if wt == 'varint':
print(f" field={fn} {wt} value={val}")
elif wt == 'len-delim':
if preview == 'OVERFLOW':
print(f" field={fn} {wt} length={val} OVERFLOW!")
elif val < 200 and preview:
try:
txt = preview.decode('utf-8', errors='replace')
printable = all(c.isprintable() or c in '\n\r\t' for c in txt[:50])
if printable and len(txt) > 0:
print(f" field={fn} {wt} length={val} text='{txt[:80]}'")
else:
print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}")
except:
print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}")
else:
if preview:
print(f" field={fn} {wt} length={val} first_bytes={preview[:20].hex()}")
else:
print(f" field={fn} {wt} length={val}")
else:
print(f" field={fn} {wt} value={val}")
# Also check a mid-sized file that might be a complete model
print("\n" + "=" * 70)
print("CHECKING MID-SIZED FILES (100KB - 2MB range)")
print("=" * 70)
mid_files = sorted(
[f for f in EXTRACT_DIR.glob("*.bin")
if "0x271a" in f.name and 100*1024 < f.stat().st_size < 2*1024*1024],
key=lambda f: f.stat().st_size,
reverse=True
)
import onnx
valid_count = 0
for f in mid_files[:100]:
try:
m = onnx.load(str(f))
valid_count += 1
print(f" VALID: {f.name} ({f.stat().st_size//1024}KB)")
print(f" ir={m.ir_version} producer='{m.producer_name}' "
f"graph='{m.graph.name}' nodes={len(m.graph.node)}")
except:
pass
if valid_count == 0:
print(" No valid ONNX models in mid-range files either.")
# Check if the largest files might be a container/archive
print("\n" + "=" * 70)
print("CHECKING FOR INTERNAL ONNX BOUNDARIES IN LARGEST FILE")
print("=" * 70)
biggest = files[0]
data = open(biggest, 'rb').read()
print(f"File: {biggest.name}, total size: {len(data)} bytes")
# Search for all occurrences of valid ONNX-like starts
import re
# Look for 0x08 [3-9] 0x12 pattern (ir_version + field2)
pattern = re.compile(b'\\x08[\\x03-\\x09]\\x12')
matches = [(m.start(), data[m.start()+1]) for m in pattern.finditer(data[:1000])]
print(f"ONNX-like headers in first 1000 bytes: {len(matches)}")
for offset, ir in matches[:10]:
print(f" offset={offset}: ir_version={ir}")
# Also search for "ONNX" string, "onnx" string, "graph" string
for needle in [b'ONNX', b'onnx', b'graph', b'Conv', b'Relu', b'BatchNorm', b'MatMul']:
positions = [m.start() for m in re.finditer(re.escape(needle), data[:50000])]
if positions:
print(f" Found '{needle.decode()}' at offsets: {positions[:5]}")