not_working_duplicate2 / Azure_api.py
Marthee's picture
Update Azure_api.py
86c797e verified
import io
import os
from azure.core.credentials import AzureKeyCredential
from azure.ai.formrecognizer import DocumentAnalysisClient
import pandas as pd
from io import BytesIO
def detect_tables(pdflist, pdfnames):
"""
- pdflist: a list of PDF bytes (each element is a bytes or bytearray object).
- pdfnames: a list of strings, where pdfnames[i] is the path or name for pdflist[i].
Both lists must have the same length.
The function:
1. Calls Azure Form Recognizer (prebuilt-layout) on each PDF bytes.
2. Extracts all tables, adding columns: 'pdf_name', 'table_id', 'page_number'.
3. Concatenates everything into one Excel worksheet named "Tables", leaving two blank rows between each PDF’s block.
4. Returns a BytesIO buffer containing the .xlsx. If no tables are found, returns None.
"""
# 1. Validate inputs
if not isinstance(pdflist, (list, tuple)) or not isinstance(pdfnames, (list, tuple)):
raise ValueError("Both pdflist and pdfnames must be lists (or tuples).")
if len(pdflist) != len(pdfnames):
raise ValueError("pdflist and pdfnames must have the same length.")
# 2. Set up Azure Form Recognizer client
endpoint = "https://tabledetection2.cognitiveservices.azure.com/"
key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH"
credential = AzureKeyCredential(key)
client = DocumentAnalysisClient(endpoint=endpoint, credential=credential)
tables_by_pdf = []
# 3. Loop over each PDF-bytes / name pair
for pdf_bytes, pdf_path in zip(pdflist, pdfnames):
# Skip anything that isn’t raw bytes or whose name isn’t a string
if not isinstance(pdf_bytes, (bytes, bytearray)) or not isinstance(pdf_path, str):
continue
# Extract the filename from the path
pdf_name = os.path.basename(pdf_path)
stream = io.BytesIO(pdf_bytes)
per_pdf_tables = []
# Call Form Recognizer on this PDF bytes
poller = client.begin_analyze_document("prebuilt-layout", document=stream)
result = poller.result()
# Extract every table as a DataFrame
for table_idx, table in enumerate(result.tables, start=1):
# Determine the grid size
cols = max(cell.column_index for cell in table.cells) + 1
rows = max(cell.row_index for cell in table.cells) + 1
grid = [["" for _ in range(cols)] for _ in range(rows)]
for cell in table.cells:
grid[cell.row_index][cell.column_index] = cell.content
df = pd.DataFrame(grid)
df["page_number"] = table.bounding_regions[0].page_number
df["table_id"] = table_idx
df["pdf_name"] = pdf_name
df = df.replace(r':+(?:selected|unselected):*', '', regex=True)
per_pdf_tables.append(df)
if per_pdf_tables:
tables_by_pdf.append((pdf_name, per_pdf_tables))
# If no tables at all, return None
if not tables_by_pdf:
return None
# 4. Write all tables into one sheet, with 2 blank rows between PDFs
excel_buffer = BytesIO()
with pd.ExcelWriter(excel_buffer, engine="openpyxl") as writer:
sheet_name = "Tables"
current_row = 0
first_block = True
for pdf_name, dfs in tables_by_pdf:
for df in dfs:
# Only write headers on the very first table in the sheet
write_header = first_block and (current_row == 0)
df.to_excel(
writer,
sheet_name=sheet_name,
index=False,
header=write_header,
startrow=current_row
)
# Advance current_row by the number of rows written:
# • df.shape[0] data rows
# • +1 if header was written
rows_written = df.shape[0] + (1 if write_header else 0)
current_row += rows_written
first_block = False
# After finishing this PDF’s tables, insert two blank rows
current_row += 2
excel_buffer.seek(0)
return excel_buffer