mnemo-ocr-core / src /layout_analyzer.py
MABobrov's picture
Deploy updated core backend pipeline
7fb79e4
from __future__ import annotations
import copy
import hashlib
from dataclasses import dataclass
from itertools import count
from pathlib import Path
from typing import Any, Iterable
import cv2
import numpy as np
from PIL import Image
def _clamp(value: float, minimum: float, maximum: float) -> float:
return max(minimum, min(maximum, value))
def _round(value: float, digits: int = 3) -> float:
return round(float(value), digits)
def _rect_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float:
ax, ay, aw, ah = a
bx, by, bw, bh = b
x1 = max(ax, bx)
y1 = max(ay, by)
x2 = min(ax + aw, bx + bw)
y2 = min(ay + ah, by + bh)
if x2 <= x1 or y2 <= y1:
return 0.0
inter = float((x2 - x1) * (y2 - y1))
union = float(aw * ah + bw * bh - inter)
return inter / union if union > 0 else 0.0
def _contains(inner: tuple[int, int, int, int], outer: tuple[int, int, int, int], tolerance: int = 0) -> bool:
ix, iy, iw, ih = inner
ox, oy, ow, oh = outer
return (
ix >= ox - tolerance
and iy >= oy - tolerance
and ix + iw <= ox + ow + tolerance
and iy + ih <= oy + oh + tolerance
)
def _bounds_to_points(box: tuple[int, int, int, int]) -> list[dict[str, float]]:
x, y, w, h = box
return [
{"x": float(x), "y": float(y)},
{"x": float(x + w), "y": float(y)},
{"x": float(x + w), "y": float(y + h)},
{"x": float(x), "y": float(y + h)},
]
def _dedupe_by_iou(items: list[dict[str, Any]], threshold: float) -> list[dict[str, Any]]:
kept: list[dict[str, Any]] = []
for item in sorted(items, key=lambda candidate: float(candidate.get("score") or 0.0), reverse=True):
box = tuple(item["box"])
if any(_rect_iou(box, tuple(existing["box"])) > threshold for existing in kept):
continue
kept.append(item)
return kept
def _prepare_bgr_image(image: Image.Image | np.ndarray) -> np.ndarray:
if isinstance(image, np.ndarray):
if image.ndim == 2:
return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR)
return image.copy()
rgb = np.array(image.convert("RGB"))
return cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
_UID_TRANSLATION = {
"O": "0",
"Q": "0",
"D": "0",
"I": "1",
"L": "1",
"|": "1",
"S": "5",
"Z": "2",
"B": "8",
}
_DEFAULT_CONFIG_PATH = Path(__file__).resolve().parents[1] / "configs" / "config.yaml"
_COLOR_RANGES_CACHE: dict[str, Any] | None = None
_ANALYZE_LAYOUT_CACHE: dict[str, dict[str, Any]] = {}
_ANALYZE_LAYOUT_CACHE_ORDER: list[str] = []
_ANALYZE_LAYOUT_CACHE_MAX = 8
def _normalize_binding_uid(value: Any) -> str:
text = str(value or "").strip().upper()
if not text:
return ""
translated = "".join(_UID_TRANSLATION.get(char, char) for char in text)
digits = "".join(char for char in translated if char.isdigit())
if not digits:
return ""
stripped = digits.lstrip("0")
return stripped or digits
def _load_color_ranges() -> dict[str, Any]:
global _COLOR_RANGES_CACHE
if _COLOR_RANGES_CACHE is not None:
return _COLOR_RANGES_CACHE
try:
from src.config_loader import load_config
config = load_config(_DEFAULT_CONFIG_PATH)
colors = config.get("colors") if isinstance(config, dict) else None
_COLOR_RANGES_CACHE = dict(colors or {})
except Exception:
_COLOR_RANGES_CACHE = {}
return _COLOR_RANGES_CACHE
def _make_analyze_layout_cache_key(
image_bgr: np.ndarray,
*,
sensitivity: int,
image_correction: int,
enhance_image: bool,
ignore_lines: bool,
) -> str:
digest = hashlib.blake2b(image_bgr.tobytes(), digest_size=16).hexdigest()
return (
f"{image_bgr.shape[1]}x{image_bgr.shape[0]}:{sensitivity}:{image_correction}:"
f"{int(enhance_image)}:{int(ignore_lines)}:{digest}"
)
def _cache_layout_result(key: str, result: dict[str, Any]) -> None:
if key in _ANALYZE_LAYOUT_CACHE:
_ANALYZE_LAYOUT_CACHE_ORDER.remove(key)
_ANALYZE_LAYOUT_CACHE[key] = copy.deepcopy(result)
_ANALYZE_LAYOUT_CACHE_ORDER.append(key)
while len(_ANALYZE_LAYOUT_CACHE_ORDER) > _ANALYZE_LAYOUT_CACHE_MAX:
oldest = _ANALYZE_LAYOUT_CACHE_ORDER.pop(0)
_ANALYZE_LAYOUT_CACHE.pop(oldest, None)
def _prepare_detection_mask(
image_bgr: np.ndarray,
*,
sensitivity: int = 60,
image_correction: int = 55,
enhance_image: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
if enhance_image:
clip_limit = 1.6 + (max(0, min(100, image_correction)) / 45.0)
clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8))
gray = clahe.apply(gray)
sharpen = np.array(
[[0, -1, 0], [-1, 5, -1], [0, -1, 0]],
dtype=np.float32,
)
gray = cv2.filter2D(gray, -1, sharpen)
blur = cv2.GaussianBlur(gray, (3, 3), 0)
block_size = 31 + (max(0, min(100, sensitivity)) // 20) * 2
if block_size % 2 == 0:
block_size += 1
constant = -7 - int((max(0, min(100, sensitivity)) - 60) / 12)
inv = 255 - blur
mask = cv2.adaptiveThreshold(
inv,
255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY,
block_size,
constant,
)
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=1)
return gray, mask
@dataclass
class _OverlayRecord:
kind: str
category: str
label: str
box: tuple[int, int, int, int]
points: list[dict[str, float]]
score: float
meta: dict[str, Any]
binding_uid: str = ""
note: str = ""
context_path: str = ""
widget_name_override: str = ""
static_shape_name_override: str = ""
def to_payload(self, index: int) -> dict[str, Any]:
x, y, width, height = self.box
return {
"id": f"{self.kind}-{index:03d}",
"kind": self.kind,
"category": self.category,
"label": self.label,
"confidence": _round(self.score, 3),
"bounds": {
"x": int(x),
"y": int(y),
"width": int(width),
"height": int(height),
},
"points": self.points,
"meta": self.meta,
"bindingUid": self.binding_uid,
"note": self.note,
"contextPath": self.context_path,
"widgetNameOverride": self.widget_name_override,
"staticShapeNameOverride": self.static_shape_name_override,
}
def _collect_contour_metrics(
gray: np.ndarray,
mask: np.ndarray,
) -> list[dict[str, Any]]:
contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
if hierarchy is None or not len(contours):
return []
nodes = hierarchy[0]
items: list[dict[str, Any]] = []
for index, contour in enumerate(contours):
x, y, width, height = cv2.boundingRect(contour)
area = width * height
if area < 180 or width < 10 or height < 10:
continue
perimeter = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * perimeter, True)
contour_area = max(1.0, cv2.contourArea(contour))
fill = contour_area / max(1.0, area)
aspect = max(width / max(1.0, height), height / max(1.0, width))
roi = gray[y : y + height, x : x + width]
mean = float(roi.mean()) if roi.size else 255.0
std = float(roi.std()) if roi.size else 0.0
child_count = 0
child = int(nodes[index][2])
while child != -1:
child_count += 1
child = int(nodes[child][0])
hull = cv2.convexHull(contour)
hull_area = max(1.0, cv2.contourArea(hull))
solidity = contour_area / hull_area
points = approx.reshape(-1, 2).tolist() if len(approx) >= 3 else []
contour_points = contour.reshape(-1, 2)
edge_pad_x = max(3, int(round(width * 0.05)))
edge_pad_y = max(3, int(round(height * 0.05)))
def _span_ratio(values: np.ndarray, extent: int) -> float:
if values.size < 2 or extent <= 0:
return 0.0
return float(values.max() - values.min()) / float(extent)
top_x = contour_points[contour_points[:, 1] <= y + edge_pad_y][:, 0]
bottom_x = contour_points[contour_points[:, 1] >= y + height - edge_pad_y][:, 0]
left_y = contour_points[contour_points[:, 0] <= x + edge_pad_x][:, 1]
right_y = contour_points[contour_points[:, 0] >= x + width - edge_pad_x][:, 1]
top_span = _span_ratio(top_x, width)
bottom_span = _span_ratio(bottom_x, width)
left_span = _span_ratio(left_y, height)
right_span = _span_ratio(right_y, height)
closed_frame = (
top_span >= 0.45
and bottom_span >= 0.45
and left_span >= 0.28
and right_span >= 0.28
)
items.append(
{
"box": (int(x), int(y), int(width), int(height)),
"area": int(area),
"contour_area": contour_area,
"fill": float(fill),
"aspect": float(aspect),
"mean": mean,
"std": std,
"children": child_count,
"verts": len(approx),
"solidity": float(solidity),
"closed_frame": bool(closed_frame),
"edge_spans": {
"top": round(top_span, 3),
"bottom": round(bottom_span, 3),
"left": round(left_span, 3),
"right": round(right_span, 3),
},
"points": [{"x": float(px), "y": float(py)} for px, py in points],
}
)
return items
def _classify_cells(items: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
cells: list[dict[str, Any]] = []
for item in items:
x, y, width, height = item["box"]
fill = float(item["fill"])
aspect = float(item["aspect"])
mean = float(item["mean"])
std = float(item["std"])
children = int(item["children"])
verts = int(item["verts"])
rectish = verts <= 8
if not rectish:
continue
if not (34 <= width <= 220 and 16 <= height <= 90):
continue
if not (1.1 <= aspect <= 9.0):
continue
if mean <= 120 or std <= 15:
continue
score = 0.0
score += min(width, 220) / 220.0 * 0.2
score += min(height, 90) / 90.0 * 0.1
score += min(0.4, children * 0.05)
score += (1 - min(abs(fill - 0.65), 0.65) / 0.65) * 0.2
score += (1 - min(abs(aspect - 3.0), 3.0) / 3.0) * 0.1
score += min(std / 80.0, 1.0) * 0.2
cells.append(
{
**item,
"kind": "cell",
"category": "value",
"label": "Ячейка",
"score": float(score),
}
)
return _dedupe_by_iou(cells, 0.8)
def _classify_groups(items: Iterable[dict[str, Any]]) -> list[dict[str, Any]]:
groups: list[dict[str, Any]] = []
for item in items:
width = int(item["box"][2])
height = int(item["box"][3])
area = int(item["area"])
fill = float(item["fill"])
aspect = float(item["aspect"])
mean = float(item["mean"])
std = float(item["std"])
children = int(item["children"])
verts = int(item["verts"])
closed_frame = bool(item.get("closed_frame"))
rectish = verts <= 12
if not rectish:
continue
if width < 140 or height < 38 or area < 12000:
continue
if aspect > 12.0 or mean <= 120:
continue
if not closed_frame:
continue
score = 0.0
score += min(0.5, children * 0.09)
score += (1 - min(abs(fill - 0.75), 0.75) / 0.75) * 0.15
score += min(std / 80.0, 1.0) * 0.1
score += min(area / 120000.0, 1.0) * 0.25
groups.append(
{
**item,
"kind": "form",
"category": "static",
"label": "Форма",
"score": float(score),
"meta": {
**dict(item.get("meta") or {}),
"staticSubtype": "form",
"staticFormType": "frame",
},
}
)
groups = _dedupe_by_iou(groups, 0.7)
filtered: list[dict[str, Any]] = []
for candidate in groups:
candidate_box = tuple(candidate["box"])
nested_groups = [
other
for other in groups
if other is not candidate
and _contains(tuple(other["box"]), candidate_box, tolerance=3)
and (other["box"][2] * other["box"][3]) < (candidate_box[2] * candidate_box[3]) * 0.82
]
nested_area = sum(int(other["box"][2]) * int(other["box"][3]) for other in nested_groups)
cover_ratio = nested_area / max(1.0, candidate_box[2] * candidate_box[3])
if len(nested_groups) >= 2 and cover_ratio >= 0.34:
continue
filtered.append(candidate)
return filtered
def _classify_arrows(items: Iterable[dict[str, Any]], *, ignore_lines: bool) -> list[dict[str, Any]]:
arrows: list[dict[str, Any]] = []
for item in items:
width = int(item["box"][2])
height = int(item["box"][3])
area = int(item["area"])
fill = float(item["fill"])
aspect = float(item["aspect"])
std = float(item["std"])
verts = int(item["verts"])
solidity = float(item["solidity"])
if area > 5000 or width < 18 or height < 10:
continue
if area < 120 or aspect > 6.0:
continue
if verts not in {3, 4, 5, 6, 7, 8}:
continue
if not (0.35 <= solidity <= 0.95):
continue
if fill > 0.75 or std <= 10:
continue
if ignore_lines and aspect > 5.6 and min(width, height) <= 8:
continue
score = 0.0
score += (1 - abs(solidity - 0.6) / 0.6) * 0.3
score += (1 - min(abs(aspect - 2.2), 2.2) / 2.2) * 0.2
score += min(std / 80.0, 1.0) * 0.2
score += (1.0 if verts == 3 else 0.7) * 0.2
arrows.append(
{
**item,
"kind": "arrow",
"category": "static",
"label": "Стрелка",
"score": float(score),
"meta": {
**dict(item.get("meta") or {}),
"staticSubtype": "process_arrow",
},
}
)
return _dedupe_by_iou(arrows, 0.5)
def _detect_sensor_cells(image_bgr: np.ndarray) -> tuple[str, list[dict[str, Any]]]:
color_ranges = _load_color_ranges()
if not color_ranges:
return "", []
positions: list[tuple[int, int, int, int]] = []
title_text = ""
try:
from src.pipeline_hf import detect_sensor_regions
positions = detect_sensor_regions(image_bgr, color_ranges)
except Exception:
positions = []
try:
from src.ocr_utils_demo import ocr_title
height, width = image_bgr.shape[:2]
title_roi = image_bgr[:45, : int(width / 2.4)]
title_text = ocr_title(title_roi)
except Exception:
title_text = ""
overlays: list[dict[str, Any]] = []
for x, y, width, height in positions:
x = int(x or 0)
y = int(y or 0)
width = int(width or 0)
height = int(height or 0)
if width <= 0 or height <= 0:
continue
box = (x, y, width, height)
overlays.append(
{
"box": box,
"points": _bounds_to_points(box),
"score": 0.55,
"kind": "widget",
"category": "widget",
"label": "Ячейка",
"note": "",
"bindingUid": "",
"meta": {
"source": "src.pipeline_hf.detect_sensor_regions",
},
}
)
overlays = _dedupe_by_iou(overlays, 0.78)
normalized_title = str(title_text or "").strip()
if normalized_title.lower() == "титул не оцифрован":
normalized_title = ""
return normalized_title, overlays
def _merge_cells_with_sensor_ocr(
cells: list[dict[str, Any]],
sensor_cells: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if not sensor_cells:
return cells
merged: list[dict[str, Any]] = []
used_sensor_indexes: set[int] = set()
for cell in cells:
cell_box = tuple(cell["box"])
best_index = -1
best_score = -1.0
for index, sensor in enumerate(sensor_cells):
sensor_box = tuple(sensor["box"])
overlap = _rect_iou(cell_box, sensor_box)
if overlap < 0.28 and not _contains(sensor_box, cell_box, tolerance=4) and not _contains(cell_box, sensor_box, tolerance=4):
continue
candidate_score = overlap + float(sensor.get("score") or 0.0) * 0.15
if candidate_score > best_score:
best_score = candidate_score
best_index = index
if best_index >= 0:
sensor = sensor_cells[best_index]
used_sensor_indexes.add(best_index)
merged.append(
{
**cell,
"box": tuple(sensor["box"]),
"points": list(sensor.get("points") or _bounds_to_points(tuple(sensor["box"]))),
"score": max(float(cell.get("score") or 0.0), float(sensor.get("score") or 0.0)),
"note": str(sensor.get("note") or ""),
"bindingUid": str(sensor.get("bindingUid") or ""),
"meta": {
**dict(cell.get("meta") or {}),
**dict(sensor.get("meta") or {}),
"matchedBySensorOcr": True,
},
}
)
continue
merged.append(cell)
for index, sensor in enumerate(sensor_cells):
if index not in used_sensor_indexes:
merged.append(sensor)
return _dedupe_by_iou(merged, 0.78)
def _ocr_layout_cells(image_bgr: np.ndarray, cells: list[dict[str, Any]]) -> list[dict[str, Any]]:
if not cells:
return cells
try:
from src.ocr_utils_demo import ocr_sensors
except Exception:
return cells
rois: list[np.ndarray] = []
crop_boxes: list[tuple[int, int, int, int]] = []
image_height, image_width = image_bgr.shape[:2]
for cell in cells:
x, y, width, height = tuple(cell["box"])
pad_x = max(2, int(round(width * 0.04)))
pad_y = max(2, int(round(height * 0.08)))
left = max(0, x - pad_x)
top = max(0, y - pad_y)
right = min(image_width, x + width + pad_x)
bottom = min(image_height, y + height + pad_y)
if right <= left or bottom <= top:
rois.append(np.zeros((1, 1, 3), dtype=np.uint8))
crop_boxes.append((left, top, right, bottom))
continue
rois.append(image_bgr[top:bottom, left:right])
crop_boxes.append((left, top, right, bottom))
try:
ocr_results = ocr_sensors(rois)
except Exception:
return cells
enriched: list[dict[str, Any]] = []
for cell, result, crop_box in zip(cells, ocr_results, crop_boxes):
text = str((result or {}).get("text") or "").strip()
if text == "?":
text = ""
score = float((result or {}).get("score") or 0.0)
existing_note = str(cell.get("note") or "").strip()
binding_uid = str(cell.get("bindingUid") or "").strip()
if not binding_uid:
binding_uid = _normalize_binding_uid(text)
meta = {
**dict(cell.get("meta") or {}),
"cellOcrText": text,
"cellOcrScore": score,
"cellOcrCrop": {
"x": crop_box[0],
"y": crop_box[1],
"width": max(0, crop_box[2] - crop_box[0]),
"height": max(0, crop_box[3] - crop_box[1]),
},
}
enriched.append(
{
**cell,
"bindingUid": binding_uid,
"note": existing_note or text,
"meta": meta,
}
)
return enriched
def _attach_group_metrics(groups: list[dict[str, Any]], cells: list[dict[str, Any]], arrows: list[dict[str, Any]]) -> None:
for group in groups:
group_box = tuple(group["box"])
cell_count = sum(1 for cell in cells if _contains(tuple(cell["box"]), group_box, tolerance=2))
arrow_count = sum(1 for arrow in arrows if _contains(tuple(arrow["box"]), group_box, tolerance=2))
group.setdefault("meta", {})
group["meta"]["cellCount"] = cell_count
group["meta"]["arrowCount"] = arrow_count
def _build_overlay_records(groups: list[dict[str, Any]], cells: list[dict[str, Any]], arrows: list[dict[str, Any]]) -> list[_OverlayRecord]:
overlays: list[_OverlayRecord] = []
for item in groups:
box = tuple(item["box"])
overlays.append(
_OverlayRecord(
kind="form",
category="static",
label="Форма",
box=box,
points=_bounds_to_points(box),
score=float(item["score"]),
meta=dict(item.get("meta") or {}),
)
)
for item in cells:
box = tuple(item["box"])
overlays.append(
_OverlayRecord(
kind="widget",
category="widget",
label="Ячейка",
box=box,
points=list(item.get("points") or _bounds_to_points(box)),
score=float(item["score"]),
meta=dict(item.get("meta") or {}),
binding_uid=str(item.get("bindingUid") or ""),
note=str(item.get("note") or ""),
context_path=str(item.get("contextPath") or ""),
widget_name_override=str(item.get("widgetNameOverride") or ""),
static_shape_name_override=str(item.get("staticShapeNameOverride") or ""),
)
)
for item in arrows:
box = tuple(item["box"])
points = item.get("points") or _bounds_to_points(box)
overlays.append(
_OverlayRecord(
kind="arrow",
category="static",
label="Стрелка",
box=box,
points=points,
score=float(item["score"]),
meta=dict(item.get("meta") or {}),
binding_uid=str(item.get("bindingUid") or ""),
note=str(item.get("note") or ""),
context_path=str(item.get("contextPath") or ""),
widget_name_override=str(item.get("widgetNameOverride") or ""),
static_shape_name_override=str(item.get("staticShapeNameOverride") or ""),
)
)
overlays.sort(key=lambda item: (item.box[1], item.box[0], item.box[2] * item.box[3]))
return overlays
def analyze_layout(
image: Image.Image | np.ndarray,
*,
sensitivity: int = 60,
image_correction: int = 55,
enhance_image: bool = True,
ignore_lines: bool = True,
) -> dict[str, Any]:
image_bgr = _prepare_bgr_image(image)
cache_key = _make_analyze_layout_cache_key(
image_bgr,
sensitivity=sensitivity,
image_correction=image_correction,
enhance_image=enhance_image,
ignore_lines=ignore_lines,
)
cached = _ANALYZE_LAYOUT_CACHE.get(cache_key)
if cached is not None:
return copy.deepcopy(cached)
gray, mask = _prepare_detection_mask(
image_bgr,
sensitivity=sensitivity,
image_correction=image_correction,
enhance_image=enhance_image,
)
contour_items = _collect_contour_metrics(gray, mask)
cells = _classify_cells(contour_items)
groups = _classify_groups(contour_items)
arrows = _classify_arrows(contour_items, ignore_lines=ignore_lines)
title_text, sensor_cells = _detect_sensor_cells(image_bgr)
cells = _merge_cells_with_sensor_ocr(cells, sensor_cells)
cells = _ocr_layout_cells(image_bgr, cells)
_attach_group_metrics(groups, cells, arrows)
overlays = _build_overlay_records(groups, cells, arrows)
payloads = [overlay.to_payload(index) for index, overlay in zip(count(1), overlays)]
counts = {
"widgets": len(cells),
"statics": len(groups) + len(arrows),
"forms": len(groups),
"shapes": 0,
"groups": len(groups),
"cells": len(cells),
"arrows": len(arrows),
"ocrCells": sum(1 for cell in cells if str(cell.get("note") or "").strip() or str(cell.get("bindingUid") or "").strip()),
"total": len(payloads),
}
result = {
"engine": "src-hybrid-layout-v2",
"summary": (
f"Распознано объектов: {counts['total']} "
f"(виджеты: {counts['widgets']}, формы: {counts['forms']}, стрелки: {counts['arrows']}, OCR-ячейки: {counts['ocrCells']})."
),
"counts": counts,
"overlays": payloads,
"title": title_text,
"imageWidth": int(image_bgr.shape[1]),
"imageHeight": int(image_bgr.shape[0]),
}
_cache_layout_result(cache_key, result)
return result