|
|
""" |
|
|
Hook BCrypt hash functions (CreateHash, HashData, FinishHash) to discover |
|
|
the key derivation scheme. Also hook GenerateSymmetricKey and BCryptDecrypt. |
|
|
""" |
|
|
import ctypes |
|
|
from ctypes import ( |
|
|
c_int64, c_char_p, c_ubyte, POINTER, byref, |
|
|
c_void_p, c_ulong, WINFUNCTYPE |
|
|
) |
|
|
import os |
|
|
import struct |
|
|
from pathlib import Path |
|
|
|
|
|
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump") |
|
|
OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data" |
|
|
MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel") |
|
|
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
|
|
|
|
|
|
decrypt_call_num = 0 |
|
|
_callback_refs = [] |
|
|
key_handle_to_material = {} |
|
|
hash_handle_to_data = {} |
|
|
alg_handle_to_name = {} |
|
|
|
|
|
|
|
|
DECRYPT_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_ulong, c_void_p, |
|
|
c_void_p, c_ulong, c_void_p, c_ulong, POINTER(c_ulong), c_ulong) |
|
|
OPEN_ALG_T = WINFUNCTYPE(c_ulong, POINTER(c_void_p), c_void_p, c_void_p, c_ulong) |
|
|
SET_PROP_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_void_p, c_ulong, c_ulong) |
|
|
GEN_KEY_T = WINFUNCTYPE(c_ulong, c_void_p, POINTER(c_void_p), c_void_p, c_ulong, |
|
|
c_void_p, c_ulong, c_ulong) |
|
|
|
|
|
|
|
|
|
|
|
CREATE_HASH_T = WINFUNCTYPE(c_ulong, c_void_p, POINTER(c_void_p), c_void_p, c_ulong, |
|
|
c_void_p, c_ulong, c_ulong) |
|
|
|
|
|
|
|
|
HASH_DATA_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_ulong, c_ulong) |
|
|
|
|
|
|
|
|
FINISH_HASH_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_ulong, c_ulong) |
|
|
|
|
|
|
|
|
orig = {} |
|
|
|
|
|
|
|
|
def read_wstr(ptr): |
|
|
if not ptr: |
|
|
return "<null>" |
|
|
try: |
|
|
return ctypes.wstring_at(ptr) |
|
|
except: |
|
|
return "<err>" |
|
|
|
|
|
|
|
|
def hooked_open_alg(phAlgorithm, pszAlgId, pszImplementation, dwFlags): |
|
|
alg_name = read_wstr(pszAlgId) |
|
|
status = orig['OpenAlgorithmProvider'](phAlgorithm, pszAlgId, pszImplementation, dwFlags) |
|
|
handle = phAlgorithm[0] if phAlgorithm else None |
|
|
if handle: |
|
|
h = handle.value if hasattr(handle, 'value') else handle |
|
|
alg_handle_to_name[h] = alg_name |
|
|
print(f"[OpenAlg] {alg_name!r} -> {status:#010x}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_set_prop(hObject, pszProperty, pbInput, cbInput, dwFlags): |
|
|
prop_name = read_wstr(pszProperty) |
|
|
value = "" |
|
|
if pbInput and cbInput > 0: |
|
|
raw = ctypes.string_at(pbInput, cbInput) |
|
|
try: |
|
|
value = raw.decode('utf-16-le').rstrip('\x00') |
|
|
except: |
|
|
value = raw.hex() |
|
|
if cbInput == 4: |
|
|
value += f" (dword={struct.unpack('<I', raw)[0]})" |
|
|
status = orig['SetProperty'](hObject, pszProperty, pbInput, cbInput, dwFlags) |
|
|
print(f"[SetProp] {prop_name!r} = {value!r} -> {status:#010x}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_create_hash(hAlgorithm, phHash, pbHashObject, cbHashObject, |
|
|
pbSecret, cbSecret, dwFlags): |
|
|
status = orig['CreateHash'](hAlgorithm, phHash, pbHashObject, cbHashObject, |
|
|
pbSecret, cbSecret, dwFlags) |
|
|
hash_handle = phHash[0] if phHash else None |
|
|
|
|
|
hmac_key = None |
|
|
if pbSecret and cbSecret > 0: |
|
|
hmac_key = ctypes.string_at(pbSecret, cbSecret) |
|
|
|
|
|
hh = hash_handle.value if hasattr(hash_handle, 'value') else hash_handle |
|
|
ah = hAlgorithm.value if hasattr(hAlgorithm, 'value') else hAlgorithm |
|
|
alg = alg_handle_to_name.get(ah, "?") |
|
|
|
|
|
hash_handle_to_data[hh] = { |
|
|
'alg': alg, |
|
|
'hmac_key': hmac_key, |
|
|
'data_chunks': [], |
|
|
'total_len': 0, |
|
|
} |
|
|
|
|
|
hmac_info = "" |
|
|
if hmac_key: |
|
|
hmac_info = f" HMAC_KEY={hmac_key.hex()} ({hmac_key!r})" |
|
|
|
|
|
print(f"[CreateHash] alg={alg} hash={hh:#x}{hmac_info} -> {status:#010x}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_hash_data(hHash, pbInput, cbInput, dwFlags): |
|
|
status = orig['HashData'](hHash, pbInput, cbInput, dwFlags) |
|
|
|
|
|
hh = hHash.value if hasattr(hHash, 'value') else hHash |
|
|
data_bytes = None |
|
|
if pbInput and cbInput > 0: |
|
|
data_bytes = ctypes.string_at(pbInput, cbInput) |
|
|
|
|
|
if hh in hash_handle_to_data and data_bytes: |
|
|
info = hash_handle_to_data[hh] |
|
|
info['data_chunks'].append(data_bytes) |
|
|
info['total_len'] += len(data_bytes) |
|
|
|
|
|
|
|
|
data_hex = data_bytes.hex() if data_bytes else "" |
|
|
data_ascii = "" |
|
|
if data_bytes: |
|
|
try: |
|
|
data_ascii = data_bytes.decode('ascii', errors='replace') |
|
|
except: |
|
|
pass |
|
|
preview = data_hex[:128] |
|
|
if len(data_hex) > 128: |
|
|
preview += "..." |
|
|
|
|
|
print(f"[HashData] hash={hh:#x} len={cbInput} data={preview}") |
|
|
if data_ascii and all(32 <= c < 127 or c in (10, 13) for c in (data_bytes or b"")): |
|
|
print(f" ASCII: {data_ascii!r}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_finish_hash(hHash, pbOutput, cbOutput, dwFlags): |
|
|
status = orig['FinishHash'](hHash, pbOutput, cbOutput, dwFlags) |
|
|
|
|
|
hh = hHash.value if hasattr(hHash, 'value') else hHash |
|
|
output = None |
|
|
if pbOutput and cbOutput > 0: |
|
|
output = ctypes.string_at(pbOutput, cbOutput) |
|
|
|
|
|
info = hash_handle_to_data.get(hh) |
|
|
all_data = b"" |
|
|
if info: |
|
|
all_data = b"".join(info['data_chunks']) |
|
|
|
|
|
print(f"[FinishHash] hash={hh:#x} output_len={cbOutput}") |
|
|
if output: |
|
|
print(f" Result: {output.hex()}") |
|
|
if info: |
|
|
print(f" Input was: {info['total_len']} bytes in {len(info['data_chunks'])} chunks") |
|
|
if info['total_len'] <= 256: |
|
|
print(f" Full input: {all_data.hex()}") |
|
|
try: |
|
|
print(f" Input ASCII: {all_data!r}") |
|
|
except: |
|
|
pass |
|
|
if info['hmac_key']: |
|
|
print(f" HMAC key: {info['hmac_key'].hex()}") |
|
|
|
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_gen_key(hAlgorithm, phKey, pbKeyObject, cbKeyObject, |
|
|
pbSecret, cbSecret, dwFlags): |
|
|
secret = None |
|
|
if pbSecret and cbSecret > 0: |
|
|
secret = ctypes.string_at(pbSecret, cbSecret) |
|
|
|
|
|
status = orig['GenerateSymmetricKey'](hAlgorithm, phKey, pbKeyObject, cbKeyObject, |
|
|
pbSecret, cbSecret, dwFlags) |
|
|
|
|
|
key_handle = phKey[0] if phKey else None |
|
|
if key_handle and secret: |
|
|
kh = key_handle.value if hasattr(key_handle, 'value') else key_handle |
|
|
key_handle_to_material[kh] = secret |
|
|
|
|
|
print(f"[GenSymKey] secret_len={cbSecret} -> {status:#010x}") |
|
|
if secret: |
|
|
print(f" Secret: {secret.hex()}") |
|
|
return status |
|
|
|
|
|
|
|
|
def hooked_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV, |
|
|
pbOutput, cbOutput, pcbResult, dwFlags): |
|
|
global decrypt_call_num |
|
|
|
|
|
iv_before = None |
|
|
if pbIV and cbIV > 0: |
|
|
iv_before = ctypes.string_at(pbIV, cbIV) |
|
|
|
|
|
status = orig['Decrypt'](hKey, pbInput, cbInput, pPadding, |
|
|
pbIV, cbIV, pbOutput, cbOutput, pcbResult, dwFlags) |
|
|
|
|
|
result_size = pcbResult[0] if pcbResult else 0 |
|
|
|
|
|
|
|
|
if cbIV > 0: |
|
|
call_num = decrypt_call_num |
|
|
decrypt_call_num += 1 |
|
|
|
|
|
kh = hKey.value if hasattr(hKey, 'value') else hKey |
|
|
known_key = key_handle_to_material.get(kh) |
|
|
|
|
|
print(f"[Decrypt #{call_num}] in={cbInput} out={result_size} iv_len={cbIV}") |
|
|
if iv_before: |
|
|
print(f" IV: {iv_before.hex()}") |
|
|
if known_key: |
|
|
print(f" AES key: {known_key.hex()}") |
|
|
|
|
|
if status == 0 and result_size > 0 and pbOutput: |
|
|
decrypted = ctypes.string_at(pbOutput, result_size) |
|
|
print(f" Decrypted[:32]: {decrypted[:32].hex()}") |
|
|
fname = OUTPUT_DIR / f"decrypt_{call_num}_in{cbInput}_out{result_size}.bin" |
|
|
fname.write_bytes(decrypted) |
|
|
|
|
|
return status |
|
|
|
|
|
|
|
|
def hook_iat(dll_handle, target_dll, func_name, hook_func, func_type): |
|
|
import pefile |
|
|
kernel32 = ctypes.windll.kernel32 |
|
|
buf = ctypes.create_unicode_buffer(260) |
|
|
kernel32.GetModuleFileNameW(ctypes.c_void_p(dll_handle), buf, 260) |
|
|
pe = pefile.PE(buf.value) |
|
|
|
|
|
for entry in pe.DIRECTORY_ENTRY_IMPORT: |
|
|
if target_dll.lower() not in entry.dll.decode('utf-8', errors='ignore').lower(): |
|
|
continue |
|
|
for imp in entry.imports: |
|
|
if imp.name and imp.name.decode('utf-8', errors='ignore') == func_name: |
|
|
iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase |
|
|
iat_addr = dll_handle + iat_rva |
|
|
|
|
|
original_ptr = ctypes.c_void_p() |
|
|
ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8) |
|
|
|
|
|
callback = func_type(hook_func) |
|
|
callback_ptr = ctypes.cast(callback, c_void_p).value |
|
|
|
|
|
old_protect = c_ulong() |
|
|
kernel32.VirtualProtect(ctypes.c_void_p(iat_addr), 8, 0x04, byref(old_protect)) |
|
|
new_ptr = ctypes.c_void_p(callback_ptr) |
|
|
ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8) |
|
|
kernel32.VirtualProtect(ctypes.c_void_p(iat_addr), 8, old_protect.value, byref(old_protect)) |
|
|
|
|
|
original_func = func_type(original_ptr.value) |
|
|
pe.close() |
|
|
_callback_refs.append(callback) |
|
|
return original_func |
|
|
|
|
|
pe.close() |
|
|
return None |
|
|
|
|
|
|
|
|
def main(): |
|
|
print("=" * 70) |
|
|
print("BCrypt HASH HOOK - Discover SHA256 key derivation input") |
|
|
print("=" * 70) |
|
|
|
|
|
for f in OUTPUT_DIR.glob("decrypt_*.bin"): |
|
|
f.unlink() |
|
|
|
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
|
|
kernel32.SetDllDirectoryW(DLL_DIR) |
|
|
|
|
|
dll_path = os.path.join(DLL_DIR, "oneocr.dll") |
|
|
print(f"Loading: {dll_path}") |
|
|
dll = ctypes.WinDLL(dll_path) |
|
|
|
|
|
dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)] |
|
|
dll.CreateOcrInitOptions.restype = c_int64 |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte] |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64 |
|
|
dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)] |
|
|
dll.CreateOcrPipeline.restype = c_int64 |
|
|
|
|
|
import pefile |
|
|
|
|
|
hooks = [ |
|
|
('bcrypt', 'BCryptOpenAlgorithmProvider', hooked_open_alg, OPEN_ALG_T), |
|
|
('bcrypt', 'BCryptSetProperty', hooked_set_prop, SET_PROP_T), |
|
|
('bcrypt', 'BCryptCreateHash', hooked_create_hash, CREATE_HASH_T), |
|
|
('bcrypt', 'BCryptHashData', hooked_hash_data, HASH_DATA_T), |
|
|
('bcrypt', 'BCryptFinishHash', hooked_finish_hash, FINISH_HASH_T), |
|
|
('bcrypt', 'BCryptGenerateSymmetricKey', hooked_gen_key, GEN_KEY_T), |
|
|
('bcrypt', 'BCryptDecrypt', hooked_decrypt, DECRYPT_T), |
|
|
] |
|
|
|
|
|
print("\n--- Installing hooks ---") |
|
|
for target_dll, func_name, hook_func, func_type in hooks: |
|
|
o = hook_iat(dll._handle, target_dll, func_name, hook_func, func_type) |
|
|
if o: |
|
|
short = func_name.replace('BCrypt', '') |
|
|
orig[short] = o |
|
|
print(f" OK: {func_name}") |
|
|
else: |
|
|
print(f" FAIL: {func_name}") |
|
|
|
|
|
print("\n--- Creating OCR Pipeline (triggers crypto) ---") |
|
|
init_options = c_int64() |
|
|
dll.CreateOcrInitOptions(byref(init_options)) |
|
|
dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0) |
|
|
|
|
|
pipeline = c_int64() |
|
|
model_buf = ctypes.create_string_buffer(MODEL_PATH.encode()) |
|
|
key_buf = ctypes.create_string_buffer(KEY) |
|
|
|
|
|
print(f"Model: {MODEL_PATH}") |
|
|
print(f"Key: {KEY}") |
|
|
print() |
|
|
|
|
|
ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline)) |
|
|
print(f"\nCreateOcrPipeline: {ret}") |
|
|
|
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("KEY DERIVATION SUMMARY") |
|
|
print("=" * 70) |
|
|
print(f"Unique derived keys: {len(key_handle_to_material)}") |
|
|
print(f"Hash operations tracked: {len(hash_handle_to_data)}") |
|
|
print(f"Decrypted chunks: {decrypt_call_num}") |
|
|
print("\nDone!") |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
main() |
|
|
|