oneocr / _archive /analysis /decrypt_config.py
OneOCR Dev
OneOCR - reverse engineering complete, ONNX pipeline 53% match rate
ce847d4
"""Decrypt the config chunk from DX and analyze its protobuf structure.
Config = first encrypted payload inside DX index.
"""
import struct
import hashlib
from Crypto.Cipher import AES
MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4'
IV = b"Copyright @ OneO"
def aes_cfb128_decrypt(key: bytes, iv: bytes, data: bytes) -> bytes:
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
return cipher.decrypt(data)
def decode_varint(data: bytes, offset: int) -> tuple[int, int]:
"""Decode protobuf varint, return (value, new_offset)."""
result = 0
shift = 0
while offset < len(data):
b = data[offset]
result |= (b & 0x7F) << shift
offset += 1
if not (b & 0x80):
break
shift += 7
return result, offset
def decode_protobuf_fields(data: bytes, indent: int = 0, max_depth: int = 3, prefix: str = ""):
"""Recursively decode protobuf-like structure."""
off = 0
field_idx = 0
pad = " " * indent
while off < len(data) and field_idx < 200:
if off >= len(data):
break
tag_byte = data[off]
field_num = tag_byte >> 3
wire_type = tag_byte & 0x07
if field_num == 0 or field_num > 30:
break
off += 1
if wire_type == 0: # varint
val, off = decode_varint(data, off)
print(f"{pad}field {field_num} (varint): {val}")
elif wire_type == 2: # length-delimited
length, off = decode_varint(data, off)
if off + length > len(data):
print(f"{pad}field {field_num} (bytes, len={length}): TRUNCATED at off={off}")
break
payload = data[off:off+length]
# Try to decode as string
try:
s = payload.decode('utf-8')
if all(c.isprintable() or c in '\n\r\t' for c in s):
if len(s) > 100:
print(f"{pad}field {field_num} (string, len={length}): {s[:100]}...")
else:
print(f"{pad}field {field_num} (string, len={length}): {s}")
else:
raise ValueError()
except (UnicodeDecodeError, ValueError):
if indent < max_depth and length > 2 and length < 100000:
# Try parsing as sub-message
print(f"{pad}field {field_num} (msg, len={length}):")
decode_protobuf_fields(payload, indent + 1, max_depth, prefix=f"{prefix}f{field_num}.")
else:
print(f"{pad}field {field_num} (bytes, len={length}): {payload[:32].hex()}...")
off += length
elif wire_type == 5: # 32-bit
if off + 4 > len(data):
break
val = struct.unpack_from("<I", data, off)[0]
off += 4
# Try float interpretation
fval = struct.unpack_from("<f", data, off-4)[0]
print(f"{pad}field {field_num} (fixed32): {val} (0x{val:08x}, float={fval:.4f})")
elif wire_type == 1: # 64-bit
if off + 8 > len(data):
break
val = struct.unpack_from("<Q", data, off)[0]
off += 8
print(f"{pad}field {field_num} (fixed64): {val}")
else:
print(f"{pad}field {field_num} (wire={wire_type}): unknown, stopping")
break
field_idx += 1
# Read file
with open("ocr_data/oneocr.onemodel", "rb") as f:
fdata = f.read()
# Step 1: Decrypt DX
file_header_hash = fdata[8:24]
dx_key = hashlib.sha256(MASTER_KEY + file_header_hash).digest()
dx_encrypted = fdata[24:24+22624]
dx = aes_cfb128_decrypt(dx_key, IV, dx_encrypted)
print("=== DX Header ===")
print(f"Magic: {dx[:8]}")
valid_size = struct.unpack_from("<Q", dx, 8)[0]
print(f"Valid size: {valid_size}")
print(f"Container magic: {dx[16:24].hex()}")
total_value = struct.unpack_from("<Q", dx, 24)[0]
print(f"DX[24] value: {total_value}")
checksum = dx[32:48]
print(f"Checksum: {checksum.hex()}")
s1, s2 = struct.unpack_from("<QQ", dx, 48)
print(f"Sizes: ({s1}, {s2})")
# Step 2: Decrypt config
sha_input = dx[48:64] + dx[32:48] # sizes + checksum
config_key = hashlib.sha256(sha_input).digest()
config_enc = dx[64:64+11920]
config_dec = aes_cfb128_decrypt(config_key, IV, config_enc)
# Save
with open("temp/config_decrypted.bin", "wb") as f:
f.write(config_dec)
print(f"\nConfig decrypted: {len(config_dec)} bytes, saved to temp/config_decrypted.bin")
# Check container magic
magic = config_dec[:8]
print(f"Config container magic: {magic.hex()}")
assert magic == bytes.fromhex("4a1a082b25000000"), "Container magic mismatch!"
# Strip 8-byte container header
config_data = config_dec[8:]
print(f"Config payload: {len(config_data)} bytes")
print("\n=== Config Protobuf Structure (top-level fields only) ===")
# Parse just top-level to see field patterns
off = 0
config_fields = []
while off < len(config_data):
if off >= len(config_data):
break
tag_byte = config_data[off]
field_num = tag_byte >> 3
wire_type = tag_byte & 0x07
if field_num == 0 or field_num > 30:
break
off += 1
if wire_type == 0:
val, off = decode_varint(config_data, off)
config_fields.append({"fn": field_num, "wt": wire_type, "val": val, "off": off})
elif wire_type == 2:
length, off = decode_varint(config_data, off)
if off + length > len(config_data):
break
payload = config_data[off:off+length]
# Try string
try:
s = payload.decode('ascii')
readable = all(c.isprintable() or c in '\n\r\t' for c in s)
except:
readable = False
if readable and len(payload) < 200:
print(f" field {field_num} (string, len={length}, off={off}): {payload[:80]}")
else:
# check first bytes for sub-message identification
fbytes = payload[:16].hex()
print(f" field {field_num} (msg/bytes, len={length}, off={off}): {fbytes}...")
config_fields.append({"fn": field_num, "wt": wire_type, "len": length, "off": off, "data": payload})
off += length
elif wire_type == 5:
if off + 4 > len(config_data):
break
val = struct.unpack_from("<I", config_data, off)[0]
config_fields.append({"fn": field_num, "wt": wire_type, "val": val, "off": off})
off += 4
elif wire_type == 1:
if off + 8 > len(config_data):
break
val = struct.unpack_from("<Q", config_data, off)[0]
config_fields.append({"fn": field_num, "wt": wire_type, "val": val, "off": off})
off += 8
else:
break
# Count field types
from collections import Counter
field_counts = Counter(f["fn"] for f in config_fields)
print(f"\nField type counts: {dict(field_counts)}")
print(f"Total fields: {len(config_fields)}")
# Decode each field 1 (repeated message) to find model entries
print("\n=== Model entries (field 1) ===")
f1_entries = [f for f in config_fields if f["fn"] == 1 and "data" in f]
for i, entry in enumerate(f1_entries):
data = entry["data"]
# Parse sub-fields
sub_off = 0
name = ""
model_type = -1
onnx_path = ""
while sub_off < len(data):
tag = data[sub_off]
fn = tag >> 3
wt = tag & 7
if fn == 0 or fn > 20:
break
sub_off += 1
if wt == 0:
val, sub_off = decode_varint(data, sub_off)
if fn == 2:
model_type = val
elif wt == 2:
ln, sub_off = decode_varint(data, sub_off)
if sub_off + ln > len(data):
break
p = data[sub_off:sub_off+ln]
if fn == 1:
try:
name = p.decode('ascii')
except:
name = p.hex()
elif fn == 3:
try:
onnx_path = p.decode('ascii', errors='replace')
except:
onnx_path = p.hex()
sub_off += ln
elif wt == 5:
sub_off += 4
elif wt == 1:
sub_off += 8
else:
break
print(f" [{i:02d}] name={name:20s} type={model_type}")
if onnx_path:
print(f" path={onnx_path[:80]}")
# Now look for checksums in the ENTIRE config (not just protobuf)
print("\n=== Searching ALL known checksums in config ===")
import json
with open("temp/crypto_log.json") as f:
log = json.load(f)
sha256s = [op for op in log if op["op"] == "sha256"]
# Get all unique checksums from 32-byte SHA256 inputs
checksums_found = 0
for s in sha256s:
inp = bytes.fromhex(s["input"])
if len(inp) == 32:
chk = inp[16:32] # last 16 bytes = checksum
pos = config_data.find(chk)
if pos >= 0:
checksums_found += 1
if checksums_found <= 5:
sizes = struct.unpack_from("<QQ", inp, 0)
print(f" FOUND checksum at config offset {pos}: sizes={sizes}")
pos2 = config_dec.find(chk)
if pos2 >= 0 and pos2 < 8:
pass # In container header
print(f"Total checksums found in config: {checksums_found} / {len([s for s in sha256s if len(bytes.fromhex(s['input'])) == 32])}")