|
|
""" |
|
|
Extended BCrypt hook - intercepts ALL BCrypt functions to capture the full |
|
|
crypto setup: algorithm provider, properties, and actual key material. |
|
|
""" |
|
|
import ctypes |
|
|
import ctypes.wintypes as wt |
|
|
from ctypes import ( |
|
|
c_int64, c_char_p, c_ubyte, POINTER, byref, Structure, |
|
|
c_void_p, c_ulong, c_int32, WINFUNCTYPE, CFUNCTYPE, c_uint8 |
|
|
) |
|
|
import os |
|
|
import sys |
|
|
import struct |
|
|
from pathlib import Path |
|
|
|
|
|
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump") |
|
|
OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data" |
|
|
MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel") |
|
|
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
|
|
|
|
|
|
intercepted_bcrypt = [] |
|
|
decrypt_call_num = 0 |
|
|
|
|
|
|
|
|
|
|
|
BCRYPT_DECRYPT_TYPE = WINFUNCTYPE( |
|
|
c_ulong, c_void_p, c_void_p, c_ulong, c_void_p, |
|
|
c_void_p, c_ulong, c_void_p, c_ulong, POINTER(c_ulong), c_ulong |
|
|
) |
|
|
|
|
|
|
|
|
BCRYPT_OPEN_ALG_TYPE = WINFUNCTYPE( |
|
|
c_ulong, POINTER(c_void_p), c_void_p, c_void_p, c_ulong |
|
|
) |
|
|
|
|
|
|
|
|
BCRYPT_SET_PROP_TYPE = WINFUNCTYPE( |
|
|
c_ulong, c_void_p, c_void_p, c_void_p, c_ulong, c_ulong |
|
|
) |
|
|
|
|
|
|
|
|
BCRYPT_GET_PROP_TYPE = WINFUNCTYPE( |
|
|
c_ulong, c_void_p, c_void_p, c_void_p, c_ulong, POINTER(c_ulong), c_ulong |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
BCRYPT_GEN_KEY_TYPE = WINFUNCTYPE( |
|
|
c_ulong, c_void_p, POINTER(c_void_p), c_void_p, c_ulong, |
|
|
c_void_p, c_ulong, c_ulong |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
BCRYPT_IMPORT_KEY_TYPE = WINFUNCTYPE( |
|
|
c_ulong, c_void_p, c_void_p, c_void_p, POINTER(c_void_p), |
|
|
c_void_p, c_ulong, c_void_p, c_ulong, c_ulong |
|
|
) |
|
|
|
|
|
|
|
|
BCRYPT_ENCRYPT_TYPE = WINFUNCTYPE( |
|
|
c_ulong, c_void_p, c_void_p, c_ulong, c_void_p, |
|
|
c_void_p, c_ulong, c_void_p, c_ulong, POINTER(c_ulong), c_ulong |
|
|
) |
|
|
|
|
|
|
|
|
orig_decrypt = None |
|
|
orig_open_alg = None |
|
|
orig_set_prop = None |
|
|
orig_get_prop = None |
|
|
orig_gen_key = None |
|
|
orig_import_key = None |
|
|
orig_encrypt = None |
|
|
|
|
|
|
|
|
_callback_refs = [] |
|
|
|
|
|
|
|
|
key_handle_to_material = {} |
|
|
alg_handle_to_name = {} |
|
|
|
|
|
|
|
|
def read_wstr(ptr): |
|
|
"""Read a null-terminated UTF-16LE string from a pointer.""" |
|
|
if not ptr: |
|
|
return "<null>" |
|
|
try: |
|
|
buf = ctypes.wstring_at(ptr) |
|
|
return buf |
|
|
except: |
|
|
return "<err>" |
|
|
|
|
|
|
|
|
def hooked_open_alg(phAlgorithm, pszAlgId, pszImplementation, dwFlags): |
|
|
alg_name = read_wstr(pszAlgId) |
|
|
impl = read_wstr(pszImplementation) |
|
|
status = orig_open_alg(phAlgorithm, pszAlgId, pszImplementation, dwFlags) |
|
|
handle = phAlgorithm[0] if phAlgorithm else None |
|
|
if handle: |
|
|
alg_handle_to_name[handle.value if hasattr(handle, 'value') else handle] = alg_name |
|
|
print(f"[BCryptOpenAlgorithmProvider] alg={alg_name!r} impl={impl!r} " |
|
|
f"flags={dwFlags:#x} -> handle={handle} status={status:#010x}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_set_prop(hObject, pszProperty, pbInput, cbInput, dwFlags): |
|
|
prop_name = read_wstr(pszProperty) |
|
|
|
|
|
|
|
|
value_repr = "" |
|
|
if pbInput and cbInput > 0: |
|
|
try: |
|
|
raw = ctypes.string_at(pbInput, cbInput) |
|
|
|
|
|
try: |
|
|
value_repr = raw.decode('utf-16-le').rstrip('\x00') |
|
|
except: |
|
|
value_repr = raw.hex() |
|
|
|
|
|
if cbInput == 4: |
|
|
dword_val = struct.unpack('<I', raw)[0] |
|
|
value_repr = f"{value_repr} (dword={dword_val})" |
|
|
except: |
|
|
value_repr = "<err>" |
|
|
|
|
|
status = orig_set_prop(hObject, pszProperty, pbInput, cbInput, dwFlags) |
|
|
h = hObject.value if hasattr(hObject, 'value') else hObject |
|
|
alg = alg_handle_to_name.get(h, "?") |
|
|
print(f"[BCryptSetProperty] obj={h:#x} ({alg}) prop={prop_name!r} " |
|
|
f"value={value_repr!r} size={cbInput} flags={dwFlags:#x} " |
|
|
f"-> status={status:#010x}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_get_prop(hObject, pszProperty, pbOutput, cbOutput, pcbResult, dwFlags): |
|
|
prop_name = read_wstr(pszProperty) |
|
|
status = orig_get_prop(hObject, pszProperty, pbOutput, cbOutput, pcbResult, dwFlags) |
|
|
|
|
|
result_size = pcbResult[0] if pcbResult else 0 |
|
|
value_repr = "" |
|
|
if status == 0 and pbOutput and result_size > 0: |
|
|
try: |
|
|
raw = ctypes.string_at(pbOutput, result_size) |
|
|
if result_size == 4: |
|
|
value_repr = f"dword={struct.unpack('<I', raw)[0]}" |
|
|
elif result_size <= 64: |
|
|
try: |
|
|
value_repr = raw.decode('utf-16-le').rstrip('\x00') |
|
|
except: |
|
|
value_repr = raw.hex() |
|
|
else: |
|
|
value_repr = f"{result_size} bytes" |
|
|
except: |
|
|
pass |
|
|
|
|
|
print(f"[BCryptGetProperty] prop={prop_name!r} -> {value_repr!r} " |
|
|
f"({result_size} bytes) status={status:#010x}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_gen_key(hAlgorithm, phKey, pbKeyObject, cbKeyObject, |
|
|
pbSecret, cbSecret, dwFlags): |
|
|
|
|
|
secret = None |
|
|
if pbSecret and cbSecret > 0: |
|
|
try: |
|
|
secret = ctypes.string_at(pbSecret, cbSecret) |
|
|
except: |
|
|
pass |
|
|
|
|
|
status = orig_gen_key(hAlgorithm, phKey, pbKeyObject, cbKeyObject, |
|
|
pbSecret, cbSecret, dwFlags) |
|
|
|
|
|
key_handle = phKey[0] if phKey else None |
|
|
alg_h = hAlgorithm.value if hasattr(hAlgorithm, 'value') else hAlgorithm |
|
|
alg = alg_handle_to_name.get(alg_h, "?") |
|
|
|
|
|
print(f"[BCryptGenerateSymmetricKey] alg={alg} secret_len={cbSecret} " |
|
|
f"keyObjSize={cbKeyObject} flags={dwFlags:#x} " |
|
|
f"-> key={key_handle} status={status:#010x}") |
|
|
if secret: |
|
|
print(f" Secret bytes: {secret.hex()}") |
|
|
print(f" Secret ASCII: {secret!r}") |
|
|
if key_handle: |
|
|
kh = key_handle.value if hasattr(key_handle, 'value') else key_handle |
|
|
key_handle_to_material[kh] = secret |
|
|
|
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_import_key(hAlgorithm, hImportKey, pszBlobType, phKey, |
|
|
pbKeyObject, cbKeyObject, pbInput, cbInput, dwFlags): |
|
|
blob_type = read_wstr(pszBlobType) |
|
|
|
|
|
blob_data = None |
|
|
if pbInput and cbInput > 0: |
|
|
try: |
|
|
blob_data = ctypes.string_at(pbInput, cbInput) |
|
|
except: |
|
|
pass |
|
|
|
|
|
status = orig_import_key(hAlgorithm, hImportKey, pszBlobType, phKey, |
|
|
pbKeyObject, cbKeyObject, pbInput, cbInput, dwFlags) |
|
|
|
|
|
key_handle = phKey[0] if phKey else None |
|
|
print(f"[BCryptImportKey] blob_type={blob_type!r} blob_size={cbInput} " |
|
|
f"flags={dwFlags:#x} -> key={key_handle} status={status:#010x}") |
|
|
if blob_data: |
|
|
print(f" Blob: {blob_data.hex()}") |
|
|
if cbInput > 12: |
|
|
magic, ver, key_len = struct.unpack('<III', blob_data[:12]) |
|
|
key_bytes = blob_data[12:12+key_len] |
|
|
print(f" Magic={magic:#x} Ver={ver} KeyLen={key_len}") |
|
|
print(f" Key: {key_bytes.hex()}") |
|
|
print(f" Key ASCII: {key_bytes!r}") |
|
|
|
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_encrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, |
|
|
pbOutput, cbOutput, pcbResult, dwFlags): |
|
|
status = orig_encrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, |
|
|
pbOutput, cbOutput, pcbResult, dwFlags) |
|
|
result_size = pcbResult[0] if pcbResult else 0 |
|
|
print(f"[BCryptEncrypt] in={cbInput} out={result_size} iv_len={cbIV} " |
|
|
f"flags={dwFlags:#x} status={status:#010x}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, |
|
|
pbOutput, cbOutput, pcbResult, dwFlags): |
|
|
global decrypt_call_num |
|
|
call_num = decrypt_call_num |
|
|
decrypt_call_num += 1 |
|
|
|
|
|
iv_before = None |
|
|
if pbIV and cbIV > 0: |
|
|
try: |
|
|
iv_before = ctypes.string_at(pbIV, cbIV) |
|
|
except: |
|
|
pass |
|
|
|
|
|
encrypted_input = None |
|
|
if pbInput and cbInput > 0: |
|
|
try: |
|
|
encrypted_input = ctypes.string_at(pbInput, min(cbInput, 64)) |
|
|
except: |
|
|
pass |
|
|
|
|
|
status = orig_decrypt(hKey, pbInput, cbInput, pPadding, |
|
|
pbIV, cbIV, pbOutput, cbOutput, pcbResult, dwFlags) |
|
|
|
|
|
result_size = pcbResult[0] if pcbResult else 0 |
|
|
|
|
|
iv_after = None |
|
|
if pbIV and cbIV > 0: |
|
|
try: |
|
|
iv_after = ctypes.string_at(pbIV, cbIV) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
kh = hKey.value if hasattr(hKey, 'value') else hKey |
|
|
known_key = key_handle_to_material.get(kh) |
|
|
|
|
|
print(f"[BCryptDecrypt #{call_num}] status={status:#x} " |
|
|
f"in={cbInput} out={result_size} iv_len={cbIV} flags={dwFlags}") |
|
|
if known_key: |
|
|
print(f" Key material: {known_key.hex()}") |
|
|
if encrypted_input: |
|
|
print(f" Enc input[:32]: {encrypted_input[:32].hex()}") |
|
|
if iv_before: |
|
|
print(f" IV before: {iv_before.hex()}") |
|
|
if iv_after and iv_after != iv_before: |
|
|
print(f" IV after: {iv_after.hex()}") |
|
|
|
|
|
if status == 0 and result_size > 0 and pbOutput: |
|
|
try: |
|
|
decrypted = ctypes.string_at(pbOutput, result_size) |
|
|
print(f" Decrypted[:32]: {decrypted[:32].hex()}") |
|
|
fname = OUTPUT_DIR / f"decrypt_{call_num}_in{cbInput}_out{result_size}.bin" |
|
|
fname.write_bytes(decrypted) |
|
|
print(f" -> Saved: {fname.name}") |
|
|
except Exception as e: |
|
|
print(f" Error: {e}") |
|
|
|
|
|
return status |
|
|
|
|
|
|
|
|
def hook_iat_generic(dll_handle, target_dll_name, target_func_name, hook_func, func_type): |
|
|
"""Hook a function by patching the IAT. Returns (original_func, callback_ref).""" |
|
|
import pefile |
|
|
|
|
|
kernel32 = ctypes.windll.kernel32 |
|
|
buf = ctypes.create_unicode_buffer(260) |
|
|
h = ctypes.c_void_p(dll_handle) |
|
|
kernel32.GetModuleFileNameW(h, buf, 260) |
|
|
dll_path = buf.value |
|
|
|
|
|
pe = pefile.PE(dll_path) |
|
|
base_addr = dll_handle |
|
|
|
|
|
for entry in pe.DIRECTORY_ENTRY_IMPORT: |
|
|
import_name = entry.dll.decode('utf-8', errors='ignore').lower() |
|
|
if target_dll_name.lower() not in import_name: |
|
|
continue |
|
|
|
|
|
for imp in entry.imports: |
|
|
if imp.name and imp.name.decode('utf-8', errors='ignore') == target_func_name: |
|
|
iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase |
|
|
iat_addr = base_addr + iat_rva |
|
|
|
|
|
original_ptr = ctypes.c_void_p() |
|
|
ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8) |
|
|
|
|
|
callback = func_type(hook_func) |
|
|
callback_ptr = ctypes.cast(callback, c_void_p).value |
|
|
|
|
|
old_protect = c_ulong() |
|
|
kernel32.VirtualProtect(ctypes.c_void_p(iat_addr), 8, 0x04, ctypes.byref(old_protect)) |
|
|
new_ptr = ctypes.c_void_p(callback_ptr) |
|
|
ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8) |
|
|
kernel32.VirtualProtect(ctypes.c_void_p(iat_addr), 8, old_protect.value, ctypes.byref(old_protect)) |
|
|
|
|
|
original_func = func_type(original_ptr.value) |
|
|
pe.close() |
|
|
print(f" Hooked {target_func_name} at IAT RVA={iat_rva:#x}") |
|
|
return original_func, callback |
|
|
|
|
|
pe.close() |
|
|
return None, None |
|
|
|
|
|
|
|
|
def main(): |
|
|
global orig_decrypt, orig_open_alg, orig_set_prop, orig_get_prop |
|
|
global orig_gen_key, orig_import_key, orig_encrypt |
|
|
|
|
|
print("=" * 70) |
|
|
print("EXTENDED BCrypt HOOK - Capturing ALL crypto setup") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
for f in OUTPUT_DIR.glob("decrypt_*.bin"): |
|
|
f.unlink() |
|
|
|
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
|
|
kernel32.SetDllDirectoryW(DLL_DIR) |
|
|
|
|
|
dll_path = os.path.join(DLL_DIR, "oneocr.dll") |
|
|
print(f"Loading: {dll_path}") |
|
|
dll = ctypes.WinDLL(dll_path) |
|
|
|
|
|
dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)] |
|
|
dll.CreateOcrInitOptions.restype = c_int64 |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte] |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64 |
|
|
dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)] |
|
|
dll.CreateOcrPipeline.restype = c_int64 |
|
|
|
|
|
import pefile |
|
|
|
|
|
|
|
|
hooks = [ |
|
|
('bcrypt', 'BCryptOpenAlgorithmProvider', hooked_open_alg, BCRYPT_OPEN_ALG_TYPE), |
|
|
('bcrypt', 'BCryptSetProperty', hooked_set_prop, BCRYPT_SET_PROP_TYPE), |
|
|
('bcrypt', 'BCryptGetProperty', hooked_get_prop, BCRYPT_GET_PROP_TYPE), |
|
|
('bcrypt', 'BCryptGenerateSymmetricKey', hooked_gen_key, BCRYPT_GEN_KEY_TYPE), |
|
|
('bcrypt', 'BCryptImportKey', hooked_import_key, BCRYPT_IMPORT_KEY_TYPE), |
|
|
('bcrypt', 'BCryptEncrypt', hooked_encrypt, BCRYPT_ENCRYPT_TYPE), |
|
|
('bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt, BCRYPT_DECRYPT_TYPE), |
|
|
] |
|
|
|
|
|
originals = {} |
|
|
print("\n--- Installing IAT hooks ---") |
|
|
for target_dll, func_name, hook_func, func_type in hooks: |
|
|
orig, cb = hook_iat_generic(dll._handle, target_dll, func_name, hook_func, func_type) |
|
|
if orig: |
|
|
originals[func_name] = orig |
|
|
_callback_refs.append(cb) |
|
|
else: |
|
|
print(f" WARNING: {func_name} not found in IAT (may not be imported)") |
|
|
|
|
|
orig_open_alg = originals.get('BCryptOpenAlgorithmProvider') |
|
|
orig_set_prop = originals.get('BCryptSetProperty') |
|
|
orig_get_prop = originals.get('BCryptGetProperty') |
|
|
orig_gen_key = originals.get('BCryptGenerateSymmetricKey') |
|
|
orig_import_key = originals.get('BCryptImportKey') |
|
|
orig_encrypt = originals.get('BCryptEncrypt') |
|
|
orig_decrypt = originals.get('BCryptDecrypt') |
|
|
|
|
|
if not orig_decrypt: |
|
|
print("FATAL: Could not hook BCryptDecrypt!") |
|
|
return |
|
|
|
|
|
print("\n--- Creating OCR Pipeline ---") |
|
|
init_options = c_int64() |
|
|
ret = dll.CreateOcrInitOptions(byref(init_options)) |
|
|
print(f"CreateOcrInitOptions: {ret}") |
|
|
|
|
|
ret = dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0) |
|
|
print(f"SetUseModelDelayLoad: {ret}") |
|
|
|
|
|
pipeline = c_int64() |
|
|
model_buf = ctypes.create_string_buffer(MODEL_PATH.encode()) |
|
|
key_buf = ctypes.create_string_buffer(KEY) |
|
|
|
|
|
print(f"\nCalling CreateOcrPipeline...") |
|
|
print(f"Model: {MODEL_PATH}") |
|
|
print(f"Key: {KEY}") |
|
|
print() |
|
|
|
|
|
ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline)) |
|
|
|
|
|
print(f"\nCreateOcrPipeline returned: {ret}") |
|
|
print(f"Pipeline handle: {pipeline.value}") |
|
|
|
|
|
|
|
|
print() |
|
|
print("=" * 70) |
|
|
print("SUMMARY") |
|
|
print("=" * 70) |
|
|
print(f"Key handles tracked: {len(key_handle_to_material)}") |
|
|
for kh, mat in key_handle_to_material.items(): |
|
|
print(f" Handle {kh:#x}: {mat.hex()}") |
|
|
print(f" ASCII: {mat!r}") |
|
|
print(f" Length: {len(mat)}") |
|
|
|
|
|
files = sorted(OUTPUT_DIR.glob("decrypt_*.bin")) |
|
|
if files: |
|
|
print(f"\nSaved {len(files)} decrypted buffers") |
|
|
total = sum(f.stat().st_size for f in files) |
|
|
print(f"Total: {total:,} bytes ({total/1024/1024:.1f} MB)") |
|
|
|
|
|
print("\nDone!") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|