|
|
""" |
|
|
OneOCR Model Extraction via Runtime Memory Dump. |
|
|
|
|
|
Strategy: Load the OCR pipeline (which decrypts the model internally), |
|
|
then scan our own process memory for ONNX/protobuf patterns and dump them. |
|
|
|
|
|
Since oneocr.dll decrypts and decompresses models into memory during |
|
|
CreateOcrPipeline, we can capture them by scanning process memory. |
|
|
""" |
|
|
|
|
|
import ctypes |
|
|
import ctypes.wintypes as wintypes |
|
|
import struct |
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from pathlib import Path |
|
|
from collections import Counter |
|
|
import math |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
OCR_DATA_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data") |
|
|
DLL_PATH = str(OCR_DATA_DIR / "oneocr.dll") |
|
|
ORT_DLL_PATH = str(OCR_DATA_DIR / "onnxruntime.dll") |
|
|
MODEL_PATH = str(OCR_DATA_DIR / "oneocr.onemodel") |
|
|
KEY = 'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\extracted_models") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
|
|
|
|
|
class MEMORY_BASIC_INFORMATION(ctypes.Structure): |
|
|
_fields_ = [ |
|
|
("BaseAddress", ctypes.c_void_p), |
|
|
("AllocationBase", ctypes.c_void_p), |
|
|
("AllocationProtect", wintypes.DWORD), |
|
|
("RegionSize", ctypes.c_size_t), |
|
|
("State", wintypes.DWORD), |
|
|
("Protect", wintypes.DWORD), |
|
|
("Type", wintypes.DWORD), |
|
|
] |
|
|
|
|
|
MEM_COMMIT = 0x1000 |
|
|
PAGE_NOACCESS = 0x01 |
|
|
PAGE_GUARD = 0x100 |
|
|
|
|
|
|
|
|
def entropy(data: bytes) -> float: |
|
|
if not data: |
|
|
return 0.0 |
|
|
freq = Counter(data) |
|
|
total = len(data) |
|
|
return -sum((c / total) * math.log2(c / total) for c in freq.values()) |
|
|
|
|
|
|
|
|
def scan_memory_regions(): |
|
|
"""Enumerate all committed, readable memory regions.""" |
|
|
regions = [] |
|
|
handle = kernel32.GetCurrentProcess() |
|
|
mbi = MEMORY_BASIC_INFORMATION() |
|
|
address = 0 |
|
|
max_addr = (1 << 47) - 1 |
|
|
|
|
|
while address < max_addr: |
|
|
result = kernel32.VirtualQuery( |
|
|
ctypes.c_void_p(address), |
|
|
ctypes.byref(mbi), |
|
|
ctypes.sizeof(mbi) |
|
|
) |
|
|
if result == 0: |
|
|
break |
|
|
|
|
|
base_addr = mbi.BaseAddress or 0 |
|
|
region_size = mbi.RegionSize or 0 |
|
|
|
|
|
if region_size == 0: |
|
|
break |
|
|
|
|
|
if (mbi.State == MEM_COMMIT and |
|
|
mbi.Protect not in (0, PAGE_NOACCESS, PAGE_GUARD) and |
|
|
not (mbi.Protect & PAGE_GUARD)): |
|
|
regions.append((base_addr, region_size)) |
|
|
|
|
|
new_address = base_addr + region_size |
|
|
if new_address <= address: |
|
|
break |
|
|
address = new_address |
|
|
return regions |
|
|
|
|
|
|
|
|
def read_mem(address, size): |
|
|
"""Read memory from current process - direct access since it's our own memory.""" |
|
|
try: |
|
|
return ctypes.string_at(address, size) |
|
|
except Exception: |
|
|
|
|
|
try: |
|
|
buf = (ctypes.c_ubyte * size)() |
|
|
n = ctypes.c_size_t(0) |
|
|
handle = kernel32.GetCurrentProcess() |
|
|
ok = kernel32.ReadProcessMemory( |
|
|
handle, ctypes.c_void_p(address), buf, size, ctypes.byref(n) |
|
|
) |
|
|
if ok and n.value > 0: |
|
|
return bytes(buf[:n.value]) |
|
|
except Exception: |
|
|
pass |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("=" * 80) |
|
|
print("OneOCR Model Extraction via Runtime Memory Dump") |
|
|
print("=" * 80) |
|
|
|
|
|
print("\n[1/5] Memory snapshot BEFORE OCR load...") |
|
|
before = set() |
|
|
before_data = {} |
|
|
for base, size in scan_memory_regions(): |
|
|
before.add(base) |
|
|
|
|
|
if size <= 65536: |
|
|
d = read_mem(base, size) |
|
|
if d: |
|
|
before_data[base] = hash(d) |
|
|
print(f" {len(before)} regions before") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\n[2/5] Loading DLLs...") |
|
|
os.add_dll_directory(str(OCR_DATA_DIR)) |
|
|
os.environ["PATH"] = str(OCR_DATA_DIR) + ";" + os.environ.get("PATH", "") |
|
|
|
|
|
ort_dll = ctypes.WinDLL(ORT_DLL_PATH) |
|
|
print(f" OK: onnxruntime.dll") |
|
|
|
|
|
ocr_dll = ctypes.WinDLL(DLL_PATH) |
|
|
print(f" OK: oneocr.dll") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\n[3/5] Creating OCR pipeline (decrypts model)...") |
|
|
|
|
|
CreateOcrInitOptions = ocr_dll.CreateOcrInitOptions |
|
|
CreateOcrInitOptions.restype = ctypes.c_int64 |
|
|
CreateOcrInitOptions.argtypes = [ctypes.POINTER(ctypes.c_int64)] |
|
|
|
|
|
OcrInitOptionsSetUseModelDelayLoad = ocr_dll.OcrInitOptionsSetUseModelDelayLoad |
|
|
OcrInitOptionsSetUseModelDelayLoad.restype = ctypes.c_int64 |
|
|
OcrInitOptionsSetUseModelDelayLoad.argtypes = [ctypes.c_int64, ctypes.c_char] |
|
|
|
|
|
CreateOcrPipeline = ocr_dll.CreateOcrPipeline |
|
|
CreateOcrPipeline.restype = ctypes.c_int64 |
|
|
CreateOcrPipeline.argtypes = [ctypes.c_int64, ctypes.c_int64, ctypes.c_int64, ctypes.POINTER(ctypes.c_int64)] |
|
|
|
|
|
ctx = ctypes.c_int64(0) |
|
|
res = CreateOcrInitOptions(ctypes.byref(ctx)) |
|
|
assert res == 0, f"CreateOcrInitOptions failed: {res}" |
|
|
|
|
|
|
|
|
res = OcrInitOptionsSetUseModelDelayLoad(ctx, ctypes.c_char(0)) |
|
|
assert res == 0, f"SetUseModelDelayLoad failed: {res}" |
|
|
|
|
|
model_path_c = ctypes.c_char_p(MODEL_PATH.encode("utf-8")) |
|
|
key_c = ctypes.c_char_p(KEY.encode("utf-8")) |
|
|
|
|
|
pipeline = ctypes.c_int64(0) |
|
|
res = CreateOcrPipeline( |
|
|
ctypes.cast(model_path_c, ctypes.c_void_p).value, |
|
|
ctypes.cast(key_c, ctypes.c_void_p).value, |
|
|
ctx.value, |
|
|
ctypes.byref(pipeline) |
|
|
) |
|
|
|
|
|
if res != 0: |
|
|
print(f" ERROR: CreateOcrPipeline returned {res}") |
|
|
sys.exit(1) |
|
|
|
|
|
print(f" Pipeline OK! handle=0x{pipeline.value:x}") |
|
|
time.sleep(0.5) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\n[4/5] Scanning process memory for ONNX models...") |
|
|
|
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
after_regions = scan_memory_regions() |
|
|
new_regions = [(b, s) for b, s in after_regions if b not in before] |
|
|
print(f" Total regions after: {len(after_regions)}") |
|
|
print(f" New regions: {len(new_regions)}") |
|
|
|
|
|
|
|
|
new_large = [(b, s) for b, s in new_regions if s >= 1024*1024] |
|
|
new_total = sum(s for _, s in new_regions) |
|
|
print(f" New large regions (>1MB): {len(new_large)}") |
|
|
print(f" Total new memory: {new_total/1024/1024:.1f} MB") |
|
|
|
|
|
found = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PATTERNS = [ |
|
|
b"\x08\x07\x12", |
|
|
b"\x08\x08\x12", |
|
|
b"\x08\x06\x12", |
|
|
b"\x08\x05\x12", |
|
|
b"\x08\x04\x12", |
|
|
b"\x08\x03\x12", |
|
|
b"\x08\x09\x12", |
|
|
b"ORTM", |
|
|
b"ONNX", |
|
|
b"\x08\x07\x1a", |
|
|
b"\x08\x08\x1a", |
|
|
] |
|
|
|
|
|
|
|
|
for ridx, (base, size) in enumerate(sorted(new_regions, key=lambda x: x[1], reverse=True)): |
|
|
if size < 4096: |
|
|
continue |
|
|
|
|
|
read_size = min(size, 200 * 1024 * 1024) |
|
|
data = read_mem(base, read_size) |
|
|
if not data: |
|
|
continue |
|
|
|
|
|
|
|
|
ent = entropy(data[:4096]) |
|
|
uniq = len(set(data[:4096])) |
|
|
|
|
|
if size >= 100000: |
|
|
|
|
|
print(f" Region 0x{base:x} size={size:,} ent={ent:.2f} uniq={uniq}/256 first={data[:16].hex()}") |
|
|
|
|
|
|
|
|
for pattern in PATTERNS: |
|
|
offset = 0 |
|
|
while True: |
|
|
idx = data.find(pattern, offset) |
|
|
if idx < 0: |
|
|
break |
|
|
|
|
|
|
|
|
chunk = data[idx:idx+min(4096, len(data)-idx)] |
|
|
chunk_ent = entropy(chunk[:1024]) if len(chunk) >= 1024 else entropy(chunk) |
|
|
|
|
|
|
|
|
if chunk_ent < 7.5 and len(chunk) > 64: |
|
|
addr = base + idx |
|
|
remaining = len(data) - idx |
|
|
found.append({ |
|
|
"addr": addr, |
|
|
"base": base, |
|
|
"offset": idx, |
|
|
"size": remaining, |
|
|
"pattern": pattern.hex(), |
|
|
"ent": chunk_ent, |
|
|
"first_32": data[idx:idx+32].hex(), |
|
|
}) |
|
|
print(f" ★ ONNX candidate at 0x{addr:x}: pattern={pattern.hex()} " |
|
|
f"ent={chunk_ent:.2f} remaining={remaining:,}") |
|
|
print(f" First 32: {data[idx:idx+32].hex()}") |
|
|
|
|
|
offset = idx + len(pattern) |
|
|
|
|
|
print(f"\n Found {len(found)} ONNX candidates total") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\n[5/5] Dumping models...") |
|
|
|
|
|
if found: |
|
|
|
|
|
seen = set() |
|
|
for i, m in enumerate(found): |
|
|
if m["addr"] in seen: |
|
|
continue |
|
|
seen.add(m["addr"]) |
|
|
|
|
|
dump_size = min(m["size"], 100 * 1024 * 1024) |
|
|
data = read_mem(m["addr"], dump_size) |
|
|
if data: |
|
|
fname = f"onnx_{i}_0x{m['addr']:x}_{dump_size//1024}KB.bin" |
|
|
out = OUTPUT_DIR / fname |
|
|
with open(out, "wb") as f: |
|
|
f.write(data) |
|
|
print(f" Saved: {fname} ({len(data):,} bytes)") |
|
|
else: |
|
|
print(" No ONNX patterns found. Dumping ALL large new regions (>1MB)...") |
|
|
|
|
|
for i, (base, size) in enumerate(new_large): |
|
|
data = read_mem(base, min(size, 200*1024*1024)) |
|
|
if data: |
|
|
ent = entropy(data[:4096]) |
|
|
fname = f"region_{i}_0x{base:x}_{size//1024//1024}MB_ent{ent:.1f}.bin" |
|
|
out = OUTPUT_DIR / fname |
|
|
with open(out, "wb") as f: |
|
|
f.write(data) |
|
|
print(f" Saved: {fname} ({len(data):,} bytes, ent={ent:.2f})") |
|
|
|
|
|
|
|
|
print("\n" + "=" * 80) |
|
|
print("RESULTS") |
|
|
print("=" * 80) |
|
|
|
|
|
if OUTPUT_DIR.exists(): |
|
|
files = sorted(OUTPUT_DIR.iterdir()) |
|
|
if files: |
|
|
total_size = sum(f.stat().st_size for f in files) |
|
|
print(f"\nExtracted {len(files)} files ({total_size/1024/1024:.1f} MB):") |
|
|
for f in files: |
|
|
sz = f.stat().st_size |
|
|
|
|
|
with open(f, "rb") as fh: |
|
|
header = fh.read(32) |
|
|
print(f" {f.name}: {sz:,} bytes | first_16={header[:16].hex()}") |
|
|
else: |
|
|
print("\nNo files extracted.") |
|
|
|