"""Manually parse protobuf structure of extracted files.""" from pathlib import Path EXTRACT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\extracted_models") def read_varint(data, pos): val = 0 shift = 0 while pos < len(data): b = data[pos] pos += 1 val |= (b & 0x7f) << shift if not (b & 0x80): break shift += 7 return val, pos def parse_protobuf_fields(data, max_fields=10): """Parse protobuf wire format and return field info.""" pos = 0 fields = [] for _ in range(max_fields): if pos >= len(data): break tag_byte = data[pos] field_num = tag_byte >> 3 wire_type = tag_byte & 0x07 pos += 1 if wire_type == 0: # varint val, pos = read_varint(data, pos) fields.append((field_num, 'varint', val, None)) elif wire_type == 2: # length-delimited length, pos = read_varint(data, pos) if length > len(data) - pos or length < 0: fields.append((field_num, 'len-delim', length, 'OVERFLOW')) break preview = data[pos:pos+min(length, 100)] pos += length fields.append((field_num, 'len-delim', length, preview)) elif wire_type == 1: # 64-bit val = data[pos:pos+8] pos += 8 fields.append((field_num, '64bit', int.from_bytes(val, 'little'), None)) elif wire_type == 5: # 32-bit val = data[pos:pos+4] pos += 4 fields.append((field_num, '32bit', int.from_bytes(val, 'little'), None)) else: fields.append((field_num, f'wire{wire_type}', 0, 'UNKNOWN')) break return fields # Check top 10 largest heap files files = sorted( [f for f in EXTRACT_DIR.glob("*.bin") if "0x271a" in f.name], key=lambda f: f.stat().st_size, reverse=True ) print("=" * 70) print("PROTOBUF STRUCTURE ANALYSIS of largest heap files") print("=" * 70) for f in files[:10]: data = open(f, 'rb').read(2048) size = f.stat().st_size print(f"\n{f.name} ({size//1024}KB):") print(f" First 32 bytes: {data[:32].hex()}") fields = parse_protobuf_fields(data) for fn, wt, val, preview in fields: if wt == 'varint': print(f" field={fn} {wt} value={val}") elif wt == 'len-delim': if preview == 'OVERFLOW': print(f" field={fn} {wt} length={val} OVERFLOW!") elif val < 200 and preview: try: txt = preview.decode('utf-8', errors='replace') printable = all(c.isprintable() or c in '\n\r\t' for c in txt[:50]) if printable and len(txt) > 0: print(f" field={fn} {wt} length={val} text='{txt[:80]}'") else: print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}") except: print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}") else: if preview: print(f" field={fn} {wt} length={val} first_bytes={preview[:20].hex()}") else: print(f" field={fn} {wt} length={val}") else: print(f" field={fn} {wt} value={val}") # Also check a mid-sized file that might be a complete model print("\n" + "=" * 70) print("CHECKING MID-SIZED FILES (100KB - 2MB range)") print("=" * 70) mid_files = sorted( [f for f in EXTRACT_DIR.glob("*.bin") if "0x271a" in f.name and 100*1024 < f.stat().st_size < 2*1024*1024], key=lambda f: f.stat().st_size, reverse=True ) import onnx valid_count = 0 for f in mid_files[:100]: try: m = onnx.load(str(f)) valid_count += 1 print(f" VALID: {f.name} ({f.stat().st_size//1024}KB)") print(f" ir={m.ir_version} producer='{m.producer_name}' " f"graph='{m.graph.name}' nodes={len(m.graph.node)}") except: pass if valid_count == 0: print(" No valid ONNX models in mid-range files either.") # Check if the largest files might be a container/archive print("\n" + "=" * 70) print("CHECKING FOR INTERNAL ONNX BOUNDARIES IN LARGEST FILE") print("=" * 70) biggest = files[0] data = open(biggest, 'rb').read() print(f"File: {biggest.name}, total size: {len(data)} bytes") # Search for all occurrences of valid ONNX-like starts import re # Look for 0x08 [3-9] 0x12 pattern (ir_version + field2) pattern = re.compile(b'\\x08[\\x03-\\x09]\\x12') matches = [(m.start(), data[m.start()+1]) for m in pattern.finditer(data[:1000])] print(f"ONNX-like headers in first 1000 bytes: {len(matches)}") for offset, ir in matches[:10]: print(f" offset={offset}: ir_version={ir}") # Also search for "ONNX" string, "onnx" string, "graph" string for needle in [b'ONNX', b'onnx', b'graph', b'Conv', b'Relu', b'BatchNorm', b'MatMul']: positions = [m.start() for m in re.finditer(re.escape(needle), data[:50000])] if positions: print(f" Found '{needle.decode()}' at offsets: {positions[:5]}")