|
|
|
|
|
""" |
|
|
OneOCR Extraction Pipeline — Complete end-to-end extraction tool. |
|
|
================================================================= |
|
|
|
|
|
Single script that performs the entire extraction process: |
|
|
1. Decrypt .onemodel container (AES-256-CFB128) |
|
|
2. Extract 34 ONNX models + config data |
|
|
3. Unlock models 11-33 (replace OneOCRFeatureExtract custom op) |
|
|
4. Verify all models load in onnxruntime |
|
|
|
|
|
Usage: |
|
|
python tools/extract_pipeline.py # defaults |
|
|
python tools/extract_pipeline.py path/to/oneocr.onemodel # custom input |
|
|
python tools/extract_pipeline.py --verify-only # just verify |
|
|
|
|
|
Requirements: |
|
|
pip install pycryptodome onnx onnxruntime numpy |
|
|
|
|
|
Output structure: |
|
|
oneocr_extracted/ |
|
|
├── onnx_models/ # 34 raw ONNX models (11-33 have custom ops) |
|
|
├── onnx_models_unlocked/ # 23 unlocked models (11-33, standard ops) |
|
|
└── config_data/ # char maps, rnn_info, manifest, configs |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import argparse |
|
|
import copy |
|
|
import hashlib |
|
|
import struct |
|
|
import sys |
|
|
import time |
|
|
from pathlib import Path |
|
|
|
|
|
import numpy as np |
|
|
|
|
|
try: |
|
|
from Crypto.Cipher import AES |
|
|
except ImportError: |
|
|
print("ERROR: pycryptodome is required.") |
|
|
print(" pip install pycryptodome") |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
import onnx |
|
|
from onnx import helper, numpy_helper |
|
|
except ImportError: |
|
|
print("ERROR: onnx is required.") |
|
|
print(" pip install onnx") |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
import onnxruntime as ort |
|
|
except ImportError: |
|
|
ort = None |
|
|
print("WARNING: onnxruntime not installed — will skip runtime verification.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
IV = b"Copyright @ OneO" |
|
|
CONTAINER_MAGIC = bytes.fromhex("4a1a082b25000000") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def aes_cfb128_decrypt(key: bytes, data: bytes) -> bytes: |
|
|
"""Decrypt with AES-256-CFB128.""" |
|
|
return AES.new(key, AES.MODE_CFB, iv=IV, segment_size=128).decrypt(data) |
|
|
|
|
|
|
|
|
def derive_key(sha_input: bytes) -> bytes: |
|
|
"""SHA256 key derivation.""" |
|
|
return hashlib.sha256(sha_input).digest() |
|
|
|
|
|
|
|
|
def read_varint(data: bytes, pos: int) -> tuple[int, int]: |
|
|
"""Read protobuf varint.""" |
|
|
val = shift = 0 |
|
|
while pos < len(data): |
|
|
b = data[pos]; pos += 1 |
|
|
val |= (b & 0x7F) << shift |
|
|
if not (b & 0x80): break |
|
|
shift += 7 |
|
|
return val, pos |
|
|
|
|
|
|
|
|
def measure_protobuf(data: bytes) -> int: |
|
|
"""Measure valid ONNX ModelProto protobuf length.""" |
|
|
VALID = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20} |
|
|
pos = 0 |
|
|
while pos < len(data): |
|
|
start = pos |
|
|
tag, pos = read_varint(data, pos) |
|
|
if pos > len(data): return start |
|
|
field, wire = tag >> 3, tag & 7 |
|
|
if field not in VALID: return start |
|
|
if wire == 0: _, pos = read_varint(data, pos) |
|
|
elif wire == 1: pos += 8 |
|
|
elif wire == 2: l, pos = read_varint(data, pos); pos += l |
|
|
elif wire == 5: pos += 4 |
|
|
else: return start |
|
|
if pos > len(data): return start |
|
|
return pos |
|
|
|
|
|
|
|
|
class OneModelFile: |
|
|
"""Parser for .onemodel encrypted containers.""" |
|
|
|
|
|
def __init__(self, filepath: str | Path): |
|
|
self.filepath = Path(filepath) |
|
|
self.data = self.filepath.read_bytes() |
|
|
self.H = struct.unpack_from("<Q", self.data, 0)[0] |
|
|
self.file_hash = self.data[8:24] |
|
|
self.dx_offset = 24 |
|
|
self.dx_size = self.H - 12 |
|
|
self.payload_start = self.H + 16 |
|
|
|
|
|
def decrypt_dx(self) -> bytes: |
|
|
key = derive_key(MASTER_KEY + self.file_hash) |
|
|
return aes_cfb128_decrypt(key, self.data[self.dx_offset:self.dx_offset + self.dx_size]) |
|
|
|
|
|
def decrypt_config(self, dx: bytes) -> bytes: |
|
|
key = derive_key(dx[48:64] + dx[32:48]) |
|
|
s1 = struct.unpack_from("<Q", dx, 48)[0] |
|
|
return aes_cfb128_decrypt(key, dx[64:64 + s1 + 8]) |
|
|
|
|
|
def iter_chunks(self): |
|
|
"""Yield (index, decrypted_payload) for each payload chunk.""" |
|
|
off = self.payload_start |
|
|
idx = 0 |
|
|
while off + 32 <= len(self.data): |
|
|
checksum = self.data[off:off + 16] |
|
|
s1, s2 = struct.unpack_from("<QQ", self.data, off + 16) |
|
|
if s2 != s1 + 24 or s1 == 0 or s1 > len(self.data): break |
|
|
enc_size = s1 + 8 |
|
|
data_off = off + 32 |
|
|
if data_off + enc_size > len(self.data): break |
|
|
key = derive_key(self.data[off + 16:off + 32] + checksum) |
|
|
dec = aes_cfb128_decrypt(key, self.data[data_off:data_off + enc_size]) |
|
|
if dec[:8] == CONTAINER_MAGIC: |
|
|
yield idx, dec[8:] |
|
|
else: |
|
|
print(f" WARNING: chunk#{idx} magic mismatch — skipping") |
|
|
off = data_off + enc_size |
|
|
idx += 1 |
|
|
|
|
|
|
|
|
def classify_chunk(payload: bytes) -> str: |
|
|
"""Classify decrypted chunk type.""" |
|
|
if len(payload) > 100 and payload[0] == 0x08 and payload[1] in (0x06, 0x07): |
|
|
return "onnx" |
|
|
try: |
|
|
sample = payload[:100].decode("ascii") |
|
|
if all(c.isprintable() or c in "\n\r\t" for c in sample): |
|
|
if "<LogPrior>" in sample: return "rnn_info" |
|
|
if sample.startswith("! ") or sample.startswith('" '): |
|
|
return "char2ind" if any(c.isdigit() for c in sample[:20]) else "char2inschar" |
|
|
if sample.startswith("0."): return "score_calibration" |
|
|
if "text_script" in sample: return "ocr_config" |
|
|
if "//" in sample[:5]: return "composite_chars" |
|
|
return "text_data" |
|
|
except (UnicodeDecodeError, ValueError): |
|
|
pass |
|
|
return "binary_data" |
|
|
|
|
|
|
|
|
def decrypt_and_extract(input_file: Path, output_dir: Path) -> dict: |
|
|
"""Step 1: Decrypt .onemodel and extract all chunks. |
|
|
|
|
|
Returns dict with 'onnx_models' and 'config_files' lists. |
|
|
""" |
|
|
print("=" * 70) |
|
|
print(" STEP 1: DECRYPT & EXTRACT") |
|
|
print("=" * 70) |
|
|
|
|
|
model_file = OneModelFile(input_file) |
|
|
print(f" Input: {input_file} ({len(model_file.data):,} bytes)") |
|
|
print(f" Output: {output_dir}") |
|
|
|
|
|
|
|
|
dx = model_file.decrypt_dx() |
|
|
assert dx[:2] == b"DX", "DX magic mismatch!" |
|
|
print(f" DX index decrypted ({len(dx):,} bytes)") |
|
|
|
|
|
|
|
|
config_dec = model_file.decrypt_config(dx) |
|
|
assert config_dec[:8] == CONTAINER_MAGIC |
|
|
config_payload = config_dec[8:] |
|
|
|
|
|
|
|
|
onnx_dir = output_dir / "onnx_models" |
|
|
config_dir = output_dir / "config_data" |
|
|
onnx_dir.mkdir(parents=True, exist_ok=True) |
|
|
config_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
manifest_path = config_dir / "manifest.bin" |
|
|
manifest_path.write_bytes(config_payload) |
|
|
print(f" Manifest: {len(config_payload):,} bytes") |
|
|
|
|
|
|
|
|
onnx_models = [] |
|
|
config_files = [manifest_path] |
|
|
|
|
|
EXT_MAP = { |
|
|
"rnn_info": ".rnn_info", "char2ind": ".char2ind.txt", |
|
|
"char2inschar": ".char2inschar.txt", "score_calibration": ".calibration.txt", |
|
|
"ocr_config": ".config.txt", "composite_chars": ".composite.txt", |
|
|
"text_data": ".txt", "binary_data": ".bin", |
|
|
} |
|
|
|
|
|
print(f"\n {'#':>4} {'Type':18s} {'Size':>12} {'Filename'}") |
|
|
print(f" {'-'*66}") |
|
|
|
|
|
for idx, payload in model_file.iter_chunks(): |
|
|
chunk_type = classify_chunk(payload) |
|
|
|
|
|
if chunk_type == "onnx": |
|
|
exact_size = measure_protobuf(payload) |
|
|
onnx_data = payload[:exact_size] |
|
|
info = _get_onnx_info(onnx_data) |
|
|
ir = info.get("ir_version", "?") |
|
|
prod = info.get("producer_version", "unknown") |
|
|
size_kb = len(onnx_data) // 1024 |
|
|
onnx_idx = len(onnx_models) |
|
|
fname = f"model_{onnx_idx:02d}_ir{ir}_{prod}_{size_kb}KB.onnx" |
|
|
(onnx_dir / fname).write_bytes(onnx_data) |
|
|
onnx_models.append(onnx_dir / fname) |
|
|
print(f" {idx:4d} {'ONNX':18s} {len(onnx_data):12,} {fname}") |
|
|
else: |
|
|
ext = EXT_MAP.get(chunk_type, ".bin") |
|
|
fname = f"chunk_{idx:02d}_{chunk_type}{ext}" |
|
|
(config_dir / fname).write_bytes(payload) |
|
|
config_files.append(config_dir / fname) |
|
|
print(f" {idx:4d} {chunk_type:18s} {len(payload):12,} {fname}") |
|
|
|
|
|
print(f"\n Extracted: {len(onnx_models)} ONNX models, {len(config_files)} config files") |
|
|
return {"onnx_models": onnx_models, "config_files": config_files} |
|
|
|
|
|
|
|
|
def _get_onnx_info(data: bytes) -> dict: |
|
|
"""Extract basic ONNX info from protobuf header.""" |
|
|
info = {}; pos = 0 |
|
|
while pos < min(len(data), 500): |
|
|
tag, pos = read_varint(data, pos) |
|
|
field, wire = tag >> 3, tag & 7 |
|
|
if wire == 0: |
|
|
val, pos = read_varint(data, pos) |
|
|
if field == 1: info["ir_version"] = val |
|
|
elif wire == 2: |
|
|
l, pos = read_varint(data, pos) |
|
|
raw = data[pos:pos + l]; pos += l |
|
|
try: |
|
|
if field == 4: info["producer_version"] = raw.decode() |
|
|
except: pass |
|
|
elif wire == 5: pos += 4 |
|
|
elif wire == 1: pos += 8 |
|
|
else: break |
|
|
if "ir_version" in info and "producer_version" in info: break |
|
|
return info |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _extract_fe_weights(model) -> tuple[np.ndarray, np.ndarray, int, int]: |
|
|
"""Extract W, b from OneOCRFeatureExtract config blob.""" |
|
|
config_blob = None |
|
|
for init in model.graph.initializer: |
|
|
if init.name == "feature/config": |
|
|
config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data) |
|
|
break |
|
|
if config_blob is None: |
|
|
raise ValueError("No feature/config initializer") |
|
|
|
|
|
be_arr = np.frombuffer(config_blob, dtype='>f4').copy() |
|
|
|
|
|
|
|
|
fe_node = next((n for n in model.graph.node if n.op_type == "OneOCRFeatureExtract"), None) |
|
|
if fe_node is None: |
|
|
raise ValueError("No OneOCRFeatureExtract node") |
|
|
|
|
|
in_dim = out_dim = None |
|
|
for i in range(len(be_arr) - 10, len(be_arr)): |
|
|
val = be_arr[i] |
|
|
if val == 21.0 and i + 1 < len(be_arr) and be_arr[i + 1] in [50.0, 51.0]: |
|
|
in_dim, out_dim = 21, int(be_arr[i + 1]) |
|
|
break |
|
|
|
|
|
if in_dim is None: |
|
|
for gi in model.graph.input: |
|
|
if gi.name == "data": |
|
|
shape = [d.dim_value for d in gi.type.tensor_type.shape.dim] |
|
|
if len(shape) >= 2 and shape[1] > 0: |
|
|
in_dim = shape[1] |
|
|
break |
|
|
|
|
|
if out_dim is None: |
|
|
fe_out = fe_node.output[0] |
|
|
for node in model.graph.node: |
|
|
if node.op_type == "Gemm" and fe_out in node.input: |
|
|
wn = node.input[1] |
|
|
for init in model.graph.initializer: |
|
|
if init.name == wn: |
|
|
W = numpy_helper.to_array(init) |
|
|
out_dim = W.shape[0] if len(W.shape) == 2 else W.shape[1] |
|
|
break |
|
|
|
|
|
if in_dim is None or out_dim is None: |
|
|
raise ValueError(f"Cannot determine dims: in={in_dim}, out={out_dim}") |
|
|
|
|
|
W = be_arr[:in_dim * out_dim].reshape(in_dim, out_dim).astype(np.float32) |
|
|
b = be_arr[in_dim * out_dim:in_dim * out_dim + out_dim].astype(np.float32) |
|
|
return W, b, in_dim, out_dim |
|
|
|
|
|
|
|
|
def unlock_gemm_model(model_path: Path, output_dir: Path) -> Path | None: |
|
|
"""Unlock models 11-32: OneOCRFeatureExtract → Gemm.""" |
|
|
model = onnx.load(str(model_path)) |
|
|
if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node): |
|
|
return None |
|
|
|
|
|
W, b, in_dim, out_dim = _extract_fe_weights(model) |
|
|
new_model = copy.deepcopy(model) |
|
|
|
|
|
|
|
|
new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"] |
|
|
new_inits.append(numpy_helper.from_array(W.T, name="fe_weight")) |
|
|
new_inits.append(numpy_helper.from_array(b, name="fe_bias")) |
|
|
del new_model.graph.initializer[:] |
|
|
new_model.graph.initializer.extend(new_inits) |
|
|
|
|
|
|
|
|
fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract") |
|
|
fe_in, fe_out = fe_node.input[0], fe_node.output[0] |
|
|
new_nodes = [] |
|
|
for node in new_model.graph.node: |
|
|
if node.op_type == "OneOCRFeatureExtract": |
|
|
new_nodes.append(helper.make_node("Gemm", [fe_in, "fe_weight", "fe_bias"], |
|
|
[fe_out], alpha=1.0, beta=1.0, transB=1)) |
|
|
else: |
|
|
new_nodes.append(node) |
|
|
del new_model.graph.node[:] |
|
|
new_model.graph.node.extend(new_nodes) |
|
|
|
|
|
|
|
|
del new_model.graph.input[:] |
|
|
new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"]) |
|
|
new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"] |
|
|
del new_model.opset_import[:] |
|
|
new_model.opset_import.extend(new_opsets) |
|
|
|
|
|
out_path = output_dir / (model_path.stem + "_unlocked.onnx") |
|
|
onnx.save(new_model, str(out_path)) |
|
|
return out_path |
|
|
|
|
|
|
|
|
def unlock_conv_model(model_path: Path, output_dir: Path) -> Path | None: |
|
|
"""Unlock model 33 (LineLayout): OneOCRFeatureExtract → Conv1x1.""" |
|
|
model = onnx.load(str(model_path)) |
|
|
if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node): |
|
|
return None |
|
|
|
|
|
|
|
|
config_blob = None |
|
|
for init in model.graph.initializer: |
|
|
if init.name == "feature/config": |
|
|
config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data) |
|
|
break |
|
|
if config_blob is None: |
|
|
return None |
|
|
|
|
|
be_arr = np.frombuffer(config_blob, dtype='>f4').copy() |
|
|
in_ch, out_ch = 256, 16 |
|
|
W = be_arr[:in_ch * out_ch].reshape(in_ch, out_ch).T.reshape(out_ch, in_ch, 1, 1).astype(np.float32) |
|
|
b = be_arr[in_ch * out_ch:in_ch * out_ch + out_ch].astype(np.float32) |
|
|
|
|
|
new_model = copy.deepcopy(model) |
|
|
new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"] |
|
|
new_inits.append(numpy_helper.from_array(W, name="fe_conv_weight")) |
|
|
new_inits.append(numpy_helper.from_array(b, name="fe_conv_bias")) |
|
|
del new_model.graph.initializer[:] |
|
|
new_model.graph.initializer.extend(new_inits) |
|
|
|
|
|
fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract") |
|
|
fe_in, fe_out = fe_node.input[0], fe_node.output[0] |
|
|
new_nodes = [] |
|
|
for node in new_model.graph.node: |
|
|
if node.op_type == "OneOCRFeatureExtract": |
|
|
new_nodes.append(helper.make_node("Conv", [fe_in, "fe_conv_weight", "fe_conv_bias"], |
|
|
[fe_out], kernel_shape=[1, 1], strides=[1, 1], |
|
|
pads=[0, 0, 0, 0])) |
|
|
else: |
|
|
new_nodes.append(node) |
|
|
del new_model.graph.node[:] |
|
|
new_model.graph.node.extend(new_nodes) |
|
|
|
|
|
del new_model.graph.input[:] |
|
|
new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"]) |
|
|
new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"] |
|
|
del new_model.opset_import[:] |
|
|
new_model.opset_import.extend(new_opsets) |
|
|
|
|
|
out_path = output_dir / (model_path.stem + "_unlocked.onnx") |
|
|
onnx.save(new_model, str(out_path)) |
|
|
return out_path |
|
|
|
|
|
|
|
|
def unlock_all_models(onnx_dir: Path, output_dir: Path) -> dict: |
|
|
"""Step 2: Unlock models 11-33 (replace custom ops). |
|
|
|
|
|
Returns dict with 'unlocked', 'skipped', 'failed' lists. |
|
|
""" |
|
|
print("\n" + "=" * 70) |
|
|
print(" STEP 2: UNLOCK MODELS (replace OneOCRFeatureExtract)") |
|
|
print("=" * 70) |
|
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
results = {"unlocked": [], "skipped": [], "failed": []} |
|
|
|
|
|
for idx in range(11, 34): |
|
|
matches = list(onnx_dir.glob(f"model_{idx:02d}_*")) |
|
|
if not matches: |
|
|
print(f" model_{idx:02d}: NOT FOUND") |
|
|
results["failed"].append(idx) |
|
|
continue |
|
|
|
|
|
model_path = matches[0] |
|
|
try: |
|
|
if idx == 33: |
|
|
out = unlock_conv_model(model_path, output_dir) |
|
|
else: |
|
|
out = unlock_gemm_model(model_path, output_dir) |
|
|
|
|
|
if out is None: |
|
|
results["skipped"].append(idx) |
|
|
print(f" model_{idx:02d}: skipped (no custom op)") |
|
|
else: |
|
|
results["unlocked"].append(idx) |
|
|
print(f" model_{idx:02d}: ✓ unlocked → {out.name}") |
|
|
except Exception as e: |
|
|
results["failed"].append(idx) |
|
|
print(f" model_{idx:02d}: ✗ FAILED — {e}") |
|
|
|
|
|
n = len(results["unlocked"]) |
|
|
print(f"\n Unlocked: {n}/23 models") |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def verify_models(onnx_dir: Path, unlocked_dir: Path) -> dict: |
|
|
"""Step 3: Verify all models load in onnxruntime. |
|
|
|
|
|
Returns dict with verification results. |
|
|
""" |
|
|
print("\n" + "=" * 70) |
|
|
print(" STEP 3: VERIFY (onnxruntime inference test)") |
|
|
print("=" * 70) |
|
|
|
|
|
if ort is None: |
|
|
print(" ⚠ onnxruntime not installed — skipping verification") |
|
|
return {"status": "skipped"} |
|
|
|
|
|
results = {"ok": [], "custom_op": [], "failed": []} |
|
|
|
|
|
|
|
|
print("\n Core models (0-10):") |
|
|
for idx in range(11): |
|
|
matches = list(onnx_dir.glob(f"model_{idx:02d}_*")) |
|
|
if not matches: continue |
|
|
try: |
|
|
sess = ort.InferenceSession(str(matches[0]), |
|
|
providers=["CPUExecutionProvider"]) |
|
|
inputs = sess.get_inputs() |
|
|
shapes = {i.name: i.shape for i in inputs} |
|
|
results["ok"].append(idx) |
|
|
print(f" model_{idx:02d}: ✓ inputs={shapes}") |
|
|
except Exception as e: |
|
|
err = str(e)[:60] |
|
|
if "custom ops" in err.lower() or "oneocr" in err.lower(): |
|
|
results["custom_op"].append(idx) |
|
|
print(f" model_{idx:02d}: ⚠ custom_op ({err})") |
|
|
else: |
|
|
results["failed"].append(idx) |
|
|
print(f" model_{idx:02d}: ✗ {err}") |
|
|
|
|
|
|
|
|
print("\n Unlocked models (11-33):") |
|
|
for idx in range(11, 34): |
|
|
matches = list(unlocked_dir.glob(f"model_{idx:02d}_*")) |
|
|
if not matches: continue |
|
|
try: |
|
|
sess = ort.InferenceSession(str(matches[0]), |
|
|
providers=["CPUExecutionProvider"]) |
|
|
|
|
|
feeds = {} |
|
|
for inp in sess.get_inputs(): |
|
|
shape = [d if isinstance(d, int) and d > 0 else 1 for d in inp.shape] |
|
|
feeds[inp.name] = np.zeros(shape, dtype=np.float32) |
|
|
out = sess.run(None, feeds) |
|
|
results["ok"].append(idx) |
|
|
print(f" model_{idx:02d}: ✓ output_shapes={[o.shape for o in out]}") |
|
|
except Exception as e: |
|
|
results["failed"].append(idx) |
|
|
print(f" model_{idx:02d}: ✗ {str(e)[:60]}") |
|
|
|
|
|
ok = len(results["ok"]) |
|
|
total = ok + len(results["custom_op"]) + len(results["failed"]) |
|
|
print(f"\n Verification: {ok}/{total} models OK") |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="OneOCR extraction pipeline: decrypt → extract → unlock → verify") |
|
|
parser.add_argument("input", nargs="?", default="ocr_data/oneocr.onemodel", |
|
|
help="Path to .onemodel file (default: ocr_data/oneocr.onemodel)") |
|
|
parser.add_argument("--output", "-o", default="oneocr_extracted", |
|
|
help="Output directory (default: oneocr_extracted)") |
|
|
parser.add_argument("--verify-only", action="store_true", |
|
|
help="Only verify existing extracted models") |
|
|
parser.add_argument("--skip-unlock", action="store_true", |
|
|
help="Skip model unlocking step") |
|
|
parser.add_argument("--skip-verify", action="store_true", |
|
|
help="Skip verification step") |
|
|
args = parser.parse_args() |
|
|
|
|
|
input_file = Path(args.input) |
|
|
output_dir = Path(args.output) |
|
|
onnx_dir = output_dir / "onnx_models" |
|
|
unlocked_dir = output_dir / "onnx_models_unlocked" |
|
|
|
|
|
print() |
|
|
print("╔══════════════════════════════════════════════════════════════════════╗") |
|
|
print("║ OneOCR Extraction Pipeline ║") |
|
|
print("║ Decrypt → Extract → Unlock → Verify ║") |
|
|
print("╚══════════════════════════════════════════════════════════════════════╝") |
|
|
|
|
|
t_start = time.perf_counter() |
|
|
|
|
|
if args.verify_only: |
|
|
verify_models(onnx_dir, unlocked_dir) |
|
|
else: |
|
|
|
|
|
if not input_file.exists(): |
|
|
print(f"\n ERROR: Input file not found: {input_file}") |
|
|
print(f" Place oneocr.onemodel in ocr_data/ directory") |
|
|
sys.exit(1) |
|
|
|
|
|
extract_result = decrypt_and_extract(input_file, output_dir) |
|
|
|
|
|
|
|
|
if not args.skip_unlock: |
|
|
unlock_result = unlock_all_models(onnx_dir, unlocked_dir) |
|
|
else: |
|
|
print("\n (Skipping unlock step)") |
|
|
|
|
|
|
|
|
if not args.skip_verify: |
|
|
verify_result = verify_models(onnx_dir, unlocked_dir) |
|
|
else: |
|
|
print("\n (Skipping verification)") |
|
|
|
|
|
elapsed = time.perf_counter() - t_start |
|
|
print(f"\n{'=' * 70}") |
|
|
print(f" DONE in {elapsed:.1f}s") |
|
|
print(f" Models: {onnx_dir}") |
|
|
print(f" Unlocked: {unlocked_dir}") |
|
|
print(f" Config: {output_dir / 'config_data'}") |
|
|
print(f"{'=' * 70}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|