""" Hook BCryptDecrypt using ctypes in-process hooking via DLL detour. Instead of Frida, we directly hook BCryptDecrypt's IAT entry in oneocr.dll. """ import ctypes import ctypes.wintypes as wt from ctypes import ( c_int64, c_char_p, c_ubyte, POINTER, byref, Structure, c_void_p, c_ulong, c_int32, WINFUNCTYPE, CFUNCTYPE, c_uint8 ) import os import sys import struct from pathlib import Path OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump") OUTPUT_DIR.mkdir(exist_ok=True) DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data" MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel") KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' # ── Globals to collect intercepted data ── intercepted_calls = [] decrypt_call_num = 0 # ── BCryptDecrypt signature ── # NTSTATUS BCryptDecrypt(BCRYPT_KEY_HANDLE, PUCHAR pbInput, ULONG cbInput, # VOID* pPadding, PUCHAR pbIV, ULONG cbIV, PUCHAR pbOutput, # ULONG cbOutput, ULONG* pcbResult, ULONG dwFlags) BCRYPT_DECRYPT_TYPE = WINFUNCTYPE( c_ulong, # NTSTATUS return c_void_p, # hKey c_void_p, # pbInput c_ulong, # cbInput c_void_p, # pPaddingInfo c_void_p, # pbIV c_ulong, # cbIV c_void_p, # pbOutput c_ulong, # cbOutput POINTER(c_ulong), # pcbResult c_ulong, # dwFlags ) # Store original function original_bcrypt_decrypt = None def hooked_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, pbOutput, cbOutput, pcbResult, dwFlags): """Our hook that intercepts BCryptDecrypt calls.""" global decrypt_call_num call_num = decrypt_call_num decrypt_call_num += 1 # Read IV before the call (it may be modified) iv_before = None if pbIV and cbIV > 0: try: iv_before = ctypes.string_at(pbIV, cbIV) except: pass # Read encrypted input BEFORE the call encrypted_input = None if pbInput and cbInput > 0: try: encrypted_input = ctypes.string_at(pbInput, min(cbInput, 64)) except: pass # Call original status = original_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, pbOutput, cbOutput, pcbResult, dwFlags) # Get result size result_size = 0 if pcbResult: result_size = pcbResult[0] # Read IV after (CFB mode modifies the IV) iv_after = None if pbIV and cbIV > 0: try: iv_after = ctypes.string_at(pbIV, cbIV) except: pass info = { 'call': call_num, 'status': status, 'cbInput': cbInput, 'cbIV': cbIV, 'cbOutput': result_size, 'dwFlags': dwFlags, 'iv_before': iv_before.hex() if iv_before else None, 'iv_after': iv_after.hex() if iv_after else None, } print(f"[BCryptDecrypt #{call_num}] status={status:#x} " f"in={cbInput} out={result_size} iv_len={cbIV} flags={dwFlags}") if encrypted_input: print(f" Encrypted input[:32]: {encrypted_input[:32].hex()}") print(f" pbInput addr: {pbInput:#x}") if iv_before: print(f" IV before: {iv_before.hex()}") if iv_after and iv_after != iv_before: print(f" IV after: {iv_after.hex()}") # Save decrypted data if status == 0 and result_size > 0 and pbOutput: try: decrypted = ctypes.string_at(pbOutput, result_size) # Check for magic number if len(decrypted) >= 4: magic = struct.unpack(' Saved: {fname.name} ({result_size:,} bytes)") except Exception as e: print(f" Error reading output: {e}") intercepted_calls.append(info) return status def hook_iat(dll_handle, target_dll_name, target_func_name, hook_func): """ Hook a function by patching the Import Address Table (IAT) of a DLL. Returns the original function pointer. """ import pefile # Get the DLL file path kernel32 = ctypes.windll.kernel32 buf = ctypes.create_unicode_buffer(260) h = ctypes.c_void_p(dll_handle) kernel32.GetModuleFileNameW(h, buf, 260) dll_path = buf.value print(f"Analyzing IAT of: {dll_path}") pe = pefile.PE(dll_path) # Find the import base_addr = dll_handle if hasattr(dll_handle, '_handle'): base_addr = dll_handle._handle for entry in pe.DIRECTORY_ENTRY_IMPORT: import_name = entry.dll.decode('utf-8', errors='ignore').lower() if target_dll_name.lower() not in import_name: continue for imp in entry.imports: if imp.name and imp.name.decode('utf-8', errors='ignore') == target_func_name: # Found it! The IAT entry is at base_addr + imp.address - pe.OPTIONAL_HEADER.ImageBase iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase iat_addr = base_addr + iat_rva print(f"Found {target_func_name} in IAT at RVA={iat_rva:#x}, " f"VA={iat_addr:#x}") # Read current value (original function pointer) original_ptr = ctypes.c_void_p() ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8) print(f"Original function pointer: {original_ptr.value:#x}") # Create callback callback = BCRYPT_DECRYPT_TYPE(hook_func) callback_ptr = ctypes.cast(callback, c_void_p).value # Make IAT page writable old_protect = c_ulong() PAGE_READWRITE = 0x04 kernel32.VirtualProtect( ctypes.c_void_p(iat_addr), 8, PAGE_READWRITE, ctypes.byref(old_protect) ) # Patch IAT new_ptr = ctypes.c_void_p(callback_ptr) ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8) # Restore protection kernel32.VirtualProtect( ctypes.c_void_p(iat_addr), 8, old_protect.value, ctypes.byref(old_protect) ) print(f"IAT patched! New function pointer: {callback_ptr:#x}") # Create callable from original original_func = BCRYPT_DECRYPT_TYPE(original_ptr.value) pe.close() return original_func, callback # Return both to prevent GC pe.close() return None, None def main(): global original_bcrypt_decrypt print("=" * 70) print("IN-PROCESS BCryptDecrypt HOOKING") print("=" * 70) # Load DLL kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) kernel32.SetDllDirectoryW(DLL_DIR) dll_path = os.path.join(DLL_DIR, "oneocr.dll") print(f"Loading: {dll_path}") dll = ctypes.WinDLL(dll_path) # Setup function types dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)] dll.CreateOcrInitOptions.restype = c_int64 dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte] dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64 dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)] dll.CreateOcrPipeline.restype = c_int64 # Try approach 1: Direct BCryptDecrypt function pointer replacement print("\n--- Setting up BCryptDecrypt hook ---") # Get the real BCryptDecrypt bcrypt_dll = ctypes.WinDLL("bcrypt") real_decrypt_addr = ctypes.cast( bcrypt_dll.BCryptDecrypt, c_void_p ).value print(f"Real BCryptDecrypt address: {real_decrypt_addr:#x}") # Instead of IAT patching, let's use a simpler approach: # We'll call BCryptDecrypt ourselves to first get a "sizing" call, # then intercept the actual decrypt. # Actually, the simplest approach: use a manual detour # But let's try IAT patching first if pefile is available try: import pefile print("pefile available, trying IAT hook...") original_bcrypt_decrypt_func, callback_ref = hook_iat( dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt ) if original_bcrypt_decrypt_func: original_bcrypt_decrypt = original_bcrypt_decrypt_func print("IAT hook installed successfully!") else: raise Exception("IAT hook failed - function not found in imports") except ImportError: print("pefile not available, installing...") os.system("uv pip install pefile") import pefile original_bcrypt_decrypt_func, callback_ref = hook_iat( dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt ) if original_bcrypt_decrypt_func: original_bcrypt_decrypt = original_bcrypt_decrypt_func else: print("ERROR: Could not hook BCryptDecrypt") return # Now create the pipeline - this will trigger decryption via our hook print("\n--- Creating OCR Pipeline (will trigger BCryptDecrypt) ---") init_options = c_int64() ret = dll.CreateOcrInitOptions(byref(init_options)) print(f"CreateOcrInitOptions: {ret}") ret = dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0) print(f"SetUseModelDelayLoad: {ret}") pipeline = c_int64() model_buf = ctypes.create_string_buffer(MODEL_PATH.encode()) key_buf = ctypes.create_string_buffer(KEY) print(f"\nCalling CreateOcrPipeline...") print(f"Model: {MODEL_PATH}") print(f"Key: {KEY}") print() ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline)) print(f"\nCreateOcrPipeline returned: {ret}") print(f"Pipeline handle: {pipeline.value}") # Summary print() print("=" * 70) print("SUMMARY") print("=" * 70) print(f"Total BCryptDecrypt calls intercepted: {len(intercepted_calls)}") magic_1_files = [] for info in intercepted_calls: if info.get('magic') == 1: magic_1_files.append(info) if magic_1_files: print(f"\n*** Found {len(magic_1_files)} calls with magic_number == 1! ***") for info in magic_1_files: print(f" Call #{info['call']}: input={info['cbInput']:,}, " f"output={info['cbOutput']:,}") # List saved files if OUTPUT_DIR.exists(): files = sorted(OUTPUT_DIR.glob("decrypt_*.bin")) if files: print(f"\nSaved {len(files)} decrypted buffers:") total = 0 for f in files: sz = f.stat().st_size total += sz header = open(f, 'rb').read(4) magic = struct.unpack('= 4 else -1 marker = " *** MAGIC=1 ***" if magic == 1 else "" print(f" {f.name}: {sz:,} bytes (magic={magic}){marker}") print(f"Total: {total:,} bytes ({total/1024/1024:.1f} MB)") print("\nDone!") if __name__ == '__main__': main()