File size: 10,784 Bytes
ce847d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
"""
Static decryptor for OneOCR .onemodel files using BCrypt CNG API.
Finds chunk boundaries by re-encrypting known plaintext patterns.
Works on Windows only (BCrypt CNG). For Linux, use the hook-based approach.
Usage: python static_decrypt.py [model_path] [-o output_dir]
"""
import ctypes
import ctypes.wintypes as wt
from ctypes import c_void_p, c_ulong, POINTER, byref
import struct
import sys
import os
from pathlib import Path
# ═══════════════════════════════════════════════════════════════
# CRYPTO PARAMETERS (discovered via IAT hook interception)
# ═══════════════════════════════════════════════════════════════
KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4'
IV = b"Copyright @ OneO"
CONTAINER_HEADER = bytes.fromhex("4a1a082b25000000")
ONNX_VALID_FIELDS = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20}
# BCrypt constants
BCRYPT_AES = "AES\0".encode('utf-16-le')
BCRYPT_CHAINING_MODE = "ChainingMode\0".encode('utf-16-le')
BCRYPT_CHAIN_MODE_CFB = "ChainingModeCFB\0".encode('utf-16-le')
bcrypt = ctypes.windll.bcrypt
class BCRYPT_KEY_DATA_BLOB_HEADER(ctypes.Structure):
_fields_ = [
("dwMagic", c_ulong),
("dwVersion", c_ulong),
("cbKeyData", c_ulong),
]
def setup_bcrypt():
hAlg = c_void_p()
assert bcrypt.BCryptOpenAlgorithmProvider(byref(hAlg), BCRYPT_AES, None, 0) == 0
assert bcrypt.BCryptSetProperty(hAlg, BCRYPT_CHAINING_MODE,
BCRYPT_CHAIN_MODE_CFB, len(BCRYPT_CHAIN_MODE_CFB), 0) == 0
header = BCRYPT_KEY_DATA_BLOB_HEADER(dwMagic=0x4d42444b, dwVersion=1, cbKeyData=len(KEY))
blob = bytes(header) + KEY
hKey = c_void_p()
assert bcrypt.BCryptGenerateSymmetricKey(hAlg, byref(hKey), None, 0, blob, len(blob), 0) == 0
return hAlg, hKey
def bcrypt_op(hKey, data, encrypt=False):
"""Encrypt or decrypt data using BCrypt AES-CFB with fresh IV."""
iv = bytearray(IV)
func = bcrypt.BCryptEncrypt if encrypt else bcrypt.BCryptDecrypt
result_size = c_ulong(0)
func(hKey, data, len(data), None, None, 0, None, 0, byref(result_size), 0)
output = (ctypes.c_ubyte * result_size.value)()
actual = c_ulong(0)
status = func(hKey, data, len(data), None,
(ctypes.c_ubyte * len(iv))(*iv), len(iv),
output, result_size.value, byref(actual), 0)
assert status == 0, f"BCrypt op failed: {status:#x}"
return bytes(output[:actual.value])
def read_varint(data, pos):
val = 0; shift = 0
while pos < len(data):
b = data[pos]; pos += 1
val |= (b & 0x7f) << shift
if not (b & 0x80): break
shift += 7
return val, pos
def measure_onnx(data):
pos = 0; last = 0
while pos < len(data):
start = pos
tag, pos = read_varint(data, pos)
if pos > len(data): break
fn = tag >> 3; wt = tag & 7
if fn not in ONNX_VALID_FIELDS: return start
if wt == 0: _, pos = read_varint(data, pos)
elif wt == 1: pos += 8
elif wt == 2: l, pos = read_varint(data, pos); pos += l
elif wt == 5: pos += 4
else: return start
if pos > len(data): return start
last = pos
return last
def main():
import argparse
parser = argparse.ArgumentParser(description="OneOCR .onemodel decryptor (Windows BCrypt)")
parser.add_argument("model_path", nargs="?", default="ocr_data/oneocr.onemodel")
parser.add_argument("-o", "--output", default="onnx_models_static")
args = parser.parse_args()
model_path = Path(args.model_path)
output_dir = Path(args.output)
output_dir.mkdir(exist_ok=True, parents=True)
for old in output_dir.glob("*"): old.unlink()
data = model_path.read_bytes()
print(f"{'='*70}")
print(f"OneOCR Static Decryptor (BCrypt CNG)")
print(f"{'='*70}")
print(f"File: {model_path} ({len(data):,} bytes)")
hAlg, hKey = setup_bcrypt()
print(f"AES-256-CFB initialized")
# Step 1: Decrypt DX index (offset 24, size 22624)
dx_offset = 24
dx_size = 22624
dx_dec = bcrypt_op(hKey, data[dx_offset:dx_offset + dx_size])
print(f"\nDX index: starts with {dx_dec[:2].hex()}")
assert dx_dec[:2] == b'DX', f"DX header not found! Got: {dx_dec[:8].hex()}"
(output_dir / "dx_index.bin").write_bytes(dx_dec)
# Step 2: Parse DX to find embedded chunks
# DX contains sub-chunks that need independent decryption
# We'll also find main payload chunks by scanning the file
# The DX contains a list of uint64 values that might be chunk sizes/offsets
dx_values = []
for i in range(0, len(dx_dec) - 7, 8):
v = struct.unpack_from('<Q', dx_dec, i)[0]
if v > 0 and v < len(data):
dx_values.append((i, v))
# Step 3: Try to decrypt every possible chunk in the payload area
# Payload starts after DX (offset 22648) + 36 bytes gap = 22684
payload_start = dx_offset + dx_size + 36
print(f"\n--- Scanning payload for encrypted chunks ---")
print(f"Payload starts at offset {payload_start}")
# Strategy: try decrypting at current offset, check if result starts
# with container magic. If yes, extract chunk, determine its size
# from the DX index or by scanning forward.
# Known chunk sizes from the DX index analysis:
# We know the DX has entries like 11943, 11903, 11927 etc.
# And the main payload has large ONNX models.
# Let's try a different approach: scan the encrypted file for positions
# where decryption produces valid container magic
print(f"\nSearching for chunk boundaries by trial decryption...")
# The container magic `4a1a082b25000000` after decryption = specific encrypted pattern
# Compute what the container magic encrypts TO:
magic_encrypted = bcrypt_op(hKey, CONTAINER_HEADER, encrypt=True)
print(f"Container magic encrypted: {magic_encrypted.hex()}")
# Search for this pattern in the payload area
chunk_starts = []
search_start = payload_start
# Also check DX sub-chunks
# First, find container magic encryptions within the DX encrypted data
while search_start < len(data) - 16:
idx = data.find(magic_encrypted[:8], search_start)
if idx < 0:
break
# Verify by decrypting 16 bytes
test = bcrypt_op(hKey, data[idx:idx+16])
if test[:8] == CONTAINER_HEADER:
chunk_starts.append(idx)
search_start = idx + 1
else:
search_start = idx + 1
print(f"Found {len(chunk_starts)} potential chunk starts")
if not chunk_starts:
# Fallback: just try sequential decryption
print("No chunk starts found via magic pattern. Trying sequential...")
# Try decrypting from payload_start with large block sizes
remaining = len(data) - payload_start
dec = bcrypt_op(hKey, data[payload_start:payload_start + remaining])
# Find container magic in decrypted data
pos = 0
chunks_data = []
while True:
idx = dec.find(CONTAINER_HEADER, pos)
if idx < 0:
# Handle remaining data
if pos < len(dec):
chunks_data.append(dec[pos:])
break
if idx > pos:
chunks_data.append(dec[pos:idx])
pos = idx # Will be split on next iteration
# Find next occurrence
next_idx = dec.find(CONTAINER_HEADER, pos + 8)
if next_idx < 0:
chunks_data.append(dec[pos:])
break
chunks_data.append(dec[pos:next_idx])
pos = next_idx
print(f"Found {len(chunks_data)} chunks in sequential decryption")
else:
# Decrypt each chunk
chunk_starts.sort()
chunks_data = []
for i, start in enumerate(chunk_starts):
end = chunk_starts[i + 1] if i + 1 < len(chunk_starts) else len(data)
encrypted = data[start:end]
try:
dec = bcrypt_op(hKey, encrypted)
chunks_data.append(dec)
except:
pass
# Extract models from chunks
print(f"\n--- Extracting ONNX models ---")
models = []
data_files = []
for chunk in chunks_data:
if chunk[:8] == CONTAINER_HEADER:
payload = chunk[8:]
else:
payload = chunk
if len(payload) >= 2 and payload[0] == 0x08 and 1 <= payload[1] <= 12:
valid_len = measure_onnx(payload)
onnx_data = payload[:valid_len]
if valid_len < 100: # Too small to be a real model
continue
producer = "unknown"
if b"PyTorch" in payload[:100]: producer = "pytorch"
elif b"onnx.quantize" in payload[:100]: producer = "onnx_quantize"
elif b"pytorch" in payload[:100]: producer = "pytorch_small"
ir = payload[1]
idx = len(models)
fname = f"model_{idx:02d}_ir{ir}_{producer}_{valid_len//1024}KB.onnx"
(output_dir / fname).write_bytes(onnx_data)
models.append({'name': fname, 'size': valid_len})
print(f" ONNX: {fname} ({valid_len:,} bytes)")
elif len(payload) > 100:
preview = payload[:30].decode('utf-8', errors='replace')
idx = len(data_files)
fname = f"data_{idx:02d}_{len(payload)}B.bin"
(output_dir / fname).write_bytes(payload)
data_files.append({'name': fname, 'size': len(payload)})
print(f" Data: {fname} ({len(payload):,} bytes) {preview[:30]!r}")
# Summary
print(f"\n{'='*70}")
print(f"EXTRACTION COMPLETE")
print(f"{'='*70}")
print(f"ONNX models: {len(models)}")
print(f"Data files: {len(data_files)}")
if models:
total = sum(m['size'] for m in models)
print(f"Total ONNX: {total:,} bytes ({total/1024/1024:.1f} MB)")
# Verify
try:
import onnx
ok = sum(1 for m in models if not _try_load(onnx, output_dir / m['name']))
ok = 0
for m in models:
try:
onnx.load(str(output_dir / m['name']))
ok += 1
except:
pass
print(f"Verified with onnx.load: {ok}/{len(models)}")
except ImportError:
pass
bcrypt.BCryptDestroyKey(hKey)
bcrypt.BCryptCloseAlgorithmProvider(hAlg, 0)
if __name__ == "__main__":
main()
|