|
|
import io |
|
|
import os |
|
|
from azure.core.credentials import AzureKeyCredential |
|
|
from azure.ai.formrecognizer import DocumentAnalysisClient |
|
|
import pandas as pd |
|
|
from io import BytesIO |
|
|
|
|
|
def detect_tables(pdflist, pdfnames): |
|
|
""" |
|
|
- pdflist: a list of PDF bytes (each element is a bytes or bytearray object). |
|
|
- pdfnames: a list of strings, where pdfnames[i] is the path or name for pdflist[i]. |
|
|
Both lists must have the same length. |
|
|
|
|
|
The function: |
|
|
1. Calls Azure Form Recognizer (prebuilt-layout) on each PDF bytes. |
|
|
2. Extracts all tables, adding columns: 'pdf_name', 'table_id', 'page_number'. |
|
|
3. Concatenates everything into one Excel worksheet named "Tables", leaving two blank rows between each PDF’s block. |
|
|
4. Returns a BytesIO buffer containing the .xlsx. If no tables are found, returns None. |
|
|
""" |
|
|
|
|
|
|
|
|
if not isinstance(pdflist, (list, tuple)) or not isinstance(pdfnames, (list, tuple)): |
|
|
raise ValueError("Both pdflist and pdfnames must be lists (or tuples).") |
|
|
if len(pdflist) != len(pdfnames): |
|
|
raise ValueError("pdflist and pdfnames must have the same length.") |
|
|
|
|
|
|
|
|
endpoint = "https://tabledetection2.cognitiveservices.azure.com/" |
|
|
key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH" |
|
|
credential = AzureKeyCredential(key) |
|
|
client = DocumentAnalysisClient(endpoint=endpoint, credential=credential) |
|
|
|
|
|
tables_by_pdf = [] |
|
|
|
|
|
|
|
|
for pdf_bytes, pdf_path in zip(pdflist, pdfnames): |
|
|
|
|
|
if not isinstance(pdf_bytes, (bytes, bytearray)) or not isinstance(pdf_path, str): |
|
|
continue |
|
|
|
|
|
|
|
|
pdf_name = os.path.basename(pdf_path) |
|
|
|
|
|
stream = io.BytesIO(pdf_bytes) |
|
|
per_pdf_tables = [] |
|
|
|
|
|
|
|
|
poller = client.begin_analyze_document("prebuilt-layout", document=stream) |
|
|
result = poller.result() |
|
|
|
|
|
|
|
|
for table_idx, table in enumerate(result.tables, start=1): |
|
|
|
|
|
cols = max(cell.column_index for cell in table.cells) + 1 |
|
|
rows = max(cell.row_index for cell in table.cells) + 1 |
|
|
grid = [["" for _ in range(cols)] for _ in range(rows)] |
|
|
|
|
|
for cell in table.cells: |
|
|
grid[cell.row_index][cell.column_index] = cell.content |
|
|
|
|
|
df = pd.DataFrame(grid) |
|
|
df["page_number"] = table.bounding_regions[0].page_number |
|
|
df["table_id"] = table_idx |
|
|
df["pdf_name"] = pdf_name |
|
|
df = df.replace(r':+(?:selected|unselected):*', '', regex=True) |
|
|
|
|
|
|
|
|
per_pdf_tables.append(df) |
|
|
|
|
|
if per_pdf_tables: |
|
|
tables_by_pdf.append((pdf_name, per_pdf_tables)) |
|
|
|
|
|
|
|
|
if not tables_by_pdf: |
|
|
return None |
|
|
|
|
|
|
|
|
excel_buffer = BytesIO() |
|
|
with pd.ExcelWriter(excel_buffer, engine="openpyxl") as writer: |
|
|
sheet_name = "Tables" |
|
|
current_row = 0 |
|
|
first_block = True |
|
|
|
|
|
for pdf_name, dfs in tables_by_pdf: |
|
|
for df in dfs: |
|
|
|
|
|
write_header = first_block and (current_row == 0) |
|
|
|
|
|
df.to_excel( |
|
|
writer, |
|
|
sheet_name=sheet_name, |
|
|
index=False, |
|
|
header=write_header, |
|
|
startrow=current_row |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rows_written = df.shape[0] + (1 if write_header else 0) |
|
|
current_row += rows_written |
|
|
first_block = False |
|
|
|
|
|
|
|
|
current_row += 2 |
|
|
|
|
|
excel_buffer.seek(0) |
|
|
return excel_buffer |
|
|
|