""" OneOCR Model Extraction via Runtime Memory Dump. Strategy: Load the OCR pipeline (which decrypts the model internally), then scan our own process memory for ONNX/protobuf patterns and dump them. Since oneocr.dll decrypts and decompresses models into memory during CreateOcrPipeline, we can capture them by scanning process memory. """ import ctypes import ctypes.wintypes as wintypes import struct import os import sys import time from pathlib import Path from collections import Counter import math # ═══════════════════════════════════════════════════════════════ # Constants # ═══════════════════════════════════════════════════════════════ OCR_DATA_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data") DLL_PATH = str(OCR_DATA_DIR / "oneocr.dll") ORT_DLL_PATH = str(OCR_DATA_DIR / "onnxruntime.dll") MODEL_PATH = str(OCR_DATA_DIR / "oneocr.onemodel") KEY = 'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\extracted_models") # ═══════════════════════════════════════════════════════════════ # Windows API # ═══════════════════════════════════════════════════════════════ kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) class MEMORY_BASIC_INFORMATION(ctypes.Structure): _fields_ = [ ("BaseAddress", ctypes.c_void_p), ("AllocationBase", ctypes.c_void_p), ("AllocationProtect", wintypes.DWORD), ("RegionSize", ctypes.c_size_t), ("State", wintypes.DWORD), ("Protect", wintypes.DWORD), ("Type", wintypes.DWORD), ] MEM_COMMIT = 0x1000 PAGE_NOACCESS = 0x01 PAGE_GUARD = 0x100 def entropy(data: bytes) -> float: if not data: return 0.0 freq = Counter(data) total = len(data) return -sum((c / total) * math.log2(c / total) for c in freq.values()) def scan_memory_regions(): """Enumerate all committed, readable memory regions.""" regions = [] handle = kernel32.GetCurrentProcess() mbi = MEMORY_BASIC_INFORMATION() address = 0 max_addr = (1 << 47) - 1 while address < max_addr: result = kernel32.VirtualQuery( ctypes.c_void_p(address), ctypes.byref(mbi), ctypes.sizeof(mbi) ) if result == 0: break base_addr = mbi.BaseAddress or 0 region_size = mbi.RegionSize or 0 if region_size == 0: break if (mbi.State == MEM_COMMIT and mbi.Protect not in (0, PAGE_NOACCESS, PAGE_GUARD) and not (mbi.Protect & PAGE_GUARD)): regions.append((base_addr, region_size)) new_address = base_addr + region_size if new_address <= address: break address = new_address return regions def read_mem(address, size): """Read memory from current process - direct access since it's our own memory.""" try: return ctypes.string_at(address, size) except Exception: # Fallback to ReadProcessMemory try: buf = (ctypes.c_ubyte * size)() n = ctypes.c_size_t(0) handle = kernel32.GetCurrentProcess() ok = kernel32.ReadProcessMemory( handle, ctypes.c_void_p(address), buf, size, ctypes.byref(n) ) if ok and n.value > 0: return bytes(buf[:n.value]) except Exception: pass return None # ═══════════════════════════════════════════════════════════════ # Step 1: Snapshot BEFORE loading OCR # ═══════════════════════════════════════════════════════════════ print("=" * 80) print("OneOCR Model Extraction via Runtime Memory Dump") print("=" * 80) print("\n[1/5] Memory snapshot BEFORE OCR load...") before = set() before_data = {} for base, size in scan_memory_regions(): before.add(base) # Store hash of small regions for change detection if size <= 65536: d = read_mem(base, size) if d: before_data[base] = hash(d) print(f" {len(before)} regions before") # ═══════════════════════════════════════════════════════════════ # Step 2: Load DLLs # ═══════════════════════════════════════════════════════════════ print("\n[2/5] Loading DLLs...") os.add_dll_directory(str(OCR_DATA_DIR)) os.environ["PATH"] = str(OCR_DATA_DIR) + ";" + os.environ.get("PATH", "") ort_dll = ctypes.WinDLL(ORT_DLL_PATH) print(f" OK: onnxruntime.dll") ocr_dll = ctypes.WinDLL(DLL_PATH) print(f" OK: oneocr.dll") # ═══════════════════════════════════════════════════════════════ # Step 3: Init OCR pipeline (triggers decryption) # ═══════════════════════════════════════════════════════════════ print("\n[3/5] Creating OCR pipeline (decrypts model)...") CreateOcrInitOptions = ocr_dll.CreateOcrInitOptions CreateOcrInitOptions.restype = ctypes.c_int64 CreateOcrInitOptions.argtypes = [ctypes.POINTER(ctypes.c_int64)] OcrInitOptionsSetUseModelDelayLoad = ocr_dll.OcrInitOptionsSetUseModelDelayLoad OcrInitOptionsSetUseModelDelayLoad.restype = ctypes.c_int64 OcrInitOptionsSetUseModelDelayLoad.argtypes = [ctypes.c_int64, ctypes.c_char] CreateOcrPipeline = ocr_dll.CreateOcrPipeline CreateOcrPipeline.restype = ctypes.c_int64 CreateOcrPipeline.argtypes = [ctypes.c_int64, ctypes.c_int64, ctypes.c_int64, ctypes.POINTER(ctypes.c_int64)] ctx = ctypes.c_int64(0) res = CreateOcrInitOptions(ctypes.byref(ctx)) assert res == 0, f"CreateOcrInitOptions failed: {res}" # Disable delay load → load ALL models immediately res = OcrInitOptionsSetUseModelDelayLoad(ctx, ctypes.c_char(0)) assert res == 0, f"SetUseModelDelayLoad failed: {res}" model_path_c = ctypes.c_char_p(MODEL_PATH.encode("utf-8")) key_c = ctypes.c_char_p(KEY.encode("utf-8")) pipeline = ctypes.c_int64(0) res = CreateOcrPipeline( ctypes.cast(model_path_c, ctypes.c_void_p).value, ctypes.cast(key_c, ctypes.c_void_p).value, ctx.value, ctypes.byref(pipeline) ) if res != 0: print(f" ERROR: CreateOcrPipeline returned {res}") sys.exit(1) print(f" Pipeline OK! handle=0x{pipeline.value:x}") time.sleep(0.5) # ═══════════════════════════════════════════════════════════════ # Step 4: Find new/changed memory regions & search for ONNX # ═══════════════════════════════════════════════════════════════ print("\n[4/5] Scanning process memory for ONNX models...") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) after_regions = scan_memory_regions() new_regions = [(b, s) for b, s in after_regions if b not in before] print(f" Total regions after: {len(after_regions)}") print(f" New regions: {len(new_regions)}") # Size distribution of new regions new_large = [(b, s) for b, s in new_regions if s >= 1024*1024] new_total = sum(s for _, s in new_regions) print(f" New large regions (>1MB): {len(new_large)}") print(f" Total new memory: {new_total/1024/1024:.1f} MB") found = [] # ONNX protobuf field patterns for start of file # ir_version (field 1, varint) followed by opset_import (field 2, len-delimited) # or producer_name (field 2, len-delimited) etc. # Search patterns PATTERNS = [ b"\x08\x07\x12", # ir_v=7, then field 2 b"\x08\x08\x12", # ir_v=8 b"\x08\x06\x12", # ir_v=6 b"\x08\x05\x12", # ir_v=5 b"\x08\x04\x12", # ir_v=4 b"\x08\x03\x12", # ir_v=3 b"\x08\x09\x12", # ir_v=9 b"ORTM", # ORT model format b"ONNX", # Just in case b"\x08\x07\x1a", # ir_v=7, field 3 b"\x08\x08\x1a", # ir_v=8, field 3 ] # Scan ALL new large regions for ridx, (base, size) in enumerate(sorted(new_regions, key=lambda x: x[1], reverse=True)): if size < 4096: continue read_size = min(size, 200 * 1024 * 1024) data = read_mem(base, read_size) if not data: continue # Check entropy of first 4KB ent = entropy(data[:4096]) uniq = len(set(data[:4096])) if size >= 100000: # Log large regions regardless print(f" Region 0x{base:x} size={size:,} ent={ent:.2f} uniq={uniq}/256 first={data[:16].hex()}") # Search for patterns for pattern in PATTERNS: offset = 0 while True: idx = data.find(pattern, offset) if idx < 0: break # Validate: check surrounding context chunk = data[idx:idx+min(4096, len(data)-idx)] chunk_ent = entropy(chunk[:1024]) if len(chunk) >= 1024 else entropy(chunk) # Valid models should have moderate entropy (not encrypted high-entropy) if chunk_ent < 7.5 and len(chunk) > 64: addr = base + idx remaining = len(data) - idx found.append({ "addr": addr, "base": base, "offset": idx, "size": remaining, "pattern": pattern.hex(), "ent": chunk_ent, "first_32": data[idx:idx+32].hex(), }) print(f" ★ ONNX candidate at 0x{addr:x}: pattern={pattern.hex()} " f"ent={chunk_ent:.2f} remaining={remaining:,}") print(f" First 32: {data[idx:idx+32].hex()}") offset = idx + len(pattern) print(f"\n Found {len(found)} ONNX candidates total") # ═══════════════════════════════════════════════════════════════ # Step 5: Dump candidates # ═══════════════════════════════════════════════════════════════ print("\n[5/5] Dumping models...") if found: # Deduplicate by address seen = set() for i, m in enumerate(found): if m["addr"] in seen: continue seen.add(m["addr"]) dump_size = min(m["size"], 100 * 1024 * 1024) data = read_mem(m["addr"], dump_size) if data: fname = f"onnx_{i}_0x{m['addr']:x}_{dump_size//1024}KB.bin" out = OUTPUT_DIR / fname with open(out, "wb") as f: f.write(data) print(f" Saved: {fname} ({len(data):,} bytes)") else: print(" No ONNX patterns found. Dumping ALL large new regions (>1MB)...") for i, (base, size) in enumerate(new_large): data = read_mem(base, min(size, 200*1024*1024)) if data: ent = entropy(data[:4096]) fname = f"region_{i}_0x{base:x}_{size//1024//1024}MB_ent{ent:.1f}.bin" out = OUTPUT_DIR / fname with open(out, "wb") as f: f.write(data) print(f" Saved: {fname} ({len(data):,} bytes, ent={ent:.2f})") # Summary print("\n" + "=" * 80) print("RESULTS") print("=" * 80) if OUTPUT_DIR.exists(): files = sorted(OUTPUT_DIR.iterdir()) if files: total_size = sum(f.stat().st_size for f in files) print(f"\nExtracted {len(files)} files ({total_size/1024/1024:.1f} MB):") for f in files: sz = f.stat().st_size # Quick check if it's ONNX with open(f, "rb") as fh: header = fh.read(32) print(f" {f.name}: {sz:,} bytes | first_16={header[:16].hex()}") else: print("\nNo files extracted.")