table_test / post_processing_v2 (1).py
Kushalguptaiitb's picture
Upload post_processing_v2 (1).py
0538136 verified
import json
import pandas as pd
def read_json(json_file):
with open(json_file, 'r', encoding='utf-8') as file:
return json.load(file)
def adjust_page_dimensions_and_bbox(modified_model_output_json, pdfminer_json):
for page_number, blocks in modified_model_output_json.items():
if page_number in pdfminer_json.keys():
if pdfminer_json[page_number]:
page_info = pdfminer_json[page_number][0]
page_width = page_info['page_width']
page_height = page_info['page_height']
for block in blocks:
original_width = block['page_img_width']
original_height = block['page_img_height']
width_scale = page_width / original_width
height_scale = page_height / original_height
block['page_img_width'] = page_width
block['page_img_height'] = page_height
block['bbox'] = [
block['bbox'][0] * width_scale,
block['bbox'][1] * height_scale,
block['bbox'][2] * width_scale,
block['bbox'][3] * height_scale
]
else:
print(f"Page {page_number} is empty.")
return modified_model_output_json
def convert_to_dataframe(extracted_df):
if isinstance(extracted_df, pd.DataFrame):
return extracted_df
elif isinstance(extracted_df, dict):
if all(isinstance(value, list) for value in extracted_df.values()):
return pd.DataFrame(extracted_df)
else:
return pd.DataFrame([extracted_df])
elif isinstance(extracted_df, list):
if all(isinstance(item, dict) for item in extracted_df):
return pd.DataFrame(extracted_df)
else:
return pd.DataFrame(extracted_df, columns=['Value'])
else:
return pd.DataFrame([extracted_df], columns=['Value'])
def calculate_centroid(bbox):
x1, y1, x2, y2 = bbox
x_center = (x1 + x2) / 2
y_center = (y1 + y2) / 2
return (x_center, y_center)
def is_within_radius(text_block_bbox, header_bbox, radius=50):
text_xmin, text_ymin, text_xmax, text_ymax = text_block_bbox
header_xmin, header_ymin, header_xmax, header_ymax = header_bbox
# Check for overlap between text_block_bbox and header_bbox
overlap_x = max(0, min(text_xmax, header_xmax) - max(text_xmin, header_xmin))
overlap_y = max(0, min(text_ymax, header_ymax) - max(text_ymin, header_ymin))
# If there is any overlap, return True
if overlap_x > 0 and overlap_y > 0:
return True
return False
def is_overlapped(text_block_bbox, header_bbox, threshold=0.20):
# Unpack bounding boxes
text_xmin, text_ymin, text_xmax, text_ymax = text_block_bbox
header_xmin, header_ymin, header_xmax, header_ymax = header_bbox
# Calculate overlap in the x and y directions
overlap_x = max(0, min(text_xmax, header_xmax) - max(text_xmin, header_xmin))
overlap_y = max(0, min(text_ymax, header_ymax) - max(text_ymin, header_ymin))
# Calculate the area of overlap
overlap_area = overlap_x * overlap_y
# Calculate the area of the text block and header
text_area = (text_xmax - text_xmin) * (text_ymax - text_ymin)
header_area = (header_xmax - header_xmin) * (header_ymax - header_ymin)
# Calculate the overlap ratio with respect to the smaller of the two areas
smaller_area = min(text_area, header_area)
overlap_ratio = overlap_area / smaller_area
# Check if the overlap ratio exceeds the threshold
if overlap_ratio > threshold:
return True
return False
def detect_header(text_block_bbox, adjusted_model_output_json, page_number ,next_header_index_in_model_udop):
text_centroid = calculate_centroid(text_block_bbox)
if str(page_number) in adjusted_model_output_json:
if next_header_index_in_model_udop is not None :
next_header_index_in_model_udop = int(next_header_index_in_model_udop)
header_block = adjusted_model_output_json[str(page_number)][next_header_index_in_model_udop]
if is_overlapped(text_block_bbox, header_block['bbox']):
return True
return False
def remove_header_from_start(first_row_text: str, first_row_header_text: str) -> str:
length_header_text = len(first_row_header_text)
return first_row_text[length_header_text:].strip()
def extract_last_header_index(all_blocks_with_indices):
last_header_index = -1
# Iterate through the list in reverse
for index in reversed(range(len(all_blocks_with_indices))):
block = all_blocks_with_indices[index]
# Check if the block is a Page-header or Section-header
if block['label_name'] in ['Page-header', 'Section-header']:
last_header_index = index
break
return last_header_index
def match_headers_with_text(adjusted_model_json, pdfminer_json):
matched_data = []
tree_format_matched_data = []
current_header = None
current_content = []
current_header_table_content = []
current_header_tree_structure = []
sorted_pages = sorted(adjusted_model_json.items(), key=lambda x: int(x[0]))
all_blocks_with_indices = []
for key, blocks in sorted_pages:
for index, block in enumerate(blocks):
if block['label_name'] in ['Page-header','Section-header','Table', "Portfolio-Company-Table"]:
block['used_model_index'] = index
all_blocks_with_indices.append(block)
for id,block in enumerate(all_blocks_with_indices):
if block['label_name'] in ['Page-header','Section-header']:
next_header_detect_flag = False
current_header_index_in_model = block['used_model_index']
current_header_bbox = block['bbox']
current_header_type = block['label_name']
current_header_centroid = calculate_centroid(block['bbox'])
current_header_page_number = block['pdf_page_id']
current_header_text = block['extracted_text'][0] if block['extracted_text'] else ""
current_header_page_width = block['page_img_width']
current_header_page_height = block['page_img_height']
current_header_page_block_id = block['page_block_id']
current_header_pdf_name = block['pdf_name']
content_source_pages = [] # Track pages where content is collected
new_start_index = id + 1
if new_start_index < len(all_blocks_with_indices):
for next_id ,next_block in enumerate(all_blocks_with_indices[new_start_index:], start = new_start_index):
if next_block['label_name'] in ['Page-header', 'Section-header']:
next_header_index_in_model_udop = next_block['used_model_index']
next_header_bbox = next_block['bbox']
next_header_centroid = calculate_centroid(next_block['bbox'])
next_header_page_number = next_block["pdf_page_id"]
next_header_text = next_block['extracted_text'][0] if next_block['extracted_text'] else ""
break
else:
next_header_bbox = None
next_header_centroid = None
next_header_page_number = None
next_header_index_in_model_udop = None
next_header_text = None
last_header_index = extract_last_header_index(all_blocks_with_indices)
if id == len(all_blocks_with_indices) - 1 or id == last_header_index:
next_header_bbox = None
next_header_centroid = None
next_header_page_number = None
next_header_index_in_model_udop = None
next_header_text = None
if current_header_text:
if current_header is not None:
current_content = []
current_header_table_content = []
current_header_tree_structure = []
current_header = {
"page_number": current_header_page_number,
"header_text": current_header_text,
"element_id": None,
"text_block_id": None
}
new_start_index = id + 1
for new_id,new_block in enumerate(all_blocks_with_indices[new_start_index:], start = new_start_index):
extracted_df_flag = False
next_block = new_block
if next_block and next_block['label_name'] in ['Page-header', 'Section-header']:
extracted_df_flag = False
break
# if next_block and next_block['label_name'] in ['Table']:
if next_block and next_block['label_name'] in ['Table', "Portfolio-Company-Table"]:
extracted_df_flag = True
extracted_df = next_block['extracted_text'][0]
if next_block["associated_table_header_info"] is not None:
extracted_df_table_header = next_block["associated_table_header_info"]['extracted_text'][0]
else:
extracted_df_table_header = None
extracted_df_new = convert_to_dataframe(extracted_df)
extracted_df_new_column_headers = extracted_df_new.columns.tolist()
extracted_df_markdown = extracted_df_new.to_csv(index=False)
table_metadata = { 'pdf_name': next_block['pdf_name'] ,
'table_page_id': next_block['pdf_page_id'],
'table_page_id_width' : next_block['page_img_width'],
'table_page_id_height': next_block['page_img_height'],
'table_bbox' : next_block['bbox']
}
table_header_pair = {
# 'label_name':'Table-header',
'label_name':next_block['label_name'],
'table_header': extracted_df_table_header,
'table_column_header' : extracted_df_new_column_headers,
'table_info': extracted_df_new,
'metadata' : table_metadata
}
tree_table_header_info = {
'label_name':'Table-header',
# 'label_name':next_block['label_name'],
'table_header_info': next_block["associated_table_header_info"],
'table_column_header' : extracted_df_new_column_headers,
'table_info': next_block
}
# current_header_table_content.append(extracted_df)
current_header_table_content.append(table_header_pair)
current_header_tree_structure.append(next_block)
last_pdf_page = int(list(pdfminer_json.keys())[-1])
first_append_flag = False
first_append_text = " "
for pdf_page_num in range(int(current_header_page_number), last_pdf_page + 1):
text_blocks = pdfminer_json.get(str(pdf_page_num), [])
start_index = 0
page_content_added = False # Track if content was added from this page
if current_header["element_id"] is None and current_header["text_block_id"] is None:
for index, text_block in enumerate(text_blocks):
if is_overlapped(text_block['bbox'],current_header_bbox):
current_header["element_id"] = text_block["element_id"]
current_header["text_block_id"] = text_block["text_block_id"]
start_index = index
first_append_flag = True
break
for next_header_index, text_block in enumerate(text_blocks[start_index:], start = start_index):
last_text_reached_flag = False
if first_append_flag:
first_row_text = text_block['text']
first_row_header_text = current_header_text
first_append_text = remove_header_from_start(first_row_text,first_row_header_text)
current_content.append(first_append_text)
page_content_added = True
first_append_flag = False
continue
if next_header_text is not None and pdf_page_num == int(next_header_page_number):
next_header_found_flag = False
if detect_header(text_block['bbox'], adjusted_model_json, next_header_page_number,next_header_index_in_model_udop):
next_header_found_flag = True
matched_data.append({
"page_number": current_header["page_number"],
"pdf_name" : current_header_pdf_name ,
"header": current_header["header_text"],
"label_name": current_header_type,
"content": " ".join(current_content),
"table_content" : current_header_table_content,
"all_source_pages": content_source_pages
})
tree_format_matched_data.append({
"header_page_number": current_header["page_number"],
"label_name":current_header_type,
'page_block_id' : current_header_page_block_id,
"header_bbox": current_header_bbox,
"header_page_width":current_header_page_width,
"header_page_height": current_header_page_height,
"header": current_header["header_text"],
"content": " ".join(current_content),
'tree_table_content' : current_header_tree_structure
})
current_content = []
current_table_content = []
current_header_tree_structure = []
next_header_detect_flag = True
break
if next_header_index == len(text_blocks) - 1:
last_text_block = text_block
if not next_header_found_flag and last_text_block:
matched_data.append({
"page_number": current_header["page_number"],
"pdf_name" : current_header_pdf_name ,
"header": current_header["header_text"],
"label_name": current_header_type,
"content": " ".join(current_content),
"table_content" : current_header_table_content,
"all_source_pages": content_source_pages
})
tree_format_matched_data.append({
"header_page_number": current_header["page_number"],
"label_name":currentHeaderType,
'page_block_id' : current_header_page_block_id,
"header_bbox": current_header_bbox,
"header_page_width":current_header_page_width,
"header_page_height": current_header_page_height,
"header": current_header["header_text"],
"content": " ".join(current_content),
'tree_table_content' : current_header_tree_structure
})
current_content = []
current_header_table_content = []
current_header_tree_structure = []
next_header_detect_flag = True
next_header_found_flag = True
break
current_content.append(text_block['text'])
page_content_added = True
if next_header_detect_flag:
break
# Add page number to source pages if content was added from this page
if page_content_added and pdf_page_num not in content_source_pages:
content_source_pages.append(pdf_page_num)
if next_header_detect_flag:
break
if next_header_text is None and next_header_page_number is None:
current_header = {
"page_number": current_header_page_number,
"header_text": current_header_text,
"element_id": None,
"text_block_id": None
}
for pdf_page_num in range(int(current_header_page_number), last_pdf_page + 1):
text_blocks = pdfminer_json.get(str(pdf_page_num), [])
start_index = 0
page_content_added = False # Track if content was added from this page
if current_header["element_id"] is None and current_header["text_block_id"] is None:
for index, text_block in enumerate(text_blocks):
if is_overlapped(text_block['bbox'],current_header_bbox):
current_header["element_id"] = text_block["element_id"]
current_header["text_block_id"] = text_block["text_block_id"]
start_index = index
first_append_flag = True
break
for no_header_index, text_block in enumerate(text_blocks[start_index:], start=start_index):
if first_append_flag:
first_row_text = text_block['text']
first_row_header_text = current_header_text
first_append_text = remove_header_from_start(first_row_text,first_row_header_text)
current_content.append(first_append_text)
page_content_added = True
first_append_flag = False
continue
# Add page number to source pages if content was added from this page
if page_content_added and pdf_page_num not in content_source_pages:
content_source_pages.append(pdf_page_num)
matched_data.append({
"page_number": current_header["page_number"],
"pdf_name" : current_header_pdf_name ,
"header": current_header["header_text"],
"label_name": current_header_type,
"content": " ".join(current_content),
"table_content" : current_header_table_content,
"all_source_pages": content_source_pages
})
tree_format_matched_data.append({
"header_page_number": current_header["page_number"],
"label_name": current_header_type,
'page_block_id' : current_header_page_block_id,
"header_bbox": current_header_bbox,
"header_page_width":current_header_page_width,
"header_page_height": current_header_page_height,
"header": current_header["header_text"],
"content": " ".join(current_content),
'tree_table_content' : current_header_tree_structure
})
return matched_data,tree_format_matched_data
def main_header_pipeline(modified_udop_json, pdfminer_json):
modified_udop_json = adjust_page_dimensions_and_bbox(modified_udop_json, pdfminer_json)
matched_data,tree_format_matched_data= match_headers_with_text(modified_udop_json, pdfminer_json)
df = pd.DataFrame(matched_data)
return df,tree_format_matched_data