oneocr / tools /extract_pipeline.py
OneOCR Dev
OneOCR - reverse engineering complete, ONNX pipeline 53% match rate
ce847d4
#!/usr/bin/env python3
"""
OneOCR Extraction Pipeline — Complete end-to-end extraction tool.
=================================================================
Single script that performs the entire extraction process:
1. Decrypt .onemodel container (AES-256-CFB128)
2. Extract 34 ONNX models + config data
3. Unlock models 11-33 (replace OneOCRFeatureExtract custom op)
4. Verify all models load in onnxruntime
Usage:
python tools/extract_pipeline.py # defaults
python tools/extract_pipeline.py path/to/oneocr.onemodel # custom input
python tools/extract_pipeline.py --verify-only # just verify
Requirements:
pip install pycryptodome onnx onnxruntime numpy
Output structure:
oneocr_extracted/
├── onnx_models/ # 34 raw ONNX models (11-33 have custom ops)
├── onnx_models_unlocked/ # 23 unlocked models (11-33, standard ops)
└── config_data/ # char maps, rnn_info, manifest, configs
"""
from __future__ import annotations
import argparse
import copy
import hashlib
import struct
import sys
import time
from pathlib import Path
import numpy as np
try:
from Crypto.Cipher import AES
except ImportError:
print("ERROR: pycryptodome is required.")
print(" pip install pycryptodome")
sys.exit(1)
try:
import onnx
from onnx import helper, numpy_helper
except ImportError:
print("ERROR: onnx is required.")
print(" pip install onnx")
sys.exit(1)
try:
import onnxruntime as ort
except ImportError:
ort = None
print("WARNING: onnxruntime not installed — will skip runtime verification.")
# ═══════════════════════════════════════════════════════════════════════════════
# CONSTANTS
# ═══════════════════════════════════════════════════════════════════════════════
MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4'
IV = b"Copyright @ OneO"
CONTAINER_MAGIC = bytes.fromhex("4a1a082b25000000")
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 1: DECRYPTION
# ═══════════════════════════════════════════════════════════════════════════════
def aes_cfb128_decrypt(key: bytes, data: bytes) -> bytes:
"""Decrypt with AES-256-CFB128."""
return AES.new(key, AES.MODE_CFB, iv=IV, segment_size=128).decrypt(data)
def derive_key(sha_input: bytes) -> bytes:
"""SHA256 key derivation."""
return hashlib.sha256(sha_input).digest()
def read_varint(data: bytes, pos: int) -> tuple[int, int]:
"""Read protobuf varint."""
val = shift = 0
while pos < len(data):
b = data[pos]; pos += 1
val |= (b & 0x7F) << shift
if not (b & 0x80): break
shift += 7
return val, pos
def measure_protobuf(data: bytes) -> int:
"""Measure valid ONNX ModelProto protobuf length."""
VALID = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20}
pos = 0
while pos < len(data):
start = pos
tag, pos = read_varint(data, pos)
if pos > len(data): return start
field, wire = tag >> 3, tag & 7
if field not in VALID: return start
if wire == 0: _, pos = read_varint(data, pos)
elif wire == 1: pos += 8
elif wire == 2: l, pos = read_varint(data, pos); pos += l
elif wire == 5: pos += 4
else: return start
if pos > len(data): return start
return pos
class OneModelFile:
"""Parser for .onemodel encrypted containers."""
def __init__(self, filepath: str | Path):
self.filepath = Path(filepath)
self.data = self.filepath.read_bytes()
self.H = struct.unpack_from("<Q", self.data, 0)[0]
self.file_hash = self.data[8:24]
self.dx_offset = 24
self.dx_size = self.H - 12
self.payload_start = self.H + 16
def decrypt_dx(self) -> bytes:
key = derive_key(MASTER_KEY + self.file_hash)
return aes_cfb128_decrypt(key, self.data[self.dx_offset:self.dx_offset + self.dx_size])
def decrypt_config(self, dx: bytes) -> bytes:
key = derive_key(dx[48:64] + dx[32:48])
s1 = struct.unpack_from("<Q", dx, 48)[0]
return aes_cfb128_decrypt(key, dx[64:64 + s1 + 8])
def iter_chunks(self):
"""Yield (index, decrypted_payload) for each payload chunk."""
off = self.payload_start
idx = 0
while off + 32 <= len(self.data):
checksum = self.data[off:off + 16]
s1, s2 = struct.unpack_from("<QQ", self.data, off + 16)
if s2 != s1 + 24 or s1 == 0 or s1 > len(self.data): break
enc_size = s1 + 8
data_off = off + 32
if data_off + enc_size > len(self.data): break
key = derive_key(self.data[off + 16:off + 32] + checksum)
dec = aes_cfb128_decrypt(key, self.data[data_off:data_off + enc_size])
if dec[:8] == CONTAINER_MAGIC:
yield idx, dec[8:]
else:
print(f" WARNING: chunk#{idx} magic mismatch — skipping")
off = data_off + enc_size
idx += 1
def classify_chunk(payload: bytes) -> str:
"""Classify decrypted chunk type."""
if len(payload) > 100 and payload[0] == 0x08 and payload[1] in (0x06, 0x07):
return "onnx"
try:
sample = payload[:100].decode("ascii")
if all(c.isprintable() or c in "\n\r\t" for c in sample):
if "<LogPrior>" in sample: return "rnn_info"
if sample.startswith("! ") or sample.startswith('" '):
return "char2ind" if any(c.isdigit() for c in sample[:20]) else "char2inschar"
if sample.startswith("0."): return "score_calibration"
if "text_script" in sample: return "ocr_config"
if "//" in sample[:5]: return "composite_chars"
return "text_data"
except (UnicodeDecodeError, ValueError):
pass
return "binary_data"
def decrypt_and_extract(input_file: Path, output_dir: Path) -> dict:
"""Step 1: Decrypt .onemodel and extract all chunks.
Returns dict with 'onnx_models' and 'config_files' lists.
"""
print("=" * 70)
print(" STEP 1: DECRYPT & EXTRACT")
print("=" * 70)
model_file = OneModelFile(input_file)
print(f" Input: {input_file} ({len(model_file.data):,} bytes)")
print(f" Output: {output_dir}")
# Decrypt DX index
dx = model_file.decrypt_dx()
assert dx[:2] == b"DX", "DX magic mismatch!"
print(f" DX index decrypted ({len(dx):,} bytes)")
# Decrypt manifest config
config_dec = model_file.decrypt_config(dx)
assert config_dec[:8] == CONTAINER_MAGIC
config_payload = config_dec[8:]
# Prepare output
onnx_dir = output_dir / "onnx_models"
config_dir = output_dir / "config_data"
onnx_dir.mkdir(parents=True, exist_ok=True)
config_dir.mkdir(parents=True, exist_ok=True)
# Save manifest
manifest_path = config_dir / "manifest.bin"
manifest_path.write_bytes(config_payload)
print(f" Manifest: {len(config_payload):,} bytes")
# Extract chunks
onnx_models = []
config_files = [manifest_path]
EXT_MAP = {
"rnn_info": ".rnn_info", "char2ind": ".char2ind.txt",
"char2inschar": ".char2inschar.txt", "score_calibration": ".calibration.txt",
"ocr_config": ".config.txt", "composite_chars": ".composite.txt",
"text_data": ".txt", "binary_data": ".bin",
}
print(f"\n {'#':>4} {'Type':18s} {'Size':>12} {'Filename'}")
print(f" {'-'*66}")
for idx, payload in model_file.iter_chunks():
chunk_type = classify_chunk(payload)
if chunk_type == "onnx":
exact_size = measure_protobuf(payload)
onnx_data = payload[:exact_size]
info = _get_onnx_info(onnx_data)
ir = info.get("ir_version", "?")
prod = info.get("producer_version", "unknown")
size_kb = len(onnx_data) // 1024
onnx_idx = len(onnx_models)
fname = f"model_{onnx_idx:02d}_ir{ir}_{prod}_{size_kb}KB.onnx"
(onnx_dir / fname).write_bytes(onnx_data)
onnx_models.append(onnx_dir / fname)
print(f" {idx:4d} {'ONNX':18s} {len(onnx_data):12,} {fname}")
else:
ext = EXT_MAP.get(chunk_type, ".bin")
fname = f"chunk_{idx:02d}_{chunk_type}{ext}"
(config_dir / fname).write_bytes(payload)
config_files.append(config_dir / fname)
print(f" {idx:4d} {chunk_type:18s} {len(payload):12,} {fname}")
print(f"\n Extracted: {len(onnx_models)} ONNX models, {len(config_files)} config files")
return {"onnx_models": onnx_models, "config_files": config_files}
def _get_onnx_info(data: bytes) -> dict:
"""Extract basic ONNX info from protobuf header."""
info = {}; pos = 0
while pos < min(len(data), 500):
tag, pos = read_varint(data, pos)
field, wire = tag >> 3, tag & 7
if wire == 0:
val, pos = read_varint(data, pos)
if field == 1: info["ir_version"] = val
elif wire == 2:
l, pos = read_varint(data, pos)
raw = data[pos:pos + l]; pos += l
try:
if field == 4: info["producer_version"] = raw.decode()
except: pass
elif wire == 5: pos += 4
elif wire == 1: pos += 8
else: break
if "ir_version" in info and "producer_version" in info: break
return info
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 2: UNLOCK MODELS
# ═══════════════════════════════════════════════════════════════════════════════
def _extract_fe_weights(model) -> tuple[np.ndarray, np.ndarray, int, int]:
"""Extract W, b from OneOCRFeatureExtract config blob."""
config_blob = None
for init in model.graph.initializer:
if init.name == "feature/config":
config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data)
break
if config_blob is None:
raise ValueError("No feature/config initializer")
be_arr = np.frombuffer(config_blob, dtype='>f4').copy()
# Find dimensions from metadata or graph
fe_node = next((n for n in model.graph.node if n.op_type == "OneOCRFeatureExtract"), None)
if fe_node is None:
raise ValueError("No OneOCRFeatureExtract node")
in_dim = out_dim = None
for i in range(len(be_arr) - 10, len(be_arr)):
val = be_arr[i]
if val == 21.0 and i + 1 < len(be_arr) and be_arr[i + 1] in [50.0, 51.0]:
in_dim, out_dim = 21, int(be_arr[i + 1])
break
if in_dim is None:
for gi in model.graph.input:
if gi.name == "data":
shape = [d.dim_value for d in gi.type.tensor_type.shape.dim]
if len(shape) >= 2 and shape[1] > 0:
in_dim = shape[1]
break
if out_dim is None:
fe_out = fe_node.output[0]
for node in model.graph.node:
if node.op_type == "Gemm" and fe_out in node.input:
wn = node.input[1]
for init in model.graph.initializer:
if init.name == wn:
W = numpy_helper.to_array(init)
out_dim = W.shape[0] if len(W.shape) == 2 else W.shape[1]
break
if in_dim is None or out_dim is None:
raise ValueError(f"Cannot determine dims: in={in_dim}, out={out_dim}")
W = be_arr[:in_dim * out_dim].reshape(in_dim, out_dim).astype(np.float32)
b = be_arr[in_dim * out_dim:in_dim * out_dim + out_dim].astype(np.float32)
return W, b, in_dim, out_dim
def unlock_gemm_model(model_path: Path, output_dir: Path) -> Path | None:
"""Unlock models 11-32: OneOCRFeatureExtract → Gemm."""
model = onnx.load(str(model_path))
if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node):
return None
W, b, in_dim, out_dim = _extract_fe_weights(model)
new_model = copy.deepcopy(model)
# Replace initializers
new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"]
new_inits.append(numpy_helper.from_array(W.T, name="fe_weight"))
new_inits.append(numpy_helper.from_array(b, name="fe_bias"))
del new_model.graph.initializer[:]
new_model.graph.initializer.extend(new_inits)
# Replace node
fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract")
fe_in, fe_out = fe_node.input[0], fe_node.output[0]
new_nodes = []
for node in new_model.graph.node:
if node.op_type == "OneOCRFeatureExtract":
new_nodes.append(helper.make_node("Gemm", [fe_in, "fe_weight", "fe_bias"],
[fe_out], alpha=1.0, beta=1.0, transB=1))
else:
new_nodes.append(node)
del new_model.graph.node[:]
new_model.graph.node.extend(new_nodes)
# Cleanup
del new_model.graph.input[:]
new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"])
new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"]
del new_model.opset_import[:]
new_model.opset_import.extend(new_opsets)
out_path = output_dir / (model_path.stem + "_unlocked.onnx")
onnx.save(new_model, str(out_path))
return out_path
def unlock_conv_model(model_path: Path, output_dir: Path) -> Path | None:
"""Unlock model 33 (LineLayout): OneOCRFeatureExtract → Conv1x1."""
model = onnx.load(str(model_path))
if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node):
return None
# Model 33: in_ch=256, out_ch=16
config_blob = None
for init in model.graph.initializer:
if init.name == "feature/config":
config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data)
break
if config_blob is None:
return None
be_arr = np.frombuffer(config_blob, dtype='>f4').copy()
in_ch, out_ch = 256, 16
W = be_arr[:in_ch * out_ch].reshape(in_ch, out_ch).T.reshape(out_ch, in_ch, 1, 1).astype(np.float32)
b = be_arr[in_ch * out_ch:in_ch * out_ch + out_ch].astype(np.float32)
new_model = copy.deepcopy(model)
new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"]
new_inits.append(numpy_helper.from_array(W, name="fe_conv_weight"))
new_inits.append(numpy_helper.from_array(b, name="fe_conv_bias"))
del new_model.graph.initializer[:]
new_model.graph.initializer.extend(new_inits)
fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract")
fe_in, fe_out = fe_node.input[0], fe_node.output[0]
new_nodes = []
for node in new_model.graph.node:
if node.op_type == "OneOCRFeatureExtract":
new_nodes.append(helper.make_node("Conv", [fe_in, "fe_conv_weight", "fe_conv_bias"],
[fe_out], kernel_shape=[1, 1], strides=[1, 1],
pads=[0, 0, 0, 0]))
else:
new_nodes.append(node)
del new_model.graph.node[:]
new_model.graph.node.extend(new_nodes)
del new_model.graph.input[:]
new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"])
new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"]
del new_model.opset_import[:]
new_model.opset_import.extend(new_opsets)
out_path = output_dir / (model_path.stem + "_unlocked.onnx")
onnx.save(new_model, str(out_path))
return out_path
def unlock_all_models(onnx_dir: Path, output_dir: Path) -> dict:
"""Step 2: Unlock models 11-33 (replace custom ops).
Returns dict with 'unlocked', 'skipped', 'failed' lists.
"""
print("\n" + "=" * 70)
print(" STEP 2: UNLOCK MODELS (replace OneOCRFeatureExtract)")
print("=" * 70)
output_dir.mkdir(parents=True, exist_ok=True)
results = {"unlocked": [], "skipped": [], "failed": []}
for idx in range(11, 34):
matches = list(onnx_dir.glob(f"model_{idx:02d}_*"))
if not matches:
print(f" model_{idx:02d}: NOT FOUND")
results["failed"].append(idx)
continue
model_path = matches[0]
try:
if idx == 33:
out = unlock_conv_model(model_path, output_dir)
else:
out = unlock_gemm_model(model_path, output_dir)
if out is None:
results["skipped"].append(idx)
print(f" model_{idx:02d}: skipped (no custom op)")
else:
results["unlocked"].append(idx)
print(f" model_{idx:02d}: ✓ unlocked → {out.name}")
except Exception as e:
results["failed"].append(idx)
print(f" model_{idx:02d}: ✗ FAILED — {e}")
n = len(results["unlocked"])
print(f"\n Unlocked: {n}/23 models")
return results
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 3: VERIFY
# ═══════════════════════════════════════════════════════════════════════════════
def verify_models(onnx_dir: Path, unlocked_dir: Path) -> dict:
"""Step 3: Verify all models load in onnxruntime.
Returns dict with verification results.
"""
print("\n" + "=" * 70)
print(" STEP 3: VERIFY (onnxruntime inference test)")
print("=" * 70)
if ort is None:
print(" ⚠ onnxruntime not installed — skipping verification")
return {"status": "skipped"}
results = {"ok": [], "custom_op": [], "failed": []}
# Verify core models (0-10)
print("\n Core models (0-10):")
for idx in range(11):
matches = list(onnx_dir.glob(f"model_{idx:02d}_*"))
if not matches: continue
try:
sess = ort.InferenceSession(str(matches[0]),
providers=["CPUExecutionProvider"])
inputs = sess.get_inputs()
shapes = {i.name: i.shape for i in inputs}
results["ok"].append(idx)
print(f" model_{idx:02d}: ✓ inputs={shapes}")
except Exception as e:
err = str(e)[:60]
if "custom ops" in err.lower() or "oneocr" in err.lower():
results["custom_op"].append(idx)
print(f" model_{idx:02d}: ⚠ custom_op ({err})")
else:
results["failed"].append(idx)
print(f" model_{idx:02d}: ✗ {err}")
# Verify unlocked models (11-33)
print("\n Unlocked models (11-33):")
for idx in range(11, 34):
matches = list(unlocked_dir.glob(f"model_{idx:02d}_*"))
if not matches: continue
try:
sess = ort.InferenceSession(str(matches[0]),
providers=["CPUExecutionProvider"])
# Quick zero-input test
feeds = {}
for inp in sess.get_inputs():
shape = [d if isinstance(d, int) and d > 0 else 1 for d in inp.shape]
feeds[inp.name] = np.zeros(shape, dtype=np.float32)
out = sess.run(None, feeds)
results["ok"].append(idx)
print(f" model_{idx:02d}: ✓ output_shapes={[o.shape for o in out]}")
except Exception as e:
results["failed"].append(idx)
print(f" model_{idx:02d}: ✗ {str(e)[:60]}")
ok = len(results["ok"])
total = ok + len(results["custom_op"]) + len(results["failed"])
print(f"\n Verification: {ok}/{total} models OK")
return results
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN
# ═══════════════════════════════════════════════════════════════════════════════
def main():
parser = argparse.ArgumentParser(
description="OneOCR extraction pipeline: decrypt → extract → unlock → verify")
parser.add_argument("input", nargs="?", default="ocr_data/oneocr.onemodel",
help="Path to .onemodel file (default: ocr_data/oneocr.onemodel)")
parser.add_argument("--output", "-o", default="oneocr_extracted",
help="Output directory (default: oneocr_extracted)")
parser.add_argument("--verify-only", action="store_true",
help="Only verify existing extracted models")
parser.add_argument("--skip-unlock", action="store_true",
help="Skip model unlocking step")
parser.add_argument("--skip-verify", action="store_true",
help="Skip verification step")
args = parser.parse_args()
input_file = Path(args.input)
output_dir = Path(args.output)
onnx_dir = output_dir / "onnx_models"
unlocked_dir = output_dir / "onnx_models_unlocked"
print()
print("╔══════════════════════════════════════════════════════════════════════╗")
print("║ OneOCR Extraction Pipeline ║")
print("║ Decrypt → Extract → Unlock → Verify ║")
print("╚══════════════════════════════════════════════════════════════════════╝")
t_start = time.perf_counter()
if args.verify_only:
verify_models(onnx_dir, unlocked_dir)
else:
# Step 1: Decrypt & Extract
if not input_file.exists():
print(f"\n ERROR: Input file not found: {input_file}")
print(f" Place oneocr.onemodel in ocr_data/ directory")
sys.exit(1)
extract_result = decrypt_and_extract(input_file, output_dir)
# Step 2: Unlock
if not args.skip_unlock:
unlock_result = unlock_all_models(onnx_dir, unlocked_dir)
else:
print("\n (Skipping unlock step)")
# Step 3: Verify
if not args.skip_verify:
verify_result = verify_models(onnx_dir, unlocked_dir)
else:
print("\n (Skipping verification)")
elapsed = time.perf_counter() - t_start
print(f"\n{'=' * 70}")
print(f" DONE in {elapsed:.1f}s")
print(f" Models: {onnx_dir}")
print(f" Unlocked: {unlocked_dir}")
print(f" Config: {output_dir / 'config_data'}")
print(f"{'=' * 70}")
if __name__ == "__main__":
main()