| | import io |
| | import os |
| | from azure.core.credentials import AzureKeyCredential |
| | from azure.ai.formrecognizer import DocumentAnalysisClient |
| | import pandas as pd |
| | from io import BytesIO |
| |
|
| | def detect_tables(pdflist, pdfnames): |
| | """ |
| | - pdflist: a list of PDF bytes (each element is a bytes or bytearray object). |
| | - pdfnames: a list of strings, where pdfnames[i] is the path or name for pdflist[i]. |
| | Both lists must have the same length. |
| | |
| | The function: |
| | 1. Calls Azure Form Recognizer (prebuilt-layout) on each PDF bytes. |
| | 2. Extracts all tables, adding columns: 'pdf_name', 'table_id', 'page_number'. |
| | 3. Concatenates everything into one Excel worksheet named "Tables", leaving two blank rows between each PDF’s block. |
| | 4. Returns a BytesIO buffer containing the .xlsx. If no tables are found, returns None. |
| | """ |
| |
|
| | |
| | if not isinstance(pdflist, (list, tuple)) or not isinstance(pdfnames, (list, tuple)): |
| | raise ValueError("Both pdflist and pdfnames must be lists (or tuples).") |
| | if len(pdflist) != len(pdfnames): |
| | raise ValueError("pdflist and pdfnames must have the same length.") |
| |
|
| | |
| | endpoint = "https://tabledetection2.cognitiveservices.azure.com/" |
| | key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH" |
| | credential = AzureKeyCredential(key) |
| | client = DocumentAnalysisClient(endpoint=endpoint, credential=credential) |
| |
|
| | tables_by_pdf = [] |
| |
|
| | |
| | for pdf_bytes, pdf_path in zip(pdflist, pdfnames): |
| | |
| | if not isinstance(pdf_bytes, (bytes, bytearray)) or not isinstance(pdf_path, str): |
| | continue |
| |
|
| | |
| | pdf_name = os.path.basename(pdf_path) |
| |
|
| | stream = io.BytesIO(pdf_bytes) |
| | per_pdf_tables = [] |
| |
|
| | |
| | poller = client.begin_analyze_document("prebuilt-layout", document=stream) |
| | result = poller.result() |
| |
|
| | |
| | for table_idx, table in enumerate(result.tables, start=1): |
| | |
| | cols = max(cell.column_index for cell in table.cells) + 1 |
| | rows = max(cell.row_index for cell in table.cells) + 1 |
| | grid = [["" for _ in range(cols)] for _ in range(rows)] |
| |
|
| | for cell in table.cells: |
| | grid[cell.row_index][cell.column_index] = cell.content |
| |
|
| | df = pd.DataFrame(grid) |
| | df["page_number"] = table.bounding_regions[0].page_number |
| | df["table_id"] = table_idx |
| | df["pdf_name"] = pdf_name |
| | df = df.replace(r':+(?:selected|unselected):*', '', regex=True) |
| |
|
| |
|
| | per_pdf_tables.append(df) |
| |
|
| | if per_pdf_tables: |
| | tables_by_pdf.append((pdf_name, per_pdf_tables)) |
| |
|
| | |
| | if not tables_by_pdf: |
| | return None |
| |
|
| | |
| | excel_buffer = BytesIO() |
| | with pd.ExcelWriter(excel_buffer, engine="openpyxl") as writer: |
| | sheet_name = "Tables" |
| | current_row = 0 |
| | first_block = True |
| |
|
| | for pdf_name, dfs in tables_by_pdf: |
| | for df in dfs: |
| | |
| | write_header = first_block and (current_row == 0) |
| |
|
| | df.to_excel( |
| | writer, |
| | sheet_name=sheet_name, |
| | index=False, |
| | header=write_header, |
| | startrow=current_row |
| | ) |
| |
|
| | |
| | |
| | |
| | rows_written = df.shape[0] + (1 if write_header else 0) |
| | current_row += rows_written |
| | first_block = False |
| |
|
| | |
| | current_row += 2 |
| |
|
| | excel_buffer.seek(0) |
| | return excel_buffer |
| |
|