Spaces:
Runtime error
Runtime error
| import asyncio | |
| import fitz | |
| import re | |
| import numpy as np | |
| from typing import List, Dict, Any, Optional | |
| from pdf2image import convert_from_path | |
| from src.config.config import settings | |
| from src.models.account_models import LineData, WordData | |
| from doctr.io import DocumentFile | |
| class TextExtractor: | |
| """Async text extractor for extracting text with bounding boxes.""" | |
| def __init__(self, doctr_model): | |
| self.doctr_model = doctr_model | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| def normalize_bbox(self, bbox, width: float, height: float) -> List[float]: | |
| """Normalize bounding box (x0, y0, x1, y1) to range [0, 1].""" | |
| x0, y0, x1, y1 = bbox | |
| return [ | |
| round(x0 / width, 6), | |
| round(y0 / height, 6), | |
| round(x1 / width, 6), | |
| round(y1 / height, 6), | |
| ] | |
| def remove_consecutive_items(self, line: List[str]) -> List[str]: | |
| """Remove consecutive duplicate items from a list.""" | |
| if not line: | |
| return line | |
| result = [line[0]] | |
| for item in line[1:]: | |
| if item != result[-1]: | |
| result.append(item) | |
| return result | |
| def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]: | |
| """Remove consecutive duplicate words from word data.""" | |
| if not word_data: | |
| return word_data | |
| result = [word_data[0]] | |
| for i in range(1, len(word_data)): | |
| if word_data[i]["word"] != result[-1]["word"]: | |
| result.append(word_data[i]) | |
| return result | |
| async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0) -> List[List[LineData]]: | |
| """Extract lines with bounding boxes from digital PDF.""" | |
| def _extract_lines(): | |
| doc = fitz.open(pdf_path) | |
| page_lines_with_bbox = [] | |
| for page in doc: | |
| words = page.get_text("words") # (x0, y0, x1, y1, word, block_no, line_no, word_no) | |
| words.sort(key=lambda w: (round(w[1], 1), w[0])) # sort by y then x | |
| lines = [] | |
| current_line = [] | |
| current_y = None | |
| current_word_data = [] | |
| for w in words: | |
| x0, y0, x1, y1, word = w[:5] | |
| if word == "|" or not word or word == "." or word == "#" or re.sub(r'[^\w\s]', '', word) == "": | |
| continue | |
| word = word.lower() | |
| word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)} | |
| if current_y is None or abs(y0 - current_y) < y_threshold: | |
| current_line.append((x0, y0, word)) | |
| current_y = y0 | |
| current_word_data.append(word_data) | |
| else: | |
| current_line.sort() | |
| line_words = [w[2] for w in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) | |
| clean_word_data = self.remove_consecutive_words(current_word_data) | |
| if clean_line: | |
| x_start = min([w[0] for w in current_line]) | |
| y_start = min([w[1] for w in current_line]) | |
| lines.append({ | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| }) | |
| current_line = [(x0, y0, word)] | |
| current_y = y0 | |
| current_word_data = [word_data] | |
| # Process remaining line | |
| if current_line: | |
| current_line.sort() | |
| line_words = [w[2] for w in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) | |
| clean_word_data = self.remove_consecutive_words(current_word_data) | |
| if clean_line: | |
| x_start = min([w[0] for w in current_line]) | |
| y_start = min([w[1] for w in current_line]) | |
| lines.append({ | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| }) | |
| page_lines_with_bbox.append(lines) | |
| return page_lines_with_bbox | |
| return await asyncio.get_event_loop().run_in_executor(None, _extract_lines) | |
| async def extract_lines_with_bbox_from_scanned_pdf(self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False) -> List[List[LineData]]: | |
| """Extract lines with bounding boxes from scanned PDF using OCR.""" | |
| def _extract_from_scanned(): | |
| result = None | |
| doc = None | |
| if first_page: | |
| pages = convert_from_path(pdf_path, dpi=settings.dpi, first_page=1, last_page=1) | |
| first_page_img = pages[0].convert("RGB") | |
| result = self.doctr_model([np.array(first_page_img)]) | |
| doc = np.array(first_page_img) | |
| else: | |
| doc = DocumentFile.from_pdf(pdf_path) | |
| result = self.doctr_model(doc) | |
| page_lines_with_bbox = [] | |
| for page in result.pages: | |
| img_width, img_height = doc[0].shape[1], doc[0].shape[0] | |
| words = [] | |
| for block in page.blocks: | |
| for line in block.lines: | |
| for word in line.words: | |
| x0, y0 = word.geometry[0] | |
| x1, y1 = word.geometry[1] | |
| abs_x0 = x0 * img_width | |
| abs_y0 = y0 * img_height | |
| abs_x1 = x1 * img_width | |
| abs_y1 = y1 * img_height | |
| text = word.value.strip().lower() | |
| text = re.sub(r'[#*]', ' ', text) | |
| text = text.strip() | |
| if text == "|" or not text or text == "." or text == "#" or re.sub(r'[^\w\s]', '', text) == "": | |
| continue | |
| words.append({"word": text, "bbox": [abs_x0, abs_y0, abs_x1, abs_y1]}) | |
| # Sort words by y then x | |
| words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0])) | |
| lines = [] | |
| current_line = [] | |
| current_word_data = [] | |
| current_y = None | |
| for w in words: | |
| y0 = w["bbox"][1] | |
| if current_y is None or abs(y0 - current_y) < y_threshold: | |
| current_line.append((w["bbox"][0], y0, w["word"])) | |
| current_word_data.append(w) | |
| current_y = y0 | |
| else: | |
| current_line.sort() | |
| line_words = [x[2] for x in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) | |
| clean_word_data = self.remove_consecutive_words(current_word_data) | |
| if clean_line: | |
| x_start = min(x[0] for x in current_line) | |
| y_start = min(x[1] for x in current_line) | |
| lines.append({ | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| }) | |
| current_line = [(w["bbox"][0], y0, w["word"])] | |
| current_word_data = [w] | |
| current_y = y0 | |
| # Final remaining line | |
| if current_line: | |
| current_line.sort() | |
| line_words = [x[2] for x in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) | |
| clean_word_data = self.remove_consecutive_words(current_word_data) | |
| if clean_line: | |
| x_start = min(x[0] for x in current_line) | |
| y_start = min(x[1] for x in current_line) | |
| lines.append({ | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| }) | |
| page_lines_with_bbox.append(lines) | |
| return page_lines_with_bbox | |
| return await asyncio.get_event_loop().run_in_executor(None, _extract_from_scanned) |