ocr-engine-3 / src /utils /_text_extractor.py
kanha-upadhyay's picture
Enhance PDFProcessorService and TextExtractor with improved logging and error handling
2e2af5e
import asyncio
import math
import multiprocessing
import re
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List
import fitz
import numpy as np
from loguru import logger
from pdf2image import convert_from_path
class TextExtractor:
def __init__(self, doctr_model):
logger.info("Initializing TextExtractor")
self.doctr_model = doctr_model
self.noise_pattern = [
r"\b[A-Z]{6,}\b",
r"[\[\]\\\^\@\#\$\%\&\*]{2,}",
r"(\d)\1{5,}",
r"\b(?=[A-Za-z]*\d)(?=\d*[A-Za-z])[A-Za-z\d]{8,}\b",
]
logger.debug(f"Initialized with {len(self.noise_pattern)} noise patterns")
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def normalize_bbox(self, bbox, width: float, height: float) -> List[float]:
x0, y0, x1, y1 = bbox
normalized = [
round(x0 / width, 6),
round(y0 / height, 6),
round(x1 / width, 6),
round(y1 / height, 6),
]
logger.debug(f"Normalized bbox from {bbox} to {normalized}")
return normalized
def remove_consecutive_items(self, line: List[str]) -> List[str]:
if not line:
return line
result = [line[0]]
for item in line[1:]:
if item != result[-1]:
result.append(item)
logger.debug(f"Removed consecutive items: {len(line)} -> {len(result)} items")
return result
def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]:
if not word_data:
return word_data
result = [word_data[0]]
for i in range(1, len(word_data)):
if word_data[i]["word"] != result[-1]["word"]:
result.append(word_data[i])
logger.debug(
f"Removed consecutive words: {len(word_data)} -> {len(result)} words"
)
return result
def shannon_entropy(self, text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
length = len(text)
return -sum(
(count / length) * math.log2(count / length) for count in counts.values()
)
def reconstruct_line_from_bboxes(self, words, space_unit=5):
logger.debug(
f"Reconstructing line from {len(words)} words with space_unit={space_unit}"
)
words = sorted(words, key=lambda w: w["bbox"][0])
line = ""
prev_end_x = 0
for word_info in words:
word = word_info["word"]
start_x = word_info["bbox"][0]
if prev_end_x is not None:
gap = max(0, start_x - prev_end_x)
num_spaces = int(round(gap / space_unit))
line += " " * num_spaces
line += word
prev_end_x = word_info["bbox"][2]
logger.debug(f"Reconstructed line: '{line[:100]}...'")
return line
def is_text_noisy(self, text: str) -> bool:
logger.debug(f"Checking if text is noisy: {len(text)} characters")
total_chars = len(text)
if total_chars < 50:
logger.debug("Text too short, marking as noisy")
return True
tokens = re.findall(r"\b\w+\b", text)
total_words = len(tokens)
digit_count = len(re.findall(r"\d", text))
symbol_count = len(re.findall(r"[^\w\s]", text))
symbol_density = symbol_count / total_chars
digit_density = digit_count / total_chars
long_repeats = len(re.findall(r"(.)\1{5,}", text))
entropy = self.shannon_entropy(text)
is_noisy = (
entropy > 4.0
and symbol_density > 0.15
and digit_density > 0.15
and long_repeats > 1
and total_words > 30
)
logger.debug(
f"Noise analysis - entropy: {entropy:.2f}, symbol_density: {symbol_density:.2f}, "
f"digit_density: {digit_density:.2f}, long_repeats: {long_repeats}, "
f"total_words: {total_words}, is_noisy: {is_noisy}"
)
return is_noisy
async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0):
logger.info(f"Extracting lines with bbox from digital PDF: {pdf_path}")
def _extract_lines():
try:
doc = fitz.open(pdf_path)
page_lines_with_bbox = []
for page_num, page in enumerate(doc):
logger.debug(f"Processing page {page_num + 1}")
words = page.get_text("words")
words.sort(key=lambda w: (round(w[1], 1), w[0]))
lines = []
current_line = []
current_y = None
current_word_data = []
for w in words:
x0, y0, x1, y1, word = w[:5]
if (
word == "|"
or not word
or word == "."
or word == "#"
or re.sub(r"[^\w\s-]", "", word) == ""
or re.sub(r"\d{19,}", "", word) == ""
):
continue
word = word.lower()
word = word.replace("$", "")
word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)}
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((x0, y0, word))
current_y = y0
current_word_data.append(word_data)
else:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(
current_word_data
)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
current_line = [(x0, y0, word)]
current_y = y0
current_word_data = [word_data]
if current_line:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(
current_word_data
)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
logger.debug(f"Page {page_num + 1}: extracted {len(lines)} lines")
page_lines_with_bbox.append(lines)
logger.info(
f"Successfully extracted lines from {len(page_lines_with_bbox)} pages"
)
return page_lines_with_bbox
except Exception as e:
logger.error(f"Error extracting lines from digital PDF: {e}")
raise
return await asyncio.get_event_loop().run_in_executor(None, _extract_lines)
def create_page_chunks(self, num_pages: int, cpu_core: int):
logger.debug(
f"Creating page chunks for {num_pages} pages using {cpu_core} CPU cores"
)
final_ranges = []
page_per_cpu = 2
for i in range(1, num_pages + 1, page_per_cpu + 1):
final_ranges.append([i, min(i + page_per_cpu, num_pages)])
logger.debug(f"Created {len(final_ranges)} page chunks: {final_ranges}")
return final_ranges
def process_page_parallel_async(
self, pdf_path: str, page_range: List[int], instance
):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
self.process_pages_concurrently(pdf_path, page_range)
)
finally:
loop.close()
async def process_pages_concurrently(self, pdf_path: str, page_range: List[int]):
start_page = page_range[0]
end_page = page_range[1]
logger.debug(f"Processing pages {start_page}-{end_page} concurrently")
tasks = []
for page in range(start_page, end_page + 1):
tasks.append(self.process_page_parallel(pdf_path, page))
page_results = await asyncio.gather(*tasks)
page_results.sort(key=lambda x: x[0])
chunk_outputs = [output for page_num, output in page_results]
logger.debug(f"Completed processing pages {start_page}-{end_page}")
return page_range, chunk_outputs
async def process_page_parallel(self, pdf_path: str, i: int):
logger.debug(f"Processing page {i}")
try:
pages = convert_from_path(pdf_path, dpi=300, first_page=i, last_page=i)
page_imgs = [page.convert("RGB") for page in pages]
output = self.doctr_model([np.array(img) for img in page_imgs])
logger.debug(f"Successfully processed page {i}")
return i, output
except Exception as e:
logger.error(f"Error processing page {i}: {e}")
raise
async def extract_lines_with_bbox_from_scanned_pdf(
self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False
):
logger.info(
f"Extracting lines from scanned PDF: {pdf_path} (first_page: {first_page})"
)
def _extract_from_scanned():
try:
result = None
doc = None
if first_page:
number_of_pages = fitz.open(pdf_path).page_count
logger.debug(
f"Processing first page(s) only, total pages: {number_of_pages}"
)
if number_of_pages < 3:
pages = convert_from_path(
pdf_path, dpi=300, first_page=1, last_page=number_of_pages
)
else:
pages = convert_from_path(
pdf_path, dpi=300, first_page=1, last_page=3
)
first_page_img = [page.convert("RGB") for page in pages]
result = self.doctr_model([np.array(img) for img in first_page_img])
doc = [np.array(img) for img in first_page_img]
else:
logger.debug("Processing all pages using parallel processing")
pdf = fitz.open(pdf_path)
num_pages = pdf.page_count
page_witdh_f = pdf[0].rect.width
page_height_f = pdf[0].rect.height
page_chunks = self.create_page_chunks(
num_pages, multiprocessing.cpu_count()
)
logger.info(
f"Processing {num_pages} pages using {multiprocessing.cpu_count()} CPU cores"
)
with ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as executor:
futures = []
for chunk in page_chunks:
futures.append(
executor.submit(
self.process_page_parallel_async,
pdf_path,
chunk,
self,
)
)
results = [f.result() for f in futures]
results.sort(key=lambda x: x[0][0])
result = []
for r in results:
result.extend(r[1])
results = result
page_lines_with_bbox = []
for result_idx, result in enumerate(results):
logger.debug(
f"Processing OCR result {result_idx + 1}/{len(results)}"
)
for page in result.pages:
if first_page:
img_width, img_height = doc[0].shape[1], doc[0].shape[0]
else:
img_width, img_height = page_witdh_f, page_height_f
words = []
for block in page.blocks:
for line in block.lines:
for word in line.words:
x0, y0 = word.geometry[0]
x1, y1 = word.geometry[1]
abs_x0 = x0 * img_width
abs_y0 = y0 * img_height
abs_x1 = x1 * img_width
abs_y1 = y1 * img_height
text = word.value.strip().lower()
text = re.sub(r"[#*]", " ", text)
text = re.sub(f"[$]", "", text)
text = text.strip()
if (
text == "|"
or not text
or text == "."
or text == "#"
or re.sub(r"[^\w\s-]", "", text) == ""
or re.sub(r"\d{19,}", "", text) == ""
):
continue
words.append(
{
"word": text,
"bbox": [abs_x0, abs_y0, abs_x1, abs_y1],
}
)
words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0]))
lines = []
current_line = []
current_word_data = []
current_y = None
for w in words:
y0 = w["bbox"][1]
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((w["bbox"][0], y0, w["word"]))
current_word_data.append(w)
current_y = y0
else:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(
current_word_data
)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
current_line = [(w["bbox"][0], y0, w["word"])]
current_word_data = [w]
current_y = y0
if current_line:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(
current_word_data
)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
page_lines_with_bbox.append(lines)
logger.info(
f"Successfully extracted lines from {len(page_lines_with_bbox)} scanned pages"
)
return page_lines_with_bbox
except Exception as e:
logger.error(f"Error extracting lines from scanned PDF: {e}")
raise
return await asyncio.get_event_loop().run_in_executor(
None, _extract_from_scanned
)