oneocr / _archive /hooks /hook_full_log.py
OneOCR Dev
OneOCR - reverse engineering complete, ONNX pipeline 53% match rate
ce847d4
"""
Full BCrypt hash hook - saves all hash inputs and AES keys to JSON for analysis.
"""
import ctypes
from ctypes import (
c_int64, c_char_p, c_ubyte, POINTER, byref,
c_void_p, c_ulong, WINFUNCTYPE
)
import os
import struct
import json
from pathlib import Path
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump")
OUTPUT_DIR.mkdir(exist_ok=True)
DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data"
MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel")
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4'
# Globals
decrypt_call_num = 0
_callback_refs = []
key_handle_to_material = {}
hash_handle_to_data = {}
alg_handle_to_name = {}
# Collect all crypto operations for JSON output
crypto_log = []
DECRYPT_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_ulong, c_void_p,
c_void_p, c_ulong, c_void_p, c_ulong, POINTER(c_ulong), c_ulong)
OPEN_ALG_T = WINFUNCTYPE(c_ulong, POINTER(c_void_p), c_void_p, c_void_p, c_ulong)
SET_PROP_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_void_p, c_ulong, c_ulong)
GEN_KEY_T = WINFUNCTYPE(c_ulong, c_void_p, POINTER(c_void_p), c_void_p, c_ulong,
c_void_p, c_ulong, c_ulong)
CREATE_HASH_T = WINFUNCTYPE(c_ulong, c_void_p, POINTER(c_void_p), c_void_p, c_ulong,
c_void_p, c_ulong, c_ulong)
HASH_DATA_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_ulong, c_ulong)
FINISH_HASH_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_ulong, c_ulong)
ENCRYPT_T = WINFUNCTYPE(c_ulong, c_void_p, c_void_p, c_ulong, c_void_p,
c_void_p, c_ulong, c_void_p, c_ulong, POINTER(c_ulong), c_ulong)
orig = {}
def read_wstr(ptr):
if not ptr:
return "<null>"
try:
return ctypes.wstring_at(ptr)
except:
return "<err>"
def hooked_open_alg(phAlgorithm, pszAlgId, pszImplementation, dwFlags):
alg_name = read_wstr(pszAlgId)
status = orig['OpenAlgorithmProvider'](phAlgorithm, pszAlgId, pszImplementation, dwFlags)
handle = phAlgorithm[0] if phAlgorithm else None
if handle:
h = handle.value if hasattr(handle, 'value') else handle
alg_handle_to_name[h] = alg_name
return status
def hooked_set_prop(hObject, pszProperty, pbInput, cbInput, dwFlags):
return orig['SetProperty'](hObject, pszProperty, pbInput, cbInput, dwFlags)
def hooked_create_hash(hAlgorithm, phHash, pbHashObject, cbHashObject,
pbSecret, cbSecret, dwFlags):
status = orig['CreateHash'](hAlgorithm, phHash, pbHashObject, cbHashObject,
pbSecret, cbSecret, dwFlags)
hash_handle = phHash[0] if phHash else None
hmac_key = None
if pbSecret and cbSecret > 0:
hmac_key = ctypes.string_at(pbSecret, cbSecret)
hh = hash_handle.value if hasattr(hash_handle, 'value') else hash_handle
hash_handle_to_data[hh] = {'hmac_key': hmac_key, 'data_chunks': []}
return status
def hooked_hash_data(hHash, pbInput, cbInput, dwFlags):
status = orig['HashData'](hHash, pbInput, cbInput, dwFlags)
hh = hHash.value if hasattr(hHash, 'value') else hHash
if pbInput and cbInput > 0:
data = ctypes.string_at(pbInput, cbInput)
if hh in hash_handle_to_data:
hash_handle_to_data[hh]['data_chunks'].append(data)
return status
def hooked_finish_hash(hHash, pbOutput, cbOutput, dwFlags):
status = orig['FinishHash'](hHash, pbOutput, cbOutput, dwFlags)
hh = hHash.value if hasattr(hHash, 'value') else hHash
output = None
if pbOutput and cbOutput > 0:
output = ctypes.string_at(pbOutput, cbOutput)
info = hash_handle_to_data.get(hh)
if info and output:
all_data = b"".join(info['data_chunks'])
crypto_log.append({
'op': 'sha256',
'input': all_data.hex(),
'input_len': len(all_data),
'output': output.hex(),
})
return status
def hooked_gen_key(hAlgorithm, phKey, pbKeyObject, cbKeyObject,
pbSecret, cbSecret, dwFlags):
secret = None
if pbSecret and cbSecret > 0:
secret = ctypes.string_at(pbSecret, cbSecret)
status = orig['GenerateSymmetricKey'](hAlgorithm, phKey, pbKeyObject, cbKeyObject,
pbSecret, cbSecret, dwFlags)
key_handle = phKey[0] if phKey else None
if key_handle and secret:
kh = key_handle.value if hasattr(key_handle, 'value') else key_handle
key_handle_to_material[kh] = secret
return status
def hooked_encrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV,
pbOutput, cbOutput, pcbResult, dwFlags):
status = orig['Encrypt'](hKey, pbInput, cbInput, pPadding, pbIV, cbIV,
pbOutput, cbOutput, pcbResult, dwFlags)
result_size = pcbResult[0] if pcbResult else 0
if cbIV > 0:
iv = ctypes.string_at(pbIV, cbIV) if pbIV else None
enc_in = ctypes.string_at(pbInput, min(cbInput, 32)) if pbInput and cbInput > 0 else None
enc_out = ctypes.string_at(pbOutput, min(result_size, 32)) if pbOutput and result_size > 0 else None
kh = hKey.value if hasattr(hKey, 'value') else hKey
crypto_log.append({
'op': 'encrypt',
'input_size': cbInput,
'output_size': result_size,
'aes_key': key_handle_to_material.get(kh, b'').hex(),
'input_preview': enc_in.hex() if enc_in else None,
'output_preview': enc_out.hex() if enc_out else None,
})
return status
def hooked_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV,
pbOutput, cbOutput, pcbResult, dwFlags):
global decrypt_call_num
status = orig['Decrypt'](hKey, pbInput, cbInput, pPadding,
pbIV, cbIV, pbOutput, cbOutput, pcbResult, dwFlags)
result_size = pcbResult[0] if pcbResult else 0
if cbIV > 0:
call_num = decrypt_call_num
decrypt_call_num += 1
kh = hKey.value if hasattr(hKey, 'value') else hKey
aes_key = key_handle_to_material.get(kh, b'').hex()
dec_data = None
if status == 0 and result_size > 0 and pbOutput:
dec_data = ctypes.string_at(pbOutput, result_size)
fname = OUTPUT_DIR / f"decrypt_{call_num}_in{cbInput}_out{result_size}.bin"
fname.write_bytes(dec_data)
crypto_log.append({
'op': 'decrypt',
'call_num': call_num,
'input_size': cbInput,
'output_size': result_size,
'aes_key': aes_key,
'first_bytes': dec_data[:32].hex() if dec_data else None,
})
return status
def hook_iat(dll_handle, target_dll, func_name, hook_func, func_type):
import pefile
kernel32 = ctypes.windll.kernel32
buf = ctypes.create_unicode_buffer(260)
kernel32.GetModuleFileNameW(ctypes.c_void_p(dll_handle), buf, 260)
pe = pefile.PE(buf.value)
for entry in pe.DIRECTORY_ENTRY_IMPORT:
if target_dll.lower() not in entry.dll.decode('utf-8', errors='ignore').lower():
continue
for imp in entry.imports:
if imp.name and imp.name.decode('utf-8', errors='ignore') == func_name:
iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase
iat_addr = dll_handle + iat_rva
original_ptr = ctypes.c_void_p()
ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8)
callback = func_type(hook_func)
callback_ptr = ctypes.cast(callback, c_void_p).value
old_protect = c_ulong()
kernel32.VirtualProtect(ctypes.c_void_p(iat_addr), 8, 0x04, byref(old_protect))
new_ptr = ctypes.c_void_p(callback_ptr)
ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8)
kernel32.VirtualProtect(ctypes.c_void_p(iat_addr), 8, old_protect.value, byref(old_protect))
original_func = func_type(original_ptr.value)
pe.close()
_callback_refs.append(callback)
return original_func
pe.close()
return None
def main():
print("BCrypt Full Hook - collecting all crypto operations to JSON...")
for f in OUTPUT_DIR.glob("decrypt_*.bin"):
f.unlink()
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
kernel32.SetDllDirectoryW(DLL_DIR)
dll = ctypes.WinDLL(os.path.join(DLL_DIR, "oneocr.dll"))
dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)]
dll.CreateOcrInitOptions.restype = c_int64
dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte]
dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64
dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)]
dll.CreateOcrPipeline.restype = c_int64
import pefile # noqa
hooks = [
('bcrypt', 'BCryptOpenAlgorithmProvider', hooked_open_alg, OPEN_ALG_T),
('bcrypt', 'BCryptSetProperty', hooked_set_prop, SET_PROP_T),
('bcrypt', 'BCryptCreateHash', hooked_create_hash, CREATE_HASH_T),
('bcrypt', 'BCryptHashData', hooked_hash_data, HASH_DATA_T),
('bcrypt', 'BCryptFinishHash', hooked_finish_hash, FINISH_HASH_T),
('bcrypt', 'BCryptGenerateSymmetricKey', hooked_gen_key, GEN_KEY_T),
('bcrypt', 'BCryptEncrypt', hooked_encrypt, ENCRYPT_T),
('bcrypt', 'BCryptDecrypt', hooked_decrypt, DECRYPT_T),
]
for target_dll, func_name, hook_func, func_type in hooks:
o = hook_iat(dll._handle, target_dll, func_name, hook_func, func_type)
if o:
orig[func_name.replace('BCrypt', '')] = o
init_options = c_int64()
dll.CreateOcrInitOptions(byref(init_options))
dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0)
pipeline = c_int64()
ret = dll.CreateOcrPipeline(
ctypes.create_string_buffer(MODEL_PATH.encode()),
ctypes.create_string_buffer(KEY),
init_options, byref(pipeline)
)
print(f"CreateOcrPipeline: {ret}")
print(f"Total crypto ops: {len(crypto_log)}")
print(f"Decrypted chunks: {decrypt_call_num}")
# Save crypto log
out_path = Path("temp/crypto_log.json")
out_path.parent.mkdir(exist_ok=True)
out_path.write_text(json.dumps(crypto_log, indent=2))
print(f"Saved crypto log to {out_path}")
if __name__ == '__main__':
main()