|
|
""" |
|
|
Static decryptor for OneOCR .onemodel files using BCrypt CNG API. |
|
|
Finds chunk boundaries by re-encrypting known plaintext patterns. |
|
|
Works on Windows only (BCrypt CNG). For Linux, use the hook-based approach. |
|
|
|
|
|
Usage: python static_decrypt.py [model_path] [-o output_dir] |
|
|
""" |
|
|
import ctypes |
|
|
import ctypes.wintypes as wt |
|
|
from ctypes import c_void_p, c_ulong, POINTER, byref |
|
|
import struct |
|
|
import sys |
|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
IV = b"Copyright @ OneO" |
|
|
CONTAINER_HEADER = bytes.fromhex("4a1a082b25000000") |
|
|
ONNX_VALID_FIELDS = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20} |
|
|
|
|
|
|
|
|
BCRYPT_AES = "AES\0".encode('utf-16-le') |
|
|
BCRYPT_CHAINING_MODE = "ChainingMode\0".encode('utf-16-le') |
|
|
BCRYPT_CHAIN_MODE_CFB = "ChainingModeCFB\0".encode('utf-16-le') |
|
|
|
|
|
bcrypt = ctypes.windll.bcrypt |
|
|
|
|
|
|
|
|
class BCRYPT_KEY_DATA_BLOB_HEADER(ctypes.Structure): |
|
|
_fields_ = [ |
|
|
("dwMagic", c_ulong), |
|
|
("dwVersion", c_ulong), |
|
|
("cbKeyData", c_ulong), |
|
|
] |
|
|
|
|
|
|
|
|
def setup_bcrypt(): |
|
|
hAlg = c_void_p() |
|
|
assert bcrypt.BCryptOpenAlgorithmProvider(byref(hAlg), BCRYPT_AES, None, 0) == 0 |
|
|
assert bcrypt.BCryptSetProperty(hAlg, BCRYPT_CHAINING_MODE, |
|
|
BCRYPT_CHAIN_MODE_CFB, len(BCRYPT_CHAIN_MODE_CFB), 0) == 0 |
|
|
header = BCRYPT_KEY_DATA_BLOB_HEADER(dwMagic=0x4d42444b, dwVersion=1, cbKeyData=len(KEY)) |
|
|
blob = bytes(header) + KEY |
|
|
hKey = c_void_p() |
|
|
assert bcrypt.BCryptGenerateSymmetricKey(hAlg, byref(hKey), None, 0, blob, len(blob), 0) == 0 |
|
|
return hAlg, hKey |
|
|
|
|
|
|
|
|
def bcrypt_op(hKey, data, encrypt=False): |
|
|
"""Encrypt or decrypt data using BCrypt AES-CFB with fresh IV.""" |
|
|
iv = bytearray(IV) |
|
|
func = bcrypt.BCryptEncrypt if encrypt else bcrypt.BCryptDecrypt |
|
|
result_size = c_ulong(0) |
|
|
func(hKey, data, len(data), None, None, 0, None, 0, byref(result_size), 0) |
|
|
output = (ctypes.c_ubyte * result_size.value)() |
|
|
actual = c_ulong(0) |
|
|
status = func(hKey, data, len(data), None, |
|
|
(ctypes.c_ubyte * len(iv))(*iv), len(iv), |
|
|
output, result_size.value, byref(actual), 0) |
|
|
assert status == 0, f"BCrypt op failed: {status:#x}" |
|
|
return bytes(output[:actual.value]) |
|
|
|
|
|
|
|
|
def read_varint(data, pos): |
|
|
val = 0; shift = 0 |
|
|
while pos < len(data): |
|
|
b = data[pos]; pos += 1 |
|
|
val |= (b & 0x7f) << shift |
|
|
if not (b & 0x80): break |
|
|
shift += 7 |
|
|
return val, pos |
|
|
|
|
|
|
|
|
def measure_onnx(data): |
|
|
pos = 0; last = 0 |
|
|
while pos < len(data): |
|
|
start = pos |
|
|
tag, pos = read_varint(data, pos) |
|
|
if pos > len(data): break |
|
|
fn = tag >> 3; wt = tag & 7 |
|
|
if fn not in ONNX_VALID_FIELDS: return start |
|
|
if wt == 0: _, pos = read_varint(data, pos) |
|
|
elif wt == 1: pos += 8 |
|
|
elif wt == 2: l, pos = read_varint(data, pos); pos += l |
|
|
elif wt == 5: pos += 4 |
|
|
else: return start |
|
|
if pos > len(data): return start |
|
|
last = pos |
|
|
return last |
|
|
|
|
|
|
|
|
def main(): |
|
|
import argparse |
|
|
parser = argparse.ArgumentParser(description="OneOCR .onemodel decryptor (Windows BCrypt)") |
|
|
parser.add_argument("model_path", nargs="?", default="ocr_data/oneocr.onemodel") |
|
|
parser.add_argument("-o", "--output", default="onnx_models_static") |
|
|
args = parser.parse_args() |
|
|
|
|
|
model_path = Path(args.model_path) |
|
|
output_dir = Path(args.output) |
|
|
output_dir.mkdir(exist_ok=True, parents=True) |
|
|
for old in output_dir.glob("*"): old.unlink() |
|
|
|
|
|
data = model_path.read_bytes() |
|
|
print(f"{'='*70}") |
|
|
print(f"OneOCR Static Decryptor (BCrypt CNG)") |
|
|
print(f"{'='*70}") |
|
|
print(f"File: {model_path} ({len(data):,} bytes)") |
|
|
|
|
|
hAlg, hKey = setup_bcrypt() |
|
|
print(f"AES-256-CFB initialized") |
|
|
|
|
|
|
|
|
dx_offset = 24 |
|
|
dx_size = 22624 |
|
|
dx_dec = bcrypt_op(hKey, data[dx_offset:dx_offset + dx_size]) |
|
|
print(f"\nDX index: starts with {dx_dec[:2].hex()}") |
|
|
assert dx_dec[:2] == b'DX', f"DX header not found! Got: {dx_dec[:8].hex()}" |
|
|
(output_dir / "dx_index.bin").write_bytes(dx_dec) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dx_values = [] |
|
|
for i in range(0, len(dx_dec) - 7, 8): |
|
|
v = struct.unpack_from('<Q', dx_dec, i)[0] |
|
|
if v > 0 and v < len(data): |
|
|
dx_values.append((i, v)) |
|
|
|
|
|
|
|
|
|
|
|
payload_start = dx_offset + dx_size + 36 |
|
|
|
|
|
print(f"\n--- Scanning payload for encrypted chunks ---") |
|
|
print(f"Payload starts at offset {payload_start}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"\nSearching for chunk boundaries by trial decryption...") |
|
|
|
|
|
|
|
|
|
|
|
magic_encrypted = bcrypt_op(hKey, CONTAINER_HEADER, encrypt=True) |
|
|
print(f"Container magic encrypted: {magic_encrypted.hex()}") |
|
|
|
|
|
|
|
|
chunk_starts = [] |
|
|
search_start = payload_start |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while search_start < len(data) - 16: |
|
|
idx = data.find(magic_encrypted[:8], search_start) |
|
|
if idx < 0: |
|
|
break |
|
|
|
|
|
test = bcrypt_op(hKey, data[idx:idx+16]) |
|
|
if test[:8] == CONTAINER_HEADER: |
|
|
chunk_starts.append(idx) |
|
|
search_start = idx + 1 |
|
|
else: |
|
|
search_start = idx + 1 |
|
|
|
|
|
print(f"Found {len(chunk_starts)} potential chunk starts") |
|
|
|
|
|
if not chunk_starts: |
|
|
|
|
|
print("No chunk starts found via magic pattern. Trying sequential...") |
|
|
|
|
|
remaining = len(data) - payload_start |
|
|
dec = bcrypt_op(hKey, data[payload_start:payload_start + remaining]) |
|
|
|
|
|
|
|
|
pos = 0 |
|
|
chunks_data = [] |
|
|
while True: |
|
|
idx = dec.find(CONTAINER_HEADER, pos) |
|
|
if idx < 0: |
|
|
|
|
|
if pos < len(dec): |
|
|
chunks_data.append(dec[pos:]) |
|
|
break |
|
|
if idx > pos: |
|
|
chunks_data.append(dec[pos:idx]) |
|
|
pos = idx |
|
|
|
|
|
next_idx = dec.find(CONTAINER_HEADER, pos + 8) |
|
|
if next_idx < 0: |
|
|
chunks_data.append(dec[pos:]) |
|
|
break |
|
|
chunks_data.append(dec[pos:next_idx]) |
|
|
pos = next_idx |
|
|
|
|
|
print(f"Found {len(chunks_data)} chunks in sequential decryption") |
|
|
else: |
|
|
|
|
|
chunk_starts.sort() |
|
|
chunks_data = [] |
|
|
for i, start in enumerate(chunk_starts): |
|
|
end = chunk_starts[i + 1] if i + 1 < len(chunk_starts) else len(data) |
|
|
encrypted = data[start:end] |
|
|
try: |
|
|
dec = bcrypt_op(hKey, encrypted) |
|
|
chunks_data.append(dec) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
print(f"\n--- Extracting ONNX models ---") |
|
|
models = [] |
|
|
data_files = [] |
|
|
|
|
|
for chunk in chunks_data: |
|
|
if chunk[:8] == CONTAINER_HEADER: |
|
|
payload = chunk[8:] |
|
|
else: |
|
|
payload = chunk |
|
|
|
|
|
if len(payload) >= 2 and payload[0] == 0x08 and 1 <= payload[1] <= 12: |
|
|
valid_len = measure_onnx(payload) |
|
|
onnx_data = payload[:valid_len] |
|
|
if valid_len < 100: |
|
|
continue |
|
|
|
|
|
producer = "unknown" |
|
|
if b"PyTorch" in payload[:100]: producer = "pytorch" |
|
|
elif b"onnx.quantize" in payload[:100]: producer = "onnx_quantize" |
|
|
elif b"pytorch" in payload[:100]: producer = "pytorch_small" |
|
|
|
|
|
ir = payload[1] |
|
|
idx = len(models) |
|
|
fname = f"model_{idx:02d}_ir{ir}_{producer}_{valid_len//1024}KB.onnx" |
|
|
(output_dir / fname).write_bytes(onnx_data) |
|
|
models.append({'name': fname, 'size': valid_len}) |
|
|
print(f" ONNX: {fname} ({valid_len:,} bytes)") |
|
|
elif len(payload) > 100: |
|
|
preview = payload[:30].decode('utf-8', errors='replace') |
|
|
idx = len(data_files) |
|
|
fname = f"data_{idx:02d}_{len(payload)}B.bin" |
|
|
(output_dir / fname).write_bytes(payload) |
|
|
data_files.append({'name': fname, 'size': len(payload)}) |
|
|
print(f" Data: {fname} ({len(payload):,} bytes) {preview[:30]!r}") |
|
|
|
|
|
|
|
|
print(f"\n{'='*70}") |
|
|
print(f"EXTRACTION COMPLETE") |
|
|
print(f"{'='*70}") |
|
|
print(f"ONNX models: {len(models)}") |
|
|
print(f"Data files: {len(data_files)}") |
|
|
if models: |
|
|
total = sum(m['size'] for m in models) |
|
|
print(f"Total ONNX: {total:,} bytes ({total/1024/1024:.1f} MB)") |
|
|
|
|
|
|
|
|
try: |
|
|
import onnx |
|
|
ok = sum(1 for m in models if not _try_load(onnx, output_dir / m['name'])) |
|
|
ok = 0 |
|
|
for m in models: |
|
|
try: |
|
|
onnx.load(str(output_dir / m['name'])) |
|
|
ok += 1 |
|
|
except: |
|
|
pass |
|
|
print(f"Verified with onnx.load: {ok}/{len(models)}") |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
bcrypt.BCryptDestroyKey(hKey) |
|
|
bcrypt.BCryptCloseAlgorithmProvider(hAlg, 0) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|