|
|
|
|
|
""" |
|
|
OneOCR .onemodel Static Decryptor |
|
|
================================= |
|
|
Cross-platform tool to extract ONNX models and config data from |
|
|
Windows OneOCR's encrypted .onemodel container files. |
|
|
|
|
|
No Windows APIs, DLLs, or runtime hooking required. |
|
|
Only dependency: pycryptodome (pip install pycryptodome) |
|
|
|
|
|
Crypto scheme (fully reverse-engineered): |
|
|
- Algorithm: AES-256-CFB128 |
|
|
- Master Key: hardcoded 32-byte ASCII string |
|
|
- IV: "Copyright @ OneO" (16 bytes, same for all chunks) |
|
|
- DX index key: SHA256(master_key + file[8:24]) |
|
|
- Config key: SHA256(DX[48:64] + DX[32:48]) (sizes + checksum) |
|
|
- Per-chunk key: SHA256(chunk_header[16:32] + chunk_header[0:16]) |
|
|
- Chunk header in file: checksum(16) + size1(8) + size2(8) = 32 bytes |
|
|
- On-disk encrypted data follows immediately: size1 + 8 bytes |
|
|
|
|
|
File structure: |
|
|
[0:8] uint64 LE H (header value) |
|
|
[8:24] 16 bytes file_hash (used in DX key derivation) |
|
|
[24:H+12] encrypted DX index |
|
|
[H+12:H+16] 4 zero bytes (gap) |
|
|
[H+16:] payload chunks (checksum(16) + sizes(16) + encrypted_data) |
|
|
|
|
|
Usage: |
|
|
python onemodel_decrypt.py [onemodel_file] [output_dir] |
|
|
python onemodel_decrypt.py # uses defaults |
|
|
""" |
|
|
|
|
|
import struct |
|
|
import hashlib |
|
|
import sys |
|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
try: |
|
|
from Crypto.Cipher import AES |
|
|
except ImportError: |
|
|
print("ERROR: pycryptodome is required. Install with: pip install pycryptodome") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
IV = b"Copyright @ OneO" |
|
|
CONTAINER_MAGIC = bytes.fromhex("4a1a082b25000000") |
|
|
|
|
|
|
|
|
|
|
|
def aes_cfb128_decrypt(key: bytes, data: bytes) -> bytes: |
|
|
"""Decrypt data with AES-256-CFB128 using the global IV.""" |
|
|
cipher = AES.new(key, AES.MODE_CFB, iv=IV, segment_size=128) |
|
|
return cipher.decrypt(data) |
|
|
|
|
|
|
|
|
def derive_key(sha256_input: bytes) -> bytes: |
|
|
"""Derive AES key via SHA256.""" |
|
|
return hashlib.sha256(sha256_input).digest() |
|
|
|
|
|
|
|
|
|
|
|
def read_varint(data: bytes, pos: int) -> tuple[int, int]: |
|
|
"""Read protobuf varint, return (value, new_pos).""" |
|
|
val = 0 |
|
|
shift = 0 |
|
|
while pos < len(data): |
|
|
b = data[pos] |
|
|
pos += 1 |
|
|
val |= (b & 0x7F) << shift |
|
|
if not (b & 0x80): |
|
|
break |
|
|
shift += 7 |
|
|
return val, pos |
|
|
|
|
|
|
|
|
def measure_protobuf(data: bytes) -> int: |
|
|
"""Walk ONNX ModelProto protobuf fields; return byte length of valid data. |
|
|
Valid fields for ONNX ModelProto: 1-9, 14, 20.""" |
|
|
VALID_FIELDS = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20} |
|
|
pos = 0 |
|
|
while pos < len(data): |
|
|
start = pos |
|
|
tag, pos = read_varint(data, pos) |
|
|
if pos > len(data): |
|
|
return start |
|
|
field_num = tag >> 3 |
|
|
wire_type = tag & 7 |
|
|
|
|
|
if field_num not in VALID_FIELDS: |
|
|
return start |
|
|
|
|
|
if wire_type == 0: |
|
|
_, pos = read_varint(data, pos) |
|
|
elif wire_type == 1: |
|
|
pos += 8 |
|
|
elif wire_type == 2: |
|
|
length, pos = read_varint(data, pos) |
|
|
pos += length |
|
|
elif wire_type == 5: |
|
|
pos += 4 |
|
|
else: |
|
|
return start |
|
|
|
|
|
if pos > len(data): |
|
|
return start |
|
|
return pos |
|
|
|
|
|
|
|
|
|
|
|
class OneModelFile: |
|
|
"""Parser for .onemodel encrypted containers.""" |
|
|
|
|
|
def __init__(self, filepath: str): |
|
|
with open(filepath, "rb") as f: |
|
|
self.data = f.read() |
|
|
self.filepath = filepath |
|
|
|
|
|
|
|
|
self.H = struct.unpack_from("<Q", self.data, 0)[0] |
|
|
self.file_hash = self.data[8:24] |
|
|
|
|
|
|
|
|
self.dx_offset = 24 |
|
|
self.dx_size = self.H - 12 |
|
|
self.payload_start = self.H + 16 |
|
|
|
|
|
def decrypt_dx(self) -> bytes: |
|
|
"""Decrypt the DX index.""" |
|
|
key = derive_key(MASTER_KEY + self.file_hash) |
|
|
dx_enc = self.data[self.dx_offset : self.dx_offset + self.dx_size] |
|
|
return aes_cfb128_decrypt(key, dx_enc) |
|
|
|
|
|
def decrypt_config(self, dx: bytes) -> bytes: |
|
|
"""Decrypt the config chunk embedded in DX.""" |
|
|
sha_input = dx[48:64] + dx[32:48] |
|
|
key = derive_key(sha_input) |
|
|
config_s1 = struct.unpack_from("<Q", dx, 48)[0] |
|
|
config_enc = dx[64 : 64 + config_s1 + 8] |
|
|
return aes_cfb128_decrypt(key, config_enc) |
|
|
|
|
|
def iter_payload_chunks(self): |
|
|
"""Iterate over all payload chunks, yielding (index, metadata, decrypted_payload). |
|
|
|
|
|
Each payload chunk in file: |
|
|
[16 bytes] checksum |
|
|
[8 bytes] uint64 LE size1 (data size excl. 8-byte container header) |
|
|
[8 bytes] uint64 LE size2 (always size1 + 24) |
|
|
[size1+8 bytes] encrypted data |
|
|
""" |
|
|
off = self.payload_start |
|
|
idx = 0 |
|
|
|
|
|
while off + 32 <= len(self.data): |
|
|
checksum = self.data[off : off + 16] |
|
|
s1, s2 = struct.unpack_from("<QQ", self.data, off + 16) |
|
|
|
|
|
|
|
|
if s2 != s1 + 24 or s1 == 0 or s1 > len(self.data): |
|
|
break |
|
|
|
|
|
enc_size = s1 + 8 |
|
|
data_off = off + 32 |
|
|
|
|
|
if data_off + enc_size > len(self.data): |
|
|
break |
|
|
|
|
|
|
|
|
sha_input = self.data[off + 16 : off + 32] + checksum |
|
|
key = derive_key(sha_input) |
|
|
|
|
|
|
|
|
dec = aes_cfb128_decrypt(key, self.data[data_off : data_off + enc_size]) |
|
|
|
|
|
|
|
|
if dec[:8] != CONTAINER_MAGIC: |
|
|
print(f" WARNING: chunk#{idx} container magic mismatch!") |
|
|
|
|
|
|
|
|
payload = dec[8:] |
|
|
|
|
|
meta = { |
|
|
"index": idx, |
|
|
"file_offset": off, |
|
|
"size1": s1, |
|
|
"size2": s2, |
|
|
"checksum": checksum.hex(), |
|
|
} |
|
|
|
|
|
yield idx, meta, payload |
|
|
|
|
|
off = data_off + enc_size |
|
|
idx += 1 |
|
|
|
|
|
|
|
|
|
|
|
def classify_chunk(payload: bytes) -> str: |
|
|
"""Classify a decrypted chunk payload.""" |
|
|
if len(payload) > 100 and payload[0] == 0x08 and payload[1] in (0x06, 0x07): |
|
|
return "onnx" |
|
|
|
|
|
|
|
|
try: |
|
|
sample = payload[:100].decode("ascii") |
|
|
if all(c.isprintable() or c in "\n\r\t" for c in sample): |
|
|
if "<LogPrior>" in sample: |
|
|
return "rnn_info" |
|
|
elif sample.startswith("! ") or sample.startswith('" '): |
|
|
if any(c.isdigit() for c in sample[:20]): |
|
|
return "char2ind" |
|
|
else: |
|
|
return "char2inschar" |
|
|
elif sample.startswith("0."): |
|
|
return "score_calibration" |
|
|
elif "text_script" in sample: |
|
|
return "ocr_config" |
|
|
elif "//" in sample[:5]: |
|
|
return "composite_chars" |
|
|
return "text_data" |
|
|
except (UnicodeDecodeError, ValueError): |
|
|
pass |
|
|
|
|
|
return "binary_data" |
|
|
|
|
|
|
|
|
def get_onnx_info(data: bytes) -> dict: |
|
|
"""Get basic ONNX model info from raw protobuf bytes.""" |
|
|
info = {} |
|
|
pos = 0 |
|
|
while pos < min(len(data), 500): |
|
|
tag, pos = read_varint(data, pos) |
|
|
field_num = tag >> 3 |
|
|
wire_type = tag & 7 |
|
|
|
|
|
if wire_type == 0: |
|
|
val, pos = read_varint(data, pos) |
|
|
if field_num == 1: |
|
|
info["ir_version"] = val |
|
|
elif wire_type == 2: |
|
|
length, pos = read_varint(data, pos) |
|
|
payload_bytes = data[pos : pos + length] |
|
|
if field_num == 3: |
|
|
try: |
|
|
info["producer"] = payload_bytes.decode("utf-8") |
|
|
except: |
|
|
pass |
|
|
elif field_num == 4: |
|
|
try: |
|
|
info["producer_version"] = payload_bytes.decode("utf-8") |
|
|
except: |
|
|
pass |
|
|
pos += length |
|
|
elif wire_type == 5: |
|
|
pos += 4 |
|
|
elif wire_type == 1: |
|
|
pos += 8 |
|
|
else: |
|
|
break |
|
|
|
|
|
if "producer" in info and "ir_version" in info: |
|
|
break |
|
|
|
|
|
return info |
|
|
|
|
|
|
|
|
def extract_all(input_file: str, output_dir: str, verify: bool = True): |
|
|
"""Extract all content from a .onemodel file.""" |
|
|
model_file = OneModelFile(input_file) |
|
|
|
|
|
print(f"File: {input_file}") |
|
|
print(f"Size: {len(model_file.data):,} bytes") |
|
|
print(f"Header value: {model_file.H}") |
|
|
print(f"DX size: {model_file.dx_size:,} bytes") |
|
|
|
|
|
|
|
|
dx = model_file.decrypt_dx() |
|
|
valid_size = struct.unpack_from("<Q", dx, 8)[0] |
|
|
print(f"DX valid size: {valid_size:,}") |
|
|
assert dx[:2] == b"DX", "DX magic mismatch!" |
|
|
|
|
|
|
|
|
config_dec = model_file.decrypt_config(dx) |
|
|
assert config_dec[:8] == CONTAINER_MAGIC, "Config magic mismatch!" |
|
|
config_payload = config_dec[8:] |
|
|
|
|
|
|
|
|
out = Path(output_dir) |
|
|
onnx_dir = out / "onnx_models" |
|
|
config_dir = out / "config_data" |
|
|
onnx_dir.mkdir(parents=True, exist_ok=True) |
|
|
config_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
config_path = config_dir / "manifest.bin" |
|
|
config_path.write_bytes(config_payload) |
|
|
print(f"\nConfig manifest saved: {config_path} ({len(config_payload):,} bytes)") |
|
|
|
|
|
|
|
|
onnx_models = [] |
|
|
config_files = [] |
|
|
|
|
|
print(f"\n{'='*70}") |
|
|
print(f"{'#':>4} {'Type':<18} {'Size':>12} {'Filename':<40}") |
|
|
print(f"{'='*70}") |
|
|
|
|
|
for idx, meta, payload in model_file.iter_payload_chunks(): |
|
|
chunk_type = classify_chunk(payload) |
|
|
|
|
|
if chunk_type == "onnx": |
|
|
|
|
|
exact_size = measure_protobuf(payload) |
|
|
onnx_data = payload[:exact_size] |
|
|
|
|
|
info = get_onnx_info(onnx_data) |
|
|
ir = info.get("ir_version", "?") |
|
|
producer = info.get("producer", "unknown") |
|
|
size_kb = len(onnx_data) // 1024 |
|
|
|
|
|
|
|
|
if "quantize" in producer.lower() or "onnx" in producer.lower(): |
|
|
prod_tag = "onnx_quantize" |
|
|
elif "pytorch" in producer.lower() or "torch" in producer.lower(): |
|
|
if size_kb < 50: |
|
|
prod_tag = "pytorch_small" |
|
|
else: |
|
|
prod_tag = "pytorch" |
|
|
else: |
|
|
prod_tag = producer.replace(" ", "_") |
|
|
|
|
|
onnx_idx = len(onnx_models) |
|
|
fname = f"model_{onnx_idx:02d}_ir{ir}_{prod_tag}_{size_kb}KB.onnx" |
|
|
fpath = onnx_dir / fname |
|
|
|
|
|
fpath.write_bytes(onnx_data) |
|
|
onnx_models.append(fpath) |
|
|
print(f"{idx:4d} {'ONNX':18s} {len(onnx_data):12,} {fname}") |
|
|
|
|
|
else: |
|
|
|
|
|
ext_map = { |
|
|
"rnn_info": ".rnn_info", |
|
|
"char2ind": ".char2ind.txt", |
|
|
"char2inschar": ".char2inschar.txt", |
|
|
"score_calibration": ".calibration.txt", |
|
|
"ocr_config": ".config.txt", |
|
|
"composite_chars": ".composite.txt", |
|
|
"text_data": ".txt", |
|
|
"binary_data": ".bin", |
|
|
} |
|
|
ext = ext_map.get(chunk_type, ".bin") |
|
|
fname = f"chunk_{idx:02d}_{chunk_type}{ext}" |
|
|
fpath = config_dir / fname |
|
|
|
|
|
fpath.write_bytes(payload) |
|
|
config_files.append(fpath) |
|
|
print(f"{idx:4d} {chunk_type:18s} {len(payload):12,} {fname}") |
|
|
|
|
|
print(f"\n{'='*70}") |
|
|
print(f"ONNX models extracted: {len(onnx_models)}") |
|
|
print(f"Config files extracted: {len(config_files)}") |
|
|
|
|
|
|
|
|
if verify: |
|
|
print(f"\n{'='*70}") |
|
|
print("ONNX Verification") |
|
|
print(f"{'='*70}") |
|
|
|
|
|
try: |
|
|
import onnx |
|
|
onnx_ok = 0 |
|
|
onnx_fail = 0 |
|
|
for fpath in onnx_models: |
|
|
try: |
|
|
model = onnx.load(str(fpath)) |
|
|
onnx.checker.check_model(model) |
|
|
onnx_ok += 1 |
|
|
print(f" OK {fpath.name}") |
|
|
except Exception as e: |
|
|
try: |
|
|
|
|
|
model = onnx.load(str(fpath)) |
|
|
onnx_ok += 1 |
|
|
print(f" OK* {fpath.name} (loads but checker warning: {str(e)[:50]})") |
|
|
except Exception as e2: |
|
|
onnx_fail += 1 |
|
|
print(f" FAIL {fpath.name}: {e2}") |
|
|
print(f"\nVerification: {onnx_ok}/{len(onnx_models)} models load successfully") |
|
|
except ImportError: |
|
|
print(" (onnx package not installed, skipping verification)") |
|
|
|
|
|
try: |
|
|
import onnxruntime as ort |
|
|
rt_ok = 0 |
|
|
rt_custom_ops = 0 |
|
|
for fpath in onnx_models: |
|
|
try: |
|
|
sess = ort.InferenceSession(str(fpath)) |
|
|
rt_ok += 1 |
|
|
except Exception as e: |
|
|
if "custom ops" in str(e).lower() or "oneocr" in str(e).lower(): |
|
|
rt_custom_ops += 1 |
|
|
else: |
|
|
pass |
|
|
print(f" onnxruntime: {rt_ok} standard, {rt_custom_ops} need custom ops") |
|
|
except ImportError: |
|
|
pass |
|
|
|
|
|
print(f"\nDone! All files saved to: {out.resolve()}") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
default_input = "ocr_data/oneocr.onemodel" |
|
|
default_output = "oneocr_extracted" |
|
|
|
|
|
input_file = sys.argv[1] if len(sys.argv) > 1 else default_input |
|
|
output_dir = sys.argv[2] if len(sys.argv) > 2 else default_output |
|
|
|
|
|
if not os.path.exists(input_file): |
|
|
print(f"ERROR: Input file not found: {input_file}") |
|
|
sys.exit(1) |
|
|
|
|
|
extract_all(input_file, output_dir) |
|
|
|