Spaces:
Sleeping
Sleeping
File size: 7,965 Bytes
0bad002 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
from sklearn.cluster import DBSCAN
import numpy as np
from itertools import islice
from collections import Counter
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DBSCAN_helper:
def __init__(self, blocks):
self.blocks = blocks
def run(self):
try:
if not self.blocks:
logger.warning("No blocks provided to DBSCAN_helper")
# Return default values
self.n_clusters = 0
self.labels = np.array([])
return
# Extract features from blocks
X = np.array(
[(x0, y0, x1, y1, len(text)) for x0, y0, x1, y1, text in self.blocks]
)
# Handle empty array
if X.size == 0:
logger.warning("Empty feature array for DBSCAN")
self.n_clusters = 0
self.labels = np.array([])
return
# Configure DBSCAN with explicit parameters for better control
dbscan = DBSCAN(eps=0.5, min_samples=2, metric='euclidean')
dbscan.fit(X)
labels = dbscan.labels_
# Count the number of clusters (excluding noise points marked as -1)
unique_labels = set(labels)
if -1 in unique_labels:
unique_labels.remove(-1)
self.n_clusters = len(unique_labels)
self.labels = labels
logger.info(f"{self.n_clusters} clusters for {len(self.blocks)} blocks")
except Exception as e:
logger.error(f"Error in DBSCAN_helper: {str(e)}")
# Set default values on error
self.n_clusters = 0
self.labels = np.array([-1] * len(self.blocks)) if self.blocks else np.array([])
class Decomposer:
def __init__(self, pdf_document=None):
if not pdf_document:
raise ValueError("PDF document must be provided")
self.pdf_doc = pdf_document
def calc_rect_center(self, rect, reverse_y=False):
try:
if reverse_y:
x0, y0, x1, y1 = rect[0], -rect[1], rect[2], -rect[3]
else:
x0, y0, x1, y1 = rect
x_center = (x0 + x1) / 2
y_center = (y0 + y1) / 2
return (x_center, y_center)
except Exception as e:
logger.error(f"Error calculating rectangle center: {str(e)}")
return (0, 0) # Return default values on error
def get_rect_labels(self):
try:
rect_centers = []
rects = []
visual_label_texts = []
categorize_vectors = []
for page_idx, page in islice(enumerate(self.pdf_doc), len(self.pdf_doc)):
try:
blocks = page.get_text("blocks")
page_cnt = page_idx + 1
logger.debug(f"=== Start Page {page_cnt}: {len(blocks)} blocks ===")
block_cnt = 0
for block in blocks:
try:
block_rect = block[:4] # (x0,y0,x1,y1)
x0, y0, x1, y1 = block_rect
rects.append(block_rect)
# Handle possible encoding issues with block text
block_text = block[4]
if isinstance(block_text, bytes):
block_text = block_text.decode('utf-8', errors='ignore')
block_num = block[5]
block_cnt = block_num + 1
rect_center = self.calc_rect_center(block_rect, reverse_y=True)
rect_centers.append(rect_center)
visual_label_text = f"({page_cnt}.{block_cnt})"
visual_label_texts.append(visual_label_text)
#block_type = "text" if block[6] == 0 else "image"
categorize_vectors.append((*block_rect, block_text))
except Exception as block_error:
logger.warning(f"Error processing block {block_cnt} on page {page_cnt}: {str(block_error)}")
continue
except Exception as page_error:
logger.warning(f"Error processing page {page_idx + 1}: {str(page_error)}")
continue
if not categorize_vectors:
logger.warning("No categorize vectors generated")
return []
categorizer = DBSCAN_helper(categorize_vectors)
categorizer.run()
# Make sure the lengths match
if len(rects) != len(categorizer.labels):
logger.warning(f"Length mismatch: rects={len(rects)}, labels={len(categorizer.labels)}")
# Handle mismatch by creating default labels
if categorizer.labels.size == 0: # If labels array is empty
result = [(rect, -1) for rect in rects] # Assign all to noise (-1)
else:
# Truncate to shorter length
min_len = min(len(rects), len(categorizer.labels))
result = [(rects[i], categorizer.labels[i]) for i in range(min_len)]
return result
return [(rects[i], categorizer.labels[i]) for i in range(len(rects))]
except Exception as e:
logger.error(f"Error in get_rect_labels: {str(e)}")
return [] # Return empty result on error
def get_page_stats(self, res):
try:
if not res:
logger.warning("Empty input to get_page_stats")
return None, None, None # Handle empty input
x_counter = Counter(x for _, x in res)
y_diffs = Counter(i[3] - i[1] for i, _ in res)
# Handle empty counters
if not x_counter or not y_diffs:
logger.warning("Empty counters in get_page_stats")
return None, None, None
most_common_x = x_counter.most_common(1)[0][0]
threshold = float('inf')
min_x = float('inf')
for i, x in res:
min_x = min(i[0], min_x)
if x != most_common_x and i[0] < threshold:
threshold = i[0]
if threshold == float('inf'): # Fallback
threshold = min_x
min_y, max_y = float('inf'), -float('inf') # Changed from 0 to -inf
for i, x in res:
if x == -1 and i[0] <= threshold:
min_y = min(min_y, i[1])
max_y = max(max_y, i[-1])
single_y = y_diffs.most_common(1)[0][0] if y_diffs else 0
# Additional validity checks
if min_y == float('inf'):
min_y = None
if max_y == -float('inf'):
max_y = None
# Ensure single_y is positive
single_y = abs(single_y) if single_y else 0
return min_y, max_y, single_y
except Exception as e:
logger.error(f"Error in get_page_stats: {str(e)}")
return None, None, None # Return default values on error
def run(self):
try:
rect_labels = self.get_rect_labels()
stats = self.get_page_stats(rect_labels)
logger.info(f"Page stats: min_y={stats[0]}, max_y={stats[1]}, single_y={stats[2]}")
return stats
except Exception as e:
logger.error(f"Error in Decomposer.run: {str(e)}")
return None, None, None # Return default values on error |