|
|
"""Decrypt the config chunk from DX and analyze its protobuf structure. |
|
|
Config = first encrypted payload inside DX index. |
|
|
""" |
|
|
import struct |
|
|
import hashlib |
|
|
from Crypto.Cipher import AES |
|
|
|
|
|
MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
IV = b"Copyright @ OneO" |
|
|
|
|
|
def aes_cfb128_decrypt(key: bytes, iv: bytes, data: bytes) -> bytes: |
|
|
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128) |
|
|
return cipher.decrypt(data) |
|
|
|
|
|
def decode_varint(data: bytes, offset: int) -> tuple[int, int]: |
|
|
"""Decode protobuf varint, return (value, new_offset).""" |
|
|
result = 0 |
|
|
shift = 0 |
|
|
while offset < len(data): |
|
|
b = data[offset] |
|
|
result |= (b & 0x7F) << shift |
|
|
offset += 1 |
|
|
if not (b & 0x80): |
|
|
break |
|
|
shift += 7 |
|
|
return result, offset |
|
|
|
|
|
def decode_protobuf_fields(data: bytes, indent: int = 0, max_depth: int = 3, prefix: str = ""): |
|
|
"""Recursively decode protobuf-like structure.""" |
|
|
off = 0 |
|
|
field_idx = 0 |
|
|
pad = " " * indent |
|
|
while off < len(data) and field_idx < 200: |
|
|
if off >= len(data): |
|
|
break |
|
|
tag_byte = data[off] |
|
|
field_num = tag_byte >> 3 |
|
|
wire_type = tag_byte & 0x07 |
|
|
|
|
|
if field_num == 0 or field_num > 30: |
|
|
break |
|
|
|
|
|
off += 1 |
|
|
|
|
|
if wire_type == 0: |
|
|
val, off = decode_varint(data, off) |
|
|
print(f"{pad}field {field_num} (varint): {val}") |
|
|
elif wire_type == 2: |
|
|
length, off = decode_varint(data, off) |
|
|
if off + length > len(data): |
|
|
print(f"{pad}field {field_num} (bytes, len={length}): TRUNCATED at off={off}") |
|
|
break |
|
|
payload = data[off:off+length] |
|
|
|
|
|
try: |
|
|
s = payload.decode('utf-8') |
|
|
if all(c.isprintable() or c in '\n\r\t' for c in s): |
|
|
if len(s) > 100: |
|
|
print(f"{pad}field {field_num} (string, len={length}): {s[:100]}...") |
|
|
else: |
|
|
print(f"{pad}field {field_num} (string, len={length}): {s}") |
|
|
else: |
|
|
raise ValueError() |
|
|
except (UnicodeDecodeError, ValueError): |
|
|
if indent < max_depth and length > 2 and length < 100000: |
|
|
|
|
|
print(f"{pad}field {field_num} (msg, len={length}):") |
|
|
decode_protobuf_fields(payload, indent + 1, max_depth, prefix=f"{prefix}f{field_num}.") |
|
|
else: |
|
|
print(f"{pad}field {field_num} (bytes, len={length}): {payload[:32].hex()}...") |
|
|
off += length |
|
|
elif wire_type == 5: |
|
|
if off + 4 > len(data): |
|
|
break |
|
|
val = struct.unpack_from("<I", data, off)[0] |
|
|
off += 4 |
|
|
|
|
|
fval = struct.unpack_from("<f", data, off-4)[0] |
|
|
print(f"{pad}field {field_num} (fixed32): {val} (0x{val:08x}, float={fval:.4f})") |
|
|
elif wire_type == 1: |
|
|
if off + 8 > len(data): |
|
|
break |
|
|
val = struct.unpack_from("<Q", data, off)[0] |
|
|
off += 8 |
|
|
print(f"{pad}field {field_num} (fixed64): {val}") |
|
|
else: |
|
|
print(f"{pad}field {field_num} (wire={wire_type}): unknown, stopping") |
|
|
break |
|
|
field_idx += 1 |
|
|
|
|
|
|
|
|
with open("ocr_data/oneocr.onemodel", "rb") as f: |
|
|
fdata = f.read() |
|
|
|
|
|
|
|
|
file_header_hash = fdata[8:24] |
|
|
dx_key = hashlib.sha256(MASTER_KEY + file_header_hash).digest() |
|
|
dx_encrypted = fdata[24:24+22624] |
|
|
dx = aes_cfb128_decrypt(dx_key, IV, dx_encrypted) |
|
|
|
|
|
print("=== DX Header ===") |
|
|
print(f"Magic: {dx[:8]}") |
|
|
valid_size = struct.unpack_from("<Q", dx, 8)[0] |
|
|
print(f"Valid size: {valid_size}") |
|
|
print(f"Container magic: {dx[16:24].hex()}") |
|
|
total_value = struct.unpack_from("<Q", dx, 24)[0] |
|
|
print(f"DX[24] value: {total_value}") |
|
|
checksum = dx[32:48] |
|
|
print(f"Checksum: {checksum.hex()}") |
|
|
s1, s2 = struct.unpack_from("<QQ", dx, 48) |
|
|
print(f"Sizes: ({s1}, {s2})") |
|
|
|
|
|
|
|
|
sha_input = dx[48:64] + dx[32:48] |
|
|
config_key = hashlib.sha256(sha_input).digest() |
|
|
config_enc = dx[64:64+11920] |
|
|
config_dec = aes_cfb128_decrypt(config_key, IV, config_enc) |
|
|
|
|
|
|
|
|
with open("temp/config_decrypted.bin", "wb") as f: |
|
|
f.write(config_dec) |
|
|
print(f"\nConfig decrypted: {len(config_dec)} bytes, saved to temp/config_decrypted.bin") |
|
|
|
|
|
|
|
|
magic = config_dec[:8] |
|
|
print(f"Config container magic: {magic.hex()}") |
|
|
assert magic == bytes.fromhex("4a1a082b25000000"), "Container magic mismatch!" |
|
|
|
|
|
|
|
|
config_data = config_dec[8:] |
|
|
print(f"Config payload: {len(config_data)} bytes") |
|
|
|
|
|
print("\n=== Config Protobuf Structure (top-level fields only) ===") |
|
|
|
|
|
off = 0 |
|
|
config_fields = [] |
|
|
while off < len(config_data): |
|
|
if off >= len(config_data): |
|
|
break |
|
|
tag_byte = config_data[off] |
|
|
field_num = tag_byte >> 3 |
|
|
wire_type = tag_byte & 0x07 |
|
|
if field_num == 0 or field_num > 30: |
|
|
break |
|
|
off += 1 |
|
|
if wire_type == 0: |
|
|
val, off = decode_varint(config_data, off) |
|
|
config_fields.append({"fn": field_num, "wt": wire_type, "val": val, "off": off}) |
|
|
elif wire_type == 2: |
|
|
length, off = decode_varint(config_data, off) |
|
|
if off + length > len(config_data): |
|
|
break |
|
|
payload = config_data[off:off+length] |
|
|
|
|
|
try: |
|
|
s = payload.decode('ascii') |
|
|
readable = all(c.isprintable() or c in '\n\r\t' for c in s) |
|
|
except: |
|
|
readable = False |
|
|
if readable and len(payload) < 200: |
|
|
print(f" field {field_num} (string, len={length}, off={off}): {payload[:80]}") |
|
|
else: |
|
|
|
|
|
fbytes = payload[:16].hex() |
|
|
print(f" field {field_num} (msg/bytes, len={length}, off={off}): {fbytes}...") |
|
|
config_fields.append({"fn": field_num, "wt": wire_type, "len": length, "off": off, "data": payload}) |
|
|
off += length |
|
|
elif wire_type == 5: |
|
|
if off + 4 > len(config_data): |
|
|
break |
|
|
val = struct.unpack_from("<I", config_data, off)[0] |
|
|
config_fields.append({"fn": field_num, "wt": wire_type, "val": val, "off": off}) |
|
|
off += 4 |
|
|
elif wire_type == 1: |
|
|
if off + 8 > len(config_data): |
|
|
break |
|
|
val = struct.unpack_from("<Q", config_data, off)[0] |
|
|
config_fields.append({"fn": field_num, "wt": wire_type, "val": val, "off": off}) |
|
|
off += 8 |
|
|
else: |
|
|
break |
|
|
|
|
|
|
|
|
from collections import Counter |
|
|
field_counts = Counter(f["fn"] for f in config_fields) |
|
|
print(f"\nField type counts: {dict(field_counts)}") |
|
|
print(f"Total fields: {len(config_fields)}") |
|
|
|
|
|
|
|
|
print("\n=== Model entries (field 1) ===") |
|
|
f1_entries = [f for f in config_fields if f["fn"] == 1 and "data" in f] |
|
|
for i, entry in enumerate(f1_entries): |
|
|
data = entry["data"] |
|
|
|
|
|
sub_off = 0 |
|
|
name = "" |
|
|
model_type = -1 |
|
|
onnx_path = "" |
|
|
while sub_off < len(data): |
|
|
tag = data[sub_off] |
|
|
fn = tag >> 3 |
|
|
wt = tag & 7 |
|
|
if fn == 0 or fn > 20: |
|
|
break |
|
|
sub_off += 1 |
|
|
if wt == 0: |
|
|
val, sub_off = decode_varint(data, sub_off) |
|
|
if fn == 2: |
|
|
model_type = val |
|
|
elif wt == 2: |
|
|
ln, sub_off = decode_varint(data, sub_off) |
|
|
if sub_off + ln > len(data): |
|
|
break |
|
|
p = data[sub_off:sub_off+ln] |
|
|
if fn == 1: |
|
|
try: |
|
|
name = p.decode('ascii') |
|
|
except: |
|
|
name = p.hex() |
|
|
elif fn == 3: |
|
|
try: |
|
|
onnx_path = p.decode('ascii', errors='replace') |
|
|
except: |
|
|
onnx_path = p.hex() |
|
|
sub_off += ln |
|
|
elif wt == 5: |
|
|
sub_off += 4 |
|
|
elif wt == 1: |
|
|
sub_off += 8 |
|
|
else: |
|
|
break |
|
|
print(f" [{i:02d}] name={name:20s} type={model_type}") |
|
|
if onnx_path: |
|
|
print(f" path={onnx_path[:80]}") |
|
|
|
|
|
|
|
|
print("\n=== Searching ALL known checksums in config ===") |
|
|
import json |
|
|
with open("temp/crypto_log.json") as f: |
|
|
log = json.load(f) |
|
|
sha256s = [op for op in log if op["op"] == "sha256"] |
|
|
|
|
|
|
|
|
checksums_found = 0 |
|
|
for s in sha256s: |
|
|
inp = bytes.fromhex(s["input"]) |
|
|
if len(inp) == 32: |
|
|
chk = inp[16:32] |
|
|
pos = config_data.find(chk) |
|
|
if pos >= 0: |
|
|
checksums_found += 1 |
|
|
if checksums_found <= 5: |
|
|
sizes = struct.unpack_from("<QQ", inp, 0) |
|
|
print(f" FOUND checksum at config offset {pos}: sizes={sizes}") |
|
|
pos2 = config_dec.find(chk) |
|
|
if pos2 >= 0 and pos2 < 8: |
|
|
pass |
|
|
|
|
|
print(f"Total checksums found in config: {checksums_found} / {len([s for s in sha256s if len(bytes.fromhex(s['input'])) == 32])}") |
|
|
|