oneocr / _archive /hooks /hook_decrypt.py
OneOCR Dev
OneOCR - reverse engineering complete, ONNX pipeline 53% match rate
ce847d4
"""
Hook BCryptDecrypt using ctypes in-process hooking via DLL detour.
Instead of Frida, we directly hook BCryptDecrypt's IAT entry in oneocr.dll.
"""
import ctypes
import ctypes.wintypes as wt
from ctypes import (
c_int64, c_char_p, c_ubyte, POINTER, byref, Structure,
c_void_p, c_ulong, c_int32, WINFUNCTYPE, CFUNCTYPE, c_uint8
)
import os
import sys
import struct
from pathlib import Path
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump")
OUTPUT_DIR.mkdir(exist_ok=True)
DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data"
MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel")
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4'
# ── Globals to collect intercepted data ──
intercepted_calls = []
decrypt_call_num = 0
# ── BCryptDecrypt signature ──
# NTSTATUS BCryptDecrypt(BCRYPT_KEY_HANDLE, PUCHAR pbInput, ULONG cbInput,
# VOID* pPadding, PUCHAR pbIV, ULONG cbIV, PUCHAR pbOutput,
# ULONG cbOutput, ULONG* pcbResult, ULONG dwFlags)
BCRYPT_DECRYPT_TYPE = WINFUNCTYPE(
c_ulong, # NTSTATUS return
c_void_p, # hKey
c_void_p, # pbInput
c_ulong, # cbInput
c_void_p, # pPaddingInfo
c_void_p, # pbIV
c_ulong, # cbIV
c_void_p, # pbOutput
c_ulong, # cbOutput
POINTER(c_ulong), # pcbResult
c_ulong, # dwFlags
)
# Store original function
original_bcrypt_decrypt = None
def hooked_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV,
pbOutput, cbOutput, pcbResult, dwFlags):
"""Our hook that intercepts BCryptDecrypt calls."""
global decrypt_call_num
call_num = decrypt_call_num
decrypt_call_num += 1
# Read IV before the call (it may be modified)
iv_before = None
if pbIV and cbIV > 0:
try:
iv_before = ctypes.string_at(pbIV, cbIV)
except:
pass
# Read encrypted input BEFORE the call
encrypted_input = None
if pbInput and cbInput > 0:
try:
encrypted_input = ctypes.string_at(pbInput, min(cbInput, 64))
except:
pass
# Call original
status = original_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding,
pbIV, cbIV, pbOutput, cbOutput,
pcbResult, dwFlags)
# Get result size
result_size = 0
if pcbResult:
result_size = pcbResult[0]
# Read IV after (CFB mode modifies the IV)
iv_after = None
if pbIV and cbIV > 0:
try:
iv_after = ctypes.string_at(pbIV, cbIV)
except:
pass
info = {
'call': call_num,
'status': status,
'cbInput': cbInput,
'cbIV': cbIV,
'cbOutput': result_size,
'dwFlags': dwFlags,
'iv_before': iv_before.hex() if iv_before else None,
'iv_after': iv_after.hex() if iv_after else None,
}
print(f"[BCryptDecrypt #{call_num}] status={status:#x} "
f"in={cbInput} out={result_size} iv_len={cbIV} flags={dwFlags}")
if encrypted_input:
print(f" Encrypted input[:32]: {encrypted_input[:32].hex()}")
print(f" pbInput addr: {pbInput:#x}")
if iv_before:
print(f" IV before: {iv_before.hex()}")
if iv_after and iv_after != iv_before:
print(f" IV after: {iv_after.hex()}")
# Save decrypted data
if status == 0 and result_size > 0 and pbOutput:
try:
decrypted = ctypes.string_at(pbOutput, result_size)
# Check for magic number
if len(decrypted) >= 4:
magic = struct.unpack('<I', decrypted[:4])[0]
info['magic'] = magic
print(f" Magic: {magic} | First 32 bytes: {decrypted[:32].hex()}")
if magic == 1:
print(f" *** MAGIC NUMBER == 1 FOUND! ***")
# Save to file
fname = OUTPUT_DIR / f"decrypt_{call_num}_in{cbInput}_out{result_size}.bin"
fname.write_bytes(decrypted)
print(f" -> Saved: {fname.name} ({result_size:,} bytes)")
except Exception as e:
print(f" Error reading output: {e}")
intercepted_calls.append(info)
return status
def hook_iat(dll_handle, target_dll_name, target_func_name, hook_func):
"""
Hook a function by patching the Import Address Table (IAT) of a DLL.
Returns the original function pointer.
"""
import pefile
# Get the DLL file path
kernel32 = ctypes.windll.kernel32
buf = ctypes.create_unicode_buffer(260)
h = ctypes.c_void_p(dll_handle)
kernel32.GetModuleFileNameW(h, buf, 260)
dll_path = buf.value
print(f"Analyzing IAT of: {dll_path}")
pe = pefile.PE(dll_path)
# Find the import
base_addr = dll_handle
if hasattr(dll_handle, '_handle'):
base_addr = dll_handle._handle
for entry in pe.DIRECTORY_ENTRY_IMPORT:
import_name = entry.dll.decode('utf-8', errors='ignore').lower()
if target_dll_name.lower() not in import_name:
continue
for imp in entry.imports:
if imp.name and imp.name.decode('utf-8', errors='ignore') == target_func_name:
# Found it! The IAT entry is at base_addr + imp.address - pe.OPTIONAL_HEADER.ImageBase
iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase
iat_addr = base_addr + iat_rva
print(f"Found {target_func_name} in IAT at RVA={iat_rva:#x}, "
f"VA={iat_addr:#x}")
# Read current value (original function pointer)
original_ptr = ctypes.c_void_p()
ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8)
print(f"Original function pointer: {original_ptr.value:#x}")
# Create callback
callback = BCRYPT_DECRYPT_TYPE(hook_func)
callback_ptr = ctypes.cast(callback, c_void_p).value
# Make IAT page writable
old_protect = c_ulong()
PAGE_READWRITE = 0x04
kernel32.VirtualProtect(
ctypes.c_void_p(iat_addr), 8,
PAGE_READWRITE, ctypes.byref(old_protect)
)
# Patch IAT
new_ptr = ctypes.c_void_p(callback_ptr)
ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8)
# Restore protection
kernel32.VirtualProtect(
ctypes.c_void_p(iat_addr), 8,
old_protect.value, ctypes.byref(old_protect)
)
print(f"IAT patched! New function pointer: {callback_ptr:#x}")
# Create callable from original
original_func = BCRYPT_DECRYPT_TYPE(original_ptr.value)
pe.close()
return original_func, callback # Return both to prevent GC
pe.close()
return None, None
def main():
global original_bcrypt_decrypt
print("=" * 70)
print("IN-PROCESS BCryptDecrypt HOOKING")
print("=" * 70)
# Load DLL
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
kernel32.SetDllDirectoryW(DLL_DIR)
dll_path = os.path.join(DLL_DIR, "oneocr.dll")
print(f"Loading: {dll_path}")
dll = ctypes.WinDLL(dll_path)
# Setup function types
dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)]
dll.CreateOcrInitOptions.restype = c_int64
dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte]
dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64
dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)]
dll.CreateOcrPipeline.restype = c_int64
# Try approach 1: Direct BCryptDecrypt function pointer replacement
print("\n--- Setting up BCryptDecrypt hook ---")
# Get the real BCryptDecrypt
bcrypt_dll = ctypes.WinDLL("bcrypt")
real_decrypt_addr = ctypes.cast(
bcrypt_dll.BCryptDecrypt, c_void_p
).value
print(f"Real BCryptDecrypt address: {real_decrypt_addr:#x}")
# Instead of IAT patching, let's use a simpler approach:
# We'll call BCryptDecrypt ourselves to first get a "sizing" call,
# then intercept the actual decrypt.
# Actually, the simplest approach: use a manual detour
# But let's try IAT patching first if pefile is available
try:
import pefile
print("pefile available, trying IAT hook...")
original_bcrypt_decrypt_func, callback_ref = hook_iat(
dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt
)
if original_bcrypt_decrypt_func:
original_bcrypt_decrypt = original_bcrypt_decrypt_func
print("IAT hook installed successfully!")
else:
raise Exception("IAT hook failed - function not found in imports")
except ImportError:
print("pefile not available, installing...")
os.system("uv pip install pefile")
import pefile
original_bcrypt_decrypt_func, callback_ref = hook_iat(
dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt
)
if original_bcrypt_decrypt_func:
original_bcrypt_decrypt = original_bcrypt_decrypt_func
else:
print("ERROR: Could not hook BCryptDecrypt")
return
# Now create the pipeline - this will trigger decryption via our hook
print("\n--- Creating OCR Pipeline (will trigger BCryptDecrypt) ---")
init_options = c_int64()
ret = dll.CreateOcrInitOptions(byref(init_options))
print(f"CreateOcrInitOptions: {ret}")
ret = dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0)
print(f"SetUseModelDelayLoad: {ret}")
pipeline = c_int64()
model_buf = ctypes.create_string_buffer(MODEL_PATH.encode())
key_buf = ctypes.create_string_buffer(KEY)
print(f"\nCalling CreateOcrPipeline...")
print(f"Model: {MODEL_PATH}")
print(f"Key: {KEY}")
print()
ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline))
print(f"\nCreateOcrPipeline returned: {ret}")
print(f"Pipeline handle: {pipeline.value}")
# Summary
print()
print("=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"Total BCryptDecrypt calls intercepted: {len(intercepted_calls)}")
magic_1_files = []
for info in intercepted_calls:
if info.get('magic') == 1:
magic_1_files.append(info)
if magic_1_files:
print(f"\n*** Found {len(magic_1_files)} calls with magic_number == 1! ***")
for info in magic_1_files:
print(f" Call #{info['call']}: input={info['cbInput']:,}, "
f"output={info['cbOutput']:,}")
# List saved files
if OUTPUT_DIR.exists():
files = sorted(OUTPUT_DIR.glob("decrypt_*.bin"))
if files:
print(f"\nSaved {len(files)} decrypted buffers:")
total = 0
for f in files:
sz = f.stat().st_size
total += sz
header = open(f, 'rb').read(4)
magic = struct.unpack('<I', header)[0] if len(header) >= 4 else -1
marker = " *** MAGIC=1 ***" if magic == 1 else ""
print(f" {f.name}: {sz:,} bytes (magic={magic}){marker}")
print(f"Total: {total:,} bytes ({total/1024/1024:.1f} MB)")
print("\nDone!")
if __name__ == '__main__':
main()