from __future__ import annotations import copy import hashlib from dataclasses import dataclass from itertools import count from pathlib import Path from typing import Any, Iterable import cv2 import numpy as np from PIL import Image def _clamp(value: float, minimum: float, maximum: float) -> float: return max(minimum, min(maximum, value)) def _round(value: float, digits: int = 3) -> float: return round(float(value), digits) def _rect_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float: ax, ay, aw, ah = a bx, by, bw, bh = b x1 = max(ax, bx) y1 = max(ay, by) x2 = min(ax + aw, bx + bw) y2 = min(ay + ah, by + bh) if x2 <= x1 or y2 <= y1: return 0.0 inter = float((x2 - x1) * (y2 - y1)) union = float(aw * ah + bw * bh - inter) return inter / union if union > 0 else 0.0 def _contains(inner: tuple[int, int, int, int], outer: tuple[int, int, int, int], tolerance: int = 0) -> bool: ix, iy, iw, ih = inner ox, oy, ow, oh = outer return ( ix >= ox - tolerance and iy >= oy - tolerance and ix + iw <= ox + ow + tolerance and iy + ih <= oy + oh + tolerance ) def _bounds_to_points(box: tuple[int, int, int, int]) -> list[dict[str, float]]: x, y, w, h = box return [ {"x": float(x), "y": float(y)}, {"x": float(x + w), "y": float(y)}, {"x": float(x + w), "y": float(y + h)}, {"x": float(x), "y": float(y + h)}, ] def _dedupe_by_iou(items: list[dict[str, Any]], threshold: float) -> list[dict[str, Any]]: kept: list[dict[str, Any]] = [] for item in sorted(items, key=lambda candidate: float(candidate.get("score") or 0.0), reverse=True): box = tuple(item["box"]) if any(_rect_iou(box, tuple(existing["box"])) > threshold for existing in kept): continue kept.append(item) return kept def _prepare_bgr_image(image: Image.Image | np.ndarray) -> np.ndarray: if isinstance(image, np.ndarray): if image.ndim == 2: return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) return image.copy() rgb = np.array(image.convert("RGB")) return cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR) _UID_TRANSLATION = { "O": "0", "Q": "0", "D": "0", "I": "1", "L": "1", "|": "1", "S": "5", "Z": "2", "B": "8", } _DEFAULT_CONFIG_PATH = Path(__file__).resolve().parents[1] / "configs" / "config.yaml" _COLOR_RANGES_CACHE: dict[str, Any] | None = None _ANALYZE_LAYOUT_CACHE: dict[str, dict[str, Any]] = {} _ANALYZE_LAYOUT_CACHE_ORDER: list[str] = [] _ANALYZE_LAYOUT_CACHE_MAX = 8 def _normalize_binding_uid(value: Any) -> str: text = str(value or "").strip().upper() if not text: return "" translated = "".join(_UID_TRANSLATION.get(char, char) for char in text) digits = "".join(char for char in translated if char.isdigit()) if not digits: return "" stripped = digits.lstrip("0") return stripped or digits def _load_color_ranges() -> dict[str, Any]: global _COLOR_RANGES_CACHE if _COLOR_RANGES_CACHE is not None: return _COLOR_RANGES_CACHE try: from src.config_loader import load_config config = load_config(_DEFAULT_CONFIG_PATH) colors = config.get("colors") if isinstance(config, dict) else None _COLOR_RANGES_CACHE = dict(colors or {}) except Exception: _COLOR_RANGES_CACHE = {} return _COLOR_RANGES_CACHE def _make_analyze_layout_cache_key( image_bgr: np.ndarray, *, sensitivity: int, image_correction: int, enhance_image: bool, ignore_lines: bool, ) -> str: digest = hashlib.blake2b(image_bgr.tobytes(), digest_size=16).hexdigest() return ( f"{image_bgr.shape[1]}x{image_bgr.shape[0]}:{sensitivity}:{image_correction}:" f"{int(enhance_image)}:{int(ignore_lines)}:{digest}" ) def _cache_layout_result(key: str, result: dict[str, Any]) -> None: if key in _ANALYZE_LAYOUT_CACHE: _ANALYZE_LAYOUT_CACHE_ORDER.remove(key) _ANALYZE_LAYOUT_CACHE[key] = copy.deepcopy(result) _ANALYZE_LAYOUT_CACHE_ORDER.append(key) while len(_ANALYZE_LAYOUT_CACHE_ORDER) > _ANALYZE_LAYOUT_CACHE_MAX: oldest = _ANALYZE_LAYOUT_CACHE_ORDER.pop(0) _ANALYZE_LAYOUT_CACHE.pop(oldest, None) def _prepare_detection_mask( image_bgr: np.ndarray, *, sensitivity: int = 60, image_correction: int = 55, enhance_image: bool = True, ) -> tuple[np.ndarray, np.ndarray]: gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY) if enhance_image: clip_limit = 1.6 + (max(0, min(100, image_correction)) / 45.0) clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8)) gray = clahe.apply(gray) sharpen = np.array( [[0, -1, 0], [-1, 5, -1], [0, -1, 0]], dtype=np.float32, ) gray = cv2.filter2D(gray, -1, sharpen) blur = cv2.GaussianBlur(gray, (3, 3), 0) block_size = 31 + (max(0, min(100, sensitivity)) // 20) * 2 if block_size % 2 == 0: block_size += 1 constant = -7 - int((max(0, min(100, sensitivity)) - 60) / 12) inv = 255 - blur mask = cv2.adaptiveThreshold( inv, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, block_size, constant, ) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=1) return gray, mask @dataclass class _OverlayRecord: kind: str category: str label: str box: tuple[int, int, int, int] points: list[dict[str, float]] score: float meta: dict[str, Any] binding_uid: str = "" note: str = "" context_path: str = "" widget_name_override: str = "" static_shape_name_override: str = "" def to_payload(self, index: int) -> dict[str, Any]: x, y, width, height = self.box return { "id": f"{self.kind}-{index:03d}", "kind": self.kind, "category": self.category, "label": self.label, "confidence": _round(self.score, 3), "bounds": { "x": int(x), "y": int(y), "width": int(width), "height": int(height), }, "points": self.points, "meta": self.meta, "bindingUid": self.binding_uid, "note": self.note, "contextPath": self.context_path, "widgetNameOverride": self.widget_name_override, "staticShapeNameOverride": self.static_shape_name_override, } def _collect_contour_metrics( gray: np.ndarray, mask: np.ndarray, ) -> list[dict[str, Any]]: contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) if hierarchy is None or not len(contours): return [] nodes = hierarchy[0] items: list[dict[str, Any]] = [] for index, contour in enumerate(contours): x, y, width, height = cv2.boundingRect(contour) area = width * height if area < 180 or width < 10 or height < 10: continue perimeter = cv2.arcLength(contour, True) approx = cv2.approxPolyDP(contour, 0.02 * perimeter, True) contour_area = max(1.0, cv2.contourArea(contour)) fill = contour_area / max(1.0, area) aspect = max(width / max(1.0, height), height / max(1.0, width)) roi = gray[y : y + height, x : x + width] mean = float(roi.mean()) if roi.size else 255.0 std = float(roi.std()) if roi.size else 0.0 child_count = 0 child = int(nodes[index][2]) while child != -1: child_count += 1 child = int(nodes[child][0]) hull = cv2.convexHull(contour) hull_area = max(1.0, cv2.contourArea(hull)) solidity = contour_area / hull_area points = approx.reshape(-1, 2).tolist() if len(approx) >= 3 else [] contour_points = contour.reshape(-1, 2) edge_pad_x = max(3, int(round(width * 0.05))) edge_pad_y = max(3, int(round(height * 0.05))) def _span_ratio(values: np.ndarray, extent: int) -> float: if values.size < 2 or extent <= 0: return 0.0 return float(values.max() - values.min()) / float(extent) top_x = contour_points[contour_points[:, 1] <= y + edge_pad_y][:, 0] bottom_x = contour_points[contour_points[:, 1] >= y + height - edge_pad_y][:, 0] left_y = contour_points[contour_points[:, 0] <= x + edge_pad_x][:, 1] right_y = contour_points[contour_points[:, 0] >= x + width - edge_pad_x][:, 1] top_span = _span_ratio(top_x, width) bottom_span = _span_ratio(bottom_x, width) left_span = _span_ratio(left_y, height) right_span = _span_ratio(right_y, height) closed_frame = ( top_span >= 0.45 and bottom_span >= 0.45 and left_span >= 0.28 and right_span >= 0.28 ) items.append( { "box": (int(x), int(y), int(width), int(height)), "area": int(area), "contour_area": contour_area, "fill": float(fill), "aspect": float(aspect), "mean": mean, "std": std, "children": child_count, "verts": len(approx), "solidity": float(solidity), "closed_frame": bool(closed_frame), "edge_spans": { "top": round(top_span, 3), "bottom": round(bottom_span, 3), "left": round(left_span, 3), "right": round(right_span, 3), }, "points": [{"x": float(px), "y": float(py)} for px, py in points], } ) return items def _classify_cells(items: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: cells: list[dict[str, Any]] = [] for item in items: x, y, width, height = item["box"] fill = float(item["fill"]) aspect = float(item["aspect"]) mean = float(item["mean"]) std = float(item["std"]) children = int(item["children"]) verts = int(item["verts"]) rectish = verts <= 8 if not rectish: continue if not (34 <= width <= 220 and 16 <= height <= 90): continue if not (1.1 <= aspect <= 9.0): continue if mean <= 120 or std <= 15: continue score = 0.0 score += min(width, 220) / 220.0 * 0.2 score += min(height, 90) / 90.0 * 0.1 score += min(0.4, children * 0.05) score += (1 - min(abs(fill - 0.65), 0.65) / 0.65) * 0.2 score += (1 - min(abs(aspect - 3.0), 3.0) / 3.0) * 0.1 score += min(std / 80.0, 1.0) * 0.2 cells.append( { **item, "kind": "cell", "category": "value", "label": "Ячейка", "score": float(score), } ) return _dedupe_by_iou(cells, 0.8) def _classify_groups(items: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: groups: list[dict[str, Any]] = [] for item in items: width = int(item["box"][2]) height = int(item["box"][3]) area = int(item["area"]) fill = float(item["fill"]) aspect = float(item["aspect"]) mean = float(item["mean"]) std = float(item["std"]) children = int(item["children"]) verts = int(item["verts"]) closed_frame = bool(item.get("closed_frame")) rectish = verts <= 12 if not rectish: continue if width < 140 or height < 38 or area < 12000: continue if aspect > 12.0 or mean <= 120: continue if not closed_frame: continue score = 0.0 score += min(0.5, children * 0.09) score += (1 - min(abs(fill - 0.75), 0.75) / 0.75) * 0.15 score += min(std / 80.0, 1.0) * 0.1 score += min(area / 120000.0, 1.0) * 0.25 groups.append( { **item, "kind": "form", "category": "static", "label": "Форма", "score": float(score), "meta": { **dict(item.get("meta") or {}), "staticSubtype": "form", "staticFormType": "frame", }, } ) groups = _dedupe_by_iou(groups, 0.7) filtered: list[dict[str, Any]] = [] for candidate in groups: candidate_box = tuple(candidate["box"]) nested_groups = [ other for other in groups if other is not candidate and _contains(tuple(other["box"]), candidate_box, tolerance=3) and (other["box"][2] * other["box"][3]) < (candidate_box[2] * candidate_box[3]) * 0.82 ] nested_area = sum(int(other["box"][2]) * int(other["box"][3]) for other in nested_groups) cover_ratio = nested_area / max(1.0, candidate_box[2] * candidate_box[3]) if len(nested_groups) >= 2 and cover_ratio >= 0.34: continue filtered.append(candidate) return filtered def _classify_arrows(items: Iterable[dict[str, Any]], *, ignore_lines: bool) -> list[dict[str, Any]]: arrows: list[dict[str, Any]] = [] for item in items: width = int(item["box"][2]) height = int(item["box"][3]) area = int(item["area"]) fill = float(item["fill"]) aspect = float(item["aspect"]) std = float(item["std"]) verts = int(item["verts"]) solidity = float(item["solidity"]) if area > 5000 or width < 18 or height < 10: continue if area < 120 or aspect > 6.0: continue if verts not in {3, 4, 5, 6, 7, 8}: continue if not (0.35 <= solidity <= 0.95): continue if fill > 0.75 or std <= 10: continue if ignore_lines and aspect > 5.6 and min(width, height) <= 8: continue score = 0.0 score += (1 - abs(solidity - 0.6) / 0.6) * 0.3 score += (1 - min(abs(aspect - 2.2), 2.2) / 2.2) * 0.2 score += min(std / 80.0, 1.0) * 0.2 score += (1.0 if verts == 3 else 0.7) * 0.2 arrows.append( { **item, "kind": "arrow", "category": "static", "label": "Стрелка", "score": float(score), "meta": { **dict(item.get("meta") or {}), "staticSubtype": "process_arrow", }, } ) return _dedupe_by_iou(arrows, 0.5) def _detect_sensor_cells(image_bgr: np.ndarray) -> tuple[str, list[dict[str, Any]]]: color_ranges = _load_color_ranges() if not color_ranges: return "", [] positions: list[tuple[int, int, int, int]] = [] title_text = "" try: from src.pipeline_hf import detect_sensor_regions positions = detect_sensor_regions(image_bgr, color_ranges) except Exception: positions = [] try: from src.ocr_utils_demo import ocr_title height, width = image_bgr.shape[:2] title_roi = image_bgr[:45, : int(width / 2.4)] title_text = ocr_title(title_roi) except Exception: title_text = "" overlays: list[dict[str, Any]] = [] for x, y, width, height in positions: x = int(x or 0) y = int(y or 0) width = int(width or 0) height = int(height or 0) if width <= 0 or height <= 0: continue box = (x, y, width, height) overlays.append( { "box": box, "points": _bounds_to_points(box), "score": 0.55, "kind": "widget", "category": "widget", "label": "Ячейка", "note": "", "bindingUid": "", "meta": { "source": "src.pipeline_hf.detect_sensor_regions", }, } ) overlays = _dedupe_by_iou(overlays, 0.78) normalized_title = str(title_text or "").strip() if normalized_title.lower() == "титул не оцифрован": normalized_title = "" return normalized_title, overlays def _merge_cells_with_sensor_ocr( cells: list[dict[str, Any]], sensor_cells: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not sensor_cells: return cells merged: list[dict[str, Any]] = [] used_sensor_indexes: set[int] = set() for cell in cells: cell_box = tuple(cell["box"]) best_index = -1 best_score = -1.0 for index, sensor in enumerate(sensor_cells): sensor_box = tuple(sensor["box"]) overlap = _rect_iou(cell_box, sensor_box) if overlap < 0.28 and not _contains(sensor_box, cell_box, tolerance=4) and not _contains(cell_box, sensor_box, tolerance=4): continue candidate_score = overlap + float(sensor.get("score") or 0.0) * 0.15 if candidate_score > best_score: best_score = candidate_score best_index = index if best_index >= 0: sensor = sensor_cells[best_index] used_sensor_indexes.add(best_index) merged.append( { **cell, "box": tuple(sensor["box"]), "points": list(sensor.get("points") or _bounds_to_points(tuple(sensor["box"]))), "score": max(float(cell.get("score") or 0.0), float(sensor.get("score") or 0.0)), "note": str(sensor.get("note") or ""), "bindingUid": str(sensor.get("bindingUid") or ""), "meta": { **dict(cell.get("meta") or {}), **dict(sensor.get("meta") or {}), "matchedBySensorOcr": True, }, } ) continue merged.append(cell) for index, sensor in enumerate(sensor_cells): if index not in used_sensor_indexes: merged.append(sensor) return _dedupe_by_iou(merged, 0.78) def _ocr_layout_cells(image_bgr: np.ndarray, cells: list[dict[str, Any]]) -> list[dict[str, Any]]: if not cells: return cells try: from src.ocr_utils_demo import ocr_sensors except Exception: return cells rois: list[np.ndarray] = [] crop_boxes: list[tuple[int, int, int, int]] = [] image_height, image_width = image_bgr.shape[:2] for cell in cells: x, y, width, height = tuple(cell["box"]) pad_x = max(2, int(round(width * 0.04))) pad_y = max(2, int(round(height * 0.08))) left = max(0, x - pad_x) top = max(0, y - pad_y) right = min(image_width, x + width + pad_x) bottom = min(image_height, y + height + pad_y) if right <= left or bottom <= top: rois.append(np.zeros((1, 1, 3), dtype=np.uint8)) crop_boxes.append((left, top, right, bottom)) continue rois.append(image_bgr[top:bottom, left:right]) crop_boxes.append((left, top, right, bottom)) try: ocr_results = ocr_sensors(rois) except Exception: return cells enriched: list[dict[str, Any]] = [] for cell, result, crop_box in zip(cells, ocr_results, crop_boxes): text = str((result or {}).get("text") or "").strip() if text == "?": text = "" score = float((result or {}).get("score") or 0.0) existing_note = str(cell.get("note") or "").strip() binding_uid = str(cell.get("bindingUid") or "").strip() if not binding_uid: binding_uid = _normalize_binding_uid(text) meta = { **dict(cell.get("meta") or {}), "cellOcrText": text, "cellOcrScore": score, "cellOcrCrop": { "x": crop_box[0], "y": crop_box[1], "width": max(0, crop_box[2] - crop_box[0]), "height": max(0, crop_box[3] - crop_box[1]), }, } enriched.append( { **cell, "bindingUid": binding_uid, "note": existing_note or text, "meta": meta, } ) return enriched def _attach_group_metrics(groups: list[dict[str, Any]], cells: list[dict[str, Any]], arrows: list[dict[str, Any]]) -> None: for group in groups: group_box = tuple(group["box"]) cell_count = sum(1 for cell in cells if _contains(tuple(cell["box"]), group_box, tolerance=2)) arrow_count = sum(1 for arrow in arrows if _contains(tuple(arrow["box"]), group_box, tolerance=2)) group.setdefault("meta", {}) group["meta"]["cellCount"] = cell_count group["meta"]["arrowCount"] = arrow_count def _build_overlay_records(groups: list[dict[str, Any]], cells: list[dict[str, Any]], arrows: list[dict[str, Any]]) -> list[_OverlayRecord]: overlays: list[_OverlayRecord] = [] for item in groups: box = tuple(item["box"]) overlays.append( _OverlayRecord( kind="form", category="static", label="Форма", box=box, points=_bounds_to_points(box), score=float(item["score"]), meta=dict(item.get("meta") or {}), ) ) for item in cells: box = tuple(item["box"]) overlays.append( _OverlayRecord( kind="widget", category="widget", label="Ячейка", box=box, points=list(item.get("points") or _bounds_to_points(box)), score=float(item["score"]), meta=dict(item.get("meta") or {}), binding_uid=str(item.get("bindingUid") or ""), note=str(item.get("note") or ""), context_path=str(item.get("contextPath") or ""), widget_name_override=str(item.get("widgetNameOverride") or ""), static_shape_name_override=str(item.get("staticShapeNameOverride") or ""), ) ) for item in arrows: box = tuple(item["box"]) points = item.get("points") or _bounds_to_points(box) overlays.append( _OverlayRecord( kind="arrow", category="static", label="Стрелка", box=box, points=points, score=float(item["score"]), meta=dict(item.get("meta") or {}), binding_uid=str(item.get("bindingUid") or ""), note=str(item.get("note") or ""), context_path=str(item.get("contextPath") or ""), widget_name_override=str(item.get("widgetNameOverride") or ""), static_shape_name_override=str(item.get("staticShapeNameOverride") or ""), ) ) overlays.sort(key=lambda item: (item.box[1], item.box[0], item.box[2] * item.box[3])) return overlays def analyze_layout( image: Image.Image | np.ndarray, *, sensitivity: int = 60, image_correction: int = 55, enhance_image: bool = True, ignore_lines: bool = True, ) -> dict[str, Any]: image_bgr = _prepare_bgr_image(image) cache_key = _make_analyze_layout_cache_key( image_bgr, sensitivity=sensitivity, image_correction=image_correction, enhance_image=enhance_image, ignore_lines=ignore_lines, ) cached = _ANALYZE_LAYOUT_CACHE.get(cache_key) if cached is not None: return copy.deepcopy(cached) gray, mask = _prepare_detection_mask( image_bgr, sensitivity=sensitivity, image_correction=image_correction, enhance_image=enhance_image, ) contour_items = _collect_contour_metrics(gray, mask) cells = _classify_cells(contour_items) groups = _classify_groups(contour_items) arrows = _classify_arrows(contour_items, ignore_lines=ignore_lines) title_text, sensor_cells = _detect_sensor_cells(image_bgr) cells = _merge_cells_with_sensor_ocr(cells, sensor_cells) cells = _ocr_layout_cells(image_bgr, cells) _attach_group_metrics(groups, cells, arrows) overlays = _build_overlay_records(groups, cells, arrows) payloads = [overlay.to_payload(index) for index, overlay in zip(count(1), overlays)] counts = { "widgets": len(cells), "statics": len(groups) + len(arrows), "forms": len(groups), "shapes": 0, "groups": len(groups), "cells": len(cells), "arrows": len(arrows), "ocrCells": sum(1 for cell in cells if str(cell.get("note") or "").strip() or str(cell.get("bindingUid") or "").strip()), "total": len(payloads), } result = { "engine": "src-hybrid-layout-v2", "summary": ( f"Распознано объектов: {counts['total']} " f"(виджеты: {counts['widgets']}, формы: {counts['forms']}, стрелки: {counts['arrows']}, OCR-ячейки: {counts['ocrCells']})." ), "counts": counts, "overlays": payloads, "title": title_text, "imageWidth": int(image_bgr.shape[1]), "imageHeight": int(image_bgr.shape[0]), } _cache_layout_result(cache_key, result) return result