Spaces:
Running
on
Zero
Running
on
Zero
| import ast | |
| import datetime | |
| import json | |
| import logging | |
| import os | |
| from io import StringIO | |
| from typing import List | |
| import boto3 | |
| import gradio as gr | |
| import pandas as pd | |
| import pymupdf | |
| from botocore.exceptions import ( | |
| ClientError, | |
| NoCredentialsError, | |
| PartialCredentialsError, | |
| TokenRetrievalError, | |
| ) | |
| from gradio import FileData | |
| from tools.aws_functions import download_file_from_s3 | |
| from tools.config import ( | |
| AWS_REGION, | |
| DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS, | |
| DOCUMENT_REDACTION_BUCKET, | |
| INPUT_FOLDER, | |
| LOAD_PREVIOUS_TEXTRACT_JOBS_S3, | |
| OUTPUT_FOLDER, | |
| RUN_AWS_FUNCTIONS, | |
| TEXTRACT_JOBS_LOCAL_LOC, | |
| TEXTRACT_JOBS_S3_LOC, | |
| TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, | |
| ) | |
| from tools.file_conversion import get_input_file_names | |
| from tools.helper_functions import get_file_name_without_type, get_textract_file_suffix | |
| from tools.secure_path_utils import ( | |
| secure_basename, | |
| secure_file_write, | |
| secure_join, | |
| ) | |
| def analyse_document_with_textract_api( | |
| local_pdf_path: str, | |
| s3_input_prefix: str, | |
| s3_output_prefix: str, | |
| job_df: pd.DataFrame, | |
| s3_bucket_name: str = TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, | |
| local_output_dir: str = OUTPUT_FOLDER, | |
| handwrite_signature_checkbox: List[str] = list(), | |
| successful_job_number: int = 0, | |
| total_document_page_count: int = 1, | |
| general_s3_bucket_name: str = DOCUMENT_REDACTION_BUCKET, | |
| aws_region: str = AWS_REGION, # Optional: specify region if not default | |
| ): | |
| """ | |
| Uploads a local PDF to S3, starts a Textract analysis job (detecting text & signatures), | |
| waits for completion, and downloads the output JSON from S3 to a local directory. | |
| Args: | |
| local_pdf_path (str): Path to the local PDF file. | |
| s3_bucket_name (str): Name of the S3 bucket to use. | |
| s3_input_prefix (str): S3 prefix (folder) to upload the input PDF. | |
| s3_output_prefix (str): S3 prefix (folder) where Textract should write output. | |
| job_df (pd.DataFrame): Dataframe containing information from previous Textract API calls. | |
| s3_bucket_name (str, optional): S3 bucket in which to save API call outputs. | |
| local_output_dir (str, optional): Local directory to save the downloaded JSON results. | |
| handwrite_signature_checkbox (List[str], optional): List of feature types to extract from the document. | |
| successful_job_number (int): The number of successful jobs that have been submitted in this session. | |
| total_document_page_count (int): The number of pages in the document | |
| aws_region (str, optional): AWS region name. Defaults to boto3 default region. | |
| Returns: | |
| str: Path to the downloaded local JSON output file, or None if failed. | |
| Raises: | |
| FileNotFoundError: If the local_pdf_path does not exist. | |
| boto3.exceptions.NoCredentialsError: If AWS credentials are not found. | |
| Exception: For other AWS errors or job failures. | |
| """ | |
| # This is a variable that is written to logs to indicate that a Textract API call was made | |
| is_a_textract_api_call = True | |
| task_textbox = "textract" | |
| # Keep only latest pdf path if it's a list | |
| if isinstance(local_pdf_path, list): | |
| local_pdf_path = local_pdf_path[-1] | |
| if not os.path.exists(local_pdf_path): | |
| raise FileNotFoundError(f"Input document not found {local_pdf_path}") | |
| file_extension = os.path.splitext(local_pdf_path)[1].lower() | |
| # Load pdf to get page count if not provided | |
| if not total_document_page_count and file_extension in [".pdf"]: | |
| print("Page count not provided. Loading PDF to get page count") | |
| try: | |
| pymupdf_doc = pymupdf.open(local_pdf_path) | |
| total_document_page_count = pymupdf_doc.page_count | |
| pymupdf_doc.close() | |
| print("Page count:", total_document_page_count) | |
| except Exception as e: | |
| print("Failed to load PDF to get page count:", e, "setting page count to 1") | |
| total_document_page_count = 1 | |
| # raise Exception(f"Failed to load PDF to get page count: {e}") | |
| else: | |
| total_document_page_count = 1 | |
| if not os.path.exists(local_output_dir): | |
| os.makedirs(local_output_dir) | |
| log_message = f"Created local output directory: {local_output_dir}" | |
| print(log_message) | |
| # logging.info(log_message) | |
| # Initialize boto3 clients | |
| session = boto3.Session(region_name=aws_region) | |
| s3_client = session.client("s3") | |
| textract_client = session.client("textract") | |
| # --- 1. Upload PDF to S3 --- | |
| pdf_filename = secure_basename(local_pdf_path) | |
| s3_input_key = secure_join(s3_input_prefix, pdf_filename).replace( | |
| "\\", "/" | |
| ) # Ensure forward slashes for S3 | |
| log_message = ( | |
| f"Uploading '{local_pdf_path}' to 's3://{s3_bucket_name}/{s3_input_key}'..." | |
| ) | |
| print(log_message) | |
| # logging.info(log_message) | |
| try: | |
| s3_client.upload_file(local_pdf_path, s3_bucket_name, s3_input_key) | |
| log_message = "Upload successful." | |
| print(log_message) | |
| # logging.info(log_message) | |
| except Exception as e: | |
| log_message = f"Failed to upload PDF to S3: {e}" | |
| print(log_message) | |
| # logging.error(log_message) | |
| raise | |
| # Filter job_df to include rows only where the analysis date is after the current date - DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS | |
| job_df["job_date_time"] = pd.to_datetime(job_df["job_date_time"], errors="coerce") | |
| if not job_df.empty: | |
| job_df = job_df.loc[ | |
| job_df["job_date_time"] | |
| > ( | |
| datetime.datetime.now() | |
| - datetime.timedelta(days=DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS) | |
| ), | |
| :, | |
| ] | |
| # If job_df is not empty | |
| if not job_df.empty: | |
| if "file_name" in job_df.columns: | |
| matching_job_id_file_names = job_df.loc[ | |
| (job_df["file_name"] == pdf_filename) | |
| & ( | |
| job_df["signature_extraction"].astype(str) | |
| == str(handwrite_signature_checkbox) | |
| ), | |
| "file_name", | |
| ] | |
| matching_job_id_file_names_dates = job_df.loc[ | |
| (job_df["file_name"] == pdf_filename) | |
| & ( | |
| job_df["signature_extraction"].astype(str) | |
| == str(handwrite_signature_checkbox) | |
| ), | |
| "job_date_time", | |
| ] | |
| matching_job_id = job_df.loc[ | |
| (job_df["file_name"] == pdf_filename) | |
| & ( | |
| job_df["signature_extraction"].astype(str) | |
| == str(handwrite_signature_checkbox) | |
| ), | |
| "job_id", | |
| ] | |
| matching_handwrite_signature = job_df.loc[ | |
| (job_df["file_name"] == pdf_filename) | |
| & ( | |
| job_df["signature_extraction"].astype(str) | |
| == str(handwrite_signature_checkbox) | |
| ), | |
| "signature_extraction", | |
| ] | |
| if len(matching_job_id) > 0: | |
| pass | |
| else: | |
| matching_job_id = "unknown_job_id" | |
| if ( | |
| len(matching_job_id_file_names) > 0 | |
| and len(matching_handwrite_signature) > 0 | |
| ): | |
| out_message = f"Existing Textract outputs found for file {pdf_filename} from date {matching_job_id_file_names_dates.iloc[0]}. No need to re-analyse. Please download existing results from the list with job ID {matching_job_id.iloc[0]}" | |
| print(out_message) | |
| raise Exception(out_message) | |
| # --- 2. Start Textract Document Analysis --- | |
| message = "Starting Textract document analysis job..." | |
| print(message) | |
| try: | |
| if ( | |
| "Extract signatures" in handwrite_signature_checkbox | |
| or "Extract forms" in handwrite_signature_checkbox | |
| or "Extract layout" in handwrite_signature_checkbox | |
| or "Extract tables" in handwrite_signature_checkbox | |
| ): | |
| feature_types = list() | |
| if "Extract signatures" in handwrite_signature_checkbox: | |
| feature_types.append("SIGNATURES") | |
| if "Extract forms" in handwrite_signature_checkbox: | |
| feature_types.append("FORMS") | |
| if "Extract layout" in handwrite_signature_checkbox: | |
| feature_types.append("LAYOUT") | |
| if "Extract tables" in handwrite_signature_checkbox: | |
| feature_types.append("TABLES") | |
| response = textract_client.start_document_analysis( | |
| DocumentLocation={ | |
| "S3Object": {"Bucket": s3_bucket_name, "Name": s3_input_key} | |
| }, | |
| FeatureTypes=feature_types, # Analyze for signatures, forms, and tables | |
| OutputConfig={"S3Bucket": s3_bucket_name, "S3Prefix": s3_output_prefix}, | |
| ) | |
| job_type = "document_analysis" | |
| if ( | |
| "Extract signatures" not in handwrite_signature_checkbox | |
| and "Extract forms" not in handwrite_signature_checkbox | |
| and "Extract layout" not in handwrite_signature_checkbox | |
| and "Extract tables" not in handwrite_signature_checkbox | |
| ): | |
| response = textract_client.start_document_text_detection( | |
| DocumentLocation={ | |
| "S3Object": {"Bucket": s3_bucket_name, "Name": s3_input_key} | |
| }, | |
| OutputConfig={"S3Bucket": s3_bucket_name, "S3Prefix": s3_output_prefix}, | |
| ) | |
| job_type = "document_text_detection" | |
| job_id = response["JobId"] | |
| print(f"Textract job started with JobId: {job_id}") | |
| # Prepare CSV in memory | |
| log_csv_key_location = f"{s3_output_prefix}/textract_document_jobs.csv" | |
| StringIO() | |
| log_df = pd.DataFrame( | |
| [ | |
| { | |
| "job_id": job_id, | |
| "file_name": pdf_filename, | |
| "job_type": job_type, | |
| "signature_extraction": handwrite_signature_checkbox, | |
| "job_date_time": datetime.datetime.now().strftime( | |
| "%Y-%m-%d %H:%M:%S" | |
| ), | |
| } | |
| ] | |
| ) | |
| # File path | |
| log_file_path = secure_join(local_output_dir, "textract_document_jobs.csv") | |
| # Write latest job ID to local text file | |
| secure_file_write( | |
| local_output_dir, | |
| pdf_filename + "_textract_document_jobs_job_id.txt", | |
| job_id, | |
| ) | |
| # Check if file exists | |
| file_exists = os.path.exists(log_file_path) | |
| # Append to CSV if it exists, otherwise write with header | |
| log_df.to_csv(log_file_path, mode="a", index=False, header=not file_exists) | |
| # log_df.to_csv(csv_buffer) | |
| # Upload the file | |
| s3_client.upload_file( | |
| log_file_path, general_s3_bucket_name, log_csv_key_location | |
| ) | |
| # Upload to S3 (overwrite existing file) | |
| # s3_client.put_object(Bucket=general_s3_bucket_name, Key=log_csv_key_location, Body=csv_buffer.getvalue()) | |
| print(f"Job ID written to {log_csv_key_location}") | |
| # logging.info(f"Job ID written to s3://{s3_bucket_name}/{s3_output_prefix}/textract_document_jobs.csv") | |
| except Exception as e: | |
| error = f"Failed to start Textract job: {e}" | |
| print(error) | |
| # logging.error(error) | |
| raise | |
| successful_job_number += 1 | |
| total_number_of_textract_page_calls = total_document_page_count | |
| return ( | |
| f"Textract analysis job submitted, job ID:{job_id}", | |
| job_id, | |
| job_type, | |
| successful_job_number, | |
| is_a_textract_api_call, | |
| total_number_of_textract_page_calls, | |
| task_textbox, | |
| ) | |
| def return_job_status( | |
| job_id: str, | |
| response: dict, | |
| attempts: int, | |
| poll_interval_seconds: int = 0, | |
| max_polling_attempts: int = 1, # ~10 minutes total wait time | |
| ): | |
| """ | |
| Polls the AWS Textract service to retrieve the current status of an asynchronous document analysis job. | |
| This function checks the job status from the provided response and logs relevant information or errors. | |
| Args: | |
| job_id (str): The unique identifier of the Textract job. | |
| response (dict): The response dictionary received from Textract's `get_document_analysis` or `get_document_text_detection` call. | |
| attempts (int): The current polling attempt number. | |
| poll_interval_seconds (int, optional): The time in seconds to wait before the next poll (currently unused in this function, but kept for context). Defaults to 0. | |
| max_polling_attempts (int, optional): The maximum number of polling attempts allowed (currently unused in this function, but kept for context). Defaults to 1. | |
| Returns: | |
| str: The current status of the Textract job (e.g., 'IN_PROGRESS', 'SUCCEEDED'). | |
| Raises: | |
| Exception: If the Textract job status is 'FAILED' or 'PARTIAL_SUCCESS', or if an unexpected status is encountered. | |
| """ | |
| job_status = response["JobStatus"] | |
| logging.info( | |
| f"Polling attempt {attempts}/{max_polling_attempts}. Job status: {job_status}" | |
| ) | |
| if job_status == "IN_PROGRESS": | |
| pass | |
| # time.sleep(poll_interval_seconds) | |
| elif job_status == "SUCCEEDED": | |
| logging.info("Textract job succeeded.") | |
| elif job_status in ["FAILED", "PARTIAL_SUCCESS"]: | |
| status_message = response.get("StatusMessage", "No status message provided.") | |
| warnings = response.get("Warnings", []) | |
| logging.error( | |
| f"Textract job ended with status: {job_status}. Message: {status_message}" | |
| ) | |
| if warnings: | |
| logging.warning(f"Warnings: {warnings}") | |
| # Decide if PARTIAL_SUCCESS should proceed or raise error | |
| # For simplicity here, we raise for both FAILED and PARTIAL_SUCCESS | |
| raise Exception( | |
| f"Textract job {job_id} failed or partially failed. Status: {job_status}. Message: {status_message}" | |
| ) | |
| else: | |
| # Should not happen based on documentation, but handle defensively | |
| raise Exception(f"Unexpected Textract job status: {job_status}") | |
| return job_status | |
| def download_textract_job_files( | |
| s3_client: str, | |
| s3_bucket_name: str, | |
| s3_output_key_prefix: str, | |
| pdf_filename: str, | |
| job_id: str, | |
| local_output_dir: str, | |
| handwrite_signature_checkbox: List[str] = list(), | |
| ): | |
| """ | |
| Download and combine output job files from AWS Textract for a given job. | |
| Args: | |
| s3_client (boto3.client): The Boto3 S3 client to interact with AWS S3. | |
| s3_bucket_name (str): Name of the S3 bucket where Textract job outputs are stored. | |
| s3_output_key_prefix (str): S3 prefix (folder path) under which job output files are located (usually ends with job_id/). | |
| pdf_filename (str): The name of the PDF file related to this Textract job (used for local naming or logging, not S3 lookup). | |
| job_id (str): The AWS Textract job ID whose outputs are being fetched. | |
| local_output_dir (str): The local directory in which to save downloaded and combined results. | |
| handwrite_signature_checkbox (List[str], optional): List indicating user options regarding post-processing for handwriting/signature (used for filtering or downstream handling). | |
| Returns: | |
| str: The local file path to the combined output JSON file. | |
| Raises: | |
| Exception: If no output files are found, or if an error occurs during download or processing. | |
| """ | |
| list_response = s3_client.list_objects_v2( | |
| Bucket=s3_bucket_name, Prefix=s3_output_key_prefix | |
| ) | |
| output_files = list_response.get("Contents", []) | |
| if not output_files: | |
| list_response = s3_client.list_objects_v2( | |
| Bucket=s3_bucket_name, Prefix=s3_output_key_prefix | |
| ) | |
| if not output_files: | |
| out_message = ( | |
| f"No output files found in s3://{s3_bucket_name}/{s3_output_key_prefix}" | |
| ) | |
| print(out_message) | |
| raise Exception(out_message) | |
| # Usually, we only need the first/main JSON output file(s) | |
| # For simplicity, download the first one found. A more complex scenario might merge multiple files. | |
| # Filter out potential directory markers if any key ends with '/' | |
| json_files_to_download = [ | |
| f | |
| for f in output_files | |
| if f["Key"] != s3_output_key_prefix | |
| and not f["Key"].endswith("/") | |
| and "access_check" not in f["Key"] | |
| ] | |
| # print("json_files_to_download:", json_files_to_download) | |
| if not json_files_to_download: | |
| error = f"No JSON files found (only prefix marker?) in s3://{s3_bucket_name}/{s3_output_key_prefix}" | |
| print(error) | |
| # logging.error(error) | |
| raise FileNotFoundError(error) | |
| combined_blocks = [] | |
| for f in sorted( | |
| json_files_to_download, key=lambda x: x["Key"] | |
| ): # Optional: sort to ensure consistent order | |
| obj = s3_client.get_object(Bucket=s3_bucket_name, Key=f["Key"]) | |
| data = json.loads(obj["Body"].read()) | |
| # Assuming Textract-style output with a "Blocks" key | |
| if "Blocks" in data: | |
| combined_blocks.extend(data["Blocks"]) | |
| else: | |
| logging.warning(f"No 'Blocks' key in file: {f['Key']}") | |
| # Build final combined JSON structure | |
| combined_output = { | |
| "DocumentMetadata": { | |
| "Pages": len(set(block.get("Page", 1) for block in combined_blocks)) | |
| }, | |
| "Blocks": combined_blocks, | |
| "JobStatus": "SUCCEEDED", | |
| } | |
| output_filename_base = os.path.basename(pdf_filename) | |
| output_filename_base_no_ext = os.path.splitext(output_filename_base)[0] | |
| # Generate suffix based on checkbox options | |
| textract_suffix = get_textract_file_suffix(handwrite_signature_checkbox) | |
| local_output_filename = ( | |
| f"{output_filename_base_no_ext}{textract_suffix}_textract.json" | |
| ) | |
| local_output_path = secure_join(local_output_dir, local_output_filename) | |
| secure_file_write( | |
| local_output_dir, local_output_filename, json.dumps(combined_output) | |
| ) | |
| print(f"Combined Textract output written to {local_output_path}") | |
| downloaded_file_path = local_output_path | |
| return downloaded_file_path | |
| def check_for_provided_job_id(job_id: str): | |
| if not job_id: | |
| raise Exception("Please provide a job ID.") | |
| return | |
| def load_pdf_job_file_from_s3( | |
| load_s3_jobs_input_loc: str, | |
| pdf_filename: str, | |
| local_output_dir: str, | |
| s3_bucket_name: str, | |
| RUN_AWS_FUNCTIONS: bool = RUN_AWS_FUNCTIONS, | |
| ) -> tuple: | |
| """ | |
| Downloads a PDF job file from S3 and saves it locally. | |
| Args: | |
| load_s3_jobs_input_loc (str): S3 prefix/location where the PDF job file is stored. | |
| pdf_filename (str): The name of the PDF file (without .pdf extension). | |
| local_output_dir (str): Directory to which the file should be saved locally. | |
| s3_bucket_name (str): The S3 bucket name. | |
| RUN_AWS_FUNCTIONS (bool, optional): Whether to run AWS functions (download from S3). Defaults to RUN_AWS_FUNCTIONS. | |
| Returns: | |
| tuple: (pdf_file_location (list of str), doc_file_name_no_extension_textbox (str)) | |
| """ | |
| try: | |
| pdf_file_location = "" | |
| doc_file_name_no_extension_textbox = "" | |
| s3_input_key_prefix = secure_join(load_s3_jobs_input_loc, pdf_filename).replace( | |
| "\\", "/" | |
| ) | |
| s3_input_key_prefix = s3_input_key_prefix + ".pdf" | |
| local_input_file_path = secure_join(local_output_dir, pdf_filename) | |
| local_input_file_path = local_input_file_path + ".pdf" | |
| download_file_from_s3( | |
| s3_bucket_name, | |
| s3_input_key_prefix, | |
| local_input_file_path, | |
| RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, | |
| ) | |
| pdf_file_location = [local_input_file_path] | |
| doc_file_name_no_extension_textbox = get_file_name_without_type(pdf_filename) | |
| except Exception as e: | |
| print("Could not download PDF job file from S3 due to:", e) | |
| return pdf_file_location, doc_file_name_no_extension_textbox | |
| def replace_existing_pdf_input_for_whole_document_outputs( | |
| load_s3_jobs_input_loc: str, | |
| pdf_filename: str, | |
| local_output_dir: str, | |
| s3_bucket_name: str, | |
| in_doc_files: FileData = [], | |
| input_folder: str = INPUT_FOLDER, | |
| RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """ | |
| Ensures the PDF input for whole document outputs is loaded from S3 unless an identical PDF is already supplied. | |
| Args: | |
| load_s3_jobs_input_loc (str): The S3 input prefix/location for the PDF job file. | |
| pdf_filename (str): The PDF file name (without extension). | |
| local_output_dir (str): The local directory for saving the file. | |
| s3_bucket_name (str): The S3 bucket name. | |
| in_doc_files (FileData, optional): List of Gradio FileData objects or paths that may already contain the PDF file. Defaults to []. | |
| input_folder (str, optional): Input folder path on disk. Defaults to INPUT_FOLDER. | |
| RUN_AWS_FUNCTIONS (bool, optional): Whether to run AWS-related operations. Defaults to RUN_AWS_FUNCTIONS global. | |
| progress (gr.Progress, optional): Gradio Progress object for reporting progress. Defaults to a tqdm-enabled progress tracker. | |
| Returns: | |
| Returns the downloaded file location and associated file name information for downstream use. | |
| """ | |
| progress(0.1, "Loading PDF from s3") | |
| if in_doc_files: | |
| ( | |
| doc_file_name_no_extension_textbox, | |
| doc_file_name_with_extension_textbox, | |
| doc_full_file_name_textbox, | |
| doc_file_name_textbox_list, | |
| total_pdf_page_count, | |
| ) = get_input_file_names(in_doc_files) | |
| if pdf_filename == doc_file_name_no_extension_textbox: | |
| print("Existing loaded PDF file has same name as file from S3") | |
| doc_file_name_no_extension_textbox = pdf_filename | |
| downloaded_pdf_file_location = in_doc_files | |
| else: | |
| downloaded_pdf_file_location, doc_file_name_no_extension_textbox = ( | |
| load_pdf_job_file_from_s3( | |
| load_s3_jobs_input_loc, | |
| pdf_filename, | |
| local_output_dir, | |
| s3_bucket_name, | |
| RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, | |
| ) | |
| ) | |
| ( | |
| doc_file_name_no_extension_textbox, | |
| doc_file_name_with_extension_textbox, | |
| doc_full_file_name_textbox, | |
| doc_file_name_textbox_list, | |
| total_pdf_page_count, | |
| ) = get_input_file_names(downloaded_pdf_file_location) | |
| else: | |
| downloaded_pdf_file_location, doc_file_name_no_extension_textbox = ( | |
| load_pdf_job_file_from_s3( | |
| load_s3_jobs_input_loc, | |
| pdf_filename, | |
| local_output_dir, | |
| s3_bucket_name, | |
| RUN_AWS_FUNCTIONS=RUN_AWS_FUNCTIONS, | |
| ) | |
| ) | |
| ( | |
| doc_file_name_no_extension_textbox, | |
| doc_file_name_with_extension_textbox, | |
| doc_full_file_name_textbox, | |
| doc_file_name_textbox_list, | |
| total_pdf_page_count, | |
| ) = get_input_file_names(downloaded_pdf_file_location) | |
| return ( | |
| downloaded_pdf_file_location, | |
| doc_file_name_no_extension_textbox, | |
| doc_file_name_with_extension_textbox, | |
| doc_full_file_name_textbox, | |
| doc_file_name_textbox_list, | |
| total_pdf_page_count, | |
| ) | |
| def poll_whole_document_textract_analysis_progress_and_download( | |
| job_id: str, | |
| job_type_dropdown: str, | |
| s3_output_prefix: str, | |
| pdf_filename: str, | |
| job_df: pd.DataFrame, | |
| s3_bucket_name: str = TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET, | |
| local_output_dir: str = OUTPUT_FOLDER, | |
| load_s3_jobs_loc: str = TEXTRACT_JOBS_S3_LOC, | |
| load_local_jobs_loc: str = TEXTRACT_JOBS_LOCAL_LOC, | |
| aws_region: str = AWS_REGION, # Optional: specify region if not default | |
| load_jobs_from_s3: str = LOAD_PREVIOUS_TEXTRACT_JOBS_S3, | |
| poll_interval_seconds: int = 1, | |
| max_polling_attempts: int = 1, # ~10 minutes total wait time | |
| DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS: int = DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| """ | |
| Polls AWS Textract for the status of a document analysis job and, once finished, downloads and combines the output into a local JSON file for further processing. | |
| Args: | |
| job_id (str): The AWS Textract job ID to check for completion. | |
| job_type_dropdown (str): The Textract operation type to use ('document_analysis' or 'document_text_detection'). | |
| s3_output_prefix (str): The S3 prefix (folder path) where the job's output files are located. | |
| pdf_filename (str): The name of the PDF document associated with this job. | |
| job_df (pd.DataFrame): DataFrame containing information from previous Textract API calls. | |
| s3_bucket_name (str, optional): S3 bucket containing the job outputs. Defaults to TEXTRACT_WHOLE_DOCUMENT_ANALYSIS_BUCKET. | |
| local_output_dir (str, optional): Local directory to which output JSON results will be saved. Defaults to OUTPUT_FOLDER. | |
| load_s3_jobs_loc (str, optional): S3 location for previously saved Textract jobs metadata. Defaults to TEXTRACT_JOBS_S3_LOC. | |
| load_local_jobs_loc (str, optional): Local location for previously saved Textract jobs metadata. Defaults to TEXTRACT_JOBS_LOCAL_LOC. | |
| aws_region (str, optional): AWS region for API calls. Defaults to AWS_REGION. | |
| load_jobs_from_s3 (str, optional): Whether to load previous jobs from S3 or local. Defaults to LOAD_PREVIOUS_TEXTRACT_JOBS_S3. | |
| poll_interval_seconds (int, optional): Seconds between polling attempts. Defaults to 1. | |
| max_polling_attempts (int, optional): How many times to check the job's status before timing out. Defaults to 1. | |
| DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS (int, optional): How many days back to display finished jobs. Defaults to DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS. | |
| progress (gr.Progress, optional): Gradio Progress object for tracking progress in a UI. | |
| Returns: | |
| [function output not explicitly documented here; see function logic for details] | |
| Raises: | |
| Exception: If job fails, polling times out, or download fails. | |
| """ | |
| progress(0.1, "Querying AWS Textract for status of document analysis job") | |
| if job_id: | |
| # Initialize boto3 clients | |
| session = boto3.Session(region_name=aws_region) | |
| s3_client = session.client("s3") | |
| textract_client = session.client("textract") | |
| # --- 3. Poll for Job Completion --- | |
| job_status = "IN_PROGRESS" | |
| attempts = 0 | |
| message = "Polling Textract for job completion status..." | |
| print(message) | |
| # logging.info("Polling Textract for job completion status...") | |
| # Update Textract document history df | |
| try: | |
| job_df = load_in_textract_job_details( | |
| load_s3_jobs=load_jobs_from_s3, | |
| load_s3_jobs_loc=load_s3_jobs_loc, | |
| load_local_jobs_loc=load_local_jobs_loc, | |
| ) | |
| except Exception as e: | |
| print(f"Failed to update job details dataframe: {e}") | |
| while job_status == "IN_PROGRESS" and attempts <= max_polling_attempts: | |
| attempts += 1 | |
| try: | |
| if job_type_dropdown == "document_analysis": | |
| response = textract_client.get_document_analysis(JobId=job_id) | |
| job_status = return_job_status( | |
| job_id, | |
| response, | |
| attempts, | |
| poll_interval_seconds, | |
| max_polling_attempts, | |
| ) | |
| elif job_type_dropdown == "document_text_detection": | |
| response = textract_client.get_document_text_detection(JobId=job_id) | |
| job_status = return_job_status( | |
| job_id, | |
| response, | |
| attempts, | |
| poll_interval_seconds, | |
| max_polling_attempts, | |
| ) | |
| else: | |
| error = "Unknown job type, cannot poll job" | |
| print(error) | |
| logging.error(error) | |
| raise Exception(error) | |
| except textract_client.exceptions.InvalidJobIdException: | |
| error_message = f"Invalid JobId: {job_id}. This might happen if the job expired (older than {DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS} days) or never existed." | |
| print(error_message) | |
| logging.error(error_message) | |
| raise Exception(error_message) | |
| except Exception as e: | |
| error_message = ( | |
| f"Error while polling Textract status for job {job_id}: {e}" | |
| ) | |
| print(error_message) | |
| logging.error(error_message) | |
| raise Exception(error_message) | |
| downloaded_file_path = None | |
| if job_status == "SUCCEEDED": | |
| # raise TimeoutError(f"Textract job {job_id} did not complete successfully within the polling limit.") | |
| # 3b - Replace PDF file name if it exists in the job dataframe | |
| progress(0.5, "Document analysis task outputs found. Downloading from S3") | |
| # If job_df is not empty | |
| # if not job_df.empty: | |
| # job_df = job_df.loc[job_df["job_date_time"] > (datetime.datetime.now() - datetime.timedelta(days=DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS)),:] | |
| # Extract signature_extraction from job_df for file naming | |
| handwrite_signature_checkbox = list() | |
| if not job_df.empty: | |
| if "signature_extraction" in job_df.columns: | |
| matching_signature_extraction = job_df.loc[ | |
| job_df["job_id"] == job_id, "signature_extraction" | |
| ] | |
| if not matching_signature_extraction.empty: | |
| signature_extraction_str = matching_signature_extraction.iloc[0] | |
| # Convert string representation to list | |
| # Handle both string representations like "['Extract signatures']" and actual lists | |
| if isinstance(signature_extraction_str, str): | |
| try: | |
| handwrite_signature_checkbox = ast.literal_eval( | |
| signature_extraction_str | |
| ) | |
| except (ValueError, SyntaxError): | |
| # If parsing fails, try to extract from string | |
| handwrite_signature_checkbox = [ | |
| signature_extraction_str | |
| ] | |
| elif isinstance(signature_extraction_str, list): | |
| handwrite_signature_checkbox = signature_extraction_str | |
| if "file_name" in job_df.columns: | |
| matching_job_id_file_names = job_df.loc[ | |
| job_df["job_id"] == job_id, "file_name" | |
| ] | |
| if pdf_filename and not matching_job_id_file_names.empty: | |
| if pdf_filename == matching_job_id_file_names.iloc[0]: | |
| out_message = f"Existing Textract outputs found for file {pdf_filename}. No need to re-download." | |
| gr.Warning(out_message) | |
| raise Exception(out_message) | |
| if not matching_job_id_file_names.empty: | |
| pdf_filename = matching_job_id_file_names.iloc[0] | |
| else: | |
| pdf_filename = "unknown_file" | |
| # --- 4. Download Output JSON from S3 --- | |
| # Textract typically creates output under s3_output_prefix/job_id/ | |
| # There might be multiple JSON files if pagination occurred during writing. | |
| # Usually, for smaller docs, there's one file, often named '1'. | |
| # For robust handling, list objects and find the JSON(s). | |
| s3_output_key_prefix = ( | |
| secure_join(s3_output_prefix, job_id).replace("\\", "/") + "/" | |
| ) | |
| logging.info( | |
| f"Searching for output files in s3://{s3_bucket_name}/{s3_output_key_prefix}" | |
| ) | |
| try: | |
| downloaded_file_path = download_textract_job_files( | |
| s3_client, | |
| s3_bucket_name, | |
| s3_output_key_prefix, | |
| pdf_filename, | |
| job_id, | |
| local_output_dir, | |
| handwrite_signature_checkbox, | |
| ) | |
| except Exception as e: | |
| out_message = ( | |
| f"Failed to download or process Textract output from S3. Error: {e}" | |
| ) | |
| print(out_message) | |
| raise Exception(out_message) | |
| else: | |
| raise Exception("No Job ID provided.") | |
| output_pdf_filename = get_file_name_without_type(pdf_filename) | |
| return downloaded_file_path, job_status, job_df, output_pdf_filename | |
| def load_in_textract_job_details( | |
| load_s3_jobs: str = LOAD_PREVIOUS_TEXTRACT_JOBS_S3, | |
| load_s3_jobs_loc: str = TEXTRACT_JOBS_S3_LOC, | |
| load_local_jobs_loc: str = TEXTRACT_JOBS_LOCAL_LOC, | |
| document_redaction_bucket: str = DOCUMENT_REDACTION_BUCKET, | |
| aws_region: str = AWS_REGION, | |
| DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS: int = DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS, | |
| ): | |
| """ | |
| Load in a dataframe of jobs previous submitted to the Textract API service. | |
| """ | |
| job_df = pd.DataFrame( | |
| columns=[ | |
| "job_id", | |
| "file_name", | |
| "job_type", | |
| "signature_extraction", | |
| "job_date_time", | |
| ] | |
| ) | |
| # Initialize boto3 clients | |
| session = boto3.Session(region_name=aws_region) | |
| s3_client = session.client("s3") | |
| local_output_path = f"{load_local_jobs_loc}/textract_document_jobs.csv" | |
| if load_s3_jobs == "True": | |
| s3_output_key = f"{load_s3_jobs_loc}/textract_document_jobs.csv" | |
| try: | |
| s3_client.head_object(Bucket=document_redaction_bucket, Key=s3_output_key) | |
| # print(f"File exists. Downloading from '{s3_output_key}' to '{local_output_path}'...") | |
| s3_client.download_file( | |
| document_redaction_bucket, s3_output_key, local_output_path | |
| ) | |
| # print("Download successful.") | |
| except ClientError as e: | |
| if e.response["Error"]["Code"] == "404": | |
| print("Log file does not exist in S3.") | |
| else: | |
| print(f"Unexpected error occurred: {e}") | |
| except (NoCredentialsError, PartialCredentialsError, TokenRetrievalError) as e: | |
| print(f"AWS credential issue encountered: {e}") | |
| print("Skipping S3 log file download.") | |
| # If the log path exists, load it in | |
| if os.path.exists(local_output_path): | |
| print("Found Textract job list log file in local path") | |
| job_df = pd.read_csv(local_output_path) | |
| if "job_date_time" in job_df.columns: | |
| job_df["job_date_time"] = pd.to_datetime( | |
| job_df["job_date_time"], errors="coerce" | |
| ) | |
| # Keep only jobs that have been completed in the last 'DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS' days | |
| cutoff_time = pd.Timestamp.now() - pd.Timedelta( | |
| days=DAYS_TO_DISPLAY_WHOLE_DOCUMENT_JOBS | |
| ) | |
| job_df = job_df.loc[job_df["job_date_time"] > cutoff_time, :] | |
| try: | |
| job_df = job_df[ | |
| [ | |
| "job_id", | |
| "file_name", | |
| "job_type", | |
| "signature_extraction", | |
| "job_date_time", | |
| ] | |
| ] | |
| except Exception as e: | |
| print( | |
| "Could not find one or more columns in Textract job list log file.", | |
| f"Error: {e}", | |
| ) | |
| return job_df | |
| def download_textract_output( | |
| job_id: str, output_bucket: str, output_prefix: str, local_folder: str | |
| ): | |
| """ | |
| Checks the status of a Textract job and downloads the output ZIP file if the job is complete. | |
| :param job_id: The Textract job ID. | |
| :param output_bucket: The S3 bucket where the output is stored. | |
| :param output_prefix: The prefix (folder path) in S3 where the output file is stored. | |
| :param local_folder: The local directory where the ZIP file should be saved. | |
| """ | |
| textract_client = boto3.client("textract") | |
| s3_client = boto3.client("s3") | |
| # Check job status | |
| while True: | |
| response = textract_client.get_document_analysis(JobId=job_id) | |
| status = response["JobStatus"] | |
| if status == "SUCCEEDED": | |
| print("Job completed successfully.") | |
| break | |
| elif status == "FAILED": | |
| print( | |
| "Job failed:", | |
| response.get("StatusMessage", "No error message provided."), | |
| ) | |
| return | |
| else: | |
| print(f"Job is still {status}.") | |
| # time.sleep(10) # Wait before checking again | |
| # Find output ZIP file in S3 | |
| output_file_key = f"{output_prefix}/{job_id}.zip" | |
| local_file_path = secure_join(local_folder, f"{job_id}.zip") | |
| # Download file | |
| try: | |
| s3_client.download_file(output_bucket, output_file_key, local_file_path) | |
| print(f"Output file downloaded to: {local_file_path}") | |
| except Exception as e: | |
| print(f"Error downloading file: {e}") | |
| def check_textract_outputs_exist(textract_output_found_checkbox): | |
| if textract_output_found_checkbox is True: | |
| print("Textract outputs found") | |
| return | |
| else: | |
| raise Exception( | |
| "Relevant Textract outputs not found. Please ensure you have selected to correct results output and you have uploaded the relevant document file in 'Choose document or image file...' above" | |
| ) | |