edwinrajeev's picture
Rename extractor.py to app.py
9df5754 verified
import gradio as gr
import boto3
from pdf2image import convert_from_path
from io import BytesIO
import time
from aws import get_region, get_key_id, get_access_key
from openpyxl import load_workbook
import os
textract = boto3.client('textract',
region_name=get_region(),
aws_access_key_id=get_key_id(),
aws_secret_access_key=get_access_key())
def analyze_document_local(image_bytes):
"""Calls Textract's analyze_document API using the local image bytes."""
response = textract.analyze_document(
Document={'Bytes': image_bytes},
FeatureTypes=['LAYOUT', 'TABLES']
)
return response
def get_text_from_cell(cell, page):
"""Extracts the text from a table cell by looking up child WORD blocks."""
text = ''
for relationship in cell.get('Relationships', []):
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
# Find the child block in the page
word_block = next((b for b in page['Blocks'] if b['Id'] == child_id), None)
if word_block and word_block['BlockType'] == 'WORD':
text += word_block.get('Text', '') + ' '
return text.strip()
def extract_table_data(page_response):
"""
Extracts table data from a single Textract response (representing one image/page).
Returns a list of tables (each table is represented as a list of rows).
"""
blocks = page_response['Blocks']
tables_data = []
for block in blocks:
if block['BlockType'] == 'TABLE':
table = []
rows = {}
# Get each cell in the table
for relationship in block.get('Relationships', []):
if relationship['Type'] == 'CHILD':
for child_id in relationship['Ids']:
cell = next((b for b in blocks if b['Id'] == child_id), None)
if cell and cell['BlockType'] == 'CELL':
row_index = cell['RowIndex']
col_index = cell['ColumnIndex']
cell_text = get_text_from_cell(cell, page_response)
if row_index not in rows:
rows[row_index] = {}
rows[row_index][col_index] = cell_text
# Create a sorted table by row and column indices
for row in sorted(rows.keys()):
sorted_row = [rows[row].get(col, '') for col in sorted(rows[row].keys())]
table.append(sorted_row)
tables_data.append(table)
return tables_data
def ocr_to_excel(pdf_file, template_file):
"""
Given a PDF file and an Excel template file, converts the PDF pages to images,
extracts table data using AWS Textract (skipping the first two rows of each table),
then creates a new Excel file by copying the template sheet for each table
and writing the filtered data starting at cell A7.
The output file is saved with the same base name as the PDF but with a .xlsx extension.
"""
pages = convert_from_path(pdf_file, dpi=200,
poppler_path=r"C:\Users\sshivlani\Projects\Inspection-Form-Transcriber\poppler-23.07.0\Library\bin") # Adjust later
all_tables = []
for i, page_image in enumerate(pages):
buffer = BytesIO()
# Save image as JPEG (quality=90)
page_image.save(buffer, format='JPEG')
image_bytes = buffer.getvalue()
# Call Textract synchronously on this image
response = analyze_document_local(image_bytes)
# Delay to avoid throttling
time.sleep(1)
tables = extract_table_data(response)
if tables:
all_tables.extend(tables)
# Load the Excel template workbook
wb = load_workbook(template_file)
# Use the first worksheet as the template sheet
template_sheet = wb.worksheets[0]
if (template_file == 'Transect Datasheets.xlsx'):
for idx, table in enumerate(all_tables, start=1):
new_sheet = wb.copy_worksheet(template_sheet)
new_sheet.title = f"Page_{idx}"
start_row = 12 # Data will start at row 7 (i.e. cell A7)
# Exclude the first two rows of the extracted table data
data_rows = table[1:]
for r_idx, row in enumerate(data_rows):
for c_idx, value in enumerate(row):
new_sheet.cell(row=start_row + r_idx, column=c_idx + 1, value=value)
if (template_file == 'Line Intercept-BB Transect.xlsx' or template_file == 'SAV Survey Datasheet.xlsx'):
for idx, table in enumerate(all_tables, start=1):
new_sheet = wb.copy_worksheet(template_sheet)
new_sheet.title = f"Page_{idx}"
start_row = 7 # Data will start at row 7 (i.e. cell A7)
# Exclude the first two rows of the extracted table data
data_rows = table[2:]
for r_idx, row in enumerate(data_rows):
for c_idx, value in enumerate(row):
new_sheet.cell(row=start_row + r_idx, column=c_idx + 1, value=value)
if (template_file == 'Blank Quad Datasheets.xlsx'):
for idx, table in enumerate(all_tables, start=1):
new_sheet = wb.copy_worksheet(template_sheet)
new_sheet.title = f"Page_{idx}"
start_row = 5 # Data will start at row 7 (i.e. cell A7)
# Exclude the first two rows of the extracted table data
data_rows = table[1:]
for r_idx, row in enumerate(data_rows):
for c_idx, value in enumerate(row):
new_sheet.cell(row=start_row + r_idx, column=c_idx + 1, value=value)
wb.remove(template_sheet)
# Construct output filename: same as PDF's base name with .xlsx extension
base_name = os.path.splitext(os.path.basename(pdf_file))[0]
output_path = base_name + ".xlsx"
wb.save(output_path)
return output_path
iface = gr.Interface(
fn=ocr_to_excel,
inputs=[
gr.File(label="PDF for OCR"),
gr.Dropdown(
choices=["Transect Datasheets.xlsx", "Line Intercept-BB Transect.xlsx", "SAV Survey Datasheet.xlsx", "Blank Quad Datasheets.xlsx"],
label="Excel Template"
)
],
outputs=gr.File(label="Output Excel File"),
title="Handwritten Datasheets to Excel File (.xlsx)",
description="Upload a PDF file and select an Excel template. The OCR data will be appended to duplicated copies of the template sheet, and the resulting Excel file will be returned."
)
iface.launch()