File size: 11,861 Bytes
ce847d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
"""
Hook BCryptDecrypt using ctypes in-process hooking via DLL detour.
Instead of Frida, we directly hook BCryptDecrypt's IAT entry in oneocr.dll.
"""
import ctypes
import ctypes.wintypes as wt
from ctypes import (
c_int64, c_char_p, c_ubyte, POINTER, byref, Structure,
c_void_p, c_ulong, c_int32, WINFUNCTYPE, CFUNCTYPE, c_uint8
)
import os
import sys
import struct
from pathlib import Path
OUTPUT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\frida_dump")
OUTPUT_DIR.mkdir(exist_ok=True)
DLL_DIR = r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\ocr_data"
MODEL_PATH = os.path.join(DLL_DIR, "oneocr.onemodel")
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4'
# ── Globals to collect intercepted data ──
intercepted_calls = []
decrypt_call_num = 0
# ── BCryptDecrypt signature ──
# NTSTATUS BCryptDecrypt(BCRYPT_KEY_HANDLE, PUCHAR pbInput, ULONG cbInput,
# VOID* pPadding, PUCHAR pbIV, ULONG cbIV, PUCHAR pbOutput,
# ULONG cbOutput, ULONG* pcbResult, ULONG dwFlags)
BCRYPT_DECRYPT_TYPE = WINFUNCTYPE(
c_ulong, # NTSTATUS return
c_void_p, # hKey
c_void_p, # pbInput
c_ulong, # cbInput
c_void_p, # pPaddingInfo
c_void_p, # pbIV
c_ulong, # cbIV
c_void_p, # pbOutput
c_ulong, # cbOutput
POINTER(c_ulong), # pcbResult
c_ulong, # dwFlags
)
# Store original function
original_bcrypt_decrypt = None
def hooked_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding, pbIV, cbIV,
pbOutput, cbOutput, pcbResult, dwFlags):
"""Our hook that intercepts BCryptDecrypt calls."""
global decrypt_call_num
call_num = decrypt_call_num
decrypt_call_num += 1
# Read IV before the call (it may be modified)
iv_before = None
if pbIV and cbIV > 0:
try:
iv_before = ctypes.string_at(pbIV, cbIV)
except:
pass
# Read encrypted input BEFORE the call
encrypted_input = None
if pbInput and cbInput > 0:
try:
encrypted_input = ctypes.string_at(pbInput, min(cbInput, 64))
except:
pass
# Call original
status = original_bcrypt_decrypt(hKey, pbInput, cbInput, pPadding,
pbIV, cbIV, pbOutput, cbOutput,
pcbResult, dwFlags)
# Get result size
result_size = 0
if pcbResult:
result_size = pcbResult[0]
# Read IV after (CFB mode modifies the IV)
iv_after = None
if pbIV and cbIV > 0:
try:
iv_after = ctypes.string_at(pbIV, cbIV)
except:
pass
info = {
'call': call_num,
'status': status,
'cbInput': cbInput,
'cbIV': cbIV,
'cbOutput': result_size,
'dwFlags': dwFlags,
'iv_before': iv_before.hex() if iv_before else None,
'iv_after': iv_after.hex() if iv_after else None,
}
print(f"[BCryptDecrypt #{call_num}] status={status:#x} "
f"in={cbInput} out={result_size} iv_len={cbIV} flags={dwFlags}")
if encrypted_input:
print(f" Encrypted input[:32]: {encrypted_input[:32].hex()}")
print(f" pbInput addr: {pbInput:#x}")
if iv_before:
print(f" IV before: {iv_before.hex()}")
if iv_after and iv_after != iv_before:
print(f" IV after: {iv_after.hex()}")
# Save decrypted data
if status == 0 and result_size > 0 and pbOutput:
try:
decrypted = ctypes.string_at(pbOutput, result_size)
# Check for magic number
if len(decrypted) >= 4:
magic = struct.unpack('<I', decrypted[:4])[0]
info['magic'] = magic
print(f" Magic: {magic} | First 32 bytes: {decrypted[:32].hex()}")
if magic == 1:
print(f" *** MAGIC NUMBER == 1 FOUND! ***")
# Save to file
fname = OUTPUT_DIR / f"decrypt_{call_num}_in{cbInput}_out{result_size}.bin"
fname.write_bytes(decrypted)
print(f" -> Saved: {fname.name} ({result_size:,} bytes)")
except Exception as e:
print(f" Error reading output: {e}")
intercepted_calls.append(info)
return status
def hook_iat(dll_handle, target_dll_name, target_func_name, hook_func):
"""
Hook a function by patching the Import Address Table (IAT) of a DLL.
Returns the original function pointer.
"""
import pefile
# Get the DLL file path
kernel32 = ctypes.windll.kernel32
buf = ctypes.create_unicode_buffer(260)
h = ctypes.c_void_p(dll_handle)
kernel32.GetModuleFileNameW(h, buf, 260)
dll_path = buf.value
print(f"Analyzing IAT of: {dll_path}")
pe = pefile.PE(dll_path)
# Find the import
base_addr = dll_handle
if hasattr(dll_handle, '_handle'):
base_addr = dll_handle._handle
for entry in pe.DIRECTORY_ENTRY_IMPORT:
import_name = entry.dll.decode('utf-8', errors='ignore').lower()
if target_dll_name.lower() not in import_name:
continue
for imp in entry.imports:
if imp.name and imp.name.decode('utf-8', errors='ignore') == target_func_name:
# Found it! The IAT entry is at base_addr + imp.address - pe.OPTIONAL_HEADER.ImageBase
iat_rva = imp.address - pe.OPTIONAL_HEADER.ImageBase
iat_addr = base_addr + iat_rva
print(f"Found {target_func_name} in IAT at RVA={iat_rva:#x}, "
f"VA={iat_addr:#x}")
# Read current value (original function pointer)
original_ptr = ctypes.c_void_p()
ctypes.memmove(ctypes.byref(original_ptr), iat_addr, 8)
print(f"Original function pointer: {original_ptr.value:#x}")
# Create callback
callback = BCRYPT_DECRYPT_TYPE(hook_func)
callback_ptr = ctypes.cast(callback, c_void_p).value
# Make IAT page writable
old_protect = c_ulong()
PAGE_READWRITE = 0x04
kernel32.VirtualProtect(
ctypes.c_void_p(iat_addr), 8,
PAGE_READWRITE, ctypes.byref(old_protect)
)
# Patch IAT
new_ptr = ctypes.c_void_p(callback_ptr)
ctypes.memmove(iat_addr, ctypes.byref(new_ptr), 8)
# Restore protection
kernel32.VirtualProtect(
ctypes.c_void_p(iat_addr), 8,
old_protect.value, ctypes.byref(old_protect)
)
print(f"IAT patched! New function pointer: {callback_ptr:#x}")
# Create callable from original
original_func = BCRYPT_DECRYPT_TYPE(original_ptr.value)
pe.close()
return original_func, callback # Return both to prevent GC
pe.close()
return None, None
def main():
global original_bcrypt_decrypt
print("=" * 70)
print("IN-PROCESS BCryptDecrypt HOOKING")
print("=" * 70)
# Load DLL
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
kernel32.SetDllDirectoryW(DLL_DIR)
dll_path = os.path.join(DLL_DIR, "oneocr.dll")
print(f"Loading: {dll_path}")
dll = ctypes.WinDLL(dll_path)
# Setup function types
dll.CreateOcrInitOptions.argtypes = [POINTER(c_int64)]
dll.CreateOcrInitOptions.restype = c_int64
dll.OcrInitOptionsSetUseModelDelayLoad.argtypes = [c_int64, c_ubyte]
dll.OcrInitOptionsSetUseModelDelayLoad.restype = c_int64
dll.CreateOcrPipeline.argtypes = [c_char_p, c_char_p, c_int64, POINTER(c_int64)]
dll.CreateOcrPipeline.restype = c_int64
# Try approach 1: Direct BCryptDecrypt function pointer replacement
print("\n--- Setting up BCryptDecrypt hook ---")
# Get the real BCryptDecrypt
bcrypt_dll = ctypes.WinDLL("bcrypt")
real_decrypt_addr = ctypes.cast(
bcrypt_dll.BCryptDecrypt, c_void_p
).value
print(f"Real BCryptDecrypt address: {real_decrypt_addr:#x}")
# Instead of IAT patching, let's use a simpler approach:
# We'll call BCryptDecrypt ourselves to first get a "sizing" call,
# then intercept the actual decrypt.
# Actually, the simplest approach: use a manual detour
# But let's try IAT patching first if pefile is available
try:
import pefile
print("pefile available, trying IAT hook...")
original_bcrypt_decrypt_func, callback_ref = hook_iat(
dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt
)
if original_bcrypt_decrypt_func:
original_bcrypt_decrypt = original_bcrypt_decrypt_func
print("IAT hook installed successfully!")
else:
raise Exception("IAT hook failed - function not found in imports")
except ImportError:
print("pefile not available, installing...")
os.system("uv pip install pefile")
import pefile
original_bcrypt_decrypt_func, callback_ref = hook_iat(
dll._handle, 'bcrypt', 'BCryptDecrypt', hooked_bcrypt_decrypt
)
if original_bcrypt_decrypt_func:
original_bcrypt_decrypt = original_bcrypt_decrypt_func
else:
print("ERROR: Could not hook BCryptDecrypt")
return
# Now create the pipeline - this will trigger decryption via our hook
print("\n--- Creating OCR Pipeline (will trigger BCryptDecrypt) ---")
init_options = c_int64()
ret = dll.CreateOcrInitOptions(byref(init_options))
print(f"CreateOcrInitOptions: {ret}")
ret = dll.OcrInitOptionsSetUseModelDelayLoad(init_options, 0)
print(f"SetUseModelDelayLoad: {ret}")
pipeline = c_int64()
model_buf = ctypes.create_string_buffer(MODEL_PATH.encode())
key_buf = ctypes.create_string_buffer(KEY)
print(f"\nCalling CreateOcrPipeline...")
print(f"Model: {MODEL_PATH}")
print(f"Key: {KEY}")
print()
ret = dll.CreateOcrPipeline(model_buf, key_buf, init_options, byref(pipeline))
print(f"\nCreateOcrPipeline returned: {ret}")
print(f"Pipeline handle: {pipeline.value}")
# Summary
print()
print("=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"Total BCryptDecrypt calls intercepted: {len(intercepted_calls)}")
magic_1_files = []
for info in intercepted_calls:
if info.get('magic') == 1:
magic_1_files.append(info)
if magic_1_files:
print(f"\n*** Found {len(magic_1_files)} calls with magic_number == 1! ***")
for info in magic_1_files:
print(f" Call #{info['call']}: input={info['cbInput']:,}, "
f"output={info['cbOutput']:,}")
# List saved files
if OUTPUT_DIR.exists():
files = sorted(OUTPUT_DIR.glob("decrypt_*.bin"))
if files:
print(f"\nSaved {len(files)} decrypted buffers:")
total = 0
for f in files:
sz = f.stat().st_size
total += sz
header = open(f, 'rb').read(4)
magic = struct.unpack('<I', header)[0] if len(header) >= 4 else -1
marker = " *** MAGIC=1 ***" if magic == 1 else ""
print(f" {f.name}: {sz:,} bytes (magic={magic}){marker}")
print(f"Total: {total:,} bytes ({total/1024/1024:.1f} MB)")
print("\nDone!")
if __name__ == '__main__':
main()
|