import json import pandas as pd def read_json(json_file): with open(json_file, 'r', encoding='utf-8') as file: return json.load(file) def adjust_page_dimensions_and_bbox(modified_model_output_json, pdfminer_json): for page_number, blocks in modified_model_output_json.items(): if page_number in pdfminer_json.keys(): if pdfminer_json[page_number]: page_info = pdfminer_json[page_number][0] page_width = page_info['page_width'] page_height = page_info['page_height'] for block in blocks: original_width = block['page_img_width'] original_height = block['page_img_height'] width_scale = page_width / original_width height_scale = page_height / original_height block['page_img_width'] = page_width block['page_img_height'] = page_height block['bbox'] = [ block['bbox'][0] * width_scale, block['bbox'][1] * height_scale, block['bbox'][2] * width_scale, block['bbox'][3] * height_scale ] else: print(f"Page {page_number} is empty.") return modified_model_output_json def convert_to_dataframe(extracted_df): if isinstance(extracted_df, pd.DataFrame): return extracted_df elif isinstance(extracted_df, dict): if all(isinstance(value, list) for value in extracted_df.values()): return pd.DataFrame(extracted_df) else: return pd.DataFrame([extracted_df]) elif isinstance(extracted_df, list): if all(isinstance(item, dict) for item in extracted_df): return pd.DataFrame(extracted_df) else: return pd.DataFrame(extracted_df, columns=['Value']) else: return pd.DataFrame([extracted_df], columns=['Value']) def calculate_centroid(bbox): x1, y1, x2, y2 = bbox x_center = (x1 + x2) / 2 y_center = (y1 + y2) / 2 return (x_center, y_center) def is_within_radius(text_block_bbox, header_bbox, radius=50): text_xmin, text_ymin, text_xmax, text_ymax = text_block_bbox header_xmin, header_ymin, header_xmax, header_ymax = header_bbox # Check for overlap between text_block_bbox and header_bbox overlap_x = max(0, min(text_xmax, header_xmax) - max(text_xmin, header_xmin)) overlap_y = max(0, min(text_ymax, header_ymax) - max(text_ymin, header_ymin)) # If there is any overlap, return True if overlap_x > 0 and overlap_y > 0: return True return False def is_overlapped(text_block_bbox, header_bbox, threshold=0.20): # Unpack bounding boxes text_xmin, text_ymin, text_xmax, text_ymax = text_block_bbox header_xmin, header_ymin, header_xmax, header_ymax = header_bbox # Calculate overlap in the x and y directions overlap_x = max(0, min(text_xmax, header_xmax) - max(text_xmin, header_xmin)) overlap_y = max(0, min(text_ymax, header_ymax) - max(text_ymin, header_ymin)) # Calculate the area of overlap overlap_area = overlap_x * overlap_y # Calculate the area of the text block and header text_area = (text_xmax - text_xmin) * (text_ymax - text_ymin) header_area = (header_xmax - header_xmin) * (header_ymax - header_ymin) # Calculate the overlap ratio with respect to the smaller of the two areas smaller_area = min(text_area, header_area) overlap_ratio = overlap_area / smaller_area # Check if the overlap ratio exceeds the threshold if overlap_ratio > threshold: return True return False def detect_header(text_block_bbox, adjusted_model_output_json, page_number ,next_header_index_in_model_udop): text_centroid = calculate_centroid(text_block_bbox) if str(page_number) in adjusted_model_output_json: if next_header_index_in_model_udop is not None : next_header_index_in_model_udop = int(next_header_index_in_model_udop) header_block = adjusted_model_output_json[str(page_number)][next_header_index_in_model_udop] if is_overlapped(text_block_bbox, header_block['bbox']): return True return False def remove_header_from_start(first_row_text: str, first_row_header_text: str) -> str: length_header_text = len(first_row_header_text) return first_row_text[length_header_text:].strip() def extract_last_header_index(all_blocks_with_indices): last_header_index = -1 # Iterate through the list in reverse for index in reversed(range(len(all_blocks_with_indices))): block = all_blocks_with_indices[index] # Check if the block is a Page-header or Section-header if block['label_name'] in ['Page-header', 'Section-header']: last_header_index = index break return last_header_index def match_headers_with_text(adjusted_model_json, pdfminer_json): matched_data = [] tree_format_matched_data = [] current_header = None current_content = [] current_header_table_content = [] current_header_tree_structure = [] sorted_pages = sorted(adjusted_model_json.items(), key=lambda x: int(x[0])) all_blocks_with_indices = [] for key, blocks in sorted_pages: for index, block in enumerate(blocks): if block['label_name'] in ['Page-header','Section-header','Table', "Portfolio-Company-Table"]: block['used_model_index'] = index all_blocks_with_indices.append(block) for id,block in enumerate(all_blocks_with_indices): if block['label_name'] in ['Page-header','Section-header']: next_header_detect_flag = False current_header_index_in_model = block['used_model_index'] current_header_bbox = block['bbox'] current_header_type = block['label_name'] current_header_centroid = calculate_centroid(block['bbox']) current_header_page_number = block['pdf_page_id'] current_header_text = block['extracted_text'][0] if block['extracted_text'] else "" current_header_page_width = block['page_img_width'] current_header_page_height = block['page_img_height'] current_header_page_block_id = block['page_block_id'] current_header_pdf_name = block['pdf_name'] content_source_pages = [] # Track pages where content is collected new_start_index = id + 1 if new_start_index < len(all_blocks_with_indices): for next_id ,next_block in enumerate(all_blocks_with_indices[new_start_index:], start = new_start_index): if next_block['label_name'] in ['Page-header', 'Section-header']: next_header_index_in_model_udop = next_block['used_model_index'] next_header_bbox = next_block['bbox'] next_header_centroid = calculate_centroid(next_block['bbox']) next_header_page_number = next_block["pdf_page_id"] next_header_text = next_block['extracted_text'][0] if next_block['extracted_text'] else "" break else: next_header_bbox = None next_header_centroid = None next_header_page_number = None next_header_index_in_model_udop = None next_header_text = None last_header_index = extract_last_header_index(all_blocks_with_indices) if id == len(all_blocks_with_indices) - 1 or id == last_header_index: next_header_bbox = None next_header_centroid = None next_header_page_number = None next_header_index_in_model_udop = None next_header_text = None if current_header_text: if current_header is not None: current_content = [] current_header_table_content = [] current_header_tree_structure = [] current_header = { "page_number": current_header_page_number, "header_text": current_header_text, "element_id": None, "text_block_id": None } new_start_index = id + 1 for new_id,new_block in enumerate(all_blocks_with_indices[new_start_index:], start = new_start_index): extracted_df_flag = False next_block = new_block if next_block and next_block['label_name'] in ['Page-header', 'Section-header']: extracted_df_flag = False break # if next_block and next_block['label_name'] in ['Table']: if next_block and next_block['label_name'] in ['Table', "Portfolio-Company-Table"]: extracted_df_flag = True extracted_df = next_block['extracted_text'][0] if next_block["associated_table_header_info"] is not None: extracted_df_table_header = next_block["associated_table_header_info"]['extracted_text'][0] else: extracted_df_table_header = None extracted_df_new = convert_to_dataframe(extracted_df) extracted_df_new_column_headers = extracted_df_new.columns.tolist() extracted_df_markdown = extracted_df_new.to_csv(index=False) table_metadata = { 'pdf_name': next_block['pdf_name'] , 'table_page_id': next_block['pdf_page_id'], 'table_page_id_width' : next_block['page_img_width'], 'table_page_id_height': next_block['page_img_height'], 'table_bbox' : next_block['bbox'] } table_header_pair = { # 'label_name':'Table-header', 'label_name':next_block['label_name'], 'table_header': extracted_df_table_header, 'table_column_header' : extracted_df_new_column_headers, 'table_info': extracted_df_new, 'metadata' : table_metadata } tree_table_header_info = { 'label_name':'Table-header', # 'label_name':next_block['label_name'], 'table_header_info': next_block["associated_table_header_info"], 'table_column_header' : extracted_df_new_column_headers, 'table_info': next_block } # current_header_table_content.append(extracted_df) current_header_table_content.append(table_header_pair) current_header_tree_structure.append(next_block) last_pdf_page = int(list(pdfminer_json.keys())[-1]) first_append_flag = False first_append_text = " " for pdf_page_num in range(int(current_header_page_number), last_pdf_page + 1): text_blocks = pdfminer_json.get(str(pdf_page_num), []) start_index = 0 page_content_added = False # Track if content was added from this page if current_header["element_id"] is None and current_header["text_block_id"] is None: for index, text_block in enumerate(text_blocks): if is_overlapped(text_block['bbox'],current_header_bbox): current_header["element_id"] = text_block["element_id"] current_header["text_block_id"] = text_block["text_block_id"] start_index = index first_append_flag = True break for next_header_index, text_block in enumerate(text_blocks[start_index:], start = start_index): last_text_reached_flag = False if first_append_flag: first_row_text = text_block['text'] first_row_header_text = current_header_text first_append_text = remove_header_from_start(first_row_text,first_row_header_text) current_content.append(first_append_text) page_content_added = True first_append_flag = False continue if next_header_text is not None and pdf_page_num == int(next_header_page_number): next_header_found_flag = False if detect_header(text_block['bbox'], adjusted_model_json, next_header_page_number,next_header_index_in_model_udop): next_header_found_flag = True matched_data.append({ "page_number": current_header["page_number"], "pdf_name" : current_header_pdf_name , "header": current_header["header_text"], "label_name": current_header_type, "content": " ".join(current_content), "table_content" : current_header_table_content, "all_source_pages": content_source_pages }) tree_format_matched_data.append({ "header_page_number": current_header["page_number"], "label_name":current_header_type, 'page_block_id' : current_header_page_block_id, "header_bbox": current_header_bbox, "header_page_width":current_header_page_width, "header_page_height": current_header_page_height, "header": current_header["header_text"], "content": " ".join(current_content), 'tree_table_content' : current_header_tree_structure }) current_content = [] current_table_content = [] current_header_tree_structure = [] next_header_detect_flag = True break if next_header_index == len(text_blocks) - 1: last_text_block = text_block if not next_header_found_flag and last_text_block: matched_data.append({ "page_number": current_header["page_number"], "pdf_name" : current_header_pdf_name , "header": current_header["header_text"], "label_name": current_header_type, "content": " ".join(current_content), "table_content" : current_header_table_content, "all_source_pages": content_source_pages }) tree_format_matched_data.append({ "header_page_number": current_header["page_number"], "label_name":currentHeaderType, 'page_block_id' : current_header_page_block_id, "header_bbox": current_header_bbox, "header_page_width":current_header_page_width, "header_page_height": current_header_page_height, "header": current_header["header_text"], "content": " ".join(current_content), 'tree_table_content' : current_header_tree_structure }) current_content = [] current_header_table_content = [] current_header_tree_structure = [] next_header_detect_flag = True next_header_found_flag = True break current_content.append(text_block['text']) page_content_added = True if next_header_detect_flag: break # Add page number to source pages if content was added from this page if page_content_added and pdf_page_num not in content_source_pages: content_source_pages.append(pdf_page_num) if next_header_detect_flag: break if next_header_text is None and next_header_page_number is None: current_header = { "page_number": current_header_page_number, "header_text": current_header_text, "element_id": None, "text_block_id": None } for pdf_page_num in range(int(current_header_page_number), last_pdf_page + 1): text_blocks = pdfminer_json.get(str(pdf_page_num), []) start_index = 0 page_content_added = False # Track if content was added from this page if current_header["element_id"] is None and current_header["text_block_id"] is None: for index, text_block in enumerate(text_blocks): if is_overlapped(text_block['bbox'],current_header_bbox): current_header["element_id"] = text_block["element_id"] current_header["text_block_id"] = text_block["text_block_id"] start_index = index first_append_flag = True break for no_header_index, text_block in enumerate(text_blocks[start_index:], start=start_index): if first_append_flag: first_row_text = text_block['text'] first_row_header_text = current_header_text first_append_text = remove_header_from_start(first_row_text,first_row_header_text) current_content.append(first_append_text) page_content_added = True first_append_flag = False continue # Add page number to source pages if content was added from this page if page_content_added and pdf_page_num not in content_source_pages: content_source_pages.append(pdf_page_num) matched_data.append({ "page_number": current_header["page_number"], "pdf_name" : current_header_pdf_name , "header": current_header["header_text"], "label_name": current_header_type, "content": " ".join(current_content), "table_content" : current_header_table_content, "all_source_pages": content_source_pages }) tree_format_matched_data.append({ "header_page_number": current_header["page_number"], "label_name": current_header_type, 'page_block_id' : current_header_page_block_id, "header_bbox": current_header_bbox, "header_page_width":current_header_page_width, "header_page_height": current_header_page_height, "header": current_header["header_text"], "content": " ".join(current_content), 'tree_table_content' : current_header_tree_structure }) return matched_data,tree_format_matched_data def main_header_pipeline(modified_udop_json, pdfminer_json): modified_udop_json = adjust_page_dimensions_and_bbox(modified_udop_json, pdfminer_json) matched_data,tree_format_matched_data= match_headers_with_text(modified_udop_json, pdfminer_json) df = pd.DataFrame(matched_data) return df,tree_format_matched_data