import asyncio import fitz import re import numpy as np from typing import List, Dict, Any, Optional from pdf2image import convert_from_path from src.config.config import settings from src.models.account_models import LineData, WordData from doctr.io import DocumentFile class TextExtractor: """Async text extractor for extracting text with bounding boxes.""" def __init__(self, doctr_model): self.doctr_model = doctr_model async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass def normalize_bbox(self, bbox, width: float, height: float) -> List[float]: """Normalize bounding box (x0, y0, x1, y1) to range [0, 1].""" x0, y0, x1, y1 = bbox return [ round(x0 / width, 6), round(y0 / height, 6), round(x1 / width, 6), round(y1 / height, 6), ] def remove_consecutive_items(self, line: List[str]) -> List[str]: """Remove consecutive duplicate items from a list.""" if not line: return line result = [line[0]] for item in line[1:]: if item != result[-1]: result.append(item) return result def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]: """Remove consecutive duplicate words from word data.""" if not word_data: return word_data result = [word_data[0]] for i in range(1, len(word_data)): if word_data[i]["word"] != result[-1]["word"]: result.append(word_data[i]) return result async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0) -> List[List[LineData]]: """Extract lines with bounding boxes from digital PDF.""" def _extract_lines(): doc = fitz.open(pdf_path) page_lines_with_bbox = [] for page in doc: words = page.get_text("words") # (x0, y0, x1, y1, word, block_no, line_no, word_no) words.sort(key=lambda w: (round(w[1], 1), w[0])) # sort by y then x lines = [] current_line = [] current_y = None current_word_data = [] for w in words: x0, y0, x1, y1, word = w[:5] if word == "|" or not word or word == "." or word == "#" or re.sub(r'[^\w\s]', '', word) == "": continue word = word.lower() word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)} if current_y is None or abs(y0 - current_y) < y_threshold: current_line.append((x0, y0, word)) current_y = y0 current_word_data.append(word_data) else: current_line.sort() line_words = [w[2] for w in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) clean_word_data = self.remove_consecutive_words(current_word_data) if clean_line: x_start = min([w[0] for w in current_line]) y_start = min([w[1] for w in current_line]) lines.append({ "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, }) current_line = [(x0, y0, word)] current_y = y0 current_word_data = [word_data] # Process remaining line if current_line: current_line.sort() line_words = [w[2] for w in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) clean_word_data = self.remove_consecutive_words(current_word_data) if clean_line: x_start = min([w[0] for w in current_line]) y_start = min([w[1] for w in current_line]) lines.append({ "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, }) page_lines_with_bbox.append(lines) return page_lines_with_bbox return await asyncio.get_event_loop().run_in_executor(None, _extract_lines) async def extract_lines_with_bbox_from_scanned_pdf(self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False) -> List[List[LineData]]: """Extract lines with bounding boxes from scanned PDF using OCR.""" def _extract_from_scanned(): result = None doc = None if first_page: pages = convert_from_path(pdf_path, dpi=settings.dpi, first_page=1, last_page=1) first_page_img = pages[0].convert("RGB") result = self.doctr_model([np.array(first_page_img)]) doc = np.array(first_page_img) else: doc = DocumentFile.from_pdf(pdf_path) result = self.doctr_model(doc) page_lines_with_bbox = [] for page in result.pages: img_width, img_height = doc[0].shape[1], doc[0].shape[0] words = [] for block in page.blocks: for line in block.lines: for word in line.words: x0, y0 = word.geometry[0] x1, y1 = word.geometry[1] abs_x0 = x0 * img_width abs_y0 = y0 * img_height abs_x1 = x1 * img_width abs_y1 = y1 * img_height text = word.value.strip().lower() text = re.sub(r'[#*]', ' ', text) text = text.strip() if text == "|" or not text or text == "." or text == "#" or re.sub(r'[^\w\s]', '', text) == "": continue words.append({"word": text, "bbox": [abs_x0, abs_y0, abs_x1, abs_y1]}) # Sort words by y then x words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0])) lines = [] current_line = [] current_word_data = [] current_y = None for w in words: y0 = w["bbox"][1] if current_y is None or abs(y0 - current_y) < y_threshold: current_line.append((w["bbox"][0], y0, w["word"])) current_word_data.append(w) current_y = y0 else: current_line.sort() line_words = [x[2] for x in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) clean_word_data = self.remove_consecutive_words(current_word_data) if clean_line: x_start = min(x[0] for x in current_line) y_start = min(x[1] for x in current_line) lines.append({ "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, }) current_line = [(w["bbox"][0], y0, w["word"])] current_word_data = [w] current_y = y0 # Final remaining line if current_line: current_line.sort() line_words = [x[2] for x in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0]) clean_word_data = self.remove_consecutive_words(current_word_data) if clean_line: x_start = min(x[0] for x in current_line) y_start = min(x[1] for x in current_line) lines.append({ "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, }) page_lines_with_bbox.append(lines) return page_lines_with_bbox return await asyncio.get_event_loop().run_in_executor(None, _extract_from_scanned)