File size: 6,710 Bytes
2e79922
 
 
a7b8c18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from src.models.table_detector import TableDetector
from src.models.text_recognizer import TextRecognizer
from src.table_creator.data_structures import TableStructure
import pandas as pd
import re

class TableExtraction:
    def __init__(self) -> None:
        self._table_detection = TableDetector()
        self._document_ocr = TextRecognizer()
        self._linklist = TableStructure()

    def _merge_words(self, prev_obj, word, word_bb):
        """Merge the current word with the previous one if they overlap significantly."""
        merged_text = prev_obj[0] + ' ' + word
        merged_bb = [
            prev_obj[1][0], prev_obj[1][1], word_bb[2], word_bb[3]
        ]
        return (merged_text, merged_bb)

    def _assign_to_column(self, word, word_bb, columns, df, debug=False):
        """Assign a word to the correct column based on bounding box overlap."""
        for key, col_bb in columns.items():
            word_bb_temp = [word_bb[0], col_bb[1], word_bb[2], col_bb[3]]
            overlap = self._table_detection._calculate_overlap(word_bb_temp, col_bb)

            if overlap > 10:
                if len(df[key]) > 0:
                    prev_obj = df[key][-1]
                    prev_overlap = self._table_detection._calculate_overlap(
                        prev_obj[1], [prev_obj[1][0], word_bb[1], prev_obj[1][2], word_bb[3]]
                    )
                    if prev_overlap >= 30:
                        word, word_bb = self._merge_words(prev_obj, word, word_bb)
                        df[key][-1] = (word, word_bb)
                    else:
                        df[key].append((word, word_bb))
                else:
                    df[key].append((word, word_bb))
                    # Dynamically adjust the column bounding box to fit the new word
                    columns[key] = [
                        min(word_bb[0], col_bb[0]), col_bb[1],
                        max(word_bb[2], col_bb[2]), col_bb[3]
                    ]
                return True
        return False
    
    def _get_normalized_bounding_box(self, imgsz : str, bb : list) -> pd.DataFrame:
        names = ['pdf1','sample_pdf2.pdf']
        pass

    def get_words_in_column(self, cords: dict, df_word: pd.DataFrame, merge=True, debug=False):
        """Distribute words into their respective columns based on bounding box coordinates."""
        df = {key: [] for key in cords}
        unknown_columns = {}
        unknown_data = {}

        for index, row in df_word.iterrows():
            word, word_bb = row['text'], list(map(int, row['boundingBox']))
            if debug:
                print(f"\nProcessing word: '{word}'")

            if not self._assign_to_column(word, word_bb, cords, df, debug):
                # Handle words that do not match any known column
                for key, val in unknown_columns.items():
                    overlap = self._table_detection._calculate_overlap(
                        val, [word_bb[0], val[1], word_bb[2], val[3]]
                    )
                    if overlap > 30:
                        prev_obj = unknown_data[key][-1]
                        prev_overlap = self._table_detection._calculate_overlap(
                            prev_obj[1], [prev_obj[1][0], word_bb[1], prev_obj[1][2], word_bb[3]]
                        )
                        if prev_overlap >= 30:
                            word, word_bb = self._merge_words(prev_obj, word, word_bb)
                            unknown_data[key][-1] = (word, word_bb)
                        else:
                            unknown_data[key].append((word, word_bb))
                        break
                else:
                    # Create a new unknown column if no match is found
                    unknown_key = f'{word}__{index}__'
                    unknown_columns[unknown_key] = word_bb
                    unknown_data[unknown_key] = [(word, word_bb)]

        if merge:
            df.update(unknown_data)

        # Convert lists to DataFrames
        df = {key: pd.DataFrame(val, columns=['text', 'boundingBox']) for key, val in df.items()}
        return df, unknown_data, unknown_columns

    def postprocess(self, parsed_df: pd.DataFrame, columns=None):
        """Post-process the parsed DataFrame to merge columns and clean data."""
        try:
            parsed_df = parsed_df.dropna(how='all').reset_index(drop=True)
            new_df = pd.DataFrame()
            
            # Merge adjacent empty header columns
            empty_columns = parsed_df.columns[parsed_df.iloc[:1].isna().all()].tolist()
            for col in empty_columns[::-1]:
                col_idx = list(parsed_df.columns).index(col)
                if col_idx > 0:
                    parsed_df.iloc[:, col_idx - 1] += ' ' + parsed_df.iloc[:, col_idx]
            parsed_df = parsed_df.drop(columns=empty_columns)

            if not columns:
                return parsed_df

            used_indices = set()
            for header in columns:
                match_indices = [i for i, col in enumerate(parsed_df.columns) if header in col]
                if match_indices:
                    used_indices.update(match_indices)
                    new_df[header] = parsed_df.iloc[:, match_indices].apply(
                        lambda x: ' '.join(x.fillna('').str.strip()), axis=1
                    )

            # Include unused columns
            unused_columns = [col for i, col in enumerate(parsed_df.columns) if i not in used_indices]
            new_df = pd.concat([new_df, parsed_df[unused_columns]], axis=1)

            return new_df
        except Exception as e:
            print(f"Error in postprocess: {e}")
            return parsed_df

    def detect(self, image_path: str):
        """Detect tables in an image and extract their data."""
        cords = self._table_detection.detect(image_path)
        all_table_df = self._document_ocr.recognize(image_path, cords)
        
        table_data = []
        for table in all_table_df:
            column_data, _, _ = self.get_words_in_column({}, table)
            ordered_columns = sorted(column_data, key=lambda x: column_data[x].iloc[0]['boundingBox'][0])
            dictword = {col: column_data[col] for col in ordered_columns}

            df = self._linklist.build_structure(dictword)
            df = df.loc[:, ordered_columns]
            df = df.rename(columns=lambda col: re.sub(r'__\d+__', '', str(col)).strip())
            df_postp = self.postprocess(df)

            # Assign generic column names
            df.columns = [f"column {i+1}" for i in range(df.shape[1])]
            table_data.append((df, df_postp))

        return table_data[0], cords