"""Decrypt the config chunk from DX and analyze its protobuf structure. Config = first encrypted payload inside DX index. """ import struct import hashlib from Crypto.Cipher import AES MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' IV = b"Copyright @ OneO" def aes_cfb128_decrypt(key: bytes, iv: bytes, data: bytes) -> bytes: cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128) return cipher.decrypt(data) def decode_varint(data: bytes, offset: int) -> tuple[int, int]: """Decode protobuf varint, return (value, new_offset).""" result = 0 shift = 0 while offset < len(data): b = data[offset] result |= (b & 0x7F) << shift offset += 1 if not (b & 0x80): break shift += 7 return result, offset def decode_protobuf_fields(data: bytes, indent: int = 0, max_depth: int = 3, prefix: str = ""): """Recursively decode protobuf-like structure.""" off = 0 field_idx = 0 pad = " " * indent while off < len(data) and field_idx < 200: if off >= len(data): break tag_byte = data[off] field_num = tag_byte >> 3 wire_type = tag_byte & 0x07 if field_num == 0 or field_num > 30: break off += 1 if wire_type == 0: # varint val, off = decode_varint(data, off) print(f"{pad}field {field_num} (varint): {val}") elif wire_type == 2: # length-delimited length, off = decode_varint(data, off) if off + length > len(data): print(f"{pad}field {field_num} (bytes, len={length}): TRUNCATED at off={off}") break payload = data[off:off+length] # Try to decode as string try: s = payload.decode('utf-8') if all(c.isprintable() or c in '\n\r\t' for c in s): if len(s) > 100: print(f"{pad}field {field_num} (string, len={length}): {s[:100]}...") else: print(f"{pad}field {field_num} (string, len={length}): {s}") else: raise ValueError() except (UnicodeDecodeError, ValueError): if indent < max_depth and length > 2 and length < 100000: # Try parsing as sub-message print(f"{pad}field {field_num} (msg, len={length}):") decode_protobuf_fields(payload, indent + 1, max_depth, prefix=f"{prefix}f{field_num}.") else: print(f"{pad}field {field_num} (bytes, len={length}): {payload[:32].hex()}...") off += length elif wire_type == 5: # 32-bit if off + 4 > len(data): break val = struct.unpack_from(" len(data): break val = struct.unpack_from("= len(config_data): break tag_byte = config_data[off] field_num = tag_byte >> 3 wire_type = tag_byte & 0x07 if field_num == 0 or field_num > 30: break off += 1 if wire_type == 0: val, off = decode_varint(config_data, off) config_fields.append({"fn": field_num, "wt": wire_type, "val": val, "off": off}) elif wire_type == 2: length, off = decode_varint(config_data, off) if off + length > len(config_data): break payload = config_data[off:off+length] # Try string try: s = payload.decode('ascii') readable = all(c.isprintable() or c in '\n\r\t' for c in s) except: readable = False if readable and len(payload) < 200: print(f" field {field_num} (string, len={length}, off={off}): {payload[:80]}") else: # check first bytes for sub-message identification fbytes = payload[:16].hex() print(f" field {field_num} (msg/bytes, len={length}, off={off}): {fbytes}...") config_fields.append({"fn": field_num, "wt": wire_type, "len": length, "off": off, "data": payload}) off += length elif wire_type == 5: if off + 4 > len(config_data): break val = struct.unpack_from(" len(config_data): break val = struct.unpack_from("> 3 wt = tag & 7 if fn == 0 or fn > 20: break sub_off += 1 if wt == 0: val, sub_off = decode_varint(data, sub_off) if fn == 2: model_type = val elif wt == 2: ln, sub_off = decode_varint(data, sub_off) if sub_off + ln > len(data): break p = data[sub_off:sub_off+ln] if fn == 1: try: name = p.decode('ascii') except: name = p.hex() elif fn == 3: try: onnx_path = p.decode('ascii', errors='replace') except: onnx_path = p.hex() sub_off += ln elif wt == 5: sub_off += 4 elif wt == 1: sub_off += 8 else: break print(f" [{i:02d}] name={name:20s} type={model_type}") if onnx_path: print(f" path={onnx_path[:80]}") # Now look for checksums in the ENTIRE config (not just protobuf) print("\n=== Searching ALL known checksums in config ===") import json with open("temp/crypto_log.json") as f: log = json.load(f) sha256s = [op for op in log if op["op"] == "sha256"] # Get all unique checksums from 32-byte SHA256 inputs checksums_found = 0 for s in sha256s: inp = bytes.fromhex(s["input"]) if len(inp) == 32: chk = inp[16:32] # last 16 bytes = checksum pos = config_data.find(chk) if pos >= 0: checksums_found += 1 if checksums_found <= 5: sizes = struct.unpack_from("= 0 and pos2 < 8: pass # In container header print(f"Total checksums found in config: {checksums_found} / {len([s for s in sha256s if len(bytes.fromhex(s['input'])) == 32])}")