|
|
""" |
|
|
Frida-based hooking of BCryptDecrypt in oneocr.dll to intercept decrypted ONNX models. |
|
|
|
|
|
Strategy: |
|
|
1. Load oneocr.dll in a child process |
|
|
2. Hook BCryptDecrypt in bcrypt.dll to capture decrypted output |
|
|
3. Call CreateOcrPipeline which triggers model decryption |
|
|
4. Save all decrypted buffers |
|
|
""" |
|
|
import frida |
|
|
import sys |
|
|
import os |
|
|
import struct |
|
|
import time |
|
|
import json |
|
|
import ctypes |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
|
|
|
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump") |
|
|
OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
FRIDA_SCRIPT = """ |
|
|
'use strict'; |
|
|
|
|
|
var MIN_SIZE = 100; |
|
|
var decryptCallNum = 0; |
|
|
|
|
|
// Hook BCryptDecrypt |
|
|
var bcryptDecrypt = Module.findExportByName('bcrypt.dll', 'BCryptDecrypt'); |
|
|
if (bcryptDecrypt) { |
|
|
Interceptor.attach(bcryptDecrypt, { |
|
|
onEnter: function(args) { |
|
|
this.pbInput = args[1]; |
|
|
this.cbInput = args[2].toInt32(); |
|
|
this.pbIV = args[4]; |
|
|
this.cbIV = args[5].toInt32(); |
|
|
this.pbOutput = args[6]; |
|
|
this.cbOutput = args[7].toInt32(); |
|
|
this.pcbResult = args[8]; |
|
|
this.dwFlags = args[9].toInt32(); |
|
|
this.callNum = decryptCallNum++; |
|
|
}, |
|
|
onLeave: function(retval) { |
|
|
var status = retval.toInt32(); |
|
|
var cbResult = 0; |
|
|
try { |
|
|
if (!this.pcbResult.isNull()) { |
|
|
cbResult = this.pcbResult.readU32(); |
|
|
} |
|
|
} catch(e) {} |
|
|
|
|
|
var info = { |
|
|
call: this.callNum, |
|
|
status: status, |
|
|
inputSize: this.cbInput, |
|
|
ivSize: this.cbIV, |
|
|
outputSize: cbResult, |
|
|
flags: this.dwFlags |
|
|
}; |
|
|
|
|
|
if (this.cbIV > 0 && !this.pbIV.isNull()) { |
|
|
try { |
|
|
info.iv = []; |
|
|
var ivBuf = this.pbIV.readByteArray(this.cbIV); |
|
|
var ivArr = new Uint8Array(ivBuf); |
|
|
for (var k = 0; k < ivArr.length; k++) info.iv.push(ivArr[k]); |
|
|
} catch(e) {} |
|
|
} |
|
|
|
|
|
send({type: 'decrypt_call', info: info}); |
|
|
|
|
|
if (status === 0 && cbResult >= MIN_SIZE && !this.pbOutput.isNull()) { |
|
|
try { |
|
|
var data = this.pbOutput.readByteArray(cbResult); |
|
|
send({type: 'decrypt_data', call: this.callNum, size: cbResult}, data); |
|
|
} catch(e) { |
|
|
send({type: 'log', msg: 'Read output failed: ' + e}); |
|
|
} |
|
|
} |
|
|
} |
|
|
}); |
|
|
send({type: 'log', msg: 'Hooked BCryptDecrypt at ' + bcryptDecrypt}); |
|
|
} else { |
|
|
send({type: 'log', msg: 'ERROR: BCryptDecrypt not found'}); |
|
|
} |
|
|
|
|
|
// Hook BCryptGenerateSymmetricKey |
|
|
var bcryptGenKey = Module.findExportByName('bcrypt.dll', 'BCryptGenerateSymmetricKey'); |
|
|
if (bcryptGenKey) { |
|
|
Interceptor.attach(bcryptGenKey, { |
|
|
onEnter: function(args) { |
|
|
this.pbSecret = args[3]; |
|
|
this.cbSecret = args[4].toInt32(); |
|
|
}, |
|
|
onLeave: function(retval) { |
|
|
if (retval.toInt32() === 0 && this.cbSecret > 0) { |
|
|
try { |
|
|
var keyBuf = this.pbSecret.readByteArray(this.cbSecret); |
|
|
var keyArr = new Uint8Array(keyBuf); |
|
|
var arr = []; |
|
|
for (var i = 0; i < keyArr.length; i++) arr.push(keyArr[i]); |
|
|
send({type: 'key_generated', size: this.cbSecret, key: arr}); |
|
|
} catch(e) {} |
|
|
} |
|
|
} |
|
|
}); |
|
|
send({type: 'log', msg: 'Hooked BCryptGenerateSymmetricKey'}); |
|
|
} |
|
|
|
|
|
// Hook BCryptSetProperty |
|
|
var bcryptSetProp = Module.findExportByName('bcrypt.dll', 'BCryptSetProperty'); |
|
|
if (bcryptSetProp) { |
|
|
Interceptor.attach(bcryptSetProp, { |
|
|
onEnter: function(args) { |
|
|
try { |
|
|
var propName = args[1].readUtf16String(); |
|
|
var cbInput = args[3].toInt32(); |
|
|
var propValue = null; |
|
|
if (cbInput > 0 && cbInput < 256 && !args[2].isNull()) { |
|
|
try { propValue = args[2].readUtf16String(); } catch(e2) {} |
|
|
} |
|
|
send({type: 'set_property', name: propName, value: propValue, size: cbInput}); |
|
|
} catch(e) {} |
|
|
} |
|
|
}); |
|
|
send({type: 'log', msg: 'Hooked BCryptSetProperty'}); |
|
|
} |
|
|
|
|
|
send({type: 'log', msg: 'All hooks installed. Ready.'}); |
|
|
""" |
|
|
|
|
|
|
|
|
def create_loader_script(): |
|
|
"""Create a small Python script that loads oneocr.dll and creates a pipeline.""" |
|
|
script = r''' |
|
|
import ctypes |
|
|
from ctypes import c_int64, c_char_p, POINTER, byref |
|
|
import time |
|
|
import sys |
|
|
import os |
|
|
|
|
|
DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data" |
|
|
MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel") |
|
|
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
|
|
|
# Load DLLs |
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
|
|
kernel32.SetDllDirectoryW(DLL_DIR) |
|
|
dll = ctypes.WinDLL(os.path.join(DLL_DIR, "oneocr.dll")) |
|
|
|
|
|
# Setup function types |
|
|
dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)] |
|
|
dll.CreateOcrInitOptions.restype = c_int64 |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, ctypes.c_char] |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64 |
|
|
dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)] |
|
|
dll.CreateOcrPipeline.restype = c_int64 |
|
|
|
|
|
# Create init options |
|
|
init_options = c_int64() |
|
|
ret = dll.CreateOcrInitOptions(byref(init_options)) |
|
|
print(f"LOADER: CreateOcrInitOptions -> {ret}", flush=True) |
|
|
assert ret == 0 |
|
|
|
|
|
ret = dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0) |
|
|
print(f"LOADER: SetUseModelDelayLoad -> {ret}", flush=True) |
|
|
assert ret == 0 |
|
|
|
|
|
# Create pipeline (this triggers decryption!) |
|
|
pipeline = c_int64() |
|
|
model_buf = ctypes.create_string_buffer(MODEL_PATH.encode()) |
|
|
key_buf = ctypes.create_string_buffer(KEY) |
|
|
|
|
|
print("LOADER: Creating OCR pipeline (triggers decryption)...", flush=True) |
|
|
ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline)) |
|
|
print(f"LOADER: CreateOcrPipeline returned {ret}, pipeline={pipeline.value}", flush=True) |
|
|
|
|
|
if ret != 0: |
|
|
print(f"LOADER: ERROR - return code {ret}", flush=True) |
|
|
sys.exit(1) |
|
|
|
|
|
print("LOADER: Pipeline created successfully! Waiting...", flush=True) |
|
|
time.sleep(5) |
|
|
print("LOADER: Done.", flush=True) |
|
|
''' |
|
|
loader_path = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_loader.py") |
|
|
loader_path.write_text(script) |
|
|
return loader_path |
|
|
|
|
|
|
|
|
def on_message(message, data): |
|
|
"""Handle messages from Frida script.""" |
|
|
if message['type'] == 'send': |
|
|
payload = message['payload'] |
|
|
msg_type = payload.get('type', '') |
|
|
|
|
|
if msg_type == 'log': |
|
|
print(f"[FRIDA] {payload['msg']}") |
|
|
|
|
|
elif msg_type == 'decrypt_call': |
|
|
info = payload['info'] |
|
|
iv_hex = '' |
|
|
if 'iv' in info: |
|
|
iv_hex = bytes(info['iv']).hex() |
|
|
print(f"[DECRYPT #{info['call']}] status={info['status']} " |
|
|
f"in={info['inputSize']} out={info['outputSize']} " |
|
|
f"iv_size={info['ivSize']} iv={iv_hex[:32]}... flags={info['flags']}") |
|
|
|
|
|
elif msg_type == 'decrypt_data': |
|
|
call_num = payload['call'] |
|
|
size = payload['size'] |
|
|
fname = OUTPUT_DIR / f"decrypt_{call_num}_{size}bytes.bin" |
|
|
fname.write_bytes(data) |
|
|
|
|
|
|
|
|
magic = struct.unpack('<I', data[:4])[0] if len(data) >= 4 else 0 |
|
|
first_16 = data[:16].hex() if data else '' |
|
|
print(f" -> Saved {fname.name} | magic={magic} | first_16={first_16}") |
|
|
|
|
|
if magic == 1: |
|
|
print(f" *** MAGIC NUMBER == 1 FOUND! This is the decrypted model container! ***") |
|
|
|
|
|
elif msg_type == 'key_generated': |
|
|
key_bytes = bytes(payload['key']) |
|
|
print(f"[KEY] size={payload['size']} key={key_bytes}") |
|
|
try: |
|
|
print(f" ASCII: {key_bytes.decode('ascii', errors='replace')}") |
|
|
except: |
|
|
pass |
|
|
|
|
|
elif msg_type == 'set_property': |
|
|
print(f"[PROPERTY] {payload['name']} = {payload['value']} (size={payload['size']})") |
|
|
|
|
|
elif msg_type == 'uncompress': |
|
|
print(f"[UNCOMPRESS] sourceLen={payload['sourceLen']} -> destLen={payload['destLen']}") |
|
|
|
|
|
elif msg_type == 'uncompress_data': |
|
|
size = payload['size'] |
|
|
fname = OUTPUT_DIR / f"uncompressed_{size}bytes.bin" |
|
|
fname.write_bytes(data) |
|
|
first_32 = data[:32].hex() if data else '' |
|
|
print(f" -> Saved {fname.name} | first_32={first_32}") |
|
|
|
|
|
elif msg_type == 'ort_export': |
|
|
print(f"[ORT] {payload['name']} @ {payload['addr']}") |
|
|
|
|
|
else: |
|
|
print(f"[MSG] {payload}") |
|
|
|
|
|
elif message['type'] == 'error': |
|
|
print(f"[FRIDA ERROR] {message['description']}") |
|
|
if 'stack' in message: |
|
|
print(message['stack']) |
|
|
|
|
|
|
|
|
def main(): |
|
|
print("=" * 70) |
|
|
print("FRIDA HOOKING: Intercepting OneOCR model decryption") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
loader_path = create_loader_script() |
|
|
print(f"Loader script: {loader_path}") |
|
|
|
|
|
|
|
|
venv_python = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\.venv\Scripts\python.exe") |
|
|
if not venv_python.exists(): |
|
|
print("ERROR: Python venv not found") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
print(f"Spawning: {venv_python} {loader_path}") |
|
|
pid = frida.spawn([str(venv_python), str(loader_path)]) |
|
|
print(f"Process spawned, PID={pid}") |
|
|
|
|
|
session = frida.attach(pid) |
|
|
print("Attached to process") |
|
|
|
|
|
script = session.create_script(FRIDA_SCRIPT) |
|
|
script.on('message', on_message) |
|
|
script.load() |
|
|
print("Script loaded, resuming process...") |
|
|
|
|
|
frida.resume(pid) |
|
|
|
|
|
|
|
|
print("Waiting for process to complete...") |
|
|
try: |
|
|
|
|
|
for _ in range(120): |
|
|
time.sleep(0.5) |
|
|
try: |
|
|
|
|
|
session.is_detached |
|
|
except: |
|
|
break |
|
|
except KeyboardInterrupt: |
|
|
print("\nInterrupted by user") |
|
|
except frida.InvalidOperationError: |
|
|
print("Process terminated") |
|
|
|
|
|
|
|
|
print() |
|
|
print("=" * 70) |
|
|
print("RESULTS") |
|
|
print("=" * 70) |
|
|
|
|
|
if OUTPUT_DIR.exists(): |
|
|
files = sorted(OUTPUT_DIR.iterdir()) |
|
|
if files: |
|
|
print(f"Dumped {len(files)} files:") |
|
|
for f in files: |
|
|
size = f.stat().st_size |
|
|
print(f" {f.name}: {size:,} bytes") |
|
|
if size >= 4: |
|
|
header = open(f, 'rb').read(16) |
|
|
magic = struct.unpack('<I', header[:4])[0] |
|
|
print(f" magic={magic}, first_16={header.hex()}") |
|
|
else: |
|
|
print("No files dumped.") |
|
|
|
|
|
print("\nDone!") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|