|
|
import json |
|
|
import pandas as pd |
|
|
|
|
|
def read_json(json_file): |
|
|
with open(json_file, 'r', encoding='utf-8') as file: |
|
|
return json.load(file) |
|
|
|
|
|
def adjust_page_dimensions_and_bbox(modified_model_output_json, pdfminer_json): |
|
|
for page_number, blocks in modified_model_output_json.items(): |
|
|
|
|
|
if page_number in pdfminer_json.keys(): |
|
|
if pdfminer_json[page_number]: |
|
|
|
|
|
page_info = pdfminer_json[page_number][0] |
|
|
page_width = page_info['page_width'] |
|
|
page_height = page_info['page_height'] |
|
|
for block in blocks: |
|
|
original_width = block['page_img_width'] |
|
|
original_height = block['page_img_height'] |
|
|
width_scale = page_width / original_width |
|
|
height_scale = page_height / original_height |
|
|
block['page_img_width'] = page_width |
|
|
block['page_img_height'] = page_height |
|
|
block['bbox'] = [ |
|
|
block['bbox'][0] * width_scale, |
|
|
block['bbox'][1] * height_scale, |
|
|
block['bbox'][2] * width_scale, |
|
|
block['bbox'][3] * height_scale |
|
|
] |
|
|
|
|
|
else: |
|
|
print(f"Page {page_number} is empty.") |
|
|
return modified_model_output_json |
|
|
|
|
|
def convert_to_dataframe(extracted_df): |
|
|
if isinstance(extracted_df, pd.DataFrame): |
|
|
return extracted_df |
|
|
|
|
|
elif isinstance(extracted_df, dict): |
|
|
if all(isinstance(value, list) for value in extracted_df.values()): |
|
|
return pd.DataFrame(extracted_df) |
|
|
else: |
|
|
return pd.DataFrame([extracted_df]) |
|
|
|
|
|
elif isinstance(extracted_df, list): |
|
|
if all(isinstance(item, dict) for item in extracted_df): |
|
|
return pd.DataFrame(extracted_df) |
|
|
else: |
|
|
return pd.DataFrame(extracted_df, columns=['Value']) |
|
|
|
|
|
else: |
|
|
return pd.DataFrame([extracted_df], columns=['Value']) |
|
|
|
|
|
def calculate_centroid(bbox): |
|
|
x1, y1, x2, y2 = bbox |
|
|
x_center = (x1 + x2) / 2 |
|
|
y_center = (y1 + y2) / 2 |
|
|
return (x_center, y_center) |
|
|
|
|
|
def is_within_radius(text_block_bbox, header_bbox, radius=50): |
|
|
text_xmin, text_ymin, text_xmax, text_ymax = text_block_bbox |
|
|
header_xmin, header_ymin, header_xmax, header_ymax = header_bbox |
|
|
|
|
|
|
|
|
overlap_x = max(0, min(text_xmax, header_xmax) - max(text_xmin, header_xmin)) |
|
|
overlap_y = max(0, min(text_ymax, header_ymax) - max(text_ymin, header_ymin)) |
|
|
|
|
|
|
|
|
if overlap_x > 0 and overlap_y > 0: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def is_overlapped(text_block_bbox, header_bbox, threshold=0.20): |
|
|
|
|
|
text_xmin, text_ymin, text_xmax, text_ymax = text_block_bbox |
|
|
header_xmin, header_ymin, header_xmax, header_ymax = header_bbox |
|
|
|
|
|
|
|
|
overlap_x = max(0, min(text_xmax, header_xmax) - max(text_xmin, header_xmin)) |
|
|
overlap_y = max(0, min(text_ymax, header_ymax) - max(text_ymin, header_ymin)) |
|
|
|
|
|
|
|
|
overlap_area = overlap_x * overlap_y |
|
|
|
|
|
|
|
|
text_area = (text_xmax - text_xmin) * (text_ymax - text_ymin) |
|
|
header_area = (header_xmax - header_xmin) * (header_ymax - header_ymin) |
|
|
|
|
|
|
|
|
smaller_area = min(text_area, header_area) |
|
|
overlap_ratio = overlap_area / smaller_area |
|
|
|
|
|
|
|
|
if overlap_ratio > threshold: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def detect_header(text_block_bbox, adjusted_model_output_json, page_number ,next_header_index_in_model_udop): |
|
|
text_centroid = calculate_centroid(text_block_bbox) |
|
|
if str(page_number) in adjusted_model_output_json: |
|
|
if next_header_index_in_model_udop is not None : |
|
|
next_header_index_in_model_udop = int(next_header_index_in_model_udop) |
|
|
header_block = adjusted_model_output_json[str(page_number)][next_header_index_in_model_udop] |
|
|
if is_overlapped(text_block_bbox, header_block['bbox']): |
|
|
return True |
|
|
return False |
|
|
|
|
|
def remove_header_from_start(first_row_text: str, first_row_header_text: str) -> str: |
|
|
length_header_text = len(first_row_header_text) |
|
|
return first_row_text[length_header_text:].strip() |
|
|
|
|
|
def extract_last_header_index(all_blocks_with_indices): |
|
|
last_header_index = -1 |
|
|
|
|
|
|
|
|
for index in reversed(range(len(all_blocks_with_indices))): |
|
|
block = all_blocks_with_indices[index] |
|
|
|
|
|
|
|
|
if block['label_name'] in ['Page-header', 'Section-header']: |
|
|
last_header_index = index |
|
|
break |
|
|
|
|
|
return last_header_index |
|
|
|
|
|
def match_headers_with_text(adjusted_model_json, pdfminer_json): |
|
|
matched_data = [] |
|
|
tree_format_matched_data = [] |
|
|
current_header = None |
|
|
current_content = [] |
|
|
current_header_table_content = [] |
|
|
current_header_tree_structure = [] |
|
|
sorted_pages = sorted(adjusted_model_json.items(), key=lambda x: int(x[0])) |
|
|
|
|
|
all_blocks_with_indices = [] |
|
|
for key, blocks in sorted_pages: |
|
|
for index, block in enumerate(blocks): |
|
|
if block['label_name'] in ['Page-header','Section-header','Table', "Portfolio-Company-Table"]: |
|
|
block['used_model_index'] = index |
|
|
all_blocks_with_indices.append(block) |
|
|
|
|
|
|
|
|
for id,block in enumerate(all_blocks_with_indices): |
|
|
if block['label_name'] in ['Page-header','Section-header']: |
|
|
next_header_detect_flag = False |
|
|
current_header_index_in_model = block['used_model_index'] |
|
|
current_header_bbox = block['bbox'] |
|
|
current_header_type = block['label_name'] |
|
|
current_header_centroid = calculate_centroid(block['bbox']) |
|
|
current_header_page_number = block['pdf_page_id'] |
|
|
current_header_text = block['extracted_text'][0] if block['extracted_text'] else "" |
|
|
current_header_page_width = block['page_img_width'] |
|
|
current_header_page_height = block['page_img_height'] |
|
|
current_header_page_block_id = block['page_block_id'] |
|
|
current_header_pdf_name = block['pdf_name'] |
|
|
content_source_pages = [] |
|
|
new_start_index = id + 1 |
|
|
if new_start_index < len(all_blocks_with_indices): |
|
|
for next_id ,next_block in enumerate(all_blocks_with_indices[new_start_index:], start = new_start_index): |
|
|
if next_block['label_name'] in ['Page-header', 'Section-header']: |
|
|
next_header_index_in_model_udop = next_block['used_model_index'] |
|
|
next_header_bbox = next_block['bbox'] |
|
|
next_header_centroid = calculate_centroid(next_block['bbox']) |
|
|
next_header_page_number = next_block["pdf_page_id"] |
|
|
next_header_text = next_block['extracted_text'][0] if next_block['extracted_text'] else "" |
|
|
break |
|
|
|
|
|
else: |
|
|
next_header_bbox = None |
|
|
next_header_centroid = None |
|
|
next_header_page_number = None |
|
|
next_header_index_in_model_udop = None |
|
|
next_header_text = None |
|
|
|
|
|
last_header_index = extract_last_header_index(all_blocks_with_indices) |
|
|
if id == len(all_blocks_with_indices) - 1 or id == last_header_index: |
|
|
next_header_bbox = None |
|
|
next_header_centroid = None |
|
|
next_header_page_number = None |
|
|
next_header_index_in_model_udop = None |
|
|
next_header_text = None |
|
|
|
|
|
if current_header_text: |
|
|
if current_header is not None: |
|
|
current_content = [] |
|
|
current_header_table_content = [] |
|
|
current_header_tree_structure = [] |
|
|
|
|
|
current_header = { |
|
|
"page_number": current_header_page_number, |
|
|
"header_text": current_header_text, |
|
|
"element_id": None, |
|
|
"text_block_id": None |
|
|
} |
|
|
new_start_index = id + 1 |
|
|
for new_id,new_block in enumerate(all_blocks_with_indices[new_start_index:], start = new_start_index): |
|
|
extracted_df_flag = False |
|
|
next_block = new_block |
|
|
if next_block and next_block['label_name'] in ['Page-header', 'Section-header']: |
|
|
extracted_df_flag = False |
|
|
break |
|
|
|
|
|
|
|
|
if next_block and next_block['label_name'] in ['Table', "Portfolio-Company-Table"]: |
|
|
extracted_df_flag = True |
|
|
extracted_df = next_block['extracted_text'][0] |
|
|
if next_block["associated_table_header_info"] is not None: |
|
|
extracted_df_table_header = next_block["associated_table_header_info"]['extracted_text'][0] |
|
|
else: |
|
|
extracted_df_table_header = None |
|
|
|
|
|
extracted_df_new = convert_to_dataframe(extracted_df) |
|
|
extracted_df_new_column_headers = extracted_df_new.columns.tolist() |
|
|
extracted_df_markdown = extracted_df_new.to_csv(index=False) |
|
|
|
|
|
table_metadata = { 'pdf_name': next_block['pdf_name'] , |
|
|
'table_page_id': next_block['pdf_page_id'], |
|
|
'table_page_id_width' : next_block['page_img_width'], |
|
|
'table_page_id_height': next_block['page_img_height'], |
|
|
'table_bbox' : next_block['bbox'] |
|
|
} |
|
|
|
|
|
table_header_pair = { |
|
|
|
|
|
'label_name':next_block['label_name'], |
|
|
'table_header': extracted_df_table_header, |
|
|
'table_column_header' : extracted_df_new_column_headers, |
|
|
'table_info': extracted_df_new, |
|
|
'metadata' : table_metadata |
|
|
} |
|
|
|
|
|
tree_table_header_info = { |
|
|
'label_name':'Table-header', |
|
|
|
|
|
'table_header_info': next_block["associated_table_header_info"], |
|
|
'table_column_header' : extracted_df_new_column_headers, |
|
|
'table_info': next_block |
|
|
} |
|
|
|
|
|
|
|
|
current_header_table_content.append(table_header_pair) |
|
|
current_header_tree_structure.append(next_block) |
|
|
|
|
|
last_pdf_page = int(list(pdfminer_json.keys())[-1]) |
|
|
first_append_flag = False |
|
|
first_append_text = " " |
|
|
for pdf_page_num in range(int(current_header_page_number), last_pdf_page + 1): |
|
|
text_blocks = pdfminer_json.get(str(pdf_page_num), []) |
|
|
start_index = 0 |
|
|
page_content_added = False |
|
|
if current_header["element_id"] is None and current_header["text_block_id"] is None: |
|
|
for index, text_block in enumerate(text_blocks): |
|
|
if is_overlapped(text_block['bbox'],current_header_bbox): |
|
|
current_header["element_id"] = text_block["element_id"] |
|
|
current_header["text_block_id"] = text_block["text_block_id"] |
|
|
start_index = index |
|
|
first_append_flag = True |
|
|
break |
|
|
|
|
|
for next_header_index, text_block in enumerate(text_blocks[start_index:], start = start_index): |
|
|
last_text_reached_flag = False |
|
|
if first_append_flag: |
|
|
first_row_text = text_block['text'] |
|
|
first_row_header_text = current_header_text |
|
|
first_append_text = remove_header_from_start(first_row_text,first_row_header_text) |
|
|
current_content.append(first_append_text) |
|
|
page_content_added = True |
|
|
first_append_flag = False |
|
|
continue |
|
|
|
|
|
if next_header_text is not None and pdf_page_num == int(next_header_page_number): |
|
|
next_header_found_flag = False |
|
|
|
|
|
if detect_header(text_block['bbox'], adjusted_model_json, next_header_page_number,next_header_index_in_model_udop): |
|
|
next_header_found_flag = True |
|
|
matched_data.append({ |
|
|
"page_number": current_header["page_number"], |
|
|
"pdf_name" : current_header_pdf_name , |
|
|
"header": current_header["header_text"], |
|
|
"label_name": current_header_type, |
|
|
"content": " ".join(current_content), |
|
|
"table_content" : current_header_table_content, |
|
|
"all_source_pages": content_source_pages |
|
|
}) |
|
|
tree_format_matched_data.append({ |
|
|
"header_page_number": current_header["page_number"], |
|
|
"label_name":current_header_type, |
|
|
'page_block_id' : current_header_page_block_id, |
|
|
"header_bbox": current_header_bbox, |
|
|
"header_page_width":current_header_page_width, |
|
|
"header_page_height": current_header_page_height, |
|
|
"header": current_header["header_text"], |
|
|
"content": " ".join(current_content), |
|
|
'tree_table_content' : current_header_tree_structure |
|
|
}) |
|
|
current_content = [] |
|
|
current_table_content = [] |
|
|
current_header_tree_structure = [] |
|
|
next_header_detect_flag = True |
|
|
break |
|
|
|
|
|
if next_header_index == len(text_blocks) - 1: |
|
|
last_text_block = text_block |
|
|
if not next_header_found_flag and last_text_block: |
|
|
matched_data.append({ |
|
|
"page_number": current_header["page_number"], |
|
|
"pdf_name" : current_header_pdf_name , |
|
|
"header": current_header["header_text"], |
|
|
"label_name": current_header_type, |
|
|
"content": " ".join(current_content), |
|
|
"table_content" : current_header_table_content, |
|
|
"all_source_pages": content_source_pages |
|
|
}) |
|
|
tree_format_matched_data.append({ |
|
|
"header_page_number": current_header["page_number"], |
|
|
"label_name":currentHeaderType, |
|
|
'page_block_id' : current_header_page_block_id, |
|
|
"header_bbox": current_header_bbox, |
|
|
"header_page_width":current_header_page_width, |
|
|
"header_page_height": current_header_page_height, |
|
|
"header": current_header["header_text"], |
|
|
"content": " ".join(current_content), |
|
|
'tree_table_content' : current_header_tree_structure |
|
|
}) |
|
|
current_content = [] |
|
|
current_header_table_content = [] |
|
|
current_header_tree_structure = [] |
|
|
next_header_detect_flag = True |
|
|
next_header_found_flag = True |
|
|
break |
|
|
|
|
|
current_content.append(text_block['text']) |
|
|
page_content_added = True |
|
|
if next_header_detect_flag: |
|
|
break |
|
|
|
|
|
|
|
|
if page_content_added and pdf_page_num not in content_source_pages: |
|
|
content_source_pages.append(pdf_page_num) |
|
|
|
|
|
if next_header_detect_flag: |
|
|
break |
|
|
|
|
|
if next_header_text is None and next_header_page_number is None: |
|
|
current_header = { |
|
|
"page_number": current_header_page_number, |
|
|
"header_text": current_header_text, |
|
|
"element_id": None, |
|
|
"text_block_id": None |
|
|
} |
|
|
|
|
|
for pdf_page_num in range(int(current_header_page_number), last_pdf_page + 1): |
|
|
text_blocks = pdfminer_json.get(str(pdf_page_num), []) |
|
|
start_index = 0 |
|
|
page_content_added = False |
|
|
if current_header["element_id"] is None and current_header["text_block_id"] is None: |
|
|
for index, text_block in enumerate(text_blocks): |
|
|
if is_overlapped(text_block['bbox'],current_header_bbox): |
|
|
current_header["element_id"] = text_block["element_id"] |
|
|
current_header["text_block_id"] = text_block["text_block_id"] |
|
|
start_index = index |
|
|
first_append_flag = True |
|
|
break |
|
|
|
|
|
for no_header_index, text_block in enumerate(text_blocks[start_index:], start=start_index): |
|
|
if first_append_flag: |
|
|
first_row_text = text_block['text'] |
|
|
first_row_header_text = current_header_text |
|
|
first_append_text = remove_header_from_start(first_row_text,first_row_header_text) |
|
|
current_content.append(first_append_text) |
|
|
page_content_added = True |
|
|
first_append_flag = False |
|
|
continue |
|
|
|
|
|
|
|
|
if page_content_added and pdf_page_num not in content_source_pages: |
|
|
content_source_pages.append(pdf_page_num) |
|
|
|
|
|
matched_data.append({ |
|
|
"page_number": current_header["page_number"], |
|
|
"pdf_name" : current_header_pdf_name , |
|
|
"header": current_header["header_text"], |
|
|
"label_name": current_header_type, |
|
|
"content": " ".join(current_content), |
|
|
"table_content" : current_header_table_content, |
|
|
"all_source_pages": content_source_pages |
|
|
}) |
|
|
tree_format_matched_data.append({ |
|
|
"header_page_number": current_header["page_number"], |
|
|
"label_name": current_header_type, |
|
|
'page_block_id' : current_header_page_block_id, |
|
|
"header_bbox": current_header_bbox, |
|
|
"header_page_width":current_header_page_width, |
|
|
"header_page_height": current_header_page_height, |
|
|
"header": current_header["header_text"], |
|
|
"content": " ".join(current_content), |
|
|
'tree_table_content' : current_header_tree_structure |
|
|
}) |
|
|
|
|
|
return matched_data,tree_format_matched_data |
|
|
|
|
|
def main_header_pipeline(modified_udop_json, pdfminer_json): |
|
|
modified_udop_json = adjust_page_dimensions_and_bbox(modified_udop_json, pdfminer_json) |
|
|
matched_data,tree_format_matched_data= match_headers_with_text(modified_udop_json, pdfminer_json) |
|
|
df = pd.DataFrame(matched_data) |
|
|
return df,tree_format_matched_data |
|
|
|
|
|
|
|
|
|