Spaces:
Running
Running
| import asyncio | |
| import math | |
| import multiprocessing | |
| import re | |
| from collections import Counter | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Dict, List | |
| import fitz | |
| import numpy as np | |
| from loguru import logger | |
| from pdf2image import convert_from_path | |
| class TextExtractor: | |
| def __init__(self, doctr_model): | |
| logger.info("Initializing TextExtractor") | |
| self.doctr_model = doctr_model | |
| self.noise_pattern = [ | |
| r"\b[A-Z]{6,}\b", | |
| r"[\[\]\\\^\@\#\$\%\&\*]{2,}", | |
| r"(\d)\1{5,}", | |
| r"\b(?=[A-Za-z]*\d)(?=\d*[A-Za-z])[A-Za-z\d]{8,}\b", | |
| ] | |
| logger.debug(f"Initialized with {len(self.noise_pattern)} noise patterns") | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| def normalize_bbox(self, bbox, width: float, height: float) -> List[float]: | |
| x0, y0, x1, y1 = bbox | |
| normalized = [ | |
| round(x0 / width, 6), | |
| round(y0 / height, 6), | |
| round(x1 / width, 6), | |
| round(y1 / height, 6), | |
| ] | |
| logger.debug(f"Normalized bbox from {bbox} to {normalized}") | |
| return normalized | |
| def remove_consecutive_items(self, line: List[str]) -> List[str]: | |
| if not line: | |
| return line | |
| result = [line[0]] | |
| for item in line[1:]: | |
| if item != result[-1]: | |
| result.append(item) | |
| logger.debug(f"Removed consecutive items: {len(line)} -> {len(result)} items") | |
| return result | |
| def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]: | |
| if not word_data: | |
| return word_data | |
| result = [word_data[0]] | |
| for i in range(1, len(word_data)): | |
| if word_data[i]["word"] != result[-1]["word"]: | |
| result.append(word_data[i]) | |
| logger.debug( | |
| f"Removed consecutive words: {len(word_data)} -> {len(result)} words" | |
| ) | |
| return result | |
| def shannon_entropy(self, text: str) -> float: | |
| if not text: | |
| return 0.0 | |
| counts = Counter(text) | |
| length = len(text) | |
| return -sum( | |
| (count / length) * math.log2(count / length) for count in counts.values() | |
| ) | |
| def reconstruct_line_from_bboxes(self, words, space_unit=5): | |
| logger.debug( | |
| f"Reconstructing line from {len(words)} words with space_unit={space_unit}" | |
| ) | |
| words = sorted(words, key=lambda w: w["bbox"][0]) | |
| line = "" | |
| prev_end_x = 0 | |
| for word_info in words: | |
| word = word_info["word"] | |
| start_x = word_info["bbox"][0] | |
| if prev_end_x is not None: | |
| gap = max(0, start_x - prev_end_x) | |
| num_spaces = int(round(gap / space_unit)) | |
| line += " " * num_spaces | |
| line += word | |
| prev_end_x = word_info["bbox"][2] | |
| logger.debug(f"Reconstructed line: '{line[:100]}...'") | |
| return line | |
| def is_text_noisy(self, text: str) -> bool: | |
| logger.debug(f"Checking if text is noisy: {len(text)} characters") | |
| total_chars = len(text) | |
| if total_chars < 50: | |
| logger.debug("Text too short, marking as noisy") | |
| return True | |
| tokens = re.findall(r"\b\w+\b", text) | |
| total_words = len(tokens) | |
| digit_count = len(re.findall(r"\d", text)) | |
| symbol_count = len(re.findall(r"[^\w\s]", text)) | |
| symbol_density = symbol_count / total_chars | |
| digit_density = digit_count / total_chars | |
| long_repeats = len(re.findall(r"(.)\1{5,}", text)) | |
| entropy = self.shannon_entropy(text) | |
| is_noisy = ( | |
| entropy > 4.0 | |
| and symbol_density > 0.15 | |
| and digit_density > 0.15 | |
| and long_repeats > 1 | |
| and total_words > 30 | |
| ) | |
| logger.debug( | |
| f"Noise analysis - entropy: {entropy:.2f}, symbol_density: {symbol_density:.2f}, " | |
| f"digit_density: {digit_density:.2f}, long_repeats: {long_repeats}, " | |
| f"total_words: {total_words}, is_noisy: {is_noisy}" | |
| ) | |
| return is_noisy | |
| async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0): | |
| logger.info(f"Extracting lines with bbox from digital PDF: {pdf_path}") | |
| def _extract_lines(): | |
| try: | |
| doc = fitz.open(pdf_path) | |
| page_lines_with_bbox = [] | |
| for page_num, page in enumerate(doc): | |
| logger.debug(f"Processing page {page_num + 1}") | |
| words = page.get_text("words") | |
| words.sort(key=lambda w: (round(w[1], 1), w[0])) | |
| lines = [] | |
| current_line = [] | |
| current_y = None | |
| current_word_data = [] | |
| for w in words: | |
| x0, y0, x1, y1, word = w[:5] | |
| if ( | |
| word == "|" | |
| or not word | |
| or word == "." | |
| or word == "#" | |
| or re.sub(r"[^\w\s-]", "", word) == "" | |
| or re.sub(r"\d{19,}", "", word) == "" | |
| ): | |
| continue | |
| word = word.lower() | |
| word = word.replace("$", "") | |
| word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)} | |
| if current_y is None or abs(y0 - current_y) < y_threshold: | |
| current_line.append((x0, y0, word)) | |
| current_y = y0 | |
| current_word_data.append(word_data) | |
| else: | |
| current_line.sort() | |
| line_words = [w[2] for w in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted( | |
| current_word_data, key=lambda w: w["bbox"][0] | |
| ) | |
| clean_word_data = self.remove_consecutive_words( | |
| current_word_data | |
| ) | |
| if clean_line: | |
| x_start = min([w[0] for w in current_line]) | |
| y_start = min([w[1] for w in current_line]) | |
| if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": | |
| lines.append( | |
| { | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| } | |
| ) | |
| current_line = [(x0, y0, word)] | |
| current_y = y0 | |
| current_word_data = [word_data] | |
| if current_line: | |
| current_line.sort() | |
| line_words = [w[2] for w in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted( | |
| current_word_data, key=lambda w: w["bbox"][0] | |
| ) | |
| clean_word_data = self.remove_consecutive_words( | |
| current_word_data | |
| ) | |
| if clean_line: | |
| x_start = min([w[0] for w in current_line]) | |
| y_start = min([w[1] for w in current_line]) | |
| if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": | |
| lines.append( | |
| { | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| } | |
| ) | |
| logger.debug(f"Page {page_num + 1}: extracted {len(lines)} lines") | |
| page_lines_with_bbox.append(lines) | |
| logger.info( | |
| f"Successfully extracted lines from {len(page_lines_with_bbox)} pages" | |
| ) | |
| return page_lines_with_bbox | |
| except Exception as e: | |
| logger.error(f"Error extracting lines from digital PDF: {e}") | |
| raise | |
| return await asyncio.get_event_loop().run_in_executor(None, _extract_lines) | |
| def create_page_chunks(self, num_pages: int, cpu_core: int): | |
| logger.debug( | |
| f"Creating page chunks for {num_pages} pages using {cpu_core} CPU cores" | |
| ) | |
| final_ranges = [] | |
| page_per_cpu = 2 | |
| for i in range(1, num_pages + 1, page_per_cpu + 1): | |
| final_ranges.append([i, min(i + page_per_cpu, num_pages)]) | |
| logger.debug(f"Created {len(final_ranges)} page chunks: {final_ranges}") | |
| return final_ranges | |
| def process_page_parallel_async( | |
| self, pdf_path: str, page_range: List[int], instance | |
| ): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| return loop.run_until_complete( | |
| self.process_pages_concurrently(pdf_path, page_range) | |
| ) | |
| finally: | |
| loop.close() | |
| async def process_pages_concurrently(self, pdf_path: str, page_range: List[int]): | |
| start_page = page_range[0] | |
| end_page = page_range[1] | |
| logger.debug(f"Processing pages {start_page}-{end_page} concurrently") | |
| tasks = [] | |
| for page in range(start_page, end_page + 1): | |
| tasks.append(self.process_page_parallel(pdf_path, page)) | |
| page_results = await asyncio.gather(*tasks) | |
| page_results.sort(key=lambda x: x[0]) | |
| chunk_outputs = [output for page_num, output in page_results] | |
| logger.debug(f"Completed processing pages {start_page}-{end_page}") | |
| return page_range, chunk_outputs | |
| async def process_page_parallel(self, pdf_path: str, i: int): | |
| logger.debug(f"Processing page {i}") | |
| try: | |
| pages = convert_from_path(pdf_path, dpi=300, first_page=i, last_page=i) | |
| page_imgs = [page.convert("RGB") for page in pages] | |
| output = self.doctr_model([np.array(img) for img in page_imgs]) | |
| logger.debug(f"Successfully processed page {i}") | |
| return i, output | |
| except Exception as e: | |
| logger.error(f"Error processing page {i}: {e}") | |
| raise | |
| async def extract_lines_with_bbox_from_scanned_pdf( | |
| self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False | |
| ): | |
| logger.info( | |
| f"Extracting lines from scanned PDF: {pdf_path} (first_page: {first_page})" | |
| ) | |
| def _extract_from_scanned(): | |
| try: | |
| result = None | |
| doc = None | |
| if first_page: | |
| number_of_pages = fitz.open(pdf_path).page_count | |
| logger.debug( | |
| f"Processing first page(s) only, total pages: {number_of_pages}" | |
| ) | |
| if number_of_pages < 3: | |
| pages = convert_from_path( | |
| pdf_path, dpi=300, first_page=1, last_page=number_of_pages | |
| ) | |
| else: | |
| pages = convert_from_path( | |
| pdf_path, dpi=300, first_page=1, last_page=3 | |
| ) | |
| first_page_img = [page.convert("RGB") for page in pages] | |
| result = self.doctr_model([np.array(img) for img in first_page_img]) | |
| doc = [np.array(img) for img in first_page_img] | |
| else: | |
| logger.debug("Processing all pages using parallel processing") | |
| pdf = fitz.open(pdf_path) | |
| num_pages = pdf.page_count | |
| page_witdh_f = pdf[0].rect.width | |
| page_height_f = pdf[0].rect.height | |
| page_chunks = self.create_page_chunks( | |
| num_pages, multiprocessing.cpu_count() | |
| ) | |
| logger.info( | |
| f"Processing {num_pages} pages using {multiprocessing.cpu_count()} CPU cores" | |
| ) | |
| with ThreadPoolExecutor( | |
| max_workers=multiprocessing.cpu_count() | |
| ) as executor: | |
| futures = [] | |
| for chunk in page_chunks: | |
| futures.append( | |
| executor.submit( | |
| self.process_page_parallel_async, | |
| pdf_path, | |
| chunk, | |
| self, | |
| ) | |
| ) | |
| results = [f.result() for f in futures] | |
| results.sort(key=lambda x: x[0][0]) | |
| result = [] | |
| for r in results: | |
| result.extend(r[1]) | |
| results = result | |
| page_lines_with_bbox = [] | |
| for result_idx, result in enumerate(results): | |
| logger.debug( | |
| f"Processing OCR result {result_idx + 1}/{len(results)}" | |
| ) | |
| for page in result.pages: | |
| if first_page: | |
| img_width, img_height = doc[0].shape[1], doc[0].shape[0] | |
| else: | |
| img_width, img_height = page_witdh_f, page_height_f | |
| words = [] | |
| for block in page.blocks: | |
| for line in block.lines: | |
| for word in line.words: | |
| x0, y0 = word.geometry[0] | |
| x1, y1 = word.geometry[1] | |
| abs_x0 = x0 * img_width | |
| abs_y0 = y0 * img_height | |
| abs_x1 = x1 * img_width | |
| abs_y1 = y1 * img_height | |
| text = word.value.strip().lower() | |
| text = re.sub(r"[#*]", " ", text) | |
| text = re.sub(f"[$]", "", text) | |
| text = text.strip() | |
| if ( | |
| text == "|" | |
| or not text | |
| or text == "." | |
| or text == "#" | |
| or re.sub(r"[^\w\s-]", "", text) == "" | |
| or re.sub(r"\d{19,}", "", text) == "" | |
| ): | |
| continue | |
| words.append( | |
| { | |
| "word": text, | |
| "bbox": [abs_x0, abs_y0, abs_x1, abs_y1], | |
| } | |
| ) | |
| words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0])) | |
| lines = [] | |
| current_line = [] | |
| current_word_data = [] | |
| current_y = None | |
| for w in words: | |
| y0 = w["bbox"][1] | |
| if current_y is None or abs(y0 - current_y) < y_threshold: | |
| current_line.append((w["bbox"][0], y0, w["word"])) | |
| current_word_data.append(w) | |
| current_y = y0 | |
| else: | |
| current_line.sort() | |
| line_words = [x[2] for x in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted( | |
| current_word_data, key=lambda w: w["bbox"][0] | |
| ) | |
| clean_word_data = self.remove_consecutive_words( | |
| current_word_data | |
| ) | |
| if clean_line: | |
| x_start = min(x[0] for x in current_line) | |
| y_start = min(x[1] for x in current_line) | |
| if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": | |
| lines.append( | |
| { | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| } | |
| ) | |
| current_line = [(w["bbox"][0], y0, w["word"])] | |
| current_word_data = [w] | |
| current_y = y0 | |
| if current_line: | |
| current_line.sort() | |
| line_words = [x[2] for x in current_line] | |
| clean_line = self.remove_consecutive_items(line_words) | |
| current_word_data = sorted( | |
| current_word_data, key=lambda w: w["bbox"][0] | |
| ) | |
| clean_word_data = self.remove_consecutive_words( | |
| current_word_data | |
| ) | |
| if clean_line: | |
| x_start = min(x[0] for x in current_line) | |
| y_start = min(x[1] for x in current_line) | |
| if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "": | |
| lines.append( | |
| { | |
| "line": " ".join(clean_line), | |
| "bbox": [x_start, y_start], | |
| "words": clean_word_data, | |
| } | |
| ) | |
| page_lines_with_bbox.append(lines) | |
| logger.info( | |
| f"Successfully extracted lines from {len(page_lines_with_bbox)} scanned pages" | |
| ) | |
| return page_lines_with_bbox | |
| except Exception as e: | |
| logger.error(f"Error extracting lines from scanned PDF: {e}") | |
| raise | |
| return await asyncio.get_event_loop().run_in_executor( | |
| None, _extract_from_scanned | |
| ) | |