Spaces:
Runtime error
Runtime error
| import io | |
| import json | |
| import os | |
| from typing import List, Tuple | |
| import pandas as pd | |
| import re | |
| from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor | |
| from img2table.document import Image | |
| import fitz | |
| import PIL | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.docstore.document import Document | |
| import torch | |
| # from kalbecdt_genomic_paper_extractor_proto.pdf_processor.tables_processor import detection_transform, device, outputs_to_objects, init | |
| class ProcessorPDF: | |
| def __init__(self): | |
| pass | |
| # self.ocr, self.model = init() | |
| def load_pdf(self, path: str, return_tables=False) -> Tuple[List[Document],pd.DataFrame]: | |
| loader = PyPDFLoader(path) | |
| # with ThreadPoolExecutor(max_workers=5) as exe: | |
| # result_tables = exe.submit(self.get_tables,path) | |
| # result_pages = exe.submit(loader.load) | |
| result_tables = None | |
| if return_tables: | |
| result_tables = self.get_tables(path) | |
| result_pages = loader.load() | |
| return result_pages, result_tables | |
| # attribution: KalbeDigitalLab/nutrigenme-paper-extractor | |
| # def get_tables(self, path) -> pd.DataFrame: | |
| # doc = fitz.open(path) | |
| # images = [] | |
| # for page in doc: | |
| # pix = page.get_pixmap() | |
| # images.append(PIL.Image.frombytes("RGB", [pix.width, pix.height], pix.samples)) | |
| # tables = [] | |
| # # Loop pages | |
| # for image in images: | |
| # pixel_values = detection_transform(image).unsqueeze(0).to(device) | |
| # with torch.no_grad(): | |
| # outputs = self.model(pixel_values) | |
| # id2label = self.model.config.id2label | |
| # id2label[len(self.model.config.id2label)] = "no object" | |
| # detected_tables = outputs_to_objects(outputs, image.size, id2label) | |
| # # Loop table in page (if any) | |
| # for idx in range(len(detected_tables)): | |
| # cropped_table = image.crop(detected_tables[idx]["bbox"]) | |
| # if detected_tables[idx]["label"] == 'table rotated': | |
| # cropped_table = cropped_table.rotate(270, expand=True) | |
| # # TODO: what is the perfect threshold? | |
| # if detected_tables[idx]['score'] > 0.9: | |
| # print(detected_tables[idx]) | |
| # tables.append(cropped_table) | |
| # df_result = [] | |
| # # Loop tables | |
| # for table in tables: | |
| # buffer = io.BytesIO() | |
| # table.save(buffer, format='PNG') | |
| # image = Image(buffer) | |
| # # Extract to dataframe | |
| # extracted_tables = image.extract_tables(ocr=self.ocr, implicit_rows=True, borderless_tables=True, min_confidence=0) | |
| # if len(extracted_tables) == 0: | |
| # continue | |
| # # Combine multiple dataframe | |
| # df_table = extracted_tables[0].df | |
| # for extracted_table in extracted_tables[1:]: | |
| # df_table = pd.concat([df_table, extracted_table.df]).reset_index(drop=True) | |
| # df_table = df_table.fillna('') | |
| # df_result.append(df_table) | |
| # return df_result |