|
|
"""OCR engine — ctypes wrapper for Windows 11 SnippingTool's oneocr.dll. |
|
|
|
|
|
Provides offline OCR capability using Microsoft's AI model from Snipping Tool. |
|
|
Requires oneocr.dll, oneocr.onemodel, and onnxruntime.dll in ocr_data/ directory. |
|
|
|
|
|
Usage: |
|
|
from src.services.ocr.engine import OcrEngine |
|
|
engine = OcrEngine() |
|
|
result = engine.recognize_pil(pil_image) |
|
|
print(result.text) |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import ctypes |
|
|
import os |
|
|
from contextlib import contextmanager |
|
|
from ctypes import ( |
|
|
POINTER, |
|
|
Structure, |
|
|
byref, |
|
|
c_char_p, |
|
|
c_float, |
|
|
c_int32, |
|
|
c_int64, |
|
|
c_ubyte, |
|
|
) |
|
|
from pathlib import Path |
|
|
from typing import TYPE_CHECKING |
|
|
|
|
|
from ocr.models import BoundingRect, OcrLine, OcrResult, OcrWord |
|
|
|
|
|
|
|
|
OCR_DLL_NAME = "oneocr.dll" |
|
|
OCR_MODEL_NAME = "oneocr.onemodel" |
|
|
OCR_MODEL_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
|
|
OCR_MAX_LINES = 200 |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from PIL import Image |
|
|
|
|
|
c_int64_p = POINTER(c_int64) |
|
|
c_float_p = POINTER(c_float) |
|
|
c_ubyte_p = POINTER(c_ubyte) |
|
|
|
|
|
|
|
|
class _ImageStructure(Structure): |
|
|
"""Image data structure for oneocr.dll (CV_8UC4 format).""" |
|
|
|
|
|
_fields_ = [ |
|
|
("type", c_int32), |
|
|
("width", c_int32), |
|
|
("height", c_int32), |
|
|
("_reserved", c_int32), |
|
|
("step_size", c_int64), |
|
|
("data_ptr", c_ubyte_p), |
|
|
] |
|
|
|
|
|
|
|
|
class _BoundingBox(Structure): |
|
|
"""Bounding box coordinates from DLL.""" |
|
|
|
|
|
_fields_ = [ |
|
|
("x1", c_float), ("y1", c_float), |
|
|
("x2", c_float), ("y2", c_float), |
|
|
("x3", c_float), ("y3", c_float), |
|
|
("x4", c_float), ("y4", c_float), |
|
|
] |
|
|
|
|
|
|
|
|
_BoundingBox_p = POINTER(_BoundingBox) |
|
|
|
|
|
|
|
|
_DLL_FUNCTIONS: list[tuple[str, list[type], type | None]] = [ |
|
|
("CreateOcrInitOptions", [c_int64_p], c_int64), |
|
|
("OcrInitOptionsSetUseModelDelayLoad", [c_int64, ctypes.c_char], c_int64), |
|
|
("CreateOcrPipeline", [c_char_p, c_char_p, c_int64, c_int64_p], c_int64), |
|
|
("CreateOcrProcessOptions", [c_int64_p], c_int64), |
|
|
("OcrProcessOptionsSetMaxRecognitionLineCount", [c_int64, c_int64], c_int64), |
|
|
("RunOcrPipeline", [c_int64, POINTER(_ImageStructure), c_int64, c_int64_p], c_int64), |
|
|
("GetImageAngle", [c_int64, c_float_p], c_int64), |
|
|
("GetOcrLineCount", [c_int64, c_int64_p], c_int64), |
|
|
("GetOcrLine", [c_int64, c_int64, c_int64_p], c_int64), |
|
|
("GetOcrLineContent", [c_int64, POINTER(c_char_p)], c_int64), |
|
|
("GetOcrLineBoundingBox", [c_int64, POINTER(_BoundingBox_p)], c_int64), |
|
|
("GetOcrLineWordCount", [c_int64, c_int64_p], c_int64), |
|
|
("GetOcrWord", [c_int64, c_int64, c_int64_p], c_int64), |
|
|
("GetOcrWordContent", [c_int64, POINTER(c_char_p)], c_int64), |
|
|
("GetOcrWordBoundingBox", [c_int64, POINTER(_BoundingBox_p)], c_int64), |
|
|
("GetOcrWordConfidence", [c_int64, c_float_p], c_int64), |
|
|
("ReleaseOcrResult", [c_int64], None), |
|
|
("ReleaseOcrInitOptions", [c_int64], None), |
|
|
("ReleaseOcrPipeline", [c_int64], None), |
|
|
("ReleaseOcrProcessOptions", [c_int64], None), |
|
|
] |
|
|
|
|
|
|
|
|
@contextmanager |
|
|
def _suppress_output(): |
|
|
"""Suppress stdout/stderr during DLL initialization (it prints to console).""" |
|
|
devnull = os.open(os.devnull, os.O_WRONLY) |
|
|
original_stdout = os.dup(1) |
|
|
original_stderr = os.dup(2) |
|
|
os.dup2(devnull, 1) |
|
|
os.dup2(devnull, 2) |
|
|
try: |
|
|
yield |
|
|
finally: |
|
|
os.dup2(original_stdout, 1) |
|
|
os.dup2(original_stderr, 2) |
|
|
os.close(original_stdout) |
|
|
os.close(original_stderr) |
|
|
os.close(devnull) |
|
|
|
|
|
|
|
|
class OcrEngine: |
|
|
"""Offline OCR engine using Windows 11 SnippingTool's oneocr.dll. |
|
|
|
|
|
Args: |
|
|
ocr_data_dir: Path to directory containing oneocr.dll, oneocr.onemodel, onnxruntime.dll. |
|
|
Defaults to PROJECT_ROOT/ocr_data/. |
|
|
""" |
|
|
|
|
|
def __init__(self, ocr_data_dir: str | Path | None = None) -> None: |
|
|
if ocr_data_dir is None: |
|
|
ocr_data_dir = Path(__file__).resolve().parent.parent / "ocr_data" |
|
|
self._data_dir = str(Path(ocr_data_dir).resolve()) |
|
|
|
|
|
self._dll: ctypes.WinDLL | None = None |
|
|
self._init_options = c_int64() |
|
|
self._pipeline = c_int64() |
|
|
self._process_options = c_int64() |
|
|
|
|
|
self._load_dll() |
|
|
self._initialize_pipeline() |
|
|
|
|
|
def __del__(self) -> None: |
|
|
if self._dll: |
|
|
try: |
|
|
self._dll.ReleaseOcrProcessOptions(self._process_options) |
|
|
self._dll.ReleaseOcrPipeline(self._pipeline) |
|
|
self._dll.ReleaseOcrInitOptions(self._init_options) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def recognize_pil(self, image: Image.Image) -> OcrResult: |
|
|
"""Run OCR on a PIL Image. |
|
|
|
|
|
Args: |
|
|
image: PIL Image object (any mode — will be converted to RGBA/BGRA). |
|
|
|
|
|
Returns: |
|
|
OcrResult with recognized text, lines, words, and confidence values. |
|
|
""" |
|
|
if any(x < 50 or x > 10000 for x in image.size): |
|
|
return OcrResult(error="Unsupported image size (must be 50-10000px)") |
|
|
|
|
|
if image.mode != "RGBA": |
|
|
image = image.convert("RGBA") |
|
|
|
|
|
|
|
|
r, g, b, a = image.split() |
|
|
from PIL import Image as PILImage |
|
|
bgra_image = PILImage.merge("RGBA", (b, g, r, a)) |
|
|
|
|
|
return self._process_image( |
|
|
width=bgra_image.width, |
|
|
height=bgra_image.height, |
|
|
step=bgra_image.width * 4, |
|
|
data=bgra_image.tobytes(), |
|
|
) |
|
|
|
|
|
def recognize_bytes(self, image_bytes: bytes) -> OcrResult: |
|
|
"""Run OCR on raw image bytes (PNG/JPEG/etc). |
|
|
|
|
|
Args: |
|
|
image_bytes: Raw image file bytes. |
|
|
|
|
|
Returns: |
|
|
OcrResult. |
|
|
""" |
|
|
from io import BytesIO |
|
|
|
|
|
from PIL import Image |
|
|
|
|
|
img = Image.open(BytesIO(image_bytes)) |
|
|
return self.recognize_pil(img) |
|
|
|
|
|
|
|
|
|
|
|
def _load_dll(self) -> None: |
|
|
"""Load oneocr.dll and bind function signatures.""" |
|
|
try: |
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True) |
|
|
if hasattr(kernel32, "SetDllDirectoryW"): |
|
|
kernel32.SetDllDirectoryW(self._data_dir) |
|
|
|
|
|
dll_path = os.path.join(self._data_dir, OCR_DLL_NAME) |
|
|
if not os.path.exists(dll_path): |
|
|
raise FileNotFoundError(f"OCR DLL not found: {dll_path}") |
|
|
|
|
|
self._dll = ctypes.WinDLL(dll_path) |
|
|
|
|
|
for name, argtypes, restype in _DLL_FUNCTIONS: |
|
|
func = getattr(self._dll, name) |
|
|
func.argtypes = argtypes |
|
|
func.restype = restype |
|
|
|
|
|
except (OSError, RuntimeError) as e: |
|
|
raise RuntimeError(f"Failed to load OCR DLL from {self._data_dir}: {e}") from e |
|
|
|
|
|
def _initialize_pipeline(self) -> None: |
|
|
"""Create OCR init options, pipeline, and process options.""" |
|
|
assert self._dll is not None |
|
|
|
|
|
|
|
|
self._check( |
|
|
self._dll.CreateOcrInitOptions(byref(self._init_options)), |
|
|
"CreateOcrInitOptions failed", |
|
|
) |
|
|
self._check( |
|
|
self._dll.OcrInitOptionsSetUseModelDelayLoad(self._init_options, 0), |
|
|
"OcrInitOptionsSetUseModelDelayLoad failed", |
|
|
) |
|
|
|
|
|
|
|
|
model_path = os.path.join(self._data_dir, OCR_MODEL_NAME) |
|
|
if not os.path.exists(model_path): |
|
|
raise FileNotFoundError(f"OCR model not found: {model_path}") |
|
|
|
|
|
model_buf = ctypes.create_string_buffer(model_path.encode()) |
|
|
key_buf = ctypes.create_string_buffer(OCR_MODEL_KEY) |
|
|
|
|
|
with _suppress_output(): |
|
|
result = self._dll.CreateOcrPipeline( |
|
|
model_buf, key_buf, self._init_options, byref(self._pipeline) |
|
|
) |
|
|
self._check(result, "CreateOcrPipeline failed (wrong key or corrupted model?)") |
|
|
|
|
|
|
|
|
self._check( |
|
|
self._dll.CreateOcrProcessOptions(byref(self._process_options)), |
|
|
"CreateOcrProcessOptions failed", |
|
|
) |
|
|
self._check( |
|
|
self._dll.OcrProcessOptionsSetMaxRecognitionLineCount( |
|
|
self._process_options, OCR_MAX_LINES |
|
|
), |
|
|
"OcrProcessOptionsSetMaxRecognitionLineCount failed", |
|
|
) |
|
|
|
|
|
def _process_image(self, width: int, height: int, step: int, data: bytes) -> OcrResult: |
|
|
"""Create image structure and run OCR pipeline.""" |
|
|
assert self._dll is not None |
|
|
|
|
|
data_ptr = (c_ubyte * len(data)).from_buffer_copy(data) |
|
|
|
|
|
img_struct = _ImageStructure( |
|
|
type=3, |
|
|
width=width, |
|
|
height=height, |
|
|
_reserved=0, |
|
|
step_size=step, |
|
|
data_ptr=data_ptr, |
|
|
) |
|
|
|
|
|
ocr_result = c_int64() |
|
|
if self._dll.RunOcrPipeline( |
|
|
self._pipeline, byref(img_struct), self._process_options, byref(ocr_result) |
|
|
) != 0: |
|
|
return OcrResult(error="RunOcrPipeline returned non-zero") |
|
|
|
|
|
parsed = self._parse_results(ocr_result) |
|
|
self._dll.ReleaseOcrResult(ocr_result) |
|
|
return parsed |
|
|
|
|
|
def _parse_results(self, ocr_result: c_int64) -> OcrResult: |
|
|
"""Extract text, lines, words from DLL result handle.""" |
|
|
assert self._dll is not None |
|
|
|
|
|
line_count = c_int64() |
|
|
if self._dll.GetOcrLineCount(ocr_result, byref(line_count)) != 0: |
|
|
return OcrResult(error="GetOcrLineCount failed") |
|
|
|
|
|
lines: list[OcrLine] = [] |
|
|
for idx in range(line_count.value): |
|
|
line = self._parse_line(ocr_result, idx) |
|
|
if line: |
|
|
lines.append(line) |
|
|
|
|
|
|
|
|
text_angle_val = c_float() |
|
|
text_angle: float | None = None |
|
|
if self._dll.GetImageAngle(ocr_result, byref(text_angle_val)) == 0: |
|
|
text_angle = text_angle_val.value |
|
|
|
|
|
full_text = "\n".join(line.text for line in lines if line.text) |
|
|
|
|
|
return OcrResult(text=full_text, text_angle=text_angle, lines=lines) |
|
|
|
|
|
def _parse_line(self, ocr_result: c_int64, line_index: int) -> OcrLine | None: |
|
|
"""Parse a single line from OCR result.""" |
|
|
assert self._dll is not None |
|
|
|
|
|
line_handle = c_int64() |
|
|
if self._dll.GetOcrLine(ocr_result, line_index, byref(line_handle)) != 0: |
|
|
return None |
|
|
if not line_handle.value: |
|
|
return None |
|
|
|
|
|
|
|
|
content = c_char_p() |
|
|
line_text = "" |
|
|
if self._dll.GetOcrLineContent(line_handle, byref(content)) == 0 and content.value: |
|
|
line_text = content.value.decode("utf-8", errors="ignore") |
|
|
|
|
|
|
|
|
line_bbox = self._get_bbox(line_handle, self._dll.GetOcrLineBoundingBox) |
|
|
|
|
|
|
|
|
word_count = c_int64() |
|
|
words: list[OcrWord] = [] |
|
|
if self._dll.GetOcrLineWordCount(line_handle, byref(word_count)) == 0: |
|
|
for wi in range(word_count.value): |
|
|
word = self._parse_word(line_handle, wi) |
|
|
if word: |
|
|
words.append(word) |
|
|
|
|
|
return OcrLine(text=line_text, bounding_rect=line_bbox, words=words) |
|
|
|
|
|
def _parse_word(self, line_handle: c_int64, word_index: int) -> OcrWord | None: |
|
|
"""Parse a single word.""" |
|
|
assert self._dll is not None |
|
|
|
|
|
word_handle = c_int64() |
|
|
if self._dll.GetOcrWord(line_handle, word_index, byref(word_handle)) != 0: |
|
|
return None |
|
|
|
|
|
|
|
|
content = c_char_p() |
|
|
word_text = "" |
|
|
if self._dll.GetOcrWordContent(word_handle, byref(content)) == 0 and content.value: |
|
|
word_text = content.value.decode("utf-8", errors="ignore") |
|
|
|
|
|
|
|
|
word_bbox = self._get_bbox(word_handle, self._dll.GetOcrWordBoundingBox) |
|
|
|
|
|
|
|
|
confidence_val = c_float() |
|
|
confidence = 0.0 |
|
|
if self._dll.GetOcrWordConfidence(word_handle, byref(confidence_val)) == 0: |
|
|
confidence = confidence_val.value |
|
|
|
|
|
return OcrWord(text=word_text, bounding_rect=word_bbox, confidence=confidence) |
|
|
|
|
|
@staticmethod |
|
|
def _get_bbox(handle: c_int64, bbox_fn: object) -> BoundingRect | None: |
|
|
"""Extract bounding box from a handle.""" |
|
|
bbox_ptr = _BoundingBox_p() |
|
|
if bbox_fn(handle, byref(bbox_ptr)) == 0 and bbox_ptr: |
|
|
bb = bbox_ptr.contents |
|
|
return BoundingRect( |
|
|
x1=bb.x1, y1=bb.y1, x2=bb.x2, y2=bb.y2, |
|
|
x3=bb.x3, y3=bb.y3, x4=bb.x4, y4=bb.y4, |
|
|
) |
|
|
return None |
|
|
|
|
|
@staticmethod |
|
|
def _check(result_code: int, msg: str) -> None: |
|
|
if result_code != 0: |
|
|
raise RuntimeError(f"{msg} (code: {result_code})") |
|
|
|