Spaces:
Runtime error
Runtime error
File size: 9,686 Bytes
4e71548 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
import asyncio
import fitz
import re
import numpy as np
from typing import List, Dict, Any, Optional
from pdf2image import convert_from_path
from src.config.config import settings
from src.models.account_models import LineData, WordData
from doctr.io import DocumentFile
class TextExtractor:
"""Async text extractor for extracting text with bounding boxes."""
def __init__(self, doctr_model):
self.doctr_model = doctr_model
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
def normalize_bbox(self, bbox, width: float, height: float) -> List[float]:
"""Normalize bounding box (x0, y0, x1, y1) to range [0, 1]."""
x0, y0, x1, y1 = bbox
return [
round(x0 / width, 6),
round(y0 / height, 6),
round(x1 / width, 6),
round(y1 / height, 6),
]
def remove_consecutive_items(self, line: List[str]) -> List[str]:
"""Remove consecutive duplicate items from a list."""
if not line:
return line
result = [line[0]]
for item in line[1:]:
if item != result[-1]:
result.append(item)
return result
def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]:
"""Remove consecutive duplicate words from word data."""
if not word_data:
return word_data
result = [word_data[0]]
for i in range(1, len(word_data)):
if word_data[i]["word"] != result[-1]["word"]:
result.append(word_data[i])
return result
async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0) -> List[List[LineData]]:
"""Extract lines with bounding boxes from digital PDF."""
def _extract_lines():
doc = fitz.open(pdf_path)
page_lines_with_bbox = []
for page in doc:
words = page.get_text("words") # (x0, y0, x1, y1, word, block_no, line_no, word_no)
words.sort(key=lambda w: (round(w[1], 1), w[0])) # sort by y then x
lines = []
current_line = []
current_y = None
current_word_data = []
for w in words:
x0, y0, x1, y1, word = w[:5]
if word == "|" or not word or word == "." or word == "#" or re.sub(r'[^\w\s]', '', word) == "":
continue
word = word.lower()
word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)}
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((x0, y0, word))
current_y = y0
current_word_data.append(word_data)
else:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
current_line = [(x0, y0, word)]
current_y = y0
current_word_data = [word_data]
# Process remaining line
if current_line:
current_line.sort()
line_words = [w[2] for w in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min([w[0] for w in current_line])
y_start = min([w[1] for w in current_line])
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
page_lines_with_bbox.append(lines)
return page_lines_with_bbox
return await asyncio.get_event_loop().run_in_executor(None, _extract_lines)
async def extract_lines_with_bbox_from_scanned_pdf(self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False) -> List[List[LineData]]:
"""Extract lines with bounding boxes from scanned PDF using OCR."""
def _extract_from_scanned():
result = None
doc = None
if first_page:
pages = convert_from_path(pdf_path, dpi=settings.dpi, first_page=1, last_page=1)
first_page_img = pages[0].convert("RGB")
result = self.doctr_model([np.array(first_page_img)])
doc = np.array(first_page_img)
else:
doc = DocumentFile.from_pdf(pdf_path)
result = self.doctr_model(doc)
page_lines_with_bbox = []
for page in result.pages:
img_width, img_height = doc[0].shape[1], doc[0].shape[0]
words = []
for block in page.blocks:
for line in block.lines:
for word in line.words:
x0, y0 = word.geometry[0]
x1, y1 = word.geometry[1]
abs_x0 = x0 * img_width
abs_y0 = y0 * img_height
abs_x1 = x1 * img_width
abs_y1 = y1 * img_height
text = word.value.strip().lower()
text = re.sub(r'[#*]', ' ', text)
text = text.strip()
if text == "|" or not text or text == "." or text == "#" or re.sub(r'[^\w\s]', '', text) == "":
continue
words.append({"word": text, "bbox": [abs_x0, abs_y0, abs_x1, abs_y1]})
# Sort words by y then x
words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0]))
lines = []
current_line = []
current_word_data = []
current_y = None
for w in words:
y0 = w["bbox"][1]
if current_y is None or abs(y0 - current_y) < y_threshold:
current_line.append((w["bbox"][0], y0, w["word"]))
current_word_data.append(w)
current_y = y0
else:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
current_line = [(w["bbox"][0], y0, w["word"])]
current_word_data = [w]
current_y = y0
# Final remaining line
if current_line:
current_line.sort()
line_words = [x[2] for x in current_line]
clean_line = self.remove_consecutive_items(line_words)
current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
clean_word_data = self.remove_consecutive_words(current_word_data)
if clean_line:
x_start = min(x[0] for x in current_line)
y_start = min(x[1] for x in current_line)
lines.append({
"line": " ".join(clean_line),
"bbox": [x_start, y_start],
"words": clean_word_data,
})
page_lines_with_bbox.append(lines)
return page_lines_with_bbox
return await asyncio.get_event_loop().run_in_executor(None, _extract_from_scanned) |