#!/usr/bin/env python3 """ OneOCR Extraction Pipeline — Complete end-to-end extraction tool. ================================================================= Single script that performs the entire extraction process: 1. Decrypt .onemodel container (AES-256-CFB128) 2. Extract 34 ONNX models + config data 3. Unlock models 11-33 (replace OneOCRFeatureExtract custom op) 4. Verify all models load in onnxruntime Usage: python tools/extract_pipeline.py # defaults python tools/extract_pipeline.py path/to/oneocr.onemodel # custom input python tools/extract_pipeline.py --verify-only # just verify Requirements: pip install pycryptodome onnx onnxruntime numpy Output structure: oneocr_extracted/ ├── onnx_models/ # 34 raw ONNX models (11-33 have custom ops) ├── onnx_models_unlocked/ # 23 unlocked models (11-33, standard ops) └── config_data/ # char maps, rnn_info, manifest, configs """ from __future__ import annotations import argparse import copy import hashlib import struct import sys import time from pathlib import Path import numpy as np try: from Crypto.Cipher import AES except ImportError: print("ERROR: pycryptodome is required.") print(" pip install pycryptodome") sys.exit(1) try: import onnx from onnx import helper, numpy_helper except ImportError: print("ERROR: onnx is required.") print(" pip install onnx") sys.exit(1) try: import onnxruntime as ort except ImportError: ort = None print("WARNING: onnxruntime not installed — will skip runtime verification.") # ═══════════════════════════════════════════════════════════════════════════════ # CONSTANTS # ═══════════════════════════════════════════════════════════════════════════════ MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' IV = b"Copyright @ OneO" CONTAINER_MAGIC = bytes.fromhex("4a1a082b25000000") # ═══════════════════════════════════════════════════════════════════════════════ # STEP 1: DECRYPTION # ═══════════════════════════════════════════════════════════════════════════════ def aes_cfb128_decrypt(key: bytes, data: bytes) -> bytes: """Decrypt with AES-256-CFB128.""" return AES.new(key, AES.MODE_CFB, iv=IV, segment_size=128).decrypt(data) def derive_key(sha_input: bytes) -> bytes: """SHA256 key derivation.""" return hashlib.sha256(sha_input).digest() def read_varint(data: bytes, pos: int) -> tuple[int, int]: """Read protobuf varint.""" val = shift = 0 while pos < len(data): b = data[pos]; pos += 1 val |= (b & 0x7F) << shift if not (b & 0x80): break shift += 7 return val, pos def measure_protobuf(data: bytes) -> int: """Measure valid ONNX ModelProto protobuf length.""" VALID = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20} pos = 0 while pos < len(data): start = pos tag, pos = read_varint(data, pos) if pos > len(data): return start field, wire = tag >> 3, tag & 7 if field not in VALID: return start if wire == 0: _, pos = read_varint(data, pos) elif wire == 1: pos += 8 elif wire == 2: l, pos = read_varint(data, pos); pos += l elif wire == 5: pos += 4 else: return start if pos > len(data): return start return pos class OneModelFile: """Parser for .onemodel encrypted containers.""" def __init__(self, filepath: str | Path): self.filepath = Path(filepath) self.data = self.filepath.read_bytes() self.H = struct.unpack_from(" bytes: key = derive_key(MASTER_KEY + self.file_hash) return aes_cfb128_decrypt(key, self.data[self.dx_offset:self.dx_offset + self.dx_size]) def decrypt_config(self, dx: bytes) -> bytes: key = derive_key(dx[48:64] + dx[32:48]) s1 = struct.unpack_from(" len(self.data): break enc_size = s1 + 8 data_off = off + 32 if data_off + enc_size > len(self.data): break key = derive_key(self.data[off + 16:off + 32] + checksum) dec = aes_cfb128_decrypt(key, self.data[data_off:data_off + enc_size]) if dec[:8] == CONTAINER_MAGIC: yield idx, dec[8:] else: print(f" WARNING: chunk#{idx} magic mismatch — skipping") off = data_off + enc_size idx += 1 def classify_chunk(payload: bytes) -> str: """Classify decrypted chunk type.""" if len(payload) > 100 and payload[0] == 0x08 and payload[1] in (0x06, 0x07): return "onnx" try: sample = payload[:100].decode("ascii") if all(c.isprintable() or c in "\n\r\t" for c in sample): if "" in sample: return "rnn_info" if sample.startswith("! ") or sample.startswith('" '): return "char2ind" if any(c.isdigit() for c in sample[:20]) else "char2inschar" if sample.startswith("0."): return "score_calibration" if "text_script" in sample: return "ocr_config" if "//" in sample[:5]: return "composite_chars" return "text_data" except (UnicodeDecodeError, ValueError): pass return "binary_data" def decrypt_and_extract(input_file: Path, output_dir: Path) -> dict: """Step 1: Decrypt .onemodel and extract all chunks. Returns dict with 'onnx_models' and 'config_files' lists. """ print("=" * 70) print(" STEP 1: DECRYPT & EXTRACT") print("=" * 70) model_file = OneModelFile(input_file) print(f" Input: {input_file} ({len(model_file.data):,} bytes)") print(f" Output: {output_dir}") # Decrypt DX index dx = model_file.decrypt_dx() assert dx[:2] == b"DX", "DX magic mismatch!" print(f" DX index decrypted ({len(dx):,} bytes)") # Decrypt manifest config config_dec = model_file.decrypt_config(dx) assert config_dec[:8] == CONTAINER_MAGIC config_payload = config_dec[8:] # Prepare output onnx_dir = output_dir / "onnx_models" config_dir = output_dir / "config_data" onnx_dir.mkdir(parents=True, exist_ok=True) config_dir.mkdir(parents=True, exist_ok=True) # Save manifest manifest_path = config_dir / "manifest.bin" manifest_path.write_bytes(config_payload) print(f" Manifest: {len(config_payload):,} bytes") # Extract chunks onnx_models = [] config_files = [manifest_path] EXT_MAP = { "rnn_info": ".rnn_info", "char2ind": ".char2ind.txt", "char2inschar": ".char2inschar.txt", "score_calibration": ".calibration.txt", "ocr_config": ".config.txt", "composite_chars": ".composite.txt", "text_data": ".txt", "binary_data": ".bin", } print(f"\n {'#':>4} {'Type':18s} {'Size':>12} {'Filename'}") print(f" {'-'*66}") for idx, payload in model_file.iter_chunks(): chunk_type = classify_chunk(payload) if chunk_type == "onnx": exact_size = measure_protobuf(payload) onnx_data = payload[:exact_size] info = _get_onnx_info(onnx_data) ir = info.get("ir_version", "?") prod = info.get("producer_version", "unknown") size_kb = len(onnx_data) // 1024 onnx_idx = len(onnx_models) fname = f"model_{onnx_idx:02d}_ir{ir}_{prod}_{size_kb}KB.onnx" (onnx_dir / fname).write_bytes(onnx_data) onnx_models.append(onnx_dir / fname) print(f" {idx:4d} {'ONNX':18s} {len(onnx_data):12,} {fname}") else: ext = EXT_MAP.get(chunk_type, ".bin") fname = f"chunk_{idx:02d}_{chunk_type}{ext}" (config_dir / fname).write_bytes(payload) config_files.append(config_dir / fname) print(f" {idx:4d} {chunk_type:18s} {len(payload):12,} {fname}") print(f"\n Extracted: {len(onnx_models)} ONNX models, {len(config_files)} config files") return {"onnx_models": onnx_models, "config_files": config_files} def _get_onnx_info(data: bytes) -> dict: """Extract basic ONNX info from protobuf header.""" info = {}; pos = 0 while pos < min(len(data), 500): tag, pos = read_varint(data, pos) field, wire = tag >> 3, tag & 7 if wire == 0: val, pos = read_varint(data, pos) if field == 1: info["ir_version"] = val elif wire == 2: l, pos = read_varint(data, pos) raw = data[pos:pos + l]; pos += l try: if field == 4: info["producer_version"] = raw.decode() except: pass elif wire == 5: pos += 4 elif wire == 1: pos += 8 else: break if "ir_version" in info and "producer_version" in info: break return info # ═══════════════════════════════════════════════════════════════════════════════ # STEP 2: UNLOCK MODELS # ═══════════════════════════════════════════════════════════════════════════════ def _extract_fe_weights(model) -> tuple[np.ndarray, np.ndarray, int, int]: """Extract W, b from OneOCRFeatureExtract config blob.""" config_blob = None for init in model.graph.initializer: if init.name == "feature/config": config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data) break if config_blob is None: raise ValueError("No feature/config initializer") be_arr = np.frombuffer(config_blob, dtype='>f4').copy() # Find dimensions from metadata or graph fe_node = next((n for n in model.graph.node if n.op_type == "OneOCRFeatureExtract"), None) if fe_node is None: raise ValueError("No OneOCRFeatureExtract node") in_dim = out_dim = None for i in range(len(be_arr) - 10, len(be_arr)): val = be_arr[i] if val == 21.0 and i + 1 < len(be_arr) and be_arr[i + 1] in [50.0, 51.0]: in_dim, out_dim = 21, int(be_arr[i + 1]) break if in_dim is None: for gi in model.graph.input: if gi.name == "data": shape = [d.dim_value for d in gi.type.tensor_type.shape.dim] if len(shape) >= 2 and shape[1] > 0: in_dim = shape[1] break if out_dim is None: fe_out = fe_node.output[0] for node in model.graph.node: if node.op_type == "Gemm" and fe_out in node.input: wn = node.input[1] for init in model.graph.initializer: if init.name == wn: W = numpy_helper.to_array(init) out_dim = W.shape[0] if len(W.shape) == 2 else W.shape[1] break if in_dim is None or out_dim is None: raise ValueError(f"Cannot determine dims: in={in_dim}, out={out_dim}") W = be_arr[:in_dim * out_dim].reshape(in_dim, out_dim).astype(np.float32) b = be_arr[in_dim * out_dim:in_dim * out_dim + out_dim].astype(np.float32) return W, b, in_dim, out_dim def unlock_gemm_model(model_path: Path, output_dir: Path) -> Path | None: """Unlock models 11-32: OneOCRFeatureExtract → Gemm.""" model = onnx.load(str(model_path)) if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node): return None W, b, in_dim, out_dim = _extract_fe_weights(model) new_model = copy.deepcopy(model) # Replace initializers new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"] new_inits.append(numpy_helper.from_array(W.T, name="fe_weight")) new_inits.append(numpy_helper.from_array(b, name="fe_bias")) del new_model.graph.initializer[:] new_model.graph.initializer.extend(new_inits) # Replace node fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract") fe_in, fe_out = fe_node.input[0], fe_node.output[0] new_nodes = [] for node in new_model.graph.node: if node.op_type == "OneOCRFeatureExtract": new_nodes.append(helper.make_node("Gemm", [fe_in, "fe_weight", "fe_bias"], [fe_out], alpha=1.0, beta=1.0, transB=1)) else: new_nodes.append(node) del new_model.graph.node[:] new_model.graph.node.extend(new_nodes) # Cleanup del new_model.graph.input[:] new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"]) new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"] del new_model.opset_import[:] new_model.opset_import.extend(new_opsets) out_path = output_dir / (model_path.stem + "_unlocked.onnx") onnx.save(new_model, str(out_path)) return out_path def unlock_conv_model(model_path: Path, output_dir: Path) -> Path | None: """Unlock model 33 (LineLayout): OneOCRFeatureExtract → Conv1x1.""" model = onnx.load(str(model_path)) if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node): return None # Model 33: in_ch=256, out_ch=16 config_blob = None for init in model.graph.initializer: if init.name == "feature/config": config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data) break if config_blob is None: return None be_arr = np.frombuffer(config_blob, dtype='>f4').copy() in_ch, out_ch = 256, 16 W = be_arr[:in_ch * out_ch].reshape(in_ch, out_ch).T.reshape(out_ch, in_ch, 1, 1).astype(np.float32) b = be_arr[in_ch * out_ch:in_ch * out_ch + out_ch].astype(np.float32) new_model = copy.deepcopy(model) new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"] new_inits.append(numpy_helper.from_array(W, name="fe_conv_weight")) new_inits.append(numpy_helper.from_array(b, name="fe_conv_bias")) del new_model.graph.initializer[:] new_model.graph.initializer.extend(new_inits) fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract") fe_in, fe_out = fe_node.input[0], fe_node.output[0] new_nodes = [] for node in new_model.graph.node: if node.op_type == "OneOCRFeatureExtract": new_nodes.append(helper.make_node("Conv", [fe_in, "fe_conv_weight", "fe_conv_bias"], [fe_out], kernel_shape=[1, 1], strides=[1, 1], pads=[0, 0, 0, 0])) else: new_nodes.append(node) del new_model.graph.node[:] new_model.graph.node.extend(new_nodes) del new_model.graph.input[:] new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"]) new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"] del new_model.opset_import[:] new_model.opset_import.extend(new_opsets) out_path = output_dir / (model_path.stem + "_unlocked.onnx") onnx.save(new_model, str(out_path)) return out_path def unlock_all_models(onnx_dir: Path, output_dir: Path) -> dict: """Step 2: Unlock models 11-33 (replace custom ops). Returns dict with 'unlocked', 'skipped', 'failed' lists. """ print("\n" + "=" * 70) print(" STEP 2: UNLOCK MODELS (replace OneOCRFeatureExtract)") print("=" * 70) output_dir.mkdir(parents=True, exist_ok=True) results = {"unlocked": [], "skipped": [], "failed": []} for idx in range(11, 34): matches = list(onnx_dir.glob(f"model_{idx:02d}_*")) if not matches: print(f" model_{idx:02d}: NOT FOUND") results["failed"].append(idx) continue model_path = matches[0] try: if idx == 33: out = unlock_conv_model(model_path, output_dir) else: out = unlock_gemm_model(model_path, output_dir) if out is None: results["skipped"].append(idx) print(f" model_{idx:02d}: skipped (no custom op)") else: results["unlocked"].append(idx) print(f" model_{idx:02d}: ✓ unlocked → {out.name}") except Exception as e: results["failed"].append(idx) print(f" model_{idx:02d}: ✗ FAILED — {e}") n = len(results["unlocked"]) print(f"\n Unlocked: {n}/23 models") return results # ═══════════════════════════════════════════════════════════════════════════════ # STEP 3: VERIFY # ═══════════════════════════════════════════════════════════════════════════════ def verify_models(onnx_dir: Path, unlocked_dir: Path) -> dict: """Step 3: Verify all models load in onnxruntime. Returns dict with verification results. """ print("\n" + "=" * 70) print(" STEP 3: VERIFY (onnxruntime inference test)") print("=" * 70) if ort is None: print(" ⚠ onnxruntime not installed — skipping verification") return {"status": "skipped"} results = {"ok": [], "custom_op": [], "failed": []} # Verify core models (0-10) print("\n Core models (0-10):") for idx in range(11): matches = list(onnx_dir.glob(f"model_{idx:02d}_*")) if not matches: continue try: sess = ort.InferenceSession(str(matches[0]), providers=["CPUExecutionProvider"]) inputs = sess.get_inputs() shapes = {i.name: i.shape for i in inputs} results["ok"].append(idx) print(f" model_{idx:02d}: ✓ inputs={shapes}") except Exception as e: err = str(e)[:60] if "custom ops" in err.lower() or "oneocr" in err.lower(): results["custom_op"].append(idx) print(f" model_{idx:02d}: ⚠ custom_op ({err})") else: results["failed"].append(idx) print(f" model_{idx:02d}: ✗ {err}") # Verify unlocked models (11-33) print("\n Unlocked models (11-33):") for idx in range(11, 34): matches = list(unlocked_dir.glob(f"model_{idx:02d}_*")) if not matches: continue try: sess = ort.InferenceSession(str(matches[0]), providers=["CPUExecutionProvider"]) # Quick zero-input test feeds = {} for inp in sess.get_inputs(): shape = [d if isinstance(d, int) and d > 0 else 1 for d in inp.shape] feeds[inp.name] = np.zeros(shape, dtype=np.float32) out = sess.run(None, feeds) results["ok"].append(idx) print(f" model_{idx:02d}: ✓ output_shapes={[o.shape for o in out]}") except Exception as e: results["failed"].append(idx) print(f" model_{idx:02d}: ✗ {str(e)[:60]}") ok = len(results["ok"]) total = ok + len(results["custom_op"]) + len(results["failed"]) print(f"\n Verification: {ok}/{total} models OK") return results # ═══════════════════════════════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════════════════════════════ def main(): parser = argparse.ArgumentParser( description="OneOCR extraction pipeline: decrypt → extract → unlock → verify") parser.add_argument("input", nargs="?", default="ocr_data/oneocr.onemodel", help="Path to .onemodel file (default: ocr_data/oneocr.onemodel)") parser.add_argument("--output", "-o", default="oneocr_extracted", help="Output directory (default: oneocr_extracted)") parser.add_argument("--verify-only", action="store_true", help="Only verify existing extracted models") parser.add_argument("--skip-unlock", action="store_true", help="Skip model unlocking step") parser.add_argument("--skip-verify", action="store_true", help="Skip verification step") args = parser.parse_args() input_file = Path(args.input) output_dir = Path(args.output) onnx_dir = output_dir / "onnx_models" unlocked_dir = output_dir / "onnx_models_unlocked" print() print("╔══════════════════════════════════════════════════════════════════════╗") print("║ OneOCR Extraction Pipeline ║") print("║ Decrypt → Extract → Unlock → Verify ║") print("╚══════════════════════════════════════════════════════════════════════╝") t_start = time.perf_counter() if args.verify_only: verify_models(onnx_dir, unlocked_dir) else: # Step 1: Decrypt & Extract if not input_file.exists(): print(f"\n ERROR: Input file not found: {input_file}") print(f" Place oneocr.onemodel in ocr_data/ directory") sys.exit(1) extract_result = decrypt_and_extract(input_file, output_dir) # Step 2: Unlock if not args.skip_unlock: unlock_result = unlock_all_models(onnx_dir, unlocked_dir) else: print("\n (Skipping unlock step)") # Step 3: Verify if not args.skip_verify: verify_result = verify_models(onnx_dir, unlocked_dir) else: print("\n (Skipping verification)") elapsed = time.perf_counter() - t_start print(f"\n{'=' * 70}") print(f" DONE in {elapsed:.1f}s") print(f" Models: {onnx_dir}") print(f" Unlocked: {unlocked_dir}") print(f" Config: {output_dir / 'config_data'}") print(f"{'=' * 70}") if __name__ == "__main__": main()