document_redaction / tools /aws_textract.py
seanpedrickcase's picture
Sync: Merge pull request #112 from seanpedrick-case/dev
66f8083
import io
import json
import os
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Tuple
import boto3
import pandas as pd
import pikepdf
from tools.config import (
AWS_ACCESS_KEY,
AWS_REGION,
AWS_SECRET_KEY,
PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS,
RUN_AWS_FUNCTIONS,
SPLIT_PUNCTUATION_FROM_WORDS,
)
from tools.custom_image_analyser_engine import CustomImageRecognizerResult, OCRResult
from tools.helper_functions import _generate_unique_ids
from tools.secure_path_utils import secure_file_read
def extract_textract_metadata(response: object):
"""Extracts metadata from an AWS Textract response."""
request_id = response["ResponseMetadata"]["RequestId"]
pages = response["DocumentMetadata"]["Pages"]
return str({"RequestId": request_id, "Pages": pages})
def analyse_page_with_textract(
pdf_page_bytes: object,
page_no: int,
client: str = "",
handwrite_signature_checkbox: List[str] = ["Extract handwriting"],
textract_output_found: bool = False,
aws_access_question_textbox: str = AWS_ACCESS_KEY,
aws_secret_question_textbox: str = AWS_SECRET_KEY,
RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS,
PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS: bool = PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS,
):
"""
Analyzes a single page of a document using AWS Textract to extract text and other features.
Args:
pdf_page_bytes (object): The content of the PDF page or image as bytes.
page_no (int): The page number being analyzed.
client (str, optional): An optional pre-initialized AWS Textract client. If not provided,
the function will attempt to create one based on configuration.
Defaults to "".
handwrite_signature_checkbox (List[str], optional): A list of feature types to extract
from the document. Options include
"Extract handwriting", "Extract signatures",
"Extract forms", "Extract layout", "Extract tables".
Defaults to ["Extract handwriting"].
textract_output_found (bool, optional): A flag indicating whether existing Textract output
for the document has been found. This can prevent
unnecessary API calls. Defaults to False.
aws_access_question_textbox (str, optional): AWS access question provided by the user, if not using
SSO or environment variables. Defaults to AWS_ACCESS_KEY.
aws_secret_question_textbox (str, optional): AWS secret question provided by the user, if not using
SSO or environment variables. Defaults to AWS_SECRET_KEY.
RUN_AWS_FUNCTIONS (bool, optional): Configuration flag to enable or
disable AWS functions. Defaults to RUN_AWS_FUNCTIONS.
PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS (bool, optional): Configuration flag (e.g., True or False)
to prioritize AWS SSO credentials
over environment variables.
Defaults to True.
Returns:
Tuple[List[Dict], str]: A tuple containing:
- A list of dictionaries, where each dictionary represents a Textract block (e.g., LINE, WORD, FORM, TABLE).
- A string containing metadata about the Textract request.
"""
# print("handwrite_signature_checkbox in analyse_page_with_textract:", handwrite_signature_checkbox)
if client == "":
try:
# Try to connect to AWS Textract Client if using that text extraction method
if RUN_AWS_FUNCTIONS and PRIORITISE_SSO_OVER_AWS_ENV_ACCESS_KEYS:
print("Connecting to Textract via existing SSO connection")
client = boto3.client("textract", region_name=AWS_REGION)
elif aws_access_question_textbox and aws_secret_question_textbox:
print(
"Connecting to Textract using AWS access question and secret questions from user input."
)
client = boto3.client(
"textract",
aws_access_question_id=aws_access_question_textbox,
aws_secret_access_question=aws_secret_question_textbox,
region_name=AWS_REGION,
)
elif RUN_AWS_FUNCTIONS is True:
print("Connecting to Textract via existing SSO connection")
client = boto3.client("textract", region_name=AWS_REGION)
elif AWS_ACCESS_KEY and AWS_SECRET_KEY:
print("Getting Textract credentials from environment variables.")
client = boto3.client(
"textract",
aws_access_question_id=AWS_ACCESS_KEY,
aws_secret_access_question=AWS_SECRET_KEY,
region_name=AWS_REGION,
)
elif textract_output_found is True:
print(
"Existing Textract data found for file, no need to connect to AWS Textract"
)
client = boto3.client("textract", region_name=AWS_REGION)
else:
client = ""
out_message = "Cannot connect to AWS Textract service."
print(out_message)
raise Exception(out_message)
except Exception as e:
out_message = "Cannot connect to AWS Textract"
print(out_message, "due to:", e)
raise Exception(out_message)
return [], "" # Return an empty list and an empty string
# Redact signatures if specified
feature_types = list()
if (
"Extract signatures" in handwrite_signature_checkbox
or "Extract forms" in handwrite_signature_checkbox
or "Extract layout" in handwrite_signature_checkbox
or "Extract tables" in handwrite_signature_checkbox
):
if "Extract signatures" in handwrite_signature_checkbox:
feature_types.append("SIGNATURES")
if "Extract forms" in handwrite_signature_checkbox:
feature_types.append("FORMS")
if "Extract layout" in handwrite_signature_checkbox:
feature_types.append("LAYOUT")
if "Extract tables" in handwrite_signature_checkbox:
feature_types.append("TABLES")
try:
response = client.analyze_document(
Document={"Bytes": pdf_page_bytes}, FeatureTypes=feature_types
)
except Exception as e:
print("Textract call failed due to:", e, "trying again in 3 seconds.")
time.sleep(3)
response = client.analyze_document(
Document={"Bytes": pdf_page_bytes}, FeatureTypes=feature_types
)
if (
"Extract signatures" not in handwrite_signature_checkbox
and "Extract forms" not in handwrite_signature_checkbox
and "Extract layout" not in handwrite_signature_checkbox
and "Extract tables" not in handwrite_signature_checkbox
):
# Call detect_document_text to extract plain text
try:
response = client.detect_document_text(Document={"Bytes": pdf_page_bytes})
except Exception as e:
print("Textract call failed due to:", e, "trying again in 5 seconds.")
time.sleep(5)
response = client.detect_document_text(Document={"Bytes": pdf_page_bytes})
# Add the 'Page' attribute to each block
if "Blocks" in response:
for block in response["Blocks"]:
block["Page"] = page_no # Inject the page number into each block
# Wrap the response with the page number in the desired format
wrapped_response = {"page_no": page_no, "data": response}
request_metadata = extract_textract_metadata(
response
) # Metadata comes out as a string
# Return a list containing the wrapped response and the metadata
return (
wrapped_response,
request_metadata,
) # Return as a list to match the desired structure
def convert_pike_pdf_page_to_bytes(pdf: object, page_num: int):
# Create a new empty PDF
new_pdf = pikepdf.Pdf.new()
# Specify the page number you want to extract (0-based index)
page_num = 0 # Example: first page
# Extract the specific page and add it to the new PDF
new_pdf.pages.append(pdf.pages[page_num])
# Save the new PDF to a bytes buffer
buffer = io.BytesIO()
new_pdf.save(buffer)
# Get the PDF bytes
pdf_bytes = buffer.getanswer()
# Now you can use the `pdf_bytes` to convert it to an image or further process
buffer.close()
return pdf_bytes
def split_word_with_punctuation(
word_text: str,
bounding_box: Tuple[int, int, int, int],
confidence: float,
) -> List[Dict[str, Any]]:
"""
Split a word that may contain punctuation into separate word entries.
Only separates punctuation at the start and end of words.
Punctuation in the middle (e.g., in email addresses like user@example.com)
is kept as part of the word.
Args:
word_text: The text of the word (may contain punctuation)
bounding_box: Tuple of (left, top, right, bottom) in pixels
confidence: Confidence score for the original word
Returns:
List of word dictionaries, each with text and bounding_box.
Leading and trailing punctuation become separate entries, while
the middle part (which may contain internal punctuation) remains intact.
"""
if not word_text:
return []
# Extract leading punctuation (at the start of the word)
leading_punct_match = re.match(r"^([^\w\s]+)", word_text)
leading_punct = leading_punct_match.group(1) if leading_punct_match else ""
# Extract trailing punctuation (at the end of the word)
trailing_punct_match = re.search(r"([^\w\s]+)$", word_text)
trailing_punct = trailing_punct_match.group(1) if trailing_punct_match else ""
# Get the middle part (everything between leading and trailing punctuation)
# This may contain punctuation (like @ or . in email addresses) which we keep
start_idx = len(leading_punct)
end_idx = len(word_text) - len(trailing_punct) if trailing_punct else len(word_text)
middle_part = word_text[start_idx:end_idx] if start_idx < end_idx else ""
# Build list of parts (leading punct, middle, trailing punct)
parts = []
if leading_punct:
parts.append(leading_punct)
if middle_part:
parts.append(middle_part)
if trailing_punct:
parts.append(trailing_punct)
# If no parts to split, return original word
if len(parts) == 0:
return [
{
"text": word_text,
"confidence": confidence,
"bounding_box": bounding_box,
}
]
# If only one part (no leading/trailing punctuation), return as-is
if len(parts) == 1:
return [
{
"text": word_text,
"confidence": confidence,
"bounding_box": bounding_box,
}
]
# Calculate bounding box dimensions
left, top, right, bottom = bounding_box
width = right - left
bottom - top
# Calculate character width (assuming proportional distribution based on text length)
total_chars = len(word_text)
if total_chars == 0:
return []
# Punctuation characters are typically narrower than alphanumeric characters
# Use a scaling factor to make punctuation boxes thinner
PUNCTUATION_WIDTH_SCALE = (
0.5 # Punctuation is approximately 50% the width of alphanumeric chars
)
# First pass: calculate effective character widths for each part
# Alphanumeric parts get full width, punctuation parts get scaled width
total_effective_chars = 0
part_info = []
for part in parts:
if not part:
continue
# Check if part is punctuation-only (no alphanumeric characters)
is_punctuation_only = not bool(re.search(r"[\w]", part))
if is_punctuation_only:
effective_length = len(part) * PUNCTUATION_WIDTH_SCALE
else:
effective_length = len(part)
part_info.append(
{
"text": part,
"length": len(part),
"effective_length": effective_length,
"is_punctuation": is_punctuation_only,
}
)
total_effective_chars += effective_length
if total_effective_chars == 0:
return []
# Calculate base character width based on effective character count
effective_char_width = width / total_effective_chars
# Build separate word entries
word_entries = []
current_pos = 0
for info in part_info:
# Calculate actual width for this part based on effective length
# (punctuation parts already have reduced effective_length)
part_width = info["effective_length"] * effective_char_width
# Calculate bounding box for this part
part_left = left + current_pos
part_right = part_left + part_width
word_entries.append(
{
"text": info["text"],
"confidence": confidence,
"bounding_box": (
int(part_left),
int(top),
int(part_right),
int(bottom),
),
}
)
# Move position forward by the effective width used
current_pos += part_width
return word_entries
def json_to_ocrresult(
json_data: dict, page_width: float, page_height: float, page_no: int
):
"""
Convert Textract JSON to structured OCR, handling lines, words, signatures,
selection elements (associating them with lines), and question-answer form data.
The question-answer data is sorted in a top-to-bottom, left-to-right reading order.
Args:
json_data (dict): The raw JSON output from AWS Textract for a specific page.
page_width (float): The width of the page in pixels or points.
page_height (float): The height of the page in pixels or points.
page_no (int): The 1-based page number being processed.
"""
# --- STAGE 1: Block Mapping & Initial Data Collection ---
# text_blocks = json_data.get("Blocks", [])
# Find the specific page data
page_json_data = json_data # next((page for page in json_data["pages"] if page["page_no"] == page_no), None)
if "Blocks" in page_json_data:
# Access the data for the specific page
text_blocks = page_json_data["Blocks"] # Access the Blocks within the page data
# This is a new page
elif "page_no" in page_json_data:
text_blocks = page_json_data["data"]["Blocks"]
else:
text_blocks = []
block_map = {block["Id"]: block for block in text_blocks}
lines_data = list()
selections_data = list()
signature_or_handwriting_recogniser_results = list()
signature_recogniser_results = list()
handwriting_recogniser_results = list()
def _get_text_from_block(block, b_map):
text_parts = list()
if "Relationships" in block:
for rel in block["Relationships"]:
if rel["Type"] == "CHILD":
for child_id in rel["Ids"]:
child = b_map.get(child_id)
if child:
if child["BlockType"] == "WORD":
text_parts.append(child["Text"])
elif child["BlockType"] == "SELECTION_ELEMENT":
text_parts.append(f"[{child['SelectionStatus']}]")
return " ".join(text_parts)
# text_line_number = 1
for block in text_blocks:
block_type = block.get("BlockType")
if block_type == "LINE":
bbox = block["Geometry"]["BoundingBox"]
line_info = {
"id": block["Id"],
"text": block.get("Text", ""),
"confidence": round(block.get("Confidence", 0.0), 0),
"words": [],
"geometry": {
"left": int(bbox["Left"] * page_width),
"top": int(bbox["Top"] * page_height),
"width": int(bbox["Width"] * page_width),
"height": int(bbox["Height"] * page_height),
},
}
if "Relationships" in block:
for rel in block.get("Relationships", []):
if rel["Type"] == "CHILD":
for child_id in rel["Ids"]:
word_block = block_map.get(child_id)
if word_block and word_block["BlockType"] == "WORD":
w_bbox = word_block["Geometry"]["BoundingBox"]
word_text = word_block.get("Text", "")
word_confidence = round(
word_block.get("Confidence", 0.0), 0
)
original_bounding_box = (
int(w_bbox["Left"] * page_width),
int(w_bbox["Top"] * page_height),
int(
(w_bbox["Left"] + w_bbox["Width"]) * page_width
),
int(
(w_bbox["Top"] + w_bbox["Height"]) * page_height
),
)
# Conditionally split word into alphanumeric parts and punctuation
if SPLIT_PUNCTUATION_FROM_WORDS:
split_words = split_word_with_punctuation(
word_text,
original_bounding_box,
word_confidence,
)
else:
# Original behavior: keep word as-is
split_words = [
{
"text": word_text,
"confidence": word_confidence,
"bounding_box": original_bounding_box,
}
]
# Add all word parts to the line
for split_word in split_words:
line_info["words"].append(split_word)
# Handle handwriting - check if original word was handwriting
if word_block.get("TextType") == "HANDWRITING":
# For handwriting, create recognition results for each split part
for split_word in split_words:
split_bbox = split_word["bounding_box"]
rec_res = CustomImageRecognizerResult(
entity_type="HANDWRITING",
text=split_word["text"],
score=split_word["confidence"],
start=0,
end=len(split_word["text"]),
left=split_bbox[0],
top=split_bbox[1],
width=split_bbox[2] - split_bbox[0],
height=split_bbox[3] - split_bbox[1],
)
handwriting_recogniser_results.append(rec_res)
signature_or_handwriting_recogniser_results.append(
rec_res
)
lines_data.append(line_info)
elif block_type == "SELECTION_ELEMENT":
bbox = block["Geometry"]["BoundingBox"]
selections_data.append(
{
"id": block["Id"],
"status": block.get("SelectionStatus", "UNKNOWN"),
"confidence": round(block.get("Confidence", 0.0), 0),
"geometry": {
"left": int(bbox["Left"] * page_width),
"top": int(bbox["Top"] * page_height),
"width": int(bbox["Width"] * page_width),
"height": int(bbox["Height"] * page_height),
},
}
)
elif block_type == "SIGNATURE":
bbox = block["Geometry"]["BoundingBox"]
rec_res = CustomImageRecognizerResult(
entity_type="SIGNATURE",
text="SIGNATURE",
score=round(block.get("Confidence", 0.0), 0),
start=0,
end=9,
left=int(bbox["Left"] * page_width),
top=int(bbox["Top"] * page_height),
width=int(bbox["Width"] * page_width),
height=int(bbox["Height"] * page_height),
)
signature_recogniser_results.append(rec_res)
signature_or_handwriting_recogniser_results.append(rec_res)
# --- STAGE 2: Question-Answer Pair Extraction & Sorting ---
def _create_question_answer_results_object(text_blocks):
question_answer_results = list()
key_blocks = [
b
for b in text_blocks
if b.get("BlockType") == "KEY_VALUE_SET"
and "KEY" in b.get("EntityTypes", [])
]
for question_block in key_blocks:
answer_block = next(
(
block_map.get(rel["Ids"][0])
for rel in question_block.get("Relationships", [])
if rel["Type"] == "VALUE"
),
None,
)
# The check for value_block now happens BEFORE we try to access its properties.
if answer_block:
question_bbox = question_block["Geometry"]["BoundingBox"]
# We also get the answer_bbox safely inside this block.
answer_bbox = answer_block["Geometry"]["BoundingBox"]
question_answer_results.append(
{
# Data for final output
"Page": page_no,
"Question": _get_text_from_block(question_block, block_map),
"Answer": _get_text_from_block(answer_block, block_map),
"Confidence Score % (Question)": round(
question_block.get("Confidence", 0.0), 0
),
"Confidence Score % (Answer)": round(
answer_block.get("Confidence", 0.0), 0
),
"Question_left": round(question_bbox["Left"], 5),
"Question_top": round(question_bbox["Top"], 5),
"Question_width": round(question_bbox["Width"], 5),
"Question_height": round(question_bbox["Height"], 5),
"Answer_left": round(answer_bbox["Left"], 5),
"Answer_top": round(answer_bbox["Top"], 5),
"Answer_width": round(answer_bbox["Width"], 5),
"Answer_height": round(answer_bbox["Height"], 5),
}
)
question_answer_results.sort(
key=lambda item: (item["Question_top"], item["Question_left"])
)
return question_answer_results
question_answer_results = _create_question_answer_results_object(text_blocks)
# --- STAGE 3: Association of Selection Elements to Lines ---
unmatched_selections = list()
for selection in selections_data:
best_match_line = None
min_dist = float("inf")
sel_geom = selection["geometry"]
sel_y_center = sel_geom["top"] + sel_geom["height"] / 2
for line in lines_data:
line_geom = line["geometry"]
line_y_center = line_geom["top"] + line_geom["height"] / 2
if abs(sel_y_center - line_y_center) < line_geom["height"]:
dist = 0
if sel_geom["left"] > (line_geom["left"] + line_geom["width"]):
dist = sel_geom["left"] - (line_geom["left"] + line_geom["width"])
elif line_geom["left"] > (sel_geom["left"] + sel_geom["width"]):
dist = line_geom["left"] - (sel_geom["left"] + sel_geom["width"])
if dist < min_dist:
min_dist = dist
best_match_line = line
if best_match_line and min_dist < (best_match_line["geometry"]["height"] * 5):
selection_as_word = {
"text": f"[{selection['status']}]",
"confidence": round(selection["confidence"], 0),
"bounding_box": (
sel_geom["left"],
sel_geom["top"],
sel_geom["left"] + sel_geom["width"],
sel_geom["top"] + sel_geom["height"],
),
}
best_match_line["words"].append(selection_as_word)
best_match_line["words"].sort(key=lambda w: w["bounding_box"][0])
else:
unmatched_selections.append(selection)
# --- STAGE 4: Final Output Generation ---
all_ocr_results = list()
ocr_results_with_words = dict()
selection_element_results = list()
for i, line in enumerate(lines_data):
line_num = i + 1
line_geom = line["geometry"]
reconstructed_text = " ".join(w["text"] for w in line["words"])
all_ocr_results.append(
OCRResult(
reconstructed_text,
line_geom["left"],
line_geom["top"],
line_geom["width"],
line_geom["height"],
round(line["confidence"], 0),
line_num,
)
)
ocr_results_with_words[f"text_line_{line_num}"] = {
"line": line_num,
"text": reconstructed_text,
"confidence": line["confidence"],
"bounding_box": (
line_geom["left"],
line_geom["top"],
line_geom["left"] + line_geom["width"],
line_geom["top"] + line_geom["height"],
),
"words": line["words"],
"page": page_no,
}
for selection in unmatched_selections:
sel_geom = selection["geometry"]
sel_text = f"[{selection['status']}]"
all_ocr_results.append(
OCRResult(
sel_text,
sel_geom["left"],
sel_geom["top"],
sel_geom["width"],
sel_geom["height"],
round(selection["confidence"], 0),
-1,
)
)
for selection in selections_data:
sel_geom = selection["geometry"]
selection_element_results.append(
{
"status": selection["status"],
"confidence": round(selection["confidence"], 0),
"bounding_box": (
sel_geom["left"],
sel_geom["top"],
sel_geom["left"] + sel_geom["width"],
sel_geom["top"] + sel_geom["height"],
),
"page": page_no,
}
)
all_ocr_results_with_page = {"page": page_no, "results": all_ocr_results}
ocr_results_with_words_with_page = {
"page": page_no,
"results": ocr_results_with_words,
}
return (
all_ocr_results_with_page,
signature_or_handwriting_recogniser_results,
signature_recogniser_results,
handwriting_recogniser_results,
ocr_results_with_words_with_page,
selection_element_results,
question_answer_results,
)
def load_and_convert_textract_json(
textract_json_file_path: str,
log_files_output_paths: str,
page_sizes_df: pd.DataFrame,
):
"""
Loads Textract JSON from a file, detects if conversion is needed, and converts if necessary.
Args:
textract_json_file_path (str): The file path to the Textract JSON output.
log_files_output_paths (str): A list of paths to log files, used for tracking.
page_sizes_df (pd.DataFrame): A DataFrame containing page size information for the document.
"""
if not os.path.exists(textract_json_file_path):
print("No existing Textract results file found.")
return (
{},
True,
log_files_output_paths,
) # Return empty dict and flag indicating missing file
print("Found existing Textract json results file.")
# Track log files
if textract_json_file_path not in log_files_output_paths:
log_files_output_paths.append(textract_json_file_path)
try:
# Split the path into base directory and filename for security
textract_json_file_path_obj = Path(textract_json_file_path)
base_dir = textract_json_file_path_obj.parent
filename = textract_json_file_path_obj.name
json_content = secure_file_read(base_dir, filename, encoding="utf-8")
textract_data = json.loads(json_content)
except json.JSONDecodeError:
print("Error: Failed to parse Textract JSON file. Returning empty data.")
return {}, True, log_files_output_paths # Indicate failure
# Check if conversion is needed
if "pages" in textract_data:
print("JSON already in the correct format for app. No changes needed.")
return textract_data, False, log_files_output_paths # No conversion required
if "Blocks" in textract_data:
print("Need to convert Textract JSON to app format.")
try:
textract_data = restructure_textract_output(textract_data, page_sizes_df)
return (
textract_data,
False,
log_files_output_paths,
) # Successfully converted
except Exception as e:
print("Failed to convert JSON data to app format due to:", e)
return {}, True, log_files_output_paths # Conversion failed
else:
print("Invalid Textract JSON format: 'Blocks' missing.")
# print("textract data:", textract_data)
return (
{},
True,
log_files_output_paths,
) # Return empty data if JSON is not recognized
def restructure_textract_output(textract_output: dict, page_sizes_df: pd.DataFrame):
"""
Reorganise Textract output from the bulk Textract analysis option on AWS
into a format that works in this redaction app, reducing size.
Args:
textract_output (dict): The raw JSON output from AWS Textract.
page_sizes_df (pd.DataFrame): A Pandas DataFrame containing page size
information, including cropbox and mediabox
dimensions and offsets for each page.
"""
pages_dict = dict()
# Extract total pages from DocumentMetadata
document_metadata = textract_output.get("DocumentMetadata", {})
# For efficient lookup, set 'page' as index if it's not already
if "page" in page_sizes_df.columns:
page_sizes_df = page_sizes_df.set_index("page")
for block in textract_output.get("Blocks", []):
page_no = block.get("Page", 1) # Default to 1 if missing
# --- Geometry Conversion Logic ---
try:
page_info = page_sizes_df.loc[page_no]
cb_width = page_info["cropbox_width"]
cb_height = page_info["cropbox_height"]
mb_width = page_info["mediabox_width"]
mb_height = page_info["mediabox_height"]
cb_x_offset = page_info["cropbox_x_offset"]
cb_y_offset_top = page_info["cropbox_y_offset_from_top"]
# Check if conversion is needed (and avoid division by zero)
needs_conversion = (
(abs(cb_width - mb_width) > 1e-6 or abs(cb_height - mb_height) > 1e-6)
and mb_width > 1e-6
and mb_height > 1e-6
) # Avoid division by zero
if needs_conversion and "Geometry" in block:
geometry = block["Geometry"] # Work directly on the block's geometry
# --- Convert BoundingBox ---
if "BoundingBox" in geometry:
bbox = geometry["BoundingBox"]
old_left = bbox["Left"]
old_top = bbox["Top"]
old_width = bbox["Width"]
old_height = bbox["Height"]
# Calculate absolute coordinates within CropBox
abs_cb_x = old_left * cb_width
abs_cb_y = old_top * cb_height
abs_cb_width = old_width * cb_width
abs_cb_height = old_height * cb_height
# Calculate absolute coordinates relative to MediaBox top-left
abs_mb_x = cb_x_offset + abs_cb_x
abs_mb_y = cb_y_offset_top + abs_cb_y
# Convert back to normalized coordinates relative to MediaBox
bbox["Left"] = abs_mb_x / mb_width
bbox["Top"] = abs_mb_y / mb_height
bbox["Width"] = abs_cb_width / mb_width
bbox["Height"] = abs_cb_height / mb_height
except KeyError:
print(
f"Warning: Page number {page_no} not found in page_sizes_df. Skipping coordinate conversion for this block."
)
# Decide how to handle missing page info: skip conversion, raise error, etc.
except ZeroDivisionError:
print(
f"Warning: MediaBox width or height is zero for page {page_no}. Skipping coordinate conversion for this block."
)
# Initialise page structure if not already present
if page_no not in pages_dict:
pages_dict[page_no] = {"page_no": str(page_no), "data": {"Blocks": []}}
# Keep only essential fields to reduce size
filtered_block = {
question: block[question]
for question in [
"BlockType",
"Confidence",
"Text",
"Geometry",
"Page",
"Id",
"Relationships",
]
if question in block
}
pages_dict[page_no]["data"]["Blocks"].append(filtered_block)
# Convert pages dictionary to a sorted list
structured_output = {
"DocumentMetadata": document_metadata, # Store metadata separately
"pages": [pages_dict[page] for page in sorted(pages_dict.questions())],
}
return structured_output
def convert_question_answer_to_dataframe(
question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame
) -> pd.DataFrame:
"""
Convert question-answer results to DataFrame format matching convert_annotation_data_to_dataframe.
Each Question and Answer will be on separate lines in the resulting dataframe.
The 'image' column will be populated with the page number as f'placeholder_image_page{i}.png'.
Args:
question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object
page_sizes_df: DataFrame containing page sizes
Returns:
pd.DataFrame: DataFrame with columns ["image", "page", "label", "color", "xmin", "xmax", "ymin", "ymax", "text", "id"]
"""
if not question_answer_results:
# Return empty DataFrame with expected schema
return pd.DataFrame(
columns=[
"image",
"page",
"label",
"color",
"xmin",
"xmax",
"ymin",
"ymax",
"text",
"id",
]
)
# Prepare data for DataFrame
rows = list()
existing_ids = set()
for i, qa_result in enumerate(question_answer_results):
page_num = int(qa_result.get("Page", 1))
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
page_sizes_df.dropna(subset=["page"], inplace=True)
if not page_sizes_df.empty:
page_sizes_df["page"] = page_sizes_df["page"].astype(int)
else:
print("Warning: Page sizes DataFrame became empty after processing.")
image_name = page_sizes_df.loc[
page_sizes_df["page"] == page_num, "image_path"
].iloc[0]
if pd.isna(image_name):
image_name = f"placeholder_image_{page_num}.png"
# Create Question row
question_bbox = {
"Question_left": qa_result.get("Question_left", 0),
"Question_top": qa_result.get("Question_top", 0),
"Question_width": qa_result.get("Question_width", 0),
"Question_height": qa_result.get("Question_height", 0),
}
question_row = {
"image": image_name,
"page": page_num,
"label": f"Question {i+1}",
"color": "(0,0,255)",
"xmin": question_bbox["Question_left"],
"xmax": question_bbox["Question_left"] + question_bbox["Question_width"],
"ymin": question_bbox["Question_top"],
"ymax": question_bbox["Question_top"] + question_bbox["Question_height"],
"text": qa_result.get("Question", ""),
"id": None, # Will be filled after generating IDs
}
# Create Answer row
answer_bbox = {
"Answer_left": qa_result.get("Answer_left", 0),
"Answer_top": qa_result.get("Answer_top", 0),
"Answer_width": qa_result.get("Answer_width", 0),
"Answer_height": qa_result.get("Answer_height", 0),
}
answer_row = {
"image": image_name,
"page": page_num,
"label": f"Answer {i+1}",
"color": "(0,255,0)",
"xmin": answer_bbox["Answer_left"],
"xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"],
"ymin": answer_bbox["Answer_top"],
"ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"],
"text": qa_result.get("Answer", ""),
"id": None, # Will be filled after generating IDs
}
rows.extend([question_row, answer_row])
# Generate unique IDs for all rows
num_ids_needed = len(rows)
unique_ids = _generate_unique_ids(num_ids_needed, existing_ids)
# Assign IDs to rows
for i, row in enumerate(rows):
row["id"] = unique_ids[i]
# Create DataFrame
df = pd.DataFrame(rows)
# Ensure all required columns are present and in correct order
required_columns = [
"image",
"page",
"label",
"color",
"xmin",
"xmax",
"ymin",
"ymax",
"text",
"id",
]
for col in required_columns:
if col not in df.columns:
df[col] = pd.NA
# Reorder columns to match expected format
df = df.reindex(columns=required_columns, fill_value=pd.NA)
return df
def convert_question_answer_to_annotation_json(
question_answer_results: List[Dict[str, Any]], page_sizes_df: pd.DataFrame
) -> List[Dict]:
"""
Convert question-answer results directly to Gradio Annotation JSON format.
This function combines the functionality of convert_question_answer_to_dataframe
and convert_review_df_to_annotation_json to directly convert question-answer
results to the annotation JSON format without the intermediate DataFrame step.
Args:
question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object
page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height']
Returns:
List of dictionaries suitable for Gradio Annotation output, one dict per image/page.
Each dict has structure: {"image": image_path, "boxes": [list of annotation boxes]}
"""
if not question_answer_results:
# Return empty structure based on page_sizes_df
json_data = list()
for _, row in page_sizes_df.iterrows():
json_data.append(
{
"image": row.get(
"image_path", f"placeholder_image_{row.get('page', 1)}.png"
),
"boxes": [],
}
)
return json_data
# Validate required columns in page_sizes_df
required_ps_cols = {"page", "image_path", "image_width", "image_height"}
if not required_ps_cols.issubset(page_sizes_df.columns):
missing = required_ps_cols - set(page_sizes_df.columns)
raise ValueError(f"page_sizes_df is missing required columns: {missing}")
# Convert page sizes columns to appropriate numeric types
page_sizes_df = page_sizes_df.copy() # Work with a copy to avoid modifying original
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
page_sizes_df["image_width"] = pd.to_numeric(
page_sizes_df["image_width"], errors="coerce"
)
page_sizes_df["image_height"] = pd.to_numeric(
page_sizes_df["image_height"], errors="coerce"
)
page_sizes_df["page"] = page_sizes_df["page"].astype("Int64")
# Prepare data for processing
rows = list()
existing_ids = set()
for i, qa_result in enumerate(question_answer_results):
page_num = int(qa_result.get("Page", 1))
# Get image path for this page
page_row = page_sizes_df[page_sizes_df["page"] == page_num]
if not page_row.empty:
page_row["image_path"].iloc[0]
else:
pass
# Create Question box.
question_bbox = {
"Question_left": qa_result.get("Question_left", 0),
"Question_top": qa_result.get("Question_top", 0),
"Question_width": qa_result.get("Question_width", 0),
"Question_height": qa_result.get("Question_height", 0),
}
question_box = {
"label": f"Question {i+1}",
"color": (0, 0, 255), # Blue for questions
"xmin": question_bbox["Question_left"],
"xmax": question_bbox["Question_left"] + question_bbox["Question_width"],
"ymin": question_bbox["Question_top"],
"ymax": question_bbox["Question_top"] + question_bbox["Question_height"],
"text": qa_result.get("Question", ""),
"id": None, # Will be filled after generating IDs
}
# Create Answer box
answer_bbox = {
"Answer_left": qa_result.get("Answer_left", 0),
"Answer_top": qa_result.get("Answer_top", 0),
"Answer_width": qa_result.get("Answer_width", 0),
"Answer_height": qa_result.get("Answer_height", 0),
}
answer_box = {
"label": f"Answer {i+1}",
"color": (0, 255, 0), # Green for answers
"xmin": answer_bbox["Answer_left"],
"xmax": answer_bbox["Answer_left"] + answer_bbox["Answer_width"],
"ymin": answer_bbox["Answer_top"],
"ymax": answer_bbox["Answer_top"] + answer_bbox["Answer_height"],
"text": qa_result.get("Answer", ""),
"id": None, # Will be filled after generating IDs
}
rows.extend([(page_num, question_box), (page_num, answer_box)])
# Generate unique IDs for all boxes
num_ids_needed = len(rows)
unique_ids = _generate_unique_ids(num_ids_needed, existing_ids)
# Assign IDs to boxes
for i, (page_num, box) in enumerate(rows):
box["id"] = unique_ids[i]
rows[i] = (page_num, box)
# Group boxes by page
boxes_by_page = {}
for page_num, box in rows:
if page_num not in boxes_by_page:
boxes_by_page[page_num] = list()
boxes_by_page[page_num].append(box)
# Build JSON structure based on page_sizes
json_data = list()
for _, row in page_sizes_df.iterrows():
page_num = row["page"]
pdf_image_path = row["image_path"]
# Get boxes for this page
annotation_boxes = boxes_by_page.get(page_num, [])
# Append the structured data for this image/page
json_data.append({"image": pdf_image_path, "boxes": annotation_boxes})
return json_data
def convert_page_question_answer_to_custom_image_recognizer_results(
question_answer_results: List[Dict[str, Any]],
page_sizes_df: pd.DataFrame,
reported_page_number: int,
) -> List["CustomImageRecognizerResult"]:
"""
Convert question-answer results to a list of CustomImageRecognizerResult objects.
Args:
question_answer_results: List of question-answer dictionaries from _create_question_answer_results_object
page_sizes_df: DataFrame containing page sizes with columns ['page', 'image_path', 'image_width', 'image_height']
reported_page_number: The page number reported by the user
Returns:
List of CustomImageRecognizerResult objects for questions and answers
"""
from tools.custom_image_analyser_engine import CustomImageRecognizerResult
if not question_answer_results:
return list()
results = list()
# Pre-process page_sizes_df once for efficiency
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce")
page_sizes_df.dropna(subset=["page"], inplace=True)
if not page_sizes_df.empty:
page_sizes_df["page"] = page_sizes_df["page"].astype(int)
else:
print("Warning: Page sizes DataFrame became empty after processing.")
return list() # Return empty list if no page sizes are available
page_row = page_sizes_df.loc[page_sizes_df["page"] == int(reported_page_number)]
if page_row.empty:
print(
f"Warning: Page {reported_page_number} not found in page_sizes_df. Skipping this entry."
)
return list() # Return empty list if page not found
for i, qa_result in enumerate(question_answer_results):
current_page = int(qa_result.get("Page", 1))
if current_page != int(reported_page_number):
continue # Skip this entry if page number does not match reported page number
# Get image dimensions safely
# Textract coordinates are normalized (0-1) relative to MediaBox
# We need to convert to image coordinates, not PDF page coordinates
# Try to get image dimensions first, fallback to mediabox if not available
try:
if "image_width" in page_sizes_df.columns:
image_width_val = page_row["image_width"].iloc[0]
if pd.notna(image_width_val) and image_width_val > 0:
image_width = image_width_val
else:
image_width = page_row["mediabox_width"].iloc[0]
else:
image_width = page_row["mediabox_width"].iloc[0]
except (KeyError, IndexError):
image_width = page_row["mediabox_width"].iloc[0]
try:
if "image_height" in page_sizes_df.columns:
image_height_val = page_row["image_height"].iloc[0]
if pd.notna(image_height_val) and image_height_val > 0:
image_height = image_height_val
else:
image_height = page_row["mediabox_height"].iloc[0]
else:
image_height = page_row["mediabox_height"].iloc[0]
except (KeyError, IndexError):
image_height = page_row["mediabox_height"].iloc[0]
# Get question and answer text safely
question_text = qa_result.get("Question", "")
answer_text = qa_result.get("Answer", "")
# Get scores and handle potential type issues
question_score = float(qa_result.get("'Confidence Score % (Question)'", 0.0))
answer_score = float(qa_result.get("'Confidence Score % (Answer)'", 0.0))
# --- Process Question Bounding Box ---
question_bbox = {
"left": qa_result.get("Question_left", 0) * image_width,
"top": qa_result.get("Question_top", 0) * image_height,
"width": qa_result.get("Question_width", 0) * image_width,
"height": qa_result.get("Question_height", 0) * image_height,
}
question_result = CustomImageRecognizerResult(
entity_type=f"QUESTION {i+1}",
start=0,
end=len(question_text),
score=question_score,
left=float(question_bbox.get("left", 0)),
top=float(question_bbox.get("top", 0)),
width=float(question_bbox.get("width", 0)),
height=float(question_bbox.get("height", 0)),
text=question_text,
color=(0, 0, 255),
)
results.append(question_result)
# --- Process Answer Bounding Box ---
answer_bbox = {
"left": qa_result.get("Answer_left", 0) * image_width,
"top": qa_result.get("Answer_top", 0) * image_height,
"width": qa_result.get("Answer_width", 0) * image_width,
"height": qa_result.get("Answer_height", 0) * image_height,
}
answer_result = CustomImageRecognizerResult(
entity_type=f"ANSWER {i+1}",
start=0,
end=len(answer_text),
score=answer_score,
left=float(answer_bbox.get("left", 0)),
top=float(answer_bbox.get("top", 0)),
width=float(answer_bbox.get("width", 0)),
height=float(answer_bbox.get("height", 0)),
text=answer_text,
color=(0, 255, 0),
)
results.append(answer_result)
return results