File size: 9,686 Bytes
4e71548
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import asyncio
import fitz
import re
import numpy as np
from typing import List, Dict, Any, Optional
from pdf2image import convert_from_path
from src.config.config import settings
from src.models.account_models import LineData, WordData
from doctr.io import DocumentFile


class TextExtractor:
    """Async text extractor for extracting text with bounding boxes."""
    
    def __init__(self, doctr_model):
        self.doctr_model = doctr_model
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_value, traceback):
        pass
    
    def normalize_bbox(self, bbox, width: float, height: float) -> List[float]:
        """Normalize bounding box (x0, y0, x1, y1) to range [0, 1]."""
        x0, y0, x1, y1 = bbox
        return [
            round(x0 / width, 6),
            round(y0 / height, 6),
            round(x1 / width, 6),
            round(y1 / height, 6),
        ]
    
    def remove_consecutive_items(self, line: List[str]) -> List[str]:
        """Remove consecutive duplicate items from a list."""
        if not line:
            return line
        result = [line[0]]
        for item in line[1:]:
            if item != result[-1]:
                result.append(item)
        return result
    
    def remove_consecutive_words(self, word_data: List[Dict]) -> List[Dict]:
        """Remove consecutive duplicate words from word data."""
        if not word_data:
            return word_data
        result = [word_data[0]]
        for i in range(1, len(word_data)):
            if word_data[i]["word"] != result[-1]["word"]:
                result.append(word_data[i])
        return result
    
    async def extract_lines_with_bbox(self, pdf_path: str, y_threshold: float = 3.0) -> List[List[LineData]]:
        """Extract lines with bounding boxes from digital PDF."""
        def _extract_lines():
            doc = fitz.open(pdf_path)
            page_lines_with_bbox = []
            
            for page in doc:
                words = page.get_text("words")  # (x0, y0, x1, y1, word, block_no, line_no, word_no)
                words.sort(key=lambda w: (round(w[1], 1), w[0]))  # sort by y then x
                
                lines = []
                current_line = []
                current_y = None
                current_word_data = []
                
                for w in words:
                    x0, y0, x1, y1, word = w[:5]
                    if word == "|" or not word or word == "." or word == "#" or re.sub(r'[^\w\s]', '', word) == "":
                        continue
                    word = word.lower()
                    word_data = {"word": word.strip(), "bbox": (x0, y0, x1, y1)}
                    
                    if current_y is None or abs(y0 - current_y) < y_threshold:
                        current_line.append((x0, y0, word))
                        current_y = y0
                        current_word_data.append(word_data)
                    else:
                        current_line.sort()
                        line_words = [w[2] for w in current_line]
                        clean_line = self.remove_consecutive_items(line_words)
                        current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
                        clean_word_data = self.remove_consecutive_words(current_word_data)
                        
                        if clean_line:
                            x_start = min([w[0] for w in current_line])
                            y_start = min([w[1] for w in current_line])
                            lines.append({
                                "line": " ".join(clean_line),
                                "bbox": [x_start, y_start],
                                "words": clean_word_data,
                            })
                        current_line = [(x0, y0, word)]
                        current_y = y0
                        current_word_data = [word_data]
                
                # Process remaining line
                if current_line:
                    current_line.sort()
                    line_words = [w[2] for w in current_line]
                    clean_line = self.remove_consecutive_items(line_words)
                    current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
                    clean_word_data = self.remove_consecutive_words(current_word_data)
                    
                    if clean_line:
                        x_start = min([w[0] for w in current_line])
                        y_start = min([w[1] for w in current_line])
                        lines.append({
                            "line": " ".join(clean_line),
                            "bbox": [x_start, y_start],
                            "words": clean_word_data,
                        })
                
                page_lines_with_bbox.append(lines)
            
            return page_lines_with_bbox
        
        return await asyncio.get_event_loop().run_in_executor(None, _extract_lines)
    
    async def extract_lines_with_bbox_from_scanned_pdf(self, pdf_path: str, y_threshold: float = 5.0, first_page: bool = False) -> List[List[LineData]]:
        """Extract lines with bounding boxes from scanned PDF using OCR."""
        def _extract_from_scanned():
            result = None
            doc = None
            
            if first_page:
                pages = convert_from_path(pdf_path, dpi=settings.dpi, first_page=1, last_page=1)
                first_page_img = pages[0].convert("RGB")
                result = self.doctr_model([np.array(first_page_img)])
                doc = np.array(first_page_img)
            else:
                doc = DocumentFile.from_pdf(pdf_path)
                result = self.doctr_model(doc)
            
            page_lines_with_bbox = []
            
            for page in result.pages:
                img_width, img_height = doc[0].shape[1], doc[0].shape[0]
                words = []
                
                for block in page.blocks:
                    for line in block.lines:
                        for word in line.words:
                            x0, y0 = word.geometry[0]
                            x1, y1 = word.geometry[1]
                            abs_x0 = x0 * img_width
                            abs_y0 = y0 * img_height
                            abs_x1 = x1 * img_width
                            abs_y1 = y1 * img_height
                            text = word.value.strip().lower()
                            text = re.sub(r'[#*]', ' ', text)
                            text = text.strip()
                            
                            if text == "|" or not text or text == "." or text == "#" or re.sub(r'[^\w\s]', '', text) == "":
                                continue
                            words.append({"word": text, "bbox": [abs_x0, abs_y0, abs_x1, abs_y1]})
                
                # Sort words by y then x
                words.sort(key=lambda w: (round(w["bbox"][1], 3), w["bbox"][0]))
                
                lines = []
                current_line = []
                current_word_data = []
                current_y = None
                
                for w in words:
                    y0 = w["bbox"][1]
                    if current_y is None or abs(y0 - current_y) < y_threshold:
                        current_line.append((w["bbox"][0], y0, w["word"]))
                        current_word_data.append(w)
                        current_y = y0
                    else:
                        current_line.sort()
                        line_words = [x[2] for x in current_line]
                        clean_line = self.remove_consecutive_items(line_words)
                        current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
                        clean_word_data = self.remove_consecutive_words(current_word_data)
                        
                        if clean_line:
                            x_start = min(x[0] for x in current_line)
                            y_start = min(x[1] for x in current_line)
                            lines.append({
                                "line": " ".join(clean_line),
                                "bbox": [x_start, y_start],
                                "words": clean_word_data,
                            })
                        current_line = [(w["bbox"][0], y0, w["word"])]
                        current_word_data = [w]
                        current_y = y0
                
                # Final remaining line
                if current_line:
                    current_line.sort()
                    line_words = [x[2] for x in current_line]
                    clean_line = self.remove_consecutive_items(line_words)
                    current_word_data = sorted(current_word_data, key=lambda w: w["bbox"][0])
                    clean_word_data = self.remove_consecutive_words(current_word_data)
                    
                    if clean_line:
                        x_start = min(x[0] for x in current_line)
                        y_start = min(x[1] for x in current_line)
                        lines.append({
                            "line": " ".join(clean_line),
                            "bbox": [x_start, y_start],
                            "words": clean_word_data,
                        })
                
                page_lines_with_bbox.append(lines)
            
            return page_lines_with_bbox
        
        return await asyncio.get_event_loop().run_in_executor(None, _extract_from_scanned)