Spaces:
Paused
Paused
| from __future__ import annotations | |
| import copy | |
| import hashlib | |
| from dataclasses import dataclass | |
| from itertools import count | |
| from pathlib import Path | |
| from typing import Any, Iterable | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| def _clamp(value: float, minimum: float, maximum: float) -> float: | |
| return max(minimum, min(maximum, value)) | |
| def _round(value: float, digits: int = 3) -> float: | |
| return round(float(value), digits) | |
| def _rect_iou(a: tuple[int, int, int, int], b: tuple[int, int, int, int]) -> float: | |
| ax, ay, aw, ah = a | |
| bx, by, bw, bh = b | |
| x1 = max(ax, bx) | |
| y1 = max(ay, by) | |
| x2 = min(ax + aw, bx + bw) | |
| y2 = min(ay + ah, by + bh) | |
| if x2 <= x1 or y2 <= y1: | |
| return 0.0 | |
| inter = float((x2 - x1) * (y2 - y1)) | |
| union = float(aw * ah + bw * bh - inter) | |
| return inter / union if union > 0 else 0.0 | |
| def _contains(inner: tuple[int, int, int, int], outer: tuple[int, int, int, int], tolerance: int = 0) -> bool: | |
| ix, iy, iw, ih = inner | |
| ox, oy, ow, oh = outer | |
| return ( | |
| ix >= ox - tolerance | |
| and iy >= oy - tolerance | |
| and ix + iw <= ox + ow + tolerance | |
| and iy + ih <= oy + oh + tolerance | |
| ) | |
| def _bounds_to_points(box: tuple[int, int, int, int]) -> list[dict[str, float]]: | |
| x, y, w, h = box | |
| return [ | |
| {"x": float(x), "y": float(y)}, | |
| {"x": float(x + w), "y": float(y)}, | |
| {"x": float(x + w), "y": float(y + h)}, | |
| {"x": float(x), "y": float(y + h)}, | |
| ] | |
| def _dedupe_by_iou(items: list[dict[str, Any]], threshold: float) -> list[dict[str, Any]]: | |
| kept: list[dict[str, Any]] = [] | |
| for item in sorted(items, key=lambda candidate: float(candidate.get("score") or 0.0), reverse=True): | |
| box = tuple(item["box"]) | |
| if any(_rect_iou(box, tuple(existing["box"])) > threshold for existing in kept): | |
| continue | |
| kept.append(item) | |
| return kept | |
| def _prepare_bgr_image(image: Image.Image | np.ndarray) -> np.ndarray: | |
| if isinstance(image, np.ndarray): | |
| if image.ndim == 2: | |
| return cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) | |
| return image.copy() | |
| rgb = np.array(image.convert("RGB")) | |
| return cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR) | |
| _UID_TRANSLATION = { | |
| "O": "0", | |
| "Q": "0", | |
| "D": "0", | |
| "I": "1", | |
| "L": "1", | |
| "|": "1", | |
| "S": "5", | |
| "Z": "2", | |
| "B": "8", | |
| } | |
| _DEFAULT_CONFIG_PATH = Path(__file__).resolve().parents[1] / "configs" / "config.yaml" | |
| _COLOR_RANGES_CACHE: dict[str, Any] | None = None | |
| _ANALYZE_LAYOUT_CACHE: dict[str, dict[str, Any]] = {} | |
| _ANALYZE_LAYOUT_CACHE_ORDER: list[str] = [] | |
| _ANALYZE_LAYOUT_CACHE_MAX = 8 | |
| def _normalize_binding_uid(value: Any) -> str: | |
| text = str(value or "").strip().upper() | |
| if not text: | |
| return "" | |
| translated = "".join(_UID_TRANSLATION.get(char, char) for char in text) | |
| digits = "".join(char for char in translated if char.isdigit()) | |
| if not digits: | |
| return "" | |
| stripped = digits.lstrip("0") | |
| return stripped or digits | |
| def _load_color_ranges() -> dict[str, Any]: | |
| global _COLOR_RANGES_CACHE | |
| if _COLOR_RANGES_CACHE is not None: | |
| return _COLOR_RANGES_CACHE | |
| try: | |
| from src.config_loader import load_config | |
| config = load_config(_DEFAULT_CONFIG_PATH) | |
| colors = config.get("colors") if isinstance(config, dict) else None | |
| _COLOR_RANGES_CACHE = dict(colors or {}) | |
| except Exception: | |
| _COLOR_RANGES_CACHE = {} | |
| return _COLOR_RANGES_CACHE | |
| def _make_analyze_layout_cache_key( | |
| image_bgr: np.ndarray, | |
| *, | |
| sensitivity: int, | |
| image_correction: int, | |
| enhance_image: bool, | |
| ignore_lines: bool, | |
| ) -> str: | |
| digest = hashlib.blake2b(image_bgr.tobytes(), digest_size=16).hexdigest() | |
| return ( | |
| f"{image_bgr.shape[1]}x{image_bgr.shape[0]}:{sensitivity}:{image_correction}:" | |
| f"{int(enhance_image)}:{int(ignore_lines)}:{digest}" | |
| ) | |
| def _cache_layout_result(key: str, result: dict[str, Any]) -> None: | |
| if key in _ANALYZE_LAYOUT_CACHE: | |
| _ANALYZE_LAYOUT_CACHE_ORDER.remove(key) | |
| _ANALYZE_LAYOUT_CACHE[key] = copy.deepcopy(result) | |
| _ANALYZE_LAYOUT_CACHE_ORDER.append(key) | |
| while len(_ANALYZE_LAYOUT_CACHE_ORDER) > _ANALYZE_LAYOUT_CACHE_MAX: | |
| oldest = _ANALYZE_LAYOUT_CACHE_ORDER.pop(0) | |
| _ANALYZE_LAYOUT_CACHE.pop(oldest, None) | |
| def _prepare_detection_mask( | |
| image_bgr: np.ndarray, | |
| *, | |
| sensitivity: int = 60, | |
| image_correction: int = 55, | |
| enhance_image: bool = True, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY) | |
| if enhance_image: | |
| clip_limit = 1.6 + (max(0, min(100, image_correction)) / 45.0) | |
| clahe = cv2.createCLAHE(clipLimit=clip_limit, tileGridSize=(8, 8)) | |
| gray = clahe.apply(gray) | |
| sharpen = np.array( | |
| [[0, -1, 0], [-1, 5, -1], [0, -1, 0]], | |
| dtype=np.float32, | |
| ) | |
| gray = cv2.filter2D(gray, -1, sharpen) | |
| blur = cv2.GaussianBlur(gray, (3, 3), 0) | |
| block_size = 31 + (max(0, min(100, sensitivity)) // 20) * 2 | |
| if block_size % 2 == 0: | |
| block_size += 1 | |
| constant = -7 - int((max(0, min(100, sensitivity)) - 60) / 12) | |
| inv = 255 - blur | |
| mask = cv2.adaptiveThreshold( | |
| inv, | |
| 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, | |
| block_size, | |
| constant, | |
| ) | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) | |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=1) | |
| return gray, mask | |
| class _OverlayRecord: | |
| kind: str | |
| category: str | |
| label: str | |
| box: tuple[int, int, int, int] | |
| points: list[dict[str, float]] | |
| score: float | |
| meta: dict[str, Any] | |
| binding_uid: str = "" | |
| note: str = "" | |
| context_path: str = "" | |
| widget_name_override: str = "" | |
| static_shape_name_override: str = "" | |
| def to_payload(self, index: int) -> dict[str, Any]: | |
| x, y, width, height = self.box | |
| return { | |
| "id": f"{self.kind}-{index:03d}", | |
| "kind": self.kind, | |
| "category": self.category, | |
| "label": self.label, | |
| "confidence": _round(self.score, 3), | |
| "bounds": { | |
| "x": int(x), | |
| "y": int(y), | |
| "width": int(width), | |
| "height": int(height), | |
| }, | |
| "points": self.points, | |
| "meta": self.meta, | |
| "bindingUid": self.binding_uid, | |
| "note": self.note, | |
| "contextPath": self.context_path, | |
| "widgetNameOverride": self.widget_name_override, | |
| "staticShapeNameOverride": self.static_shape_name_override, | |
| } | |
| def _collect_contour_metrics( | |
| gray: np.ndarray, | |
| mask: np.ndarray, | |
| ) -> list[dict[str, Any]]: | |
| contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) | |
| if hierarchy is None or not len(contours): | |
| return [] | |
| nodes = hierarchy[0] | |
| items: list[dict[str, Any]] = [] | |
| for index, contour in enumerate(contours): | |
| x, y, width, height = cv2.boundingRect(contour) | |
| area = width * height | |
| if area < 180 or width < 10 or height < 10: | |
| continue | |
| perimeter = cv2.arcLength(contour, True) | |
| approx = cv2.approxPolyDP(contour, 0.02 * perimeter, True) | |
| contour_area = max(1.0, cv2.contourArea(contour)) | |
| fill = contour_area / max(1.0, area) | |
| aspect = max(width / max(1.0, height), height / max(1.0, width)) | |
| roi = gray[y : y + height, x : x + width] | |
| mean = float(roi.mean()) if roi.size else 255.0 | |
| std = float(roi.std()) if roi.size else 0.0 | |
| child_count = 0 | |
| child = int(nodes[index][2]) | |
| while child != -1: | |
| child_count += 1 | |
| child = int(nodes[child][0]) | |
| hull = cv2.convexHull(contour) | |
| hull_area = max(1.0, cv2.contourArea(hull)) | |
| solidity = contour_area / hull_area | |
| points = approx.reshape(-1, 2).tolist() if len(approx) >= 3 else [] | |
| contour_points = contour.reshape(-1, 2) | |
| edge_pad_x = max(3, int(round(width * 0.05))) | |
| edge_pad_y = max(3, int(round(height * 0.05))) | |
| def _span_ratio(values: np.ndarray, extent: int) -> float: | |
| if values.size < 2 or extent <= 0: | |
| return 0.0 | |
| return float(values.max() - values.min()) / float(extent) | |
| top_x = contour_points[contour_points[:, 1] <= y + edge_pad_y][:, 0] | |
| bottom_x = contour_points[contour_points[:, 1] >= y + height - edge_pad_y][:, 0] | |
| left_y = contour_points[contour_points[:, 0] <= x + edge_pad_x][:, 1] | |
| right_y = contour_points[contour_points[:, 0] >= x + width - edge_pad_x][:, 1] | |
| top_span = _span_ratio(top_x, width) | |
| bottom_span = _span_ratio(bottom_x, width) | |
| left_span = _span_ratio(left_y, height) | |
| right_span = _span_ratio(right_y, height) | |
| closed_frame = ( | |
| top_span >= 0.45 | |
| and bottom_span >= 0.45 | |
| and left_span >= 0.28 | |
| and right_span >= 0.28 | |
| ) | |
| items.append( | |
| { | |
| "box": (int(x), int(y), int(width), int(height)), | |
| "area": int(area), | |
| "contour_area": contour_area, | |
| "fill": float(fill), | |
| "aspect": float(aspect), | |
| "mean": mean, | |
| "std": std, | |
| "children": child_count, | |
| "verts": len(approx), | |
| "solidity": float(solidity), | |
| "closed_frame": bool(closed_frame), | |
| "edge_spans": { | |
| "top": round(top_span, 3), | |
| "bottom": round(bottom_span, 3), | |
| "left": round(left_span, 3), | |
| "right": round(right_span, 3), | |
| }, | |
| "points": [{"x": float(px), "y": float(py)} for px, py in points], | |
| } | |
| ) | |
| return items | |
| def _classify_cells(items: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: | |
| cells: list[dict[str, Any]] = [] | |
| for item in items: | |
| x, y, width, height = item["box"] | |
| fill = float(item["fill"]) | |
| aspect = float(item["aspect"]) | |
| mean = float(item["mean"]) | |
| std = float(item["std"]) | |
| children = int(item["children"]) | |
| verts = int(item["verts"]) | |
| rectish = verts <= 8 | |
| if not rectish: | |
| continue | |
| if not (34 <= width <= 220 and 16 <= height <= 90): | |
| continue | |
| if not (1.1 <= aspect <= 9.0): | |
| continue | |
| if mean <= 120 or std <= 15: | |
| continue | |
| score = 0.0 | |
| score += min(width, 220) / 220.0 * 0.2 | |
| score += min(height, 90) / 90.0 * 0.1 | |
| score += min(0.4, children * 0.05) | |
| score += (1 - min(abs(fill - 0.65), 0.65) / 0.65) * 0.2 | |
| score += (1 - min(abs(aspect - 3.0), 3.0) / 3.0) * 0.1 | |
| score += min(std / 80.0, 1.0) * 0.2 | |
| cells.append( | |
| { | |
| **item, | |
| "kind": "cell", | |
| "category": "value", | |
| "label": "Ячейка", | |
| "score": float(score), | |
| } | |
| ) | |
| return _dedupe_by_iou(cells, 0.8) | |
| def _classify_groups(items: Iterable[dict[str, Any]]) -> list[dict[str, Any]]: | |
| groups: list[dict[str, Any]] = [] | |
| for item in items: | |
| width = int(item["box"][2]) | |
| height = int(item["box"][3]) | |
| area = int(item["area"]) | |
| fill = float(item["fill"]) | |
| aspect = float(item["aspect"]) | |
| mean = float(item["mean"]) | |
| std = float(item["std"]) | |
| children = int(item["children"]) | |
| verts = int(item["verts"]) | |
| closed_frame = bool(item.get("closed_frame")) | |
| rectish = verts <= 12 | |
| if not rectish: | |
| continue | |
| if width < 140 or height < 38 or area < 12000: | |
| continue | |
| if aspect > 12.0 or mean <= 120: | |
| continue | |
| if not closed_frame: | |
| continue | |
| score = 0.0 | |
| score += min(0.5, children * 0.09) | |
| score += (1 - min(abs(fill - 0.75), 0.75) / 0.75) * 0.15 | |
| score += min(std / 80.0, 1.0) * 0.1 | |
| score += min(area / 120000.0, 1.0) * 0.25 | |
| groups.append( | |
| { | |
| **item, | |
| "kind": "form", | |
| "category": "static", | |
| "label": "Форма", | |
| "score": float(score), | |
| "meta": { | |
| **dict(item.get("meta") or {}), | |
| "staticSubtype": "form", | |
| "staticFormType": "frame", | |
| }, | |
| } | |
| ) | |
| groups = _dedupe_by_iou(groups, 0.7) | |
| filtered: list[dict[str, Any]] = [] | |
| for candidate in groups: | |
| candidate_box = tuple(candidate["box"]) | |
| nested_groups = [ | |
| other | |
| for other in groups | |
| if other is not candidate | |
| and _contains(tuple(other["box"]), candidate_box, tolerance=3) | |
| and (other["box"][2] * other["box"][3]) < (candidate_box[2] * candidate_box[3]) * 0.82 | |
| ] | |
| nested_area = sum(int(other["box"][2]) * int(other["box"][3]) for other in nested_groups) | |
| cover_ratio = nested_area / max(1.0, candidate_box[2] * candidate_box[3]) | |
| if len(nested_groups) >= 2 and cover_ratio >= 0.34: | |
| continue | |
| filtered.append(candidate) | |
| return filtered | |
| def _classify_arrows(items: Iterable[dict[str, Any]], *, ignore_lines: bool) -> list[dict[str, Any]]: | |
| arrows: list[dict[str, Any]] = [] | |
| for item in items: | |
| width = int(item["box"][2]) | |
| height = int(item["box"][3]) | |
| area = int(item["area"]) | |
| fill = float(item["fill"]) | |
| aspect = float(item["aspect"]) | |
| std = float(item["std"]) | |
| verts = int(item["verts"]) | |
| solidity = float(item["solidity"]) | |
| if area > 5000 or width < 18 or height < 10: | |
| continue | |
| if area < 120 or aspect > 6.0: | |
| continue | |
| if verts not in {3, 4, 5, 6, 7, 8}: | |
| continue | |
| if not (0.35 <= solidity <= 0.95): | |
| continue | |
| if fill > 0.75 or std <= 10: | |
| continue | |
| if ignore_lines and aspect > 5.6 and min(width, height) <= 8: | |
| continue | |
| score = 0.0 | |
| score += (1 - abs(solidity - 0.6) / 0.6) * 0.3 | |
| score += (1 - min(abs(aspect - 2.2), 2.2) / 2.2) * 0.2 | |
| score += min(std / 80.0, 1.0) * 0.2 | |
| score += (1.0 if verts == 3 else 0.7) * 0.2 | |
| arrows.append( | |
| { | |
| **item, | |
| "kind": "arrow", | |
| "category": "static", | |
| "label": "Стрелка", | |
| "score": float(score), | |
| "meta": { | |
| **dict(item.get("meta") or {}), | |
| "staticSubtype": "process_arrow", | |
| }, | |
| } | |
| ) | |
| return _dedupe_by_iou(arrows, 0.5) | |
| def _detect_sensor_cells(image_bgr: np.ndarray) -> tuple[str, list[dict[str, Any]]]: | |
| color_ranges = _load_color_ranges() | |
| if not color_ranges: | |
| return "", [] | |
| positions: list[tuple[int, int, int, int]] = [] | |
| title_text = "" | |
| try: | |
| from src.pipeline_hf import detect_sensor_regions | |
| positions = detect_sensor_regions(image_bgr, color_ranges) | |
| except Exception: | |
| positions = [] | |
| try: | |
| from src.ocr_utils_demo import ocr_title | |
| height, width = image_bgr.shape[:2] | |
| title_roi = image_bgr[:45, : int(width / 2.4)] | |
| title_text = ocr_title(title_roi) | |
| except Exception: | |
| title_text = "" | |
| overlays: list[dict[str, Any]] = [] | |
| for x, y, width, height in positions: | |
| x = int(x or 0) | |
| y = int(y or 0) | |
| width = int(width or 0) | |
| height = int(height or 0) | |
| if width <= 0 or height <= 0: | |
| continue | |
| box = (x, y, width, height) | |
| overlays.append( | |
| { | |
| "box": box, | |
| "points": _bounds_to_points(box), | |
| "score": 0.55, | |
| "kind": "widget", | |
| "category": "widget", | |
| "label": "Ячейка", | |
| "note": "", | |
| "bindingUid": "", | |
| "meta": { | |
| "source": "src.pipeline_hf.detect_sensor_regions", | |
| }, | |
| } | |
| ) | |
| overlays = _dedupe_by_iou(overlays, 0.78) | |
| normalized_title = str(title_text or "").strip() | |
| if normalized_title.lower() == "титул не оцифрован": | |
| normalized_title = "" | |
| return normalized_title, overlays | |
| def _merge_cells_with_sensor_ocr( | |
| cells: list[dict[str, Any]], | |
| sensor_cells: list[dict[str, Any]], | |
| ) -> list[dict[str, Any]]: | |
| if not sensor_cells: | |
| return cells | |
| merged: list[dict[str, Any]] = [] | |
| used_sensor_indexes: set[int] = set() | |
| for cell in cells: | |
| cell_box = tuple(cell["box"]) | |
| best_index = -1 | |
| best_score = -1.0 | |
| for index, sensor in enumerate(sensor_cells): | |
| sensor_box = tuple(sensor["box"]) | |
| overlap = _rect_iou(cell_box, sensor_box) | |
| if overlap < 0.28 and not _contains(sensor_box, cell_box, tolerance=4) and not _contains(cell_box, sensor_box, tolerance=4): | |
| continue | |
| candidate_score = overlap + float(sensor.get("score") or 0.0) * 0.15 | |
| if candidate_score > best_score: | |
| best_score = candidate_score | |
| best_index = index | |
| if best_index >= 0: | |
| sensor = sensor_cells[best_index] | |
| used_sensor_indexes.add(best_index) | |
| merged.append( | |
| { | |
| **cell, | |
| "box": tuple(sensor["box"]), | |
| "points": list(sensor.get("points") or _bounds_to_points(tuple(sensor["box"]))), | |
| "score": max(float(cell.get("score") or 0.0), float(sensor.get("score") or 0.0)), | |
| "note": str(sensor.get("note") or ""), | |
| "bindingUid": str(sensor.get("bindingUid") or ""), | |
| "meta": { | |
| **dict(cell.get("meta") or {}), | |
| **dict(sensor.get("meta") or {}), | |
| "matchedBySensorOcr": True, | |
| }, | |
| } | |
| ) | |
| continue | |
| merged.append(cell) | |
| for index, sensor in enumerate(sensor_cells): | |
| if index not in used_sensor_indexes: | |
| merged.append(sensor) | |
| return _dedupe_by_iou(merged, 0.78) | |
| def _ocr_layout_cells(image_bgr: np.ndarray, cells: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| if not cells: | |
| return cells | |
| try: | |
| from src.ocr_utils_demo import ocr_sensors | |
| except Exception: | |
| return cells | |
| rois: list[np.ndarray] = [] | |
| crop_boxes: list[tuple[int, int, int, int]] = [] | |
| image_height, image_width = image_bgr.shape[:2] | |
| for cell in cells: | |
| x, y, width, height = tuple(cell["box"]) | |
| pad_x = max(2, int(round(width * 0.04))) | |
| pad_y = max(2, int(round(height * 0.08))) | |
| left = max(0, x - pad_x) | |
| top = max(0, y - pad_y) | |
| right = min(image_width, x + width + pad_x) | |
| bottom = min(image_height, y + height + pad_y) | |
| if right <= left or bottom <= top: | |
| rois.append(np.zeros((1, 1, 3), dtype=np.uint8)) | |
| crop_boxes.append((left, top, right, bottom)) | |
| continue | |
| rois.append(image_bgr[top:bottom, left:right]) | |
| crop_boxes.append((left, top, right, bottom)) | |
| try: | |
| ocr_results = ocr_sensors(rois) | |
| except Exception: | |
| return cells | |
| enriched: list[dict[str, Any]] = [] | |
| for cell, result, crop_box in zip(cells, ocr_results, crop_boxes): | |
| text = str((result or {}).get("text") or "").strip() | |
| if text == "?": | |
| text = "" | |
| score = float((result or {}).get("score") or 0.0) | |
| existing_note = str(cell.get("note") or "").strip() | |
| binding_uid = str(cell.get("bindingUid") or "").strip() | |
| if not binding_uid: | |
| binding_uid = _normalize_binding_uid(text) | |
| meta = { | |
| **dict(cell.get("meta") or {}), | |
| "cellOcrText": text, | |
| "cellOcrScore": score, | |
| "cellOcrCrop": { | |
| "x": crop_box[0], | |
| "y": crop_box[1], | |
| "width": max(0, crop_box[2] - crop_box[0]), | |
| "height": max(0, crop_box[3] - crop_box[1]), | |
| }, | |
| } | |
| enriched.append( | |
| { | |
| **cell, | |
| "bindingUid": binding_uid, | |
| "note": existing_note or text, | |
| "meta": meta, | |
| } | |
| ) | |
| return enriched | |
| def _attach_group_metrics(groups: list[dict[str, Any]], cells: list[dict[str, Any]], arrows: list[dict[str, Any]]) -> None: | |
| for group in groups: | |
| group_box = tuple(group["box"]) | |
| cell_count = sum(1 for cell in cells if _contains(tuple(cell["box"]), group_box, tolerance=2)) | |
| arrow_count = sum(1 for arrow in arrows if _contains(tuple(arrow["box"]), group_box, tolerance=2)) | |
| group.setdefault("meta", {}) | |
| group["meta"]["cellCount"] = cell_count | |
| group["meta"]["arrowCount"] = arrow_count | |
| def _build_overlay_records(groups: list[dict[str, Any]], cells: list[dict[str, Any]], arrows: list[dict[str, Any]]) -> list[_OverlayRecord]: | |
| overlays: list[_OverlayRecord] = [] | |
| for item in groups: | |
| box = tuple(item["box"]) | |
| overlays.append( | |
| _OverlayRecord( | |
| kind="form", | |
| category="static", | |
| label="Форма", | |
| box=box, | |
| points=_bounds_to_points(box), | |
| score=float(item["score"]), | |
| meta=dict(item.get("meta") or {}), | |
| ) | |
| ) | |
| for item in cells: | |
| box = tuple(item["box"]) | |
| overlays.append( | |
| _OverlayRecord( | |
| kind="widget", | |
| category="widget", | |
| label="Ячейка", | |
| box=box, | |
| points=list(item.get("points") or _bounds_to_points(box)), | |
| score=float(item["score"]), | |
| meta=dict(item.get("meta") or {}), | |
| binding_uid=str(item.get("bindingUid") or ""), | |
| note=str(item.get("note") or ""), | |
| context_path=str(item.get("contextPath") or ""), | |
| widget_name_override=str(item.get("widgetNameOverride") or ""), | |
| static_shape_name_override=str(item.get("staticShapeNameOverride") or ""), | |
| ) | |
| ) | |
| for item in arrows: | |
| box = tuple(item["box"]) | |
| points = item.get("points") or _bounds_to_points(box) | |
| overlays.append( | |
| _OverlayRecord( | |
| kind="arrow", | |
| category="static", | |
| label="Стрелка", | |
| box=box, | |
| points=points, | |
| score=float(item["score"]), | |
| meta=dict(item.get("meta") or {}), | |
| binding_uid=str(item.get("bindingUid") or ""), | |
| note=str(item.get("note") or ""), | |
| context_path=str(item.get("contextPath") or ""), | |
| widget_name_override=str(item.get("widgetNameOverride") or ""), | |
| static_shape_name_override=str(item.get("staticShapeNameOverride") or ""), | |
| ) | |
| ) | |
| overlays.sort(key=lambda item: (item.box[1], item.box[0], item.box[2] * item.box[3])) | |
| return overlays | |
| def analyze_layout( | |
| image: Image.Image | np.ndarray, | |
| *, | |
| sensitivity: int = 60, | |
| image_correction: int = 55, | |
| enhance_image: bool = True, | |
| ignore_lines: bool = True, | |
| ) -> dict[str, Any]: | |
| image_bgr = _prepare_bgr_image(image) | |
| cache_key = _make_analyze_layout_cache_key( | |
| image_bgr, | |
| sensitivity=sensitivity, | |
| image_correction=image_correction, | |
| enhance_image=enhance_image, | |
| ignore_lines=ignore_lines, | |
| ) | |
| cached = _ANALYZE_LAYOUT_CACHE.get(cache_key) | |
| if cached is not None: | |
| return copy.deepcopy(cached) | |
| gray, mask = _prepare_detection_mask( | |
| image_bgr, | |
| sensitivity=sensitivity, | |
| image_correction=image_correction, | |
| enhance_image=enhance_image, | |
| ) | |
| contour_items = _collect_contour_metrics(gray, mask) | |
| cells = _classify_cells(contour_items) | |
| groups = _classify_groups(contour_items) | |
| arrows = _classify_arrows(contour_items, ignore_lines=ignore_lines) | |
| title_text, sensor_cells = _detect_sensor_cells(image_bgr) | |
| cells = _merge_cells_with_sensor_ocr(cells, sensor_cells) | |
| cells = _ocr_layout_cells(image_bgr, cells) | |
| _attach_group_metrics(groups, cells, arrows) | |
| overlays = _build_overlay_records(groups, cells, arrows) | |
| payloads = [overlay.to_payload(index) for index, overlay in zip(count(1), overlays)] | |
| counts = { | |
| "widgets": len(cells), | |
| "statics": len(groups) + len(arrows), | |
| "forms": len(groups), | |
| "shapes": 0, | |
| "groups": len(groups), | |
| "cells": len(cells), | |
| "arrows": len(arrows), | |
| "ocrCells": sum(1 for cell in cells if str(cell.get("note") or "").strip() or str(cell.get("bindingUid") or "").strip()), | |
| "total": len(payloads), | |
| } | |
| result = { | |
| "engine": "src-hybrid-layout-v2", | |
| "summary": ( | |
| f"Распознано объектов: {counts['total']} " | |
| f"(виджеты: {counts['widgets']}, формы: {counts['forms']}, стрелки: {counts['arrows']}, OCR-ячейки: {counts['ocrCells']})." | |
| ), | |
| "counts": counts, | |
| "overlays": payloads, | |
| "title": title_text, | |
| "imageWidth": int(image_bgr.shape[1]), | |
| "imageHeight": int(image_bgr.shape[0]), | |
| } | |
| _cache_layout_result(cache_key, result) | |
| return result | |