|
|
""" |
|
|
Hook BCryptDecrypt using ctypes in-process hooking via DLL detour. |
|
|
Instead of Frida, we directly hook BCryptDecrypt's IAT entry in oneocr.dll. |
|
|
""" |
|
|
import ctypes |
|
|
import ctypes.wintypes as wt |
|
|
from ctypes import ( |
|
|
c_int64, c_char_p, c_ubyte, POINTER, byref, Structure, |
|
|
c_void_p, c_ulong, c_int32, WINFUNCTYPE, CFUNCTYPE, c_uint8 |
|
|
) |
|
|
import os |
|
|
import sys |
|
|
import struct |
|
|
from pathlib import Path |
|
|
|
|
|
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump") |
|
|
OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data" |
|
|
MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel") |
|
|
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
|
|
|
|
|
|
intercepted_calls = [] |
|
|
decrypt_call_num = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BCRYPT_DECRYPT_TYPE = WINFUNCTYPE( |
|
|
c_ulong, |
|
|
c_void_p, |
|
|
c_void_p, |
|
|
c_ulong, |
|
|
c_void_p, |
|
|
c_void_p, |
|
|
c_ulong, |
|
|
c_void_p, |
|
|
c_ulong, |
|
|
POINTER(c_ulong), |
|
|
c_ulong, |
|
|
) |
|
|
|
|
|
|
|
|
original_bcrypt_decrypt = None |
|
|
|
|
|
|
|
|
def hooked_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, |
|
|
pbOutput, cbOutput, pcbResult, dwFlags): |
|
|
"""Our hook that intercepts BCryptDecrypt calls.""" |
|
|
global decrypt_call_num |
|
|
|
|
|
call_num = decrypt_call_num |
|
|
decrypt_call_num += 1 |
|
|
|
|
|
|
|
|
iv_before = None |
|
|
if pbIV and cbIV > 0: |
|
|
try: |
|
|
iv_before = ctypes.string_at(pbIV, cbIV) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
encrypted_input = None |
|
|
if pbInput and cbInput > 0: |
|
|
try: |
|
|
encrypted_input = ctypes.string_at(pbInput, min(cbInput, 64)) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
status = original_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, |
|
|
pbIV, cbIV, pbOutput, cbOutput, |
|
|
pcbResult, dwFlags) |
|
|
|
|
|
|
|
|
result_size = 0 |
|
|
if pcbResult: |
|
|
result_size = pcbResult[0] |
|
|
|
|
|
|
|
|
iv_after = None |
|
|
if pbIV and cbIV > 0: |
|
|
try: |
|
|
iv_after = ctypes.string_at(pbIV, cbIV) |
|
|
except: |
|
|
pass |
|
|
|
|
|
info = { |
|
|
'call': call_num, |
|
|
'status': status, |
|
|
'cbInput': cbInput, |
|
|
'cbIV': cbIV, |
|
|
'cbOutput': result_size, |
|
|
'dwFlags': dwFlags, |
|
|
'iv_before': iv_before.hex() if iv_before else None, |
|
|
'iv_after': iv_after.hex() if iv_after else None, |
|
|
} |
|
|
|
|
|
print(f"[BCryptDecrypt #{call_num}] status={status:#x} " |
|
|
f"in={cbInput} out={result_size} iv_len={cbIV} flags={dwFlags}") |
|
|
if encrypted_input: |
|
|
print(f" Encrypted input[:32]: {encrypted_input[:32].hex()}") |
|
|
print(f" pbInput addr: {pbInput:#x}") |
|
|
if iv_before: |
|
|
print(f" IV before: {iv_before.hex()}") |
|
|
if iv_after and iv_after != iv_before: |
|
|
print(f" IV after: {iv_after.hex()}") |
|
|
|
|
|
|
|
|
if status == 0 and result_size > 0 and pbOutput: |
|
|
try: |
|
|
decrypted = ctypes.string_at(pbOutput, result_size) |
|
|
|
|
|
|
|
|
if len(decrypted) >= 4: |
|
|
magic = struct.unpack('<I', decrypted[:4])[0] |
|
|
info['magic'] = magic |
|
|
print(f" Magic: {magic} | First 32 bytes: {decrypted[:32].hex()}") |
|
|
|
|
|
if magic == 1: |
|
|
print(f" *** MAGIC NUMBER == 1 FOUND! ***") |
|
|
|
|
|
|
|
|
fname = OUTPUT_DIR / f"decrypt_{call_num}_in{cbInput}_out{result_size}.bin" |
|
|
fname.write_bytes(decrypted) |
|
|
print(f" -> Saved: {fname.name} ({result_size:,} bytes)") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" Error reading output: {e}") |
|
|
|
|
|
intercepted_calls.append(info) |
|
|
return status |
|
|
|
|
|
|
|
|
def hook_iat(dll_handle, target_dll_name, target_func_name, hook_func): |
|
|
""" |
|
|
Hook a function by patching the Import Address Table (IAT) of a DLL. |
|
|
Returns the original function pointer. |
|
|
""" |
|
|
import pefile |
|
|
|
|
|
|
|
|
kernel32 = ctypes.windll.kernel32 |
|
|
buf = ctypes.create_unicode_buffer(260) |
|
|
h = ctypes.c_void_p(dll_handle) |
|
|
kernel32.GetModuleFileNameW(h, buf, 260) |
|
|
dll_path = buf.value |
|
|
|
|
|
print(f"Analyzing IAT of: {dll_path}") |
|
|
|
|
|
pe = pefile.PE(dll_path) |
|
|
|
|
|
|
|
|
base_addr = dll_handle |
|
|
if hasattr(dll_handle, '_handle'): |
|
|
base_addr = dll_handle._handle |
|
|
|
|
|
for entry in pe.DIRECTORY_ENTRY_IMPORT: |
|
|
import_name = entry.dll.decode('utf-8', errors='ignore').lower() |
|
|
if target_dll_name.lower() not in import_name: |
|
|
continue |
|
|
|
|
|
for imp in entry.imports: |
|
|
if imp.name and imp.name.decode('utf-8', errors='ignore') == target_func_name: |
|
|
|
|
|
iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase |
|
|
iat_addr = base_addr + iat_rva |
|
|
|
|
|
print(f"Found {target_func_name} in IAT at RVA={iat_rva:#x}, " |
|
|
f"VA={iat_addr:#x}") |
|
|
|
|
|
|
|
|
original_ptr = ctypes.c_void_p() |
|
|
ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8) |
|
|
print(f"Original function pointer: {original_ptr.value:#x}") |
|
|
|
|
|
|
|
|
callback = BCRYPT_DECRYPT_TYPE(hook_func) |
|
|
callback_ptr = ctypes.cast(callback, c_void_p).value |
|
|
|
|
|
|
|
|
old_protect = c_ulong() |
|
|
PAGE_READWRITE = 0x04 |
|
|
kernel32.VirtualProtect( |
|
|
ctypes.c_void_p(iat_addr), 8, |
|
|
PAGE_READWRITE, ctypes.byref(old_protect) |
|
|
) |
|
|
|
|
|
|
|
|
new_ptr = ctypes.c_void_p(callback_ptr) |
|
|
ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8) |
|
|
|
|
|
|
|
|
kernel32.VirtualProtect( |
|
|
ctypes.c_void_p(iat_addr), 8, |
|
|
old_protect.value, ctypes.byref(old_protect) |
|
|
) |
|
|
|
|
|
print(f"IAT patched! New function pointer: {callback_ptr:#x}") |
|
|
|
|
|
|
|
|
original_func = BCRYPT_DECRYPT_TYPE(original_ptr.value) |
|
|
|
|
|
pe.close() |
|
|
return original_func, callback |
|
|
|
|
|
pe.close() |
|
|
return None, None |
|
|
|
|
|
|
|
|
def main(): |
|
|
global original_bcrypt_decrypt |
|
|
|
|
|
print("=" * 70) |
|
|
print("IN-PROCESS BCryptDecrypt HOOKING") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
|
|
kernel32.SetDllDirectoryW(DLL_DIR) |
|
|
|
|
|
dll_path = os.path.join(DLL_DIR, "oneocr.dll") |
|
|
print(f"Loading: {dll_path}") |
|
|
dll = ctypes.WinDLL(dll_path) |
|
|
|
|
|
|
|
|
dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)] |
|
|
dll.CreateOcrInitOptions.restype = c_int64 |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte] |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64 |
|
|
dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)] |
|
|
dll.CreateOcrPipeline.restype = c_int64 |
|
|
|
|
|
|
|
|
print("\n--- Setting up BCryptDecrypt hook ---") |
|
|
|
|
|
|
|
|
bcrypt_dll = ctypes.WinDLL("bcrypt") |
|
|
real_decrypt_addr = ctypes.cast( |
|
|
bcrypt_dll.BCryptDecrypt, c_void_p |
|
|
).value |
|
|
print(f"Real BCryptDecrypt address: {real_decrypt_addr:#x}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
import pefile |
|
|
print("pefile available, trying IAT hook...") |
|
|
|
|
|
original_bcrypt_decrypt_func, callback_ref = hook_iat( |
|
|
dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt |
|
|
) |
|
|
|
|
|
if original_bcrypt_decrypt_func: |
|
|
original_bcrypt_decrypt = original_bcrypt_decrypt_func |
|
|
print("IAT hook installed successfully!") |
|
|
else: |
|
|
raise Exception("IAT hook failed - function not found in imports") |
|
|
|
|
|
except ImportError: |
|
|
print("pefile not available, installing...") |
|
|
os.system("uv pip install pefile") |
|
|
import pefile |
|
|
|
|
|
original_bcrypt_decrypt_func, callback_ref = hook_iat( |
|
|
dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt |
|
|
) |
|
|
|
|
|
if original_bcrypt_decrypt_func: |
|
|
original_bcrypt_decrypt = original_bcrypt_decrypt_func |
|
|
else: |
|
|
print("ERROR: Could not hook BCryptDecrypt") |
|
|
return |
|
|
|
|
|
|
|
|
print("\n--- Creating OCR Pipeline (will trigger BCryptDecrypt) ---") |
|
|
|
|
|
init_options = c_int64() |
|
|
ret = dll.CreateOcrInitOptions(byref(init_options)) |
|
|
print(f"CreateOcrInitOptions: {ret}") |
|
|
|
|
|
ret = dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0) |
|
|
print(f"SetUseModelDelayLoad: {ret}") |
|
|
|
|
|
pipeline = c_int64() |
|
|
model_buf = ctypes.create_string_buffer(MODEL_PATH.encode()) |
|
|
key_buf = ctypes.create_string_buffer(KEY) |
|
|
|
|
|
print(f"\nCalling CreateOcrPipeline...") |
|
|
print(f"Model: {MODEL_PATH}") |
|
|
print(f"Key: {KEY}") |
|
|
print() |
|
|
|
|
|
ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline)) |
|
|
|
|
|
print(f"\nCreateOcrPipeline returned: {ret}") |
|
|
print(f"Pipeline handle: {pipeline.value}") |
|
|
|
|
|
|
|
|
print() |
|
|
print("=" * 70) |
|
|
print("SUMMARY") |
|
|
print("=" * 70) |
|
|
print(f"Total BCryptDecrypt calls intercepted: {len(intercepted_calls)}") |
|
|
|
|
|
magic_1_files = [] |
|
|
for info in intercepted_calls: |
|
|
if info.get('magic') == 1: |
|
|
magic_1_files.append(info) |
|
|
|
|
|
if magic_1_files: |
|
|
print(f"\n*** Found {len(magic_1_files)} calls with magic_number == 1! ***") |
|
|
for info in magic_1_files: |
|
|
print(f" Call #{info['call']}: input={info['cbInput']:,}, " |
|
|
f"output={info['cbOutput']:,}") |
|
|
|
|
|
|
|
|
if OUTPUT_DIR.exists(): |
|
|
files = sorted(OUTPUT_DIR.glob("decrypt_*.bin")) |
|
|
if files: |
|
|
print(f"\nSaved {len(files)} decrypted buffers:") |
|
|
total = 0 |
|
|
for f in files: |
|
|
sz = f.stat().st_size |
|
|
total += sz |
|
|
header = open(f, 'rb').read(4) |
|
|
magic = struct.unpack('<I', header)[0] if len(header) >= 4 else -1 |
|
|
marker = " *** MAGIC=1 ***" if magic == 1 else "" |
|
|
print(f" {f.name}: {sz:,} bytes (magic={magic}){marker}") |
|
|
print(f"Total: {total:,} bytes ({total/1024/1024:.1f} MB)") |
|
|
|
|
|
print("\nDone!") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|