import asyncio import math import multiprocessing import re from collections import Counter from concurrent.futures import ThreadPoolExecutor from typing import Dict, List import fitz import numpy as np from pdf2image import convert_from_path class TextExtractor: """Async text extractor for extracting text with bounding boxes.""" def __init__(self, doctr_model): self.doctr_model = doctr_model self.noise_pattern = [ r"\b[A-Z]{6,}\b", r"[\[\]\\\^\@\#\$\%\&\*]{2,}", r"(\d)\1{5,}", r"\b(?=[A-Za-z]*\d)(?=\d*[A-Za-z])[A-Za-z\d]{8,}\b", ] async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass def normalize_bbox(self, bbox, width: float, height: float) -> List[float]: """Normalize bounding box (x0, y0, x1, y1) to range [0, 1].""" x0, y0, x1, y1 = bbox return [ round(x0 / width, 6), round(y0 / height, 6), round(x1 / width, 6), round(y1 / height, 6), ] def remove_consecutive_items(self, line: List[str]) -> List[str]: """Remove consecutive duplicate items from a list.""" if not line: return line result = [line[0]] for item in line[1:]: if item != result[-1]: result.append(item) return result def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]: """Remove consecutive duplicate words from word data.""" if not word_data: return word_data result = [word_data[0]] for i in range(1, len(word_data)): if word_data[i]["word"] != result[-1]["word"]: result.append(word_data[i]) return result def shannon_entropy(self, text: str) -> float: if not text: return 0.0 counts = Counter(text) length = len(text) return -sum( (count / length) * math.log2(count / length) for count in counts.values() ) def reconstruct_line_from_bboxes(self, words, space_unit=5): """ Reconstructs a line with appropriate spacing based on word bounding boxes. Parameters: - words: list of dicts with 'word' and 'bbox' (bbox = [x0, y0, x1, y1]) - space_unit: how many pixels roughly correspond to one space Returns: - str: reconstructed line with spaces """ # Sort words by x-coordinate (left to right) words = sorted(words, key=lambda w: w["bbox"][0]) line = "" prev_end_x = 0 for word_info in words: word = word_info["word"] start_x = word_info["bbox"][0] if prev_end_x is not None: # Calculate gap between previous word and current word gap = max(0, start_x - prev_end_x) num_spaces = int(round(gap / space_unit)) line += " " * num_spaces line += word prev_end_x = word_info["bbox"][2] # x1 of current word return line def is_text_noisy(self, text: str) -> bool: """Check if text is noisy (contains special characters).""" total_chars = len(text) if total_chars < 50: # skip empty or small pages return True tokens = re.findall(r"\b\w+\b", text) total_words = len(tokens) # Symbol & digit density digit_count = len(re.findall(r"\d", text)) symbol_count = len( re.findall(r"[^\w\s]", text) ) # anything not a word char or whitespace symbol_density = symbol_count / total_chars digit_density = digit_count / total_chars # Repeating char patterns like "22222222222" or "!!!!!!" long_repeats = len(re.findall(r"(.)\1{5,}", text)) # any char repeated 6+ times # Entropy: randomness of characters entropy = self.shannon_entropy(text) # Heuristics tuned for your sample if ( entropy > 4.0 and symbol_density > 0.15 and digit_density > 0.15 and long_repeats > 1 and total_words > 30 ): return True return False async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0): """Extract lines with bounding boxes from digital PDF.""" def _extract_lines(): doc = fitz.open(pdf_path) page_lines_with_bbox = [] for page in doc: words = page.get_text( "words" ) # (x0, y0, x1, y1, word, block_no, line_no, word_no) words.sort(key=lambda w: (round(w[1], 1), w[0])) # sort by y then x lines = [] current_line = [] current_y = None current_word_data = [] for w in words: x0, y0, x1, y1, word = w[:5] if ( word == "|" or not word or word == "." or word == "#" or re.sub(r"[^\w\s-]", "", word) == "" or re.sub(r"\d{19,}", "", word) == "" ): continue word = word.lower() word = word.replace("$", "") word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)} if current_y is None or abs(y0 - current_y) < y_threshold: current_line.append((x0, y0, word)) current_y = y0 current_word_data.append(word_data) else: current_line.sort() line_words = [w[2] for w in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words( current_word_data ) if clean_line: x_start = min([w[0] for w in current_line]) y_start = min([w[1] for w in current_line]) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) current_line = [(x0, y0, word)] current_y = y0 current_word_data = [word_data] # Process remaining line if current_line: current_line.sort() line_words = [w[2] for w in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words(current_word_data) if clean_line: x_start = min([w[0] for w in current_line]) y_start = min([w[1] for w in current_line]) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) page_lines_with_bbox.append(lines) return page_lines_with_bbox return await asyncio.get_event_loop().run_in_executor(None, _extract_lines) def create_page_chunks(self, num_pages: int, cpu_core: int): final_ranges = [] page_per_cpu = 2 for i in range(1, num_pages + 1, page_per_cpu + 1): final_ranges.append([i, min(i + page_per_cpu, num_pages)]) return final_ranges def process_page_parallel_async( self, pdf_path: str, page_range: List[int], instance ): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete( self.process_pages_concurrently(pdf_path, page_range) ) finally: loop.close() async def process_pages_concurrently(self, pdf_path: str, page_range: List[int]): start_page = page_range[0] end_page = page_range[1] tasks = [] for page in range(start_page, end_page + 1): tasks.append(self.process_page_parallel(pdf_path, page)) page_results = await asyncio.gather(*tasks) page_results.sort(key=lambda x: x[0]) chunk_outputs = [output for page_num, output in page_results] return page_range, chunk_outputs async def process_page_parallel(self, pdf_path: str, i: int): print(f"Processing page {i}") pages = convert_from_path(pdf_path, dpi=300, first_page=i, last_page=i) page_imgs = [page.convert("RGB") for page in pages] output = self.doctr_model([np.array(img) for img in page_imgs]) return i, output async def extract_lines_with_bbox_from_scanned_pdf( self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False ): """Extract lines with bounding boxes from scanned PDF using OCR.""" def _extract_from_scanned(): result = None doc = None if first_page: number_of_pages = fitz.open(pdf_path).page_count if number_of_pages < 3: pages = convert_from_path( pdf_path, dpi=300, first_page=1, last_page=number_of_pages ) else: pages = convert_from_path( pdf_path, dpi=300, first_page=1, last_page=3 ) first_page_img = [page.convert("RGB") for page in pages] result = self.doctr_model([np.array(img) for img in first_page_img]) doc = [np.array(img) for img in first_page_img] else: pdf = fitz.open(pdf_path) num_pages = pdf.page_count page_witdh_f = pdf[0].rect.width page_height_f = pdf[0].rect.height page_chunks = self.create_page_chunks( num_pages, multiprocessing.cpu_count() ) with ThreadPoolExecutor( max_workers=multiprocessing.cpu_count() ) as executor: futures = [] for chunk in page_chunks: futures.append( executor.submit( self.process_page_parallel_async, pdf_path, chunk, self ) ) results = [f.result() for f in futures] results.sort(key=lambda x: x[0][0]) result = [] for r in results: result.extend(r[1]) results = result page_lines_with_bbox = [] for result in results: for page in result.pages: if first_page: img_width, img_height = doc[0].shape[1], doc[0].shape[0] else: img_width, img_height = page_witdh_f, page_height_f words = [] for block in page.blocks: for line in block.lines: for word in line.words: x0, y0 = word.geometry[0] x1, y1 = word.geometry[1] abs_x0 = x0 * img_width abs_y0 = y0 * img_height abs_x1 = x1 * img_width abs_y1 = y1 * img_height text = word.value.strip().lower() text = re.sub(r"[#*]", " ", text) text = re.sub(f"[$]", "", text) text = text.strip() if ( text == "|" or not text or text == "." or text == "#" or re.sub(r"[^\w\s-]", "", text) == "" or re.sub(r"\d{19,}", "", text) == "" ): continue words.append( { "word": text, "bbox": [abs_x0, abs_y0, abs_x1, abs_y1], } ) # Sort words by y then x words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0])) lines = [] current_line = [] current_word_data = [] current_y = None for w in words: y0 = w["bbox"][1] if current_y is None or abs(y0 - current_y) < y_threshold: current_line.append((w["bbox"][0], y0, w["word"])) current_word_data.append(w) current_y = y0 else: current_line.sort() line_words = [x[2] for x in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words( current_word_data ) if clean_line: x_start = min(x[0] for x in current_line) y_start = min(x[1] for x in current_line) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) current_line = [(w["bbox"][0], y0, w["word"])] current_word_data = [w] current_y = y0 # Final remaining line if current_line: current_line.sort() line_words = [x[2] for x in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words(current_word_data) if clean_line: x_start = min(x[0] for x in current_line) y_start = min(x[1] for x in current_line) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) page_lines_with_bbox.append(lines) return page_lines_with_bbox return await asyncio.get_event_loop().run_in_executor( None, _extract_from_scanned )