Bank-Scrubber / src /ocr /text_extractor.py
Aryan Jain
bank scrubber streamlit application
4e71548
import asyncio
import fitz
import re
import numpy as np
from typing import List, Dict, Any, Optional
from pdf2image import convert_from_path
from src.config.config import settings
from src.models.account_models import LineData, WordData
from doctr.io import DocumentFile
class TextExtractor:
"""Async text extractor for extracting text with bounding boxes."""
def __init__(self, doctr_model):
self.doctr_model = doctr_model
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def normalize_bbox(self, bbox, width: float, height: float) -> List[float]:
"""Normalize bounding box (x0, y0, x1, y1) to range [0, 1]."""
x0, y0, x1, y1 = bbox
return [
round(x0 / width, 6),
round(y0 / height, 6),
round(x1 / width, 6),
round(y1 / height, 6),
]
def remove_consecutive_items(self, line: List[str]) -> List[str]:
"""Remove consecutive duplicate items from a list."""
if not line:
return line
result = [line[0]]
for item in line[1:]:
if item != result[-1]:
result.append(item)
return result
def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]:
"""Remove consecutive duplicate words from word data."""
if not word_data:
return word_data
result = [word_data[0]]
for i in range(1, len(word_data)):
if word_data[i]["word"] != result[-1]["word"]:
result.append(word_data[i])
return result
async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0) -> List[List[LineData]]:
"""Extract lines with bounding boxes from digital PDF."""
def _extract_lines():
doc = fitz.open(pdf_path)
page_lines_with_bbox = []
for page in doc:
words = page.get_text("words") # (x0, y0, x1, y1, word, block_no, line_no, word_no)
words.sort(key=lambda w: (round(w[1], 1), w[0])) # sort by y then x
lines = []
current_line = []
current_y = None
current_word_data = []
for w in words:
x0, y0, x1, y1, word = w[:5]
if word == "|" or not word or word == "." or word == "#" or re.sub(r'[^\w\s]', '', word) == "":
continue
word = word.lower()
word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)}
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((x0, y0, word))
current_y = y0
current_word_data.append(word_data)
else:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
current_line = [(x0, y0, word)]
current_y = y0
current_word_data = [word_data]
# Process remaining line
if current_line:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
page_lines_with_bbox.append(lines)
return page_lines_with_bbox
return await asyncio.get_event_loop().run_in_executor(None, _extract_lines)
async def extract_lines_with_bbox_from_scanned_pdf(self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False) -> List[List[LineData]]:
"""Extract lines with bounding boxes from scanned PDF using OCR."""
def _extract_from_scanned():
result = None
doc = None
if first_page:
pages = convert_from_path(pdf_path, dpi=settings.dpi, first_page=1, last_page=1)
first_page_img = pages[0].convert("RGB")
result = self.doctr_model([np.array(first_page_img)])
doc = np.array(first_page_img)
else:
doc = DocumentFile.from_pdf(pdf_path)
result = self.doctr_model(doc)
page_lines_with_bbox = []
for page in result.pages:
img_width, img_height = doc[0].shape[1], doc[0].shape[0]
words = []
for block in page.blocks:
for line in block.lines:
for word in line.words:
x0, y0 = word.geometry[0]
x1, y1 = word.geometry[1]
abs_x0 = x0 * img_width
abs_y0 = y0 * img_height
abs_x1 = x1 * img_width
abs_y1 = y1 * img_height
text = word.value.strip().lower()
text = re.sub(r'[#*]', ' ', text)
text = text.strip()
if text == "|" or not text or text == "." or text == "#" or re.sub(r'[^\w\s]', '', text) == "":
continue
words.append({"word": text, "bbox": [abs_x0, abs_y0, abs_x1, abs_y1]})
# Sort words by y then x
words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0]))
lines = []
current_line = []
current_word_data = []
current_y = None
for w in words:
y0 = w["bbox"][1]
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((w["bbox"][0], y0, w["word"]))
current_word_data.append(w)
current_y = y0
else:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
current_line = [(w["bbox"][0], y0, w["word"])]
current_word_data = [w]
current_y = y0
# Final remaining line
if current_line:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
page_lines_with_bbox.append(lines)
return page_lines_with_bbox
return await asyncio.get_event_loop().run_in_executor(None, _extract_from_scanned)