import asyncio import math import multiprocessing import re from collections import Counter from concurrent.futures import ThreadPoolExecutor from typing import Dict, List import fitz import numpy as np from loguru import logger from pdf2image import convert_from_path class TextExtractor: def __init__(self, doctr_model): logger.info("Initializing TextExtractor") self.doctr_model = doctr_model self.noise_pattern = [ r"\b[A-Z]{6,}\b", r"[\[\]\\\^\@\#\$\%\&\*]{2,}", r"(\d)\1{5,}", r"\b(?=[A-Za-z]*\d)(?=\d*[A-Za-z])[A-Za-z\d]{8,}\b", ] logger.debug(f"Initialized with {len(self.noise_pattern)} noise patterns") async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass def normalize_bbox(self, bbox, width: float, height: float) -> List[float]: x0, y0, x1, y1 = bbox normalized = [ round(x0 / width, 6), round(y0 / height, 6), round(x1 / width, 6), round(y1 / height, 6), ] logger.debug(f"Normalized bbox from {bbox} to {normalized}") return normalized def remove_consecutive_items(self, line: List[str]) -> List[str]: if not line: return line result = [line[0]] for item in line[1:]: if item != result[-1]: result.append(item) logger.debug(f"Removed consecutive items: {len(line)} -> {len(result)} items") return result def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]: if not word_data: return word_data result = [word_data[0]] for i in range(1, len(word_data)): if word_data[i]["word"] != result[-1]["word"]: result.append(word_data[i]) logger.debug( f"Removed consecutive words: {len(word_data)} -> {len(result)} words" ) return result def shannon_entropy(self, text: str) -> float: if not text: return 0.0 counts = Counter(text) length = len(text) return -sum( (count / length) * math.log2(count / length) for count in counts.values() ) def reconstruct_line_from_bboxes(self, words, space_unit=5): logger.debug( f"Reconstructing line from {len(words)} words with space_unit={space_unit}" ) words = sorted(words, key=lambda w: w["bbox"][0]) line = "" prev_end_x = 0 for word_info in words: word = word_info["word"] start_x = word_info["bbox"][0] if prev_end_x is not None: gap = max(0, start_x - prev_end_x) num_spaces = int(round(gap / space_unit)) line += " " * num_spaces line += word prev_end_x = word_info["bbox"][2] logger.debug(f"Reconstructed line: '{line[:100]}...'") return line def is_text_noisy(self, text: str) -> bool: logger.debug(f"Checking if text is noisy: {len(text)} characters") total_chars = len(text) if total_chars < 50: logger.debug("Text too short, marking as noisy") return True tokens = re.findall(r"\b\w+\b", text) total_words = len(tokens) digit_count = len(re.findall(r"\d", text)) symbol_count = len(re.findall(r"[^\w\s]", text)) symbol_density = symbol_count / total_chars digit_density = digit_count / total_chars long_repeats = len(re.findall(r"(.)\1{5,}", text)) entropy = self.shannon_entropy(text) is_noisy = ( entropy > 4.0 and symbol_density > 0.15 and digit_density > 0.15 and long_repeats > 1 and total_words > 30 ) logger.debug( f"Noise analysis - entropy: {entropy:.2f}, symbol_density: {symbol_density:.2f}, " f"digit_density: {digit_density:.2f}, long_repeats: {long_repeats}, " f"total_words: {total_words}, is_noisy: {is_noisy}" ) return is_noisy async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0): logger.info(f"Extracting lines with bbox from digital PDF: {pdf_path}") def _extract_lines(): try: doc = fitz.open(pdf_path) page_lines_with_bbox = [] for page_num, page in enumerate(doc): logger.debug(f"Processing page {page_num + 1}") words = page.get_text("words") words.sort(key=lambda w: (round(w[1], 1), w[0])) lines = [] current_line = [] current_y = None current_word_data = [] for w in words: x0, y0, x1, y1, word = w[:5] if ( word == "|" or not word or word == "." or word == "#" or re.sub(r"[^\w\s-]", "", word) == "" or re.sub(r"\d{19,}", "", word) == "" ): continue word = word.lower() word = word.replace("$", "") word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)} if current_y is None or abs(y0 - current_y) < y_threshold: current_line.append((x0, y0, word)) current_y = y0 current_word_data.append(word_data) else: current_line.sort() line_words = [w[2] for w in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words( current_word_data ) if clean_line: x_start = min([w[0] for w in current_line]) y_start = min([w[1] for w in current_line]) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) current_line = [(x0, y0, word)] current_y = y0 current_word_data = [word_data] if current_line: current_line.sort() line_words = [w[2] for w in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words( current_word_data ) if clean_line: x_start = min([w[0] for w in current_line]) y_start = min([w[1] for w in current_line]) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) logger.debug(f"Page {page_num + 1}: extracted {len(lines)} lines") page_lines_with_bbox.append(lines) logger.info( f"Successfully extracted lines from {len(page_lines_with_bbox)} pages" ) return page_lines_with_bbox except Exception as e: logger.error(f"Error extracting lines from digital PDF: {e}") raise return await asyncio.get_event_loop().run_in_executor(None, _extract_lines) def create_page_chunks(self, num_pages: int, cpu_core: int): logger.debug( f"Creating page chunks for {num_pages} pages using {cpu_core} CPU cores" ) final_ranges = [] page_per_cpu = 2 for i in range(1, num_pages + 1, page_per_cpu + 1): final_ranges.append([i, min(i + page_per_cpu, num_pages)]) logger.debug(f"Created {len(final_ranges)} page chunks: {final_ranges}") return final_ranges def process_page_parallel_async( self, pdf_path: str, page_range: List[int], instance ): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete( self.process_pages_concurrently(pdf_path, page_range) ) finally: loop.close() async def process_pages_concurrently(self, pdf_path: str, page_range: List[int]): start_page = page_range[0] end_page = page_range[1] logger.debug(f"Processing pages {start_page}-{end_page} concurrently") tasks = [] for page in range(start_page, end_page + 1): tasks.append(self.process_page_parallel(pdf_path, page)) page_results = await asyncio.gather(*tasks) page_results.sort(key=lambda x: x[0]) chunk_outputs = [output for page_num, output in page_results] logger.debug(f"Completed processing pages {start_page}-{end_page}") return page_range, chunk_outputs async def process_page_parallel(self, pdf_path: str, i: int): logger.debug(f"Processing page {i}") try: pages = convert_from_path(pdf_path, dpi=300, first_page=i, last_page=i) page_imgs = [page.convert("RGB") for page in pages] output = self.doctr_model([np.array(img) for img in page_imgs]) logger.debug(f"Successfully processed page {i}") return i, output except Exception as e: logger.error(f"Error processing page {i}: {e}") raise async def extract_lines_with_bbox_from_scanned_pdf( self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False ): logger.info( f"Extracting lines from scanned PDF: {pdf_path} (first_page: {first_page})" ) def _extract_from_scanned(): try: result = None doc = None if first_page: number_of_pages = fitz.open(pdf_path).page_count logger.debug( f"Processing first page(s) only, total pages: {number_of_pages}" ) if number_of_pages < 3: pages = convert_from_path( pdf_path, dpi=300, first_page=1, last_page=number_of_pages ) else: pages = convert_from_path( pdf_path, dpi=300, first_page=1, last_page=3 ) first_page_img = [page.convert("RGB") for page in pages] result = self.doctr_model([np.array(img) for img in first_page_img]) doc = [np.array(img) for img in first_page_img] else: logger.debug("Processing all pages using parallel processing") pdf = fitz.open(pdf_path) num_pages = pdf.page_count page_witdh_f = pdf[0].rect.width page_height_f = pdf[0].rect.height page_chunks = self.create_page_chunks( num_pages, multiprocessing.cpu_count() ) logger.info( f"Processing {num_pages} pages using {multiprocessing.cpu_count()} CPU cores" ) with ThreadPoolExecutor( max_workers=multiprocessing.cpu_count() ) as executor: futures = [] for chunk in page_chunks: futures.append( executor.submit( self.process_page_parallel_async, pdf_path, chunk, self, ) ) results = [f.result() for f in futures] results.sort(key=lambda x: x[0][0]) result = [] for r in results: result.extend(r[1]) results = result page_lines_with_bbox = [] for result_idx, result in enumerate(results): logger.debug( f"Processing OCR result {result_idx + 1}/{len(results)}" ) for page in result.pages: if first_page: img_width, img_height = doc[0].shape[1], doc[0].shape[0] else: img_width, img_height = page_witdh_f, page_height_f words = [] for block in page.blocks: for line in block.lines: for word in line.words: x0, y0 = word.geometry[0] x1, y1 = word.geometry[1] abs_x0 = x0 * img_width abs_y0 = y0 * img_height abs_x1 = x1 * img_width abs_y1 = y1 * img_height text = word.value.strip().lower() text = re.sub(r"[#*]", " ", text) text = re.sub(f"[$]", "", text) text = text.strip() if ( text == "|" or not text or text == "." or text == "#" or re.sub(r"[^\w\s-]", "", text) == "" or re.sub(r"\d{19,}", "", text) == "" ): continue words.append( { "word": text, "bbox": [abs_x0, abs_y0, abs_x1, abs_y1], } ) words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0])) lines = [] current_line = [] current_word_data = [] current_y = None for w in words: y0 = w["bbox"][1] if current_y is None or abs(y0 - current_y) < y_threshold: current_line.append((w["bbox"][0], y0, w["word"])) current_word_data.append(w) current_y = y0 else: current_line.sort() line_words = [x[2] for x in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words( current_word_data ) if clean_line: x_start = min(x[0] for x in current_line) y_start = min(x[1] for x in current_line) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) current_line = [(w["bbox"][0], y0, w["word"])] current_word_data = [w] current_y = y0 if current_line: current_line.sort() line_words = [x[2] for x in current_line] clean_line = self.remove_consecutive_items(line_words) current_word_data = sorted( current_word_data, key=lambda w: w["bbox"][0] ) clean_word_data = self.remove_consecutive_words( current_word_data ) if clean_line: x_start = min(x[0] for x in current_line) y_start = min(x[1] for x in current_line) if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": lines.append( { "line": " ".join(clean_line), "bbox": [x_start, y_start], "words": clean_word_data, } ) page_lines_with_bbox.append(lines) logger.info( f"Successfully extracted lines from {len(page_lines_with_bbox)} scanned pages" ) return page_lines_with_bbox except Exception as e: logger.error(f"Error extracting lines from scanned PDF: {e}") raise return await asyncio.get_event_loop().run_in_executor( None, _extract_from_scanned )