Spaces:
Running
Running
| """ | |
| وحدة استخراج الجداول (Table Extraction Module) | |
| ================================================== | |
| استخراج بيانات الجداول المهيكلة من مناطق الجداول المكتشفة: | |
| - كشف الخلايا باستخدام تحليل المحيطات وتقاطعات الخطوط | |
| - إعادة بناء الصفوف والأعمدة | |
| - التعرف على محتوى الخلايا | |
| - إخراج بيانات مهيكلة (قائمة قوائم) | |
| Extracts structured table data from detected table regions using | |
| contour analysis and line intersection detection. | |
| المصدر: دمج من مشروع arabic-ocr-pro | |
| Source: Merged from arabic-ocr-pro project | |
| OmniFile AI Processor - وحدة معالجة الملفات الذكية | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Optional | |
| import cv2 | |
| import numpy as np | |
| from modules.core.structure import BBox | |
| logger = logging.getLogger(__name__) | |
| class TableExtractor: | |
| """مستخرج البيانات المهيكلة من مناطق الجداول في صور المستندات. | |
| يستخدم مزيجاً من كشف الخطوط وتحليل المحيطات | |
| لتحديد خلايا الجدول وصفوفه وأعمدته. | |
| Extracts structured data from table regions using a combination | |
| of line detection and contour analysis. | |
| Attributes: | |
| ocr_engine: محرك OCR لقراءة محتوى الخلايا / OCR engine for cell contents | |
| min_cell_area: الحد الأدنى لمساحة الخلية / Minimum cell area in pixels | |
| cell_padding: حشوة حول الخلايا للتعرف الأفضل / Padding around cell boundaries | |
| """ | |
| def __init__( | |
| self, | |
| ocr_engine: Optional[object] = None, | |
| min_cell_area: int = 200, | |
| cell_padding: int = 2, | |
| ) -> None: | |
| """تهيئة مستخرج الجداول / Initialize the table extractor. | |
| Args: | |
| ocr_engine: محرك OCR (إذا None، تُستخرج الخلايا فقط) / OCR engine instance | |
| min_cell_area: الحد الأدنى لمساحة الخلية بالبكسل / Minimum cell area in pixels | |
| cell_padding: حشوة بالبكسل حول حدود الخلية / Padding in pixels around cells | |
| """ | |
| self.ocr_engine = ocr_engine | |
| self.min_cell_area = min_cell_area | |
| self.cell_padding = cell_padding | |
| def extract_table( | |
| self, | |
| image: np.ndarray, | |
| table_bbox: BBox, | |
| ) -> list[list[str]]: | |
| """استخراج بيانات الجدول من منطقة جدول / Extract table data from a table region. | |
| يكتشف بنية الشبكة ويحدد الخلايا ويشغل OCR على كل خلية. | |
| Args: | |
| image: صورة المستند الكاملة / Full document image | |
| table_bbox: مربع إحاطة منطقة الجدول / Bounding box of the table region | |
| Returns: | |
| قائمة صفوف، كل صف قائمة نصوص خلايا / List of rows with cell text strings | |
| """ | |
| h, w = image.shape[:2] | |
| # قص منطقة الجدول | |
| x1 = max(0, table_bbox.x) | |
| y1 = max(0, table_bbox.y) | |
| x2 = min(w, table_bbox.x + table_bbox.width) | |
| y2 = min(h, table_bbox.y + table_bbox.height) | |
| if x1 >= x2 or y1 >= y2: | |
| return [] | |
| table_image = image[y1:y2, x1:x2].copy() | |
| gray = self._to_grayscale(table_image) | |
| # كشف الخطوط الأفقية والعمودية | |
| h_lines = self._detect_lines(gray, orientation="horizontal") | |
| v_lines = self._detect_lines(gray, orientation="vertical") | |
| # إذا وجدنا خطوط شبكة، نستخدم كشف الخلايا بالخطوط | |
| if h_lines and v_lines: | |
| cells = self._detect_cells_from_lines(table_image, h_lines, v_lines) | |
| else: | |
| # الرجوع لكشف الخلايا بالمحيطات | |
| cells = self._detect_cells_from_contours(table_image) | |
| if not cells: | |
| logger.debug("No cells detected in table region") | |
| return [] | |
| # تنظيم الخلايا في صفوف | |
| rows = self._organize_into_rows(cells) | |
| # التعرف على كل خلية | |
| if self.ocr_engine is not None: | |
| result = self._ocr_cells(table_image, rows) | |
| else: | |
| result = [["" for _ in row] for row in rows] | |
| logger.debug(f"Extracted table: {len(result)} rows x {max(len(r) for r in result) if result else 0} cols") | |
| return result | |
| def _to_grayscale(self, image: np.ndarray) -> np.ndarray: | |
| """تحويل الصورة إلى تدرج رمادي / Convert image to grayscale.""" | |
| if len(image.shape) == 2: | |
| return image.copy() | |
| return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| def _detect_lines( | |
| self, | |
| gray: np.ndarray, | |
| orientation: str = "horizontal", | |
| min_line_length_ratio: float = 0.2, | |
| ) -> list[tuple[int, int, int, int]]: | |
| """كشف الخطوط في اتجاه معين / Detect lines in a specific orientation.""" | |
| h, w = gray.shape | |
| binary = cv2.adaptiveThreshold( | |
| gray, 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY_INV, 15, 5, | |
| ) | |
| if orientation == "horizontal": | |
| kernel_length = int(w * 0.3) | |
| if kernel_length % 2 == 0: | |
| kernel_length += 1 | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (kernel_length, 1)) | |
| min_length = int(w * min_line_length_ratio) | |
| else: | |
| kernel_length = int(h * 0.2) | |
| if kernel_length % 2 == 0: | |
| kernel_length += 1 | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (1, kernel_length)) | |
| min_length = int(h * min_line_length_ratio) | |
| lines_mask = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel, iterations=2) | |
| edges = cv2.Canny(lines_mask, 50, 150) | |
| lines = cv2.HoughLinesP( | |
| edges, 1, np.pi / 180, | |
| threshold=50, | |
| minLineLength=min_length, | |
| maxLineGap=10, | |
| ) | |
| result: list[tuple[int, int, int, int]] = [] | |
| if lines is not None: | |
| for line in lines: | |
| x1_l, y1_l, x2_l, y2_l = line[0] | |
| result.append((int(x1_l), int(y1_l), int(x2_l), int(y2_l))) | |
| return result | |
| def _detect_cells_from_lines( | |
| self, | |
| image: np.ndarray, | |
| h_lines: list[tuple[int, int, int, int]], | |
| v_lines: list[tuple[int, int, int, int]], | |
| ) -> list[BBox]: | |
| """كشف خلايا الجدول من خطوط الشبكة / Detect table cells from grid lines.""" | |
| h, w = image.shape[:2] | |
| h_y_values: list[int] = [] | |
| for x1, y1, x2, y2 in h_lines: | |
| h_y_values.extend([y1, y2]) | |
| h_y_values = sorted(set(h_y_values)) | |
| v_x_values: list[int] = [] | |
| for x1, y1, x2, y2 in v_lines: | |
| v_x_values.extend([x1, x2]) | |
| v_x_values = sorted(set(v_x_values)) | |
| # إضافة حدود الصورة | |
| h_y_values = [0] + h_y_values + [h] | |
| v_x_values = [0] + v_x_values + [w] | |
| # دمج الإحداثيات المتقاربة | |
| h_y_values = self._merge_close_values(h_y_values, threshold=5) | |
| v_x_values = self._merge_close_values(v_x_values, threshold=5) | |
| # إنشاء مربعات إحاطة الخلايا | |
| cells: list[BBox] = [] | |
| for i in range(len(h_y_values) - 1): | |
| for j in range(len(v_x_values) - 1): | |
| y1 = h_y_values[i] | |
| y2 = h_y_values[i + 1] | |
| x1 = v_x_values[j] | |
| x2 = v_x_values[j + 1] | |
| cw = x2 - x1 | |
| ch = y2 - y1 | |
| if cw > 5 and ch > 5 and cw * ch >= self.min_cell_area: | |
| cells.append(BBox(x=x1, y=y1, width=cw, height=ch)) | |
| return cells | |
| def _detect_cells_from_contours(self, image: np.ndarray) -> list[BBox]: | |
| """كشف خلايا الجدول باستخدام تحليل المحيطات / Detect cells using contour analysis.""" | |
| gray = self._to_grayscale(image) | |
| _, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| contours, _ = cv2.findContours(binary, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) | |
| cells: list[BBox] = [] | |
| for contour in contours: | |
| area = cv2.contourArea(contour) | |
| if area < self.min_cell_area: | |
| continue | |
| x, y, w_c, h_c = cv2.boundingRect(contour) | |
| aspect = h_c / max(w_c, 1) | |
| if 0.1 < aspect < 10.0: | |
| cells.append(BBox(x=x, y=y, width=w_c, height=h_c)) | |
| return cells | |
| def _organize_into_rows(self, cells: list[BBox]) -> list[list[BBox]]: | |
| """تنظيم الخلايا المكتشفة في صفوف / Organize cells into rows.""" | |
| if not cells: | |
| return [] | |
| sorted_cells = sorted(cells, key=lambda c: c.y) | |
| rows: list[list[BBox]] = [] | |
| current_row: list[BBox] = [sorted_cells[0]] | |
| for cell in sorted_cells[1:]: | |
| row_y_center = sum(c.center[1] for c in current_row) / len(current_row) | |
| row_height = max(c.height for c in current_row) | |
| if abs(cell.center[1] - row_y_center) < row_height * 0.6: | |
| current_row.append(cell) | |
| else: | |
| current_row.sort(key=lambda c: c.x, reverse=True) | |
| rows.append(current_row) | |
| current_row = [cell] | |
| if current_row: | |
| current_row.sort(key=lambda c: c.x, reverse=True) | |
| rows.append(current_row) | |
| return rows | |
| def _ocr_cells( | |
| self, | |
| table_image: np.ndarray, | |
| rows: list[list[BBox]], | |
| ) -> list[list[str]]: | |
| """تشغيل OCR على كل خلية من الجدول / Run OCR on each table cell.""" | |
| result: list[list[str]] = [] | |
| h, w = table_image.shape[:2] | |
| for row in rows: | |
| row_texts: list[str] = [] | |
| for cell_bbox in row: | |
| pad = self.cell_padding | |
| x1 = max(0, cell_bbox.x - pad) | |
| y1 = max(0, cell_bbox.y - pad) | |
| x2 = min(w, cell_bbox.x + cell_bbox.width + pad) | |
| y2 = min(h, cell_bbox.y + cell_bbox.height + pad) | |
| cell_image = table_image[y1:y2, x1:x2] | |
| try: | |
| tokens = self.ocr_engine.recognize(cell_image) | |
| text = " ".join(t.text for t in tokens).strip() | |
| except Exception as exc: | |
| logger.debug(f"Cell OCR failed: {exc}") | |
| text = "" | |
| row_texts.append(text) | |
| result.append(row_texts) | |
| return result | |
| def _merge_close_values( | |
| values: list[int], | |
| threshold: int = 5, | |
| ) -> list[int]: | |
| """دمج القيم المتقاربة / Merge values that are close to each other.""" | |
| if not values: | |
| return [] | |
| merged: list[int] = [values[0]] | |
| for val in values[1:]: | |
| if val - merged[-1] <= threshold: | |
| merged[-1] = (merged[-1] + val) // 2 | |
| else: | |
| merged.append(val) | |
| return merged | |