ocr-engine / src /utils /_text_extractor.py
kanha-upadhyay's picture
init
e42e330
raw
history blame
16.8 kB
import asyncio
import math
import multiprocessing
import re
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List
import fitz
import numpy as np
from pdf2image import convert_from_path
class TextExtractor:
"""Async text extractor for extracting text with bounding boxes."""
def __init__(self, doctr_model):
self.doctr_model = doctr_model
self.noise_pattern = [
r"\b[A-Z]{6,}\b",
r"[\[\]\\\^\@\#\$\%\&\*]{2,}",
r"(\d)\1{5,}",
r"\b(?=[A-Za-z]*\d)(?=\d*[A-Za-z])[A-Za-z\d]{8,}\b",
]
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def normalize_bbox(self, bbox, width: float, height: float) -> List[float]:
"""Normalize bounding box (x0, y0, x1, y1) to range [0, 1]."""
x0, y0, x1, y1 = bbox
return [
round(x0 / width, 6),
round(y0 / height, 6),
round(x1 / width, 6),
round(y1 / height, 6),
]
def remove_consecutive_items(self, line: List[str]) -> List[str]:
"""Remove consecutive duplicate items from a list."""
if not line:
return line
result = [line[0]]
for item in line[1:]:
if item != result[-1]:
result.append(item)
return result
def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]:
"""Remove consecutive duplicate words from word data."""
if not word_data:
return word_data
result = [word_data[0]]
for i in range(1, len(word_data)):
if word_data[i]["word"] != result[-1]["word"]:
result.append(word_data[i])
return result
def shannon_entropy(self, text: str) -> float:
if not text:
return 0.0
counts = Counter(text)
length = len(text)
return -sum(
(count / length) * math.log2(count / length) for count in counts.values()
)
def reconstruct_line_from_bboxes(self, words, space_unit=5):
"""
Reconstructs a line with appropriate spacing based on word bounding boxes.
Parameters:
- words: list of dicts with 'word' and 'bbox' (bbox = [x0, y0, x1, y1])
- space_unit: how many pixels roughly correspond to one space
Returns:
- str: reconstructed line with spaces
"""
# Sort words by x-coordinate (left to right)
words = sorted(words, key=lambda w: w["bbox"][0])
line = ""
prev_end_x = 0
for word_info in words:
word = word_info["word"]
start_x = word_info["bbox"][0]
if prev_end_x is not None:
# Calculate gap between previous word and current word
gap = max(0, start_x - prev_end_x)
num_spaces = int(round(gap / space_unit))
line += " " * num_spaces
line += word
prev_end_x = word_info["bbox"][2] # x1 of current word
return line
def is_text_noisy(self, text: str) -> bool:
"""Check if text is noisy (contains special characters)."""
total_chars = len(text)
if total_chars < 50: # skip empty or small pages
return True
tokens = re.findall(r"\b\w+\b", text)
total_words = len(tokens)
# Symbol & digit density
digit_count = len(re.findall(r"\d", text))
symbol_count = len(
re.findall(r"[^\w\s]", text)
) # anything not a word char or whitespace
symbol_density = symbol_count / total_chars
digit_density = digit_count / total_chars
# Repeating char patterns like "22222222222" or "!!!!!!"
long_repeats = len(re.findall(r"(.)\1{5,}", text)) # any char repeated 6+ times
# Entropy: randomness of characters
entropy = self.shannon_entropy(text)
# Heuristics tuned for your sample
if (
entropy > 4.0
and symbol_density > 0.15
and digit_density > 0.15
and long_repeats > 1
and total_words > 30
):
return True
return False
async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0):
"""Extract lines with bounding boxes from digital PDF."""
def _extract_lines():
doc = fitz.open(pdf_path)
page_lines_with_bbox = []
for page in doc:
words = page.get_text(
"words"
) # (x0, y0, x1, y1, word, block_no, line_no, word_no)
words.sort(key=lambda w: (round(w[1], 1), w[0])) # sort by y then x
lines = []
current_line = []
current_y = None
current_word_data = []
for w in words:
x0, y0, x1, y1, word = w[:5]
if (
word == "|"
or not word
or word == "."
or word == "#"
or re.sub(r"[^\w\s-]", "", word) == ""
or re.sub(r"\d{19,}", "", word) == ""
):
continue
word = word.lower()
word = word.replace("$", "")
word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)}
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((x0, y0, word))
current_y = y0
current_word_data.append(word_data)
else:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(
current_word_data
)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
current_line = [(x0, y0, word)]
current_y = y0
current_word_data = [word_data]
# Process remaining line
if current_line:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
page_lines_with_bbox.append(lines)
return page_lines_with_bbox
return await asyncio.get_event_loop().run_in_executor(None, _extract_lines)
def create_page_chunks(self, num_pages: int, cpu_core: int):
final_ranges = []
page_per_cpu = 2
for i in range(1, num_pages + 1, page_per_cpu + 1):
final_ranges.append([i, min(i + page_per_cpu, num_pages)])
return final_ranges
def process_page_parallel_async(
self, pdf_path: str, page_range: List[int], instance
):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
self.process_pages_concurrently(pdf_path, page_range)
)
finally:
loop.close()
async def process_pages_concurrently(self, pdf_path: str, page_range: List[int]):
start_page = page_range[0]
end_page = page_range[1]
tasks = []
for page in range(start_page, end_page + 1):
tasks.append(self.process_page_parallel(pdf_path, page))
page_results = await asyncio.gather(*tasks)
page_results.sort(key=lambda x: x[0])
chunk_outputs = [output for page_num, output in page_results]
return page_range, chunk_outputs
async def process_page_parallel(self, pdf_path: str, i: int):
print(f"Processing page {i}")
pages = convert_from_path(pdf_path, dpi=300, first_page=i, last_page=i)
page_imgs = [page.convert("RGB") for page in pages]
output = self.doctr_model([np.array(img) for img in page_imgs])
return i, output
async def extract_lines_with_bbox_from_scanned_pdf(
self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False
):
"""Extract lines with bounding boxes from scanned PDF using OCR."""
def _extract_from_scanned():
result = None
doc = None
if first_page:
number_of_pages = fitz.open(pdf_path).page_count
if number_of_pages < 3:
pages = convert_from_path(
pdf_path, dpi=300, first_page=1, last_page=number_of_pages
)
else:
pages = convert_from_path(
pdf_path, dpi=300, first_page=1, last_page=3
)
first_page_img = [page.convert("RGB") for page in pages]
result = self.doctr_model([np.array(img) for img in first_page_img])
doc = [np.array(img) for img in first_page_img]
else:
pdf = fitz.open(pdf_path)
num_pages = pdf.page_count
page_witdh_f = pdf[0].rect.width
page_height_f = pdf[0].rect.height
page_chunks = self.create_page_chunks(
num_pages, multiprocessing.cpu_count()
)
with ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as executor:
futures = []
for chunk in page_chunks:
futures.append(
executor.submit(
self.process_page_parallel_async, pdf_path, chunk, self
)
)
results = [f.result() for f in futures]
results.sort(key=lambda x: x[0][0])
result = []
for r in results:
result.extend(r[1])
results = result
page_lines_with_bbox = []
for result in results:
for page in result.pages:
if first_page:
img_width, img_height = doc[0].shape[1], doc[0].shape[0]
else:
img_width, img_height = page_witdh_f, page_height_f
words = []
for block in page.blocks:
for line in block.lines:
for word in line.words:
x0, y0 = word.geometry[0]
x1, y1 = word.geometry[1]
abs_x0 = x0 * img_width
abs_y0 = y0 * img_height
abs_x1 = x1 * img_width
abs_y1 = y1 * img_height
text = word.value.strip().lower()
text = re.sub(r"[#*]", " ", text)
text = re.sub(f"[$]", "", text)
text = text.strip()
if (
text == "|"
or not text
or text == "."
or text == "#"
or re.sub(r"[^\w\s-]", "", text) == ""
or re.sub(r"\d{19,}", "", text) == ""
):
continue
words.append(
{
"word": text,
"bbox": [abs_x0, abs_y0, abs_x1, abs_y1],
}
)
# Sort words by y then x
words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0]))
lines = []
current_line = []
current_word_data = []
current_y = None
for w in words:
y0 = w["bbox"][1]
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((w["bbox"][0], y0, w["word"]))
current_word_data.append(w)
current_y = y0
else:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(
current_word_data
)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
current_line = [(w["bbox"][0], y0, w["word"])]
current_word_data = [w]
current_y = y0
# Final remaining line
if current_line:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(
current_word_data, key=lambda w: w["bbox"][0]
)
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
if re.sub(r"\d{13,}", "", " ".join(clean_line)) != "":
lines.append(
{
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
}
)
page_lines_with_bbox.append(lines)
return page_lines_with_bbox
return await asyncio.get_event_loop().run_in_executor(
None, _extract_from_scanned
)