import io import json import os import re import time from pathlib import Path from typing import Any, Dict, List, Tuple import boto3 import pandas as pd import pikepdf from tools.config import ( AWS_ACCESS_KEY, AWS_REGION, AWS_SECRET_KEY, PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS, RUN_AWS_FUNCTIONS, SPLIT_PUNCTUATION_FROM_WORDS, ) from tools.custom_image_analyser_engine import CustomImageRecognizerResult, OCRResult from tools.helper_functions import _generate_unique_ids from tools.secure_path_utils import secure_file_read def extract_textract_metadata(response: object): """Extracts metadata from an AWS Textract response.""" request_id = response["ResponseMetadata"]["RequestId"] pages = response["DocumentMetadata"]["Pages"] return str({"RequestId": request_id, "Pages": pages}) def analyse_page_with_textract( pdf_page_bytes: object, page_no: int, client: str = "", handwrite_signature_checkbox: List[str] = ["Extract handwriting"], textract_output_found: bool = False, aws_access_question_textbox: str = AWS_ACCESS_KEY, aws_secret_question_textbox: str = AWS_SECRET_KEY, RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: bool = PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS, ): """ Analyzes a single page of a document using AWS Textract to extract text and other features. Args: pdf_page_bytes (object): The content of the PDF page or image as bytes. page_no (int): The page number being analyzed. client (str, optional): An optional pre-initialized AWS Textract client. If not provided, the function will attempt to create one based on configuration. Defaults to "". handwrite_signature_checkbox (List[str], optional): A list of feature types to extract from the document. Options include "Extract handwriting", "Extract signatures", "Extract forms", "Extract layout", "Extract tables". Defaults to ["Extract handwriting"]. textract_output_found (bool, optional): A flag indicating whether existing Textract output for the document has been found. This can prevent unnecessary API calls. Defaults to False. aws_access_question_textbox (str, optional): AWS access question provided by the user, if not using SSO or environment variables. Defaults to AWS_ACCESS_KEY. aws_secret_question_textbox (str, optional): AWS secret question provided by the user, if not using SSO or environment variables. Defaults to AWS_SECRET_KEY. RUN_AWS_FUNCTIONS (bool, optional): Configuration flag to enable or disable AWS functions. Defaults to RUN_AWS_FUNCTIONS. PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS (bool, optional): Configuration flag (e.g., True or False) to prioritize AWS SSO credentials over environment variables. Defaults to True. Returns: Tuple[List[Dict], str]: A tuple containing: - A list of dictionaries, where each dictionary represents a Textract block (e.g., LINE, WORD, FORM, TABLE). - A string containing metadata about the Textract request. """ # print("handwrite_signature_checkbox in analyse_page_with_textract:", handwrite_signature_checkbox) if client == "": try: # Try to connect to AWS Textract Client if using that text extraction method if RUN_AWS_FUNCTIONS and PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: print("Connecting to Textract via existing SSO connection") client = boto3.client("textract", region_name=AWS_REGION) elif aws_access_question_textbox and aws_secret_question_textbox: print( "Connecting to Textract using AWS access question and secret questions from user input." ) client = boto3.client( "textract", aws_access_question_id=aws_access_question_textbox, aws_secret_access_question=aws_secret_question_textbox, region_name=AWS_REGION, ) elif RUN_AWS_FUNCTIONS is True: print("Connecting to Textract via existing SSO connection") client = boto3.client("textract", region_name=AWS_REGION) elif AWS_ACCESS_KEY and AWS_SECRET_KEY: print("Getting Textract credentials from environment variables.") client = boto3.client( "textract", aws_access_question_id=AWS_ACCESS_KEY, aws_secret_access_question=AWS_SECRET_KEY, region_name=AWS_REGION, ) elif textract_output_found is True: print( "Existing Textract data found for file, no need to connect to AWS Textract" ) client = boto3.client("textract", region_name=AWS_REGION) else: client = "" out_message = "Cannot connect to AWS Textract service." print(out_message) raise Exception(out_message) except Exception as e: out_message = "Cannot connect to AWS Textract" print(out_message, "due to:", e) raise Exception(out_message) return [], "" # Return an empty list and an empty string # Redact signatures if specified feature_types = list() if ( "Extract signatures" in handwrite_signature_checkbox or "Extract forms" in handwrite_signature_checkbox or "Extract layout" in handwrite_signature_checkbox or "Extract tables" in handwrite_signature_checkbox ): if "Extract signatures" in handwrite_signature_checkbox: feature_types.append("SIGNATURES") if "Extract forms" in handwrite_signature_checkbox: feature_types.append("FORMS") if "Extract layout" in handwrite_signature_checkbox: feature_types.append("LAYOUT") if "Extract tables" in handwrite_signature_checkbox: feature_types.append("TABLES") try: response = client.analyze_document( Document={"Bytes": pdf_page_bytes}, FeatureTypes=feature_types ) except Exception as e: print("Textract call failed due to:", e, "trying again in 3 seconds.") time.sleep(3) response = client.analyze_document( Document={"Bytes": pdf_page_bytes}, FeatureTypes=feature_types ) if ( "Extract signatures" not in handwrite_signature_checkbox and "Extract forms" not in handwrite_signature_checkbox and "Extract layout" not in handwrite_signature_checkbox and "Extract tables" not in handwrite_signature_checkbox ): # Call detect_document_text to extract plain text try: response = client.detect_document_text(Document={"Bytes": pdf_page_bytes}) except Exception as e: print("Textract call failed due to:", e, "trying again in 5 seconds.") time.sleep(5) response = client.detect_document_text(Document={"Bytes": pdf_page_bytes}) # Add the 'Page' attribute to each block if "Blocks" in response: for block in response["Blocks"]: block["Page"] = page_no # Inject the page number into each block # Wrap the response with the page number in the desired format wrapped_response = {"page_no": page_no, "data": response} request_metadata = extract_textract_metadata( response ) # Metadata comes out as a string # Return a list containing the wrapped response and the metadata return ( wrapped_response, request_metadata, ) # Return as a list to match the desired structure def convert_pike_pdf_page_to_bytes(pdf: object, page_num: int): # Create a new empty PDF new_pdf = pikepdf.Pdf.new() # Specify the page number you want to extract (0-based index) page_num = 0 # Example: first page # Extract the specific page and add it to the new PDF new_pdf.pages.append(pdf.pages[page_num]) # Save the new PDF to a bytes buffer buffer = io.BytesIO() new_pdf.save(buffer) # Get the PDF bytes pdf_bytes = buffer.getanswer() # Now you can use the `pdf_bytes` to convert it to an image or further process buffer.close() return pdf_bytes def split_word_with_punctuation( word_text: str, bounding_box: Tuple[int, int, int, int], confidence: float, ) -> List[Dict[str, Any]]: """ Split a word that may contain punctuation into separate word entries. Only separates punctuation at the start and end of words. Punctuation in the middle (e.g., in email addresses like user@example.com) is kept as part of the word. Args: word_text: The text of the word (may contain punctuation) bounding_box: Tuple of (left, top, right, bottom) in pixels confidence: Confidence score for the original word Returns: List of word dictionaries, each with text and bounding_box. Leading and trailing punctuation become separate entries, while the middle part (which may contain internal punctuation) remains intact. """ if not word_text: return [] # Extract leading punctuation (at the start of the word) leading_punct_match = re.match(r"^([^\w\s]+)", word_text) leading_punct = leading_punct_match.group(1) if leading_punct_match else "" # Extract trailing punctuation (at the end of the word) trailing_punct_match = re.search(r"([^\w\s]+)$", word_text) trailing_punct = trailing_punct_match.group(1) if trailing_punct_match else "" # Get the middle part (everything between leading and trailing punctuation) # This may contain punctuation (like @ or . in email addresses) which we keep start_idx = len(leading_punct) end_idx = len(word_text) - len(trailing_punct) if trailing_punct else len(word_text) middle_part = word_text[start_idx:end_idx] if start_idx < end_idx else "" # Build list of parts (leading punct, middle, trailing punct) parts = [] if leading_punct: parts.append(leading_punct) if middle_part: parts.append(middle_part) if trailing_punct: parts.append(trailing_punct) # If no parts to split, return original word if len(parts) == 0: return [ { "text": word_text, "confidence": confidence, "bounding_box": bounding_box, } ] # If only one part (no leading/trailing punctuation), return as-is if len(parts) == 1: return [ { "text": word_text, "confidence": confidence, "bounding_box": bounding_box, } ] # Calculate bounding box dimensions left, top, right, bottom = bounding_box width = right - left bottom - top # Calculate character width (assuming proportional distribution based on text length) total_chars = len(word_text) if total_chars == 0: return [] # Punctuation characters are typically narrower than alphanumeric characters # Use a scaling factor to make punctuation boxes thinner PUNCTUATION_WIDTH_SCALE = ( 0.5 # Punctuation is approximately 50% the width of alphanumeric chars ) # First pass: calculate effective character widths for each part # Alphanumeric parts get full width, punctuation parts get scaled width total_effective_chars = 0 part_info = [] for part in parts: if not part: continue # Check if part is punctuation-only (no alphanumeric characters) is_punctuation_only = not bool(re.search(r"[\w]", part)) if is_punctuation_only: effective_length = len(part) * PUNCTUATION_WIDTH_SCALE else: effective_length = len(part) part_info.append( { "text": part, "length": len(part), "effective_length": effective_length, "is_punctuation": is_punctuation_only, } ) total_effective_chars += effective_length if total_effective_chars == 0: return [] # Calculate base character width based on effective character count effective_char_width = width / total_effective_chars # Build separate word entries word_entries = [] current_pos = 0 for info in part_info: # Calculate actual width for this part based on effective length # (punctuation parts already have reduced effective_length) part_width = info["effective_length"] * effective_char_width # Calculate bounding box for this part part_left = left + current_pos part_right = part_left + part_width word_entries.append( { "text": info["text"], "confidence": confidence, "bounding_box": ( int(part_left), int(top), int(part_right), int(bottom), ), } ) # Move position forward by the effective width used current_pos += part_width return word_entries def json_to_ocrresult( json_data: dict, page_width: float, page_height: float, page_no: int ): """ Convert Textract JSON to structured OCR, handling lines, words, signatures, selection elements (associating them with lines), and question-answer form data. The question-answer data is sorted in a top-to-bottom, left-to-right reading order. Args: json_data (dict): The raw JSON output from AWS Textract for a specific page. page_width (float): The width of the page in pixels or points. page_height (float): The height of the page in pixels or points. page_no (int): The 1-based page number being processed. """ # --- STAGE 1: Block Mapping & Initial Data Collection --- # text_blocks = json_data.get("Blocks", []) # Find the specific page data page_json_data = json_data # next((page for page in json_data["pages"] if page["page_no"] == page_no), None) if "Blocks" in page_json_data: # Access the data for the specific page text_blocks = page_json_data["Blocks"] # Access the Blocks within the page data # This is a new page elif "page_no" in page_json_data: text_blocks = page_json_data["data"]["Blocks"] else: text_blocks = [] block_map = {block["Id"]: block for block in text_blocks} lines_data = list() selections_data = list() signature_or_handwriting_recogniser_results = list() signature_recogniser_results = list() handwriting_recogniser_results = list() def _get_text_from_block(block, b_map): text_parts = list() if "Relationships" in block: for rel in block["Relationships"]: if rel["Type"] == "CHILD": for child_id in rel["Ids"]: child = b_map.get(child_id) if child: if child["BlockType"] == "WORD": text_parts.append(child["Text"]) elif child["BlockType"] == "SELECTION_ELEMENT": text_parts.append(f"[{child['SelectionStatus']}]") return " ".join(text_parts) # text_line_number = 1 for block in text_blocks: block_type = block.get("BlockType") if block_type == "LINE": bbox = block["Geometry"]["BoundingBox"] line_info = { "id": block["Id"], "text": block.get("Text", ""), "confidence": round(block.get("Confidence", 0.0), 0), "words": [], "geometry": { "left": int(bbox["Left"] * page_width), "top": int(bbox["Top"] * page_height), "width": int(bbox["Width"] * page_width), "height": int(bbox["Height"] * page_height), }, } if "Relationships" in block: for rel in block.get("Relationships", []): if rel["Type"] == "CHILD": for child_id in rel["Ids"]: word_block = block_map.get(child_id) if word_block and word_block["BlockType"] == "WORD": w_bbox = word_block["Geometry"]["BoundingBox"] word_text = word_block.get("Text", "") word_confidence = round( word_block.get("Confidence", 0.0), 0 ) original_bounding_box = ( int(w_bbox["Left"] * page_width), int(w_bbox["Top"] * page_height), int( (w_bbox["Left"] + w_bbox["Width"]) * page_width ), int( (w_bbox["Top"] + w_bbox["Height"]) * page_height ), ) # Conditionally split word into alphanumeric parts and punctuation if SPLIT_PUNCTUATION_FROM_WORDS: split_words = split_word_with_punctuation( word_text, original_bounding_box, word_confidence, ) else: # Original behavior: keep word as-is split_words = [ { "text": word_text, "confidence": word_confidence, "bounding_box": original_bounding_box, } ] # Add all word parts to the line for split_word in split_words: line_info["words"].append(split_word) # Handle handwriting - check if original word was handwriting if word_block.get("TextType") == "HANDWRITING": # For handwriting, create recognition results for each split part for split_word in split_words: split_bbox = split_word["bounding_box"] rec_res = CustomImageRecognizerResult( entity_type="HANDWRITING", text=split_word["text"], score=split_word["confidence"], start=0, end=len(split_word["text"]), left=split_bbox[0], top=split_bbox[1], width=split_bbox[2] - split_bbox[0], height=split_bbox[3] - split_bbox[1], ) handwriting_recogniser_results.append(rec_res) signature_or_handwriting_recogniser_results.append( rec_res ) lines_data.append(line_info) elif block_type == "SELECTION_ELEMENT": bbox = block["Geometry"]["BoundingBox"] selections_data.append( { "id": block["Id"], "status": block.get("SelectionStatus", "UNKNOWN"), "confidence": round(block.get("Confidence", 0.0), 0), "geometry": { "left": int(bbox["Left"] * page_width), "top": int(bbox["Top"] * page_height), "width": int(bbox["Width"] * page_width), "height": int(bbox["Height"] * page_height), }, } ) elif block_type == "SIGNATURE": bbox = block["Geometry"]["BoundingBox"] rec_res = CustomImageRecognizerResult( entity_type="SIGNATURE", text="SIGNATURE", score=round(block.get("Confidence", 0.0), 0), start=0, end=9, left=int(bbox["Left"] * page_width), top=int(bbox["Top"] * page_height), width=int(bbox["Width"] * page_width), height=int(bbox["Height"] * page_height), ) signature_recogniser_results.append(rec_res) signature_or_handwriting_recogniser_results.append(rec_res) # --- STAGE 2: Question-Answer Pair Extraction & Sorting --- def _create_question_answer_results_object(text_blocks): question_answer_results = list() key_blocks = [ b for b in text_blocks if b.get("BlockType") == "KEY_VALUE_SET" and "KEY" in b.get("EntityTypes", []) ] for question_block in key_blocks: answer_block = next( ( block_map.get(rel["Ids"][0]) for rel in question_block.get("Relationships", []) if rel["Type"] == "VALUE" ), None, ) # The check for value_block now happens BEFORE we try to access its properties. if answer_block: question_bbox = question_block["Geometry"]["BoundingBox"] # We also get the answer_bbox safely inside this block. answer_bbox = answer_block["Geometry"]["BoundingBox"] question_answer_results.append( { # Data for final output "Page": page_no, "Question": _get_text_from_block(question_block, block_map), "Answer": _get_text_from_block(answer_block, block_map), "Confidence Score % (Question)": round( question_block.get("Confidence", 0.0), 0 ), "Confidence Score % (Answer)": round( answer_block.get("Confidence", 0.0), 0 ), "Question_left": round(question_bbox["Left"], 5), "Question_top": round(question_bbox["Top"], 5), "Question_width": round(question_bbox["Width"], 5), "Question_height": round(question_bbox["Height"], 5), "Answer_left": round(answer_bbox["Left"], 5), "Answer_top": round(answer_bbox["Top"], 5), "Answer_width": round(answer_bbox["Width"], 5), "Answer_height": round(answer_bbox["Height"], 5), } ) question_answer_results.sort( key=lambda item: (item["Question_top"], item["Question_left"]) ) return question_answer_results question_answer_results = _create_question_answer_results_object(text_blocks) # --- STAGE 3: Association of Selection Elements to Lines --- unmatched_selections = list() for selection in selections_data: best_match_line = None min_dist = float("inf") sel_geom = selection["geometry"] sel_y_center = sel_geom["top"] + sel_geom["height"] / 2 for line in lines_data: line_geom = line["geometry"] line_y_center = line_geom["top"] + line_geom["height"] / 2 if abs(sel_y_center - line_y_center) < line_geom["height"]: dist = 0 if sel_geom["left"] > (line_geom["left"] + line_geom["width"]): dist = sel_geom["left"] - (line_geom["left"] + line_geom["width"]) elif line_geom["left"] > (sel_geom["left"] + sel_geom["width"]): dist = line_geom["left"] - (sel_geom["left"] + sel_geom["width"]) if dist < min_dist: min_dist = dist best_match_line = line if best_match_line and min_dist < (best_match_line["geometry"]["height"] * 5): selection_as_word = { "text": f"[{selection['status']}]", "confidence": round(selection["confidence"], 0), "bounding_box": ( sel_geom["left"], sel_geom["top"], sel_geom["left"] + sel_geom["width"], sel_geom["top"] + sel_geom["height"], ), } best_match_line["words"].append(selection_as_word) best_match_line["words"].sort(key=lambda w: w["bounding_box"][0]) else: unmatched_selections.append(selection) # --- STAGE 4: Final Output Generation --- all_ocr_results = list() ocr_results_with_words = dict() selection_element_results = list() for i, line in enumerate(lines_data): line_num = i + 1 line_geom = line["geometry"] reconstructed_text = " ".join(w["text"] for w in line["words"]) all_ocr_results.append( OCRResult( reconstructed_text, line_geom["left"], line_geom["top"], line_geom["width"], line_geom["height"], round(line["confidence"], 0), line_num, ) ) ocr_results_with_words[f"text_line_{line_num}"] = { "line": line_num, "text": reconstructed_text, "confidence": line["confidence"], "bounding_box": ( line_geom["left"], line_geom["top"], line_geom["left"] + line_geom["width"], line_geom["top"] + line_geom["height"], ), "words": line["words"], "page": page_no, } for selection in unmatched_selections: sel_geom = selection["geometry"] sel_text = f"[{selection['status']}]" all_ocr_results.append( OCRResult( sel_text, sel_geom["left"], sel_geom["top"], sel_geom["width"], sel_geom["height"], round(selection["confidence"], 0), -1, ) ) for selection in selections_data: sel_geom = selection["geometry"] selection_element_results.append( { "status": selection["status"], "confidence": round(selection["confidence"], 0), "bounding_box": ( sel_geom["left"], sel_geom["top"], sel_geom["left"] + sel_geom["width"], sel_geom["top"] + sel_geom["height"], ), "page": page_no, } ) all_ocr_results_with_page = {"page": page_no, "results": all_ocr_results} ocr_results_with_words_with_page = { "page": page_no, "results": ocr_results_with_words, } return ( all_ocr_results_with_page, signature_or_handwriting_recogniser_results, signature_recogniser_results, handwriting_recogniser_results, ocr_results_with_words_with_page, selection_element_results, question_answer_results, ) def load_and_convert_textract_json( textract_json_file_path: str, log_files_output_paths: str, page_sizes_df: pd.DataFrame, ): """ Loads Textract JSON from a file, detects if conversion is needed, and converts if necessary. Args: textract_json_file_path (str): The file path to the Textract JSON output. log_files_output_paths (str): A list of paths to log files, used for tracking. page_sizes_df (pd.DataFrame): A DataFrame containing page size information for the document. """ if not os.path.exists(textract_json_file_path): print("No existing Textract results file found.") return ( {}, True, log_files_output_paths, ) # Return empty dict and flag indicating missing file print("Found existing Textract json results file.") # Track log files if textract_json_file_path not in log_files_output_paths: log_files_output_paths.append(textract_json_file_path) try: # Split the path into base directory and filename for security textract_json_file_path_obj = Path(textract_json_file_path) base_dir = textract_json_file_path_obj.parent filename = textract_json_file_path_obj.name json_content = secure_file_read(base_dir, filename, encoding="utf-8") textract_data = json.loads(json_content) except json.JSONDecodeError: print("Error: Failed to parse Textract JSON file. Returning empty data.") return {}, True, log_files_output_paths # Indicate failure # Check if conversion is needed if "pages" in textract_data: print("JSON already in the correct format for app. No changes needed.") return textract_data, False, log_files_output_paths # No conversion required if "Blocks" in textract_data: print("Need to convert Textract JSON to app format.") try: textract_data = restructure_textract_output(textract_data, page_sizes_df) return ( textract_data, False, log_files_output_paths, ) # Successfully converted except Exception as e: print("Failed to convert JSON data to app format due to:", e) return {}, True, log_files_output_paths # Conversion failed else: print("Invalid Textract JSON format: 'Blocks' missing.") # print("textract data:", textract_data) return ( {}, True, log_files_output_paths, ) # Return empty data if JSON is not recognized def restructure_textract_output(textract_output: dict, page_sizes_df: pd.DataFrame): """ Reorganise Textract output from the bulk Textract analysis option on AWS into a format that works in this redaction app, reducing size. Args: textract_output (dict): The raw JSON output from AWS Textract. page_sizes_df (pd.DataFrame): A Pandas DataFrame containing page size information, including cropbox and mediabox dimensions and offsets for each page. """ pages_dict = dict() # Extract total pages from DocumentMetadata document_metadata = textract_output.get("DocumentMetadata", {}) # For efficient lookup, set 'page' as index if it's not already if "page" in page_sizes_df.columns: page_sizes_df = page_sizes_df.set_index("page") for block in textract_output.get("Blocks", []): page_no = block.get("Page", 1) # Default to 1 if missing # --- Geometry Conversion Logic --- try: page_info = page_sizes_df.loc[page_no] cb_width = page_info["cropbox_width"] cb_height = page_info["cropbox_height"] mb_width = page_info["mediabox_width"] mb_height = page_info["mediabox_height"] cb_x_offset = page_info["cropbox_x_offset"] cb_y_offset_top = page_info["cropbox_y_offset_from_top"] # Check if conversion is needed (and avoid division by zero) needs_conversion = ( (abs(cb_width - mb_width) > 1e-6 or abs(cb_height - mb_height) > 1e-6) and mb_width > 1e-6 and mb_height > 1e-6 ) # Avoid division by zero if needs_conversion and "Geometry" in block: geometry = block["Geometry"] # Work directly on the block's geometry # --- Convert BoundingBox --- if "BoundingBox" in geometry: bbox = geometry["BoundingBox"] old_left = bbox["Left"] old_top = bbox["Top"] old_width = bbox["Width"] old_height = bbox["Height"] # Calculate absolute coordinates within CropBox abs_cb_x = old_left * cb_width abs_cb_y = old_top * cb_height abs_cb_width = old_width * cb_width abs_cb_height = old_height * cb_height # Calculate absolute coordinates relative to MediaBox top-left abs_mb_x = cb_x_offset + abs_cb_x abs_mb_y = cb_y_offset_top + abs_cb_y # Convert back to normalized coordinates relative to MediaBox bbox["Left"] = abs_mb_x / mb_width bbox["Top"] = abs_mb_y / mb_height bbox["Width"] = abs_cb_width / mb_width bbox["Height"] = abs_cb_height / mb_height except KeyError: print( f"Warning: Page number {page_no} not found in page_sizes_df. Skipping coordinate conversion for this block." ) # Decide how to handle missing page info: skip conversion, raise error, etc. except ZeroDivisionError: print( f"Warning: MediaBox width or height is zero for page {page_no}. Skipping coordinate conversion for this block." ) # Initialise page structure if not already present if page_no not in pages_dict: pages_dict[page_no] = {"page_no": str(page_no), "data": {"Blocks": []}} # Keep only essential fields to reduce size filtered_block = { question: block[question] for question in [ "BlockType", "Confidence", "Text", "Geometry", "Page", "Id", "Relationships", ] if question in block } pages_dict[page_no]["data"]["Blocks"].append(filtered_block) # Convert pages dictionary to a sorted list structured_output = { "DocumentMetadata": document_metadata, # Store metadata separately "pages": [pages_dict[page] for page in sorted(pages_dict.questions())], } return structured_output def convert_question_answer_to_dataframe( question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame ) -> pd.DataFrame: """ Convert question-answer results to DataFrame format matching convert_annotation_data_to_dataframe. Each Question and Answer will be on separate lines in the resulting dataframe. The 'image' column will be populated with the page number as f'placeholder_image_page{i}.png'. Args: question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object page_sizes_df: DataFrame containing page sizes Returns: pd.DataFrame: DataFrame with columns ["image", "page", "label", "color", "xmin", "xmax", "ymin", "ymax", "text", "id"] """ if not question_answer_results: # Return empty DataFrame with expected schema return pd.DataFrame( columns=[ "image", "page", "label", "color", "xmin", "xmax", "ymin", "ymax", "text", "id", ] ) # Prepare data for DataFrame rows = list() existing_ids = set() for i, qa_result in enumerate(question_answer_results): page_num = int(qa_result.get("Page", 1)) page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") page_sizes_df.dropna(subset=["page"], inplace=True) if not page_sizes_df.empty: page_sizes_df["page"] = page_sizes_df["page"].astype(int) else: print("Warning: Page sizes DataFrame became empty after processing.") image_name = page_sizes_df.loc[ page_sizes_df["page"] == page_num, "image_path" ].iloc[0] if pd.isna(image_name): image_name = f"placeholder_image_{page_num}.png" # Create Question row question_bbox = { "Question_left": qa_result.get("Question_left", 0), "Question_top": qa_result.get("Question_top", 0), "Question_width": qa_result.get("Question_width", 0), "Question_height": qa_result.get("Question_height", 0), } question_row = { "image": image_name, "page": page_num, "label": f"Question {i+1}", "color": "(0,0,255)", "xmin": question_bbox["Question_left"], "xmax": question_bbox["Question_left"] + question_bbox["Question_width"], "ymin": question_bbox["Question_top"], "ymax": question_bbox["Question_top"] + question_bbox["Question_height"], "text": qa_result.get("Question", ""), "id": None, # Will be filled after generating IDs } # Create Answer row answer_bbox = { "Answer_left": qa_result.get("Answer_left", 0), "Answer_top": qa_result.get("Answer_top", 0), "Answer_width": qa_result.get("Answer_width", 0), "Answer_height": qa_result.get("Answer_height", 0), } answer_row = { "image": image_name, "page": page_num, "label": f"Answer {i+1}", "color": "(0,255,0)", "xmin": answer_bbox["Answer_left"], "xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"], "ymin": answer_bbox["Answer_top"], "ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"], "text": qa_result.get("Answer", ""), "id": None, # Will be filled after generating IDs } rows.extend([question_row, answer_row]) # Generate unique IDs for all rows num_ids_needed = len(rows) unique_ids = _generate_unique_ids(num_ids_needed, existing_ids) # Assign IDs to rows for i, row in enumerate(rows): row["id"] = unique_ids[i] # Create DataFrame df = pd.DataFrame(rows) # Ensure all required columns are present and in correct order required_columns = [ "image", "page", "label", "color", "xmin", "xmax", "ymin", "ymax", "text", "id", ] for col in required_columns: if col not in df.columns: df[col] = pd.NA # Reorder columns to match expected format df = df.reindex(columns=required_columns, fill_value=pd.NA) return df def convert_question_answer_to_annotation_json( question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame ) -> List[Dict]: """ Convert question-answer results directly to Gradio Annotation JSON format. This function combines the functionality of convert_question_answer_to_dataframe and convert_review_df_to_annotation_json to directly convert question-answer results to the annotation JSON format without the intermediate DataFrame step. Args: question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height'] Returns: List of dictionaries suitable for Gradio Annotation output, one dict per image/page. Each dict has structure: {"image": image_path, "boxes": [list of annotation boxes]} """ if not question_answer_results: # Return empty structure based on page_sizes_df json_data = list() for _, row in page_sizes_df.iterrows(): json_data.append( { "image": row.get( "image_path", f"placeholder_image_{row.get('page', 1)}.png" ), "boxes": [], } ) return json_data # Validate required columns in page_sizes_df required_ps_cols = {"page", "image_path", "image_width", "image_height"} if not required_ps_cols.issubset(page_sizes_df.columns): missing = required_ps_cols - set(page_sizes_df.columns) raise ValueError(f"page_sizes_df is missing required columns: {missing}") # Convert page sizes columns to appropriate numeric types page_sizes_df = page_sizes_df.copy() # Work with a copy to avoid modifying original page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") page_sizes_df["image_width"] = pd.to_numeric( page_sizes_df["image_width"], errors="coerce" ) page_sizes_df["image_height"] = pd.to_numeric( page_sizes_df["image_height"], errors="coerce" ) page_sizes_df["page"] = page_sizes_df["page"].astype("Int64") # Prepare data for processing rows = list() existing_ids = set() for i, qa_result in enumerate(question_answer_results): page_num = int(qa_result.get("Page", 1)) # Get image path for this page page_row = page_sizes_df[page_sizes_df["page"] == page_num] if not page_row.empty: page_row["image_path"].iloc[0] else: pass # Create Question box. question_bbox = { "Question_left": qa_result.get("Question_left", 0), "Question_top": qa_result.get("Question_top", 0), "Question_width": qa_result.get("Question_width", 0), "Question_height": qa_result.get("Question_height", 0), } question_box = { "label": f"Question {i+1}", "color": (0, 0, 255), # Blue for questions "xmin": question_bbox["Question_left"], "xmax": question_bbox["Question_left"] + question_bbox["Question_width"], "ymin": question_bbox["Question_top"], "ymax": question_bbox["Question_top"] + question_bbox["Question_height"], "text": qa_result.get("Question", ""), "id": None, # Will be filled after generating IDs } # Create Answer box answer_bbox = { "Answer_left": qa_result.get("Answer_left", 0), "Answer_top": qa_result.get("Answer_top", 0), "Answer_width": qa_result.get("Answer_width", 0), "Answer_height": qa_result.get("Answer_height", 0), } answer_box = { "label": f"Answer {i+1}", "color": (0, 255, 0), # Green for answers "xmin": answer_bbox["Answer_left"], "xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"], "ymin": answer_bbox["Answer_top"], "ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"], "text": qa_result.get("Answer", ""), "id": None, # Will be filled after generating IDs } rows.extend([(page_num, question_box), (page_num, answer_box)]) # Generate unique IDs for all boxes num_ids_needed = len(rows) unique_ids = _generate_unique_ids(num_ids_needed, existing_ids) # Assign IDs to boxes for i, (page_num, box) in enumerate(rows): box["id"] = unique_ids[i] rows[i] = (page_num, box) # Group boxes by page boxes_by_page = {} for page_num, box in rows: if page_num not in boxes_by_page: boxes_by_page[page_num] = list() boxes_by_page[page_num].append(box) # Build JSON structure based on page_sizes json_data = list() for _, row in page_sizes_df.iterrows(): page_num = row["page"] pdf_image_path = row["image_path"] # Get boxes for this page annotation_boxes = boxes_by_page.get(page_num, []) # Append the structured data for this image/page json_data.append({"image": pdf_image_path, "boxes": annotation_boxes}) return json_data def convert_page_question_answer_to_custom_image_recognizer_results( question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame, reported_page_number: int, ) -> List["CustomImageRecognizerResult"]: """ Convert question-answer results to a list of CustomImageRecognizerResult objects. Args: question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height'] reported_page_number: The page number reported by the user Returns: List of CustomImageRecognizerResult objects for questions and answers """ from tools.custom_image_analyser_engine import CustomImageRecognizerResult if not question_answer_results: return list() results = list() # Pre-process page_sizes_df once for efficiency page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") page_sizes_df.dropna(subset=["page"], inplace=True) if not page_sizes_df.empty: page_sizes_df["page"] = page_sizes_df["page"].astype(int) else: print("Warning: Page sizes DataFrame became empty after processing.") return list() # Return empty list if no page sizes are available page_row = page_sizes_df.loc[page_sizes_df["page"] == int(reported_page_number)] if page_row.empty: print( f"Warning: Page {reported_page_number} not found in page_sizes_df. Skipping this entry." ) return list() # Return empty list if page not found for i, qa_result in enumerate(question_answer_results): current_page = int(qa_result.get("Page", 1)) if current_page != int(reported_page_number): continue # Skip this entry if page number does not match reported page number # Get image dimensions safely # Textract coordinates are normalized (0-1) relative to MediaBox # We need to convert to image coordinates, not PDF page coordinates # Try to get image dimensions first, fallback to mediabox if not available try: if "image_width" in page_sizes_df.columns: image_width_val = page_row["image_width"].iloc[0] if pd.notna(image_width_val) and image_width_val > 0: image_width = image_width_val else: image_width = page_row["mediabox_width"].iloc[0] else: image_width = page_row["mediabox_width"].iloc[0] except (KeyError, IndexError): image_width = page_row["mediabox_width"].iloc[0] try: if "image_height" in page_sizes_df.columns: image_height_val = page_row["image_height"].iloc[0] if pd.notna(image_height_val) and image_height_val > 0: image_height = image_height_val else: image_height = page_row["mediabox_height"].iloc[0] else: image_height = page_row["mediabox_height"].iloc[0] except (KeyError, IndexError): image_height = page_row["mediabox_height"].iloc[0] # Get question and answer text safely question_text = qa_result.get("Question", "") answer_text = qa_result.get("Answer", "") # Get scores and handle potential type issues question_score = float(qa_result.get("'Confidence Score % (Question)'", 0.0)) answer_score = float(qa_result.get("'Confidence Score % (Answer)'", 0.0)) # --- Process Question Bounding Box --- question_bbox = { "left": qa_result.get("Question_left", 0) * image_width, "top": qa_result.get("Question_top", 0) * image_height, "width": qa_result.get("Question_width", 0) * image_width, "height": qa_result.get("Question_height", 0) * image_height, } question_result = CustomImageRecognizerResult( entity_type=f"QUESTION {i+1}", start=0, end=len(question_text), score=question_score, left=float(question_bbox.get("left", 0)), top=float(question_bbox.get("top", 0)), width=float(question_bbox.get("width", 0)), height=float(question_bbox.get("height", 0)), text=question_text, color=(0, 0, 255), ) results.append(question_result) # --- Process Answer Bounding Box --- answer_bbox = { "left": qa_result.get("Answer_left", 0) * image_width, "top": qa_result.get("Answer_top", 0) * image_height, "width": qa_result.get("Answer_width", 0) * image_width, "height": qa_result.get("Answer_height", 0) * image_height, } answer_result = CustomImageRecognizerResult( entity_type=f"ANSWER {i+1}", start=0, end=len(answer_text), score=answer_score, left=float(answer_bbox.get("left", 0)), top=float(answer_bbox.get("top", 0)), width=float(answer_bbox.get("width", 0)), height=float(answer_bbox.get("height", 0)), text=answer_text, color=(0, 255, 0), ) results.append(answer_result) return results