import PyPDF2 from pdfminer.high_level import extract_pages from pdfminer.layout import LTTextContainer, LTFigure import pdfplumber from pdf2image import convert_from_bytes import pytesseract import logging import requests from io import BytesIO import signal import time import functools import time import signal class ModuleException(Exception): """Exception to be raised in this module.""" def __init__(self, *args: object) -> None: super().__init__(*args) class TimeoutError(ModuleException): """To be raised if the function takes too much time""" def __init__(self, *args: object) -> None: super().__init__(*args) class BadUrlException(ModuleException): """Raised when the url is not valid""" def __init__(self, message): self.message = message super().__init__(self.message) def timeout(seconds_before_timeout): """A decorator that raises an error once the function takes longer than `seconds_before_timeout` time""" def wrapper_wrapper(func): def handler(signum, frame): raise TimeoutError() @functools.wraps(func) def wrapper_function(*args, **kwargs): old = signal.signal(signal.SIGALRM, handler) old_time_left = signal.alarm(seconds_before_timeout) if ( 0 < old_time_left < second_before_timeout ): # never lengthen existing timer signal.alarm(old_time_left) start_time = time.time() try: result = func(*args, **kwargs) finally: if old_time_left > 0: # deduct f's run time from the saved timer old_time_left -= time.time() - start_time signal.signal(signal.SIGALRM, old) signal.alarm(old_time_left) return result return wrapper_function return wrapper_wrapper def get_simple_logger(name, level="info"): """Creates a simple loger that outputs to stdout""" level_to_int_map = { "debug": logging.DEBUG, "info": logging.INFO, "warning": logging.WARNING, "error": logging.ERROR, "critical": logging.CRITICAL, } if isinstance(level, str): level = level_to_int_map[level.lower()] logger = logging.getLogger(name) logger.setLevel(level) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) if logger.hasHandlers(): logger.handlers.clear() handler = logging.StreamHandler() handler.setLevel(level) handler.setFormatter(formatter) logger.addHandler(handler) return logger class PDFExtractor: """A class to extract pdf information. The information that are extracted from the class are: - Textual Information - Tabular Information - Image Information """ def __init__( self, file_path, min_characters=5, maximum_pages=3, is_url=True, ) -> None: """A class that can be used to extract pdf information from a file path or url Parameters ---------- file_path : str The path to the file or the url min_characters : int, optional The minimum number of characters that a text needs to have to be considered a relevant text, by default 5 maximum_pages : int, optional The maximum number of pages that the pdf can have, by default 5 is_url : bool, optional Whether the file_path is a url or not, by default True Raises ------ ValueError If the url raises a status code other than 200 Returns ------- None Examples -------- >>> extractor = PDFExtractor(file_path="file_path", min_characters=5, maximum_pages=5, is_url=False) >>> extractor.extract_pages() """ self.file_path = file_path self.min_characters = min_characters self.maximum_pages = maximum_pages self.logger = get_simple_logger(name="pdf_extractor", level="info") try: self.byte_file = self._create_byte_object(self.file_path, is_url) except Exception as e: self.logger.error(e) raise BadUrlException(str(e)) # PyPDF2 object self.pypdf_object = PyPDF2.PdfReader(self.byte_file) self.pdf_plumber_object = pdfplumber.open(self.byte_file) # Pages from pdfminer self.pdfminer_pages = extract_pages(self.byte_file) self.page_contents = [] self.final_content = "" self.one_page_contents = { "contents": [], "images": [], "tables": [], } # @timeout(120) def _create_byte_object(self, path, is_url): """Creates the byte file based on if it is url or not Parameters ---------- path : str The path to the file or the url is_url : bool Whether the path is a url or not Returns ------- BytesIO The byte file object Raises ------ BadUrlException If the url raises a status code other than 200 """ if is_url: res = requests.get(path) if res.status_code == 200: byte_file = BytesIO(res.content) else: raise BadUrlException(f"The url raised status code: {res.status_code}") else: byte_file = open(path, "rb") return byte_file def __check_for_relevant_text(self, text): """Checks if the text is a relevant text or not Parameters ---------- text : str The text to be checked Returns ------- bool True if the text is a relevant text, False otherwise """ # Remove the line breaker from the text text_ = text.replace("\n", "") if len(text_) > self.min_characters: self.logger.debug(f"Text: {text_} is a relevant text.") return True self.logger.debug(f"Text: {text_} is not a relevant text.") return False def _handle_text(self, component): """Handles the extraction of textual information Parameters ---------- component : pdfplumber.page.Page The page object to extract the text from Returns ------- bool True if the text is a relevant text, False otherwise """ text = component.get_text() if self.__check_for_relevant_text(text): self.one_page_contents["contents"].append(text) return True return False def __table_converter(self, table): """Converts table from the output given by pdf_plumnber to make it more readble Parameters ---------- table : list The table to be converted Returns ------- str The converted table as a string Examples -------- >>> table = [["Name", "Age"], ["John", "30"], ["Jane", "25"]] >>> table_string = __table_converter(table) >>> print(table_string) |Name|Age| |John|30| |Jane|25| >>> table = [["Name", "Age"], ["John", "30"], ["Jane", "25"], ["\n", "None"]] >>> table_string = __table_converter(table) >>> print(table_string) |Name|Age| |John|30| |Jane|25| |None|None| >>> table = [["Name", "Age"], ["John", "30"], ["Jane", "25"], ["\n", "None"], ["\n", "None"]] >>> table_string = __table_converter(table) """ table_string = "" # Iterate through each row of the table for row_num in range(len(table)): row = table[row_num] # Remove the line breaker from the wrapted texts cleaned_row = [ item.replace("\n", " ") if item is not None and "\n" in item else "None" if item is None else item for item in row ] # Convert the table into a string table_string += "|" + "|".join(cleaned_row) + "|" + "\n" # Removing the last line break table_string = table_string[:-1] return table_string def _handle_table(self, page_number): """Handles the extraction of tabular information Parameters ---------- page_number : int The page number to extract the table from Returns ------- None """ page = self.pdf_plumber_object.pages[page_number] try: tables = page.extract_tables() except: tables = [ ["NA", "NA"], ] tables_final = [self.__table_converter(table) for table in tables] self.one_page_contents["tables"] = tables_final # Create a function to crop the image elements from PDFs def __crop_image(self, element, page_number): """Crops the pdf and creates a new pdf with only the cropped area. This will later be converted into image and then OCRed using pytesseract Parameters ---------- element : pdfminer.layout.LTTextContainer The element to crop from the pdf page_number : int The page number to crop the element from Returns ------- bytes The cropped pdf as a byte object """ pypdf_page = self.pypdf_object.pages[page_number] # Get the coordinates to crop the image from PDF [image_left, image_top, image_right, image_bottom] = [ element.x0, element.y0, element.x1, element.y1, ] # Crop the page using coordinates (left, bottom, right, top) pypdf_page.mediabox.lower_left = (image_left, image_bottom) pypdf_page.mediabox.upper_right = (image_right, image_top) # Save the cropped page to a new PDF cropped_pdf_writer = PyPDF2.PdfWriter() cropped_pdf_writer.add_page(pypdf_page) # convert to byte cropped_pdf_stream = BytesIO() cropped_pdf_writer.write(cropped_pdf_stream) byte_object = cropped_pdf_stream.getvalue() return byte_object # Create a function to convert the PDF to images def _convert_to_images(self, pdf_byte): """Converts the pdf byte object to images Parameters ---------- pdf_byte : bytes The pdf byte object to be converted Returns ------- PIL.Image The converted image """ images = convert_from_bytes(pdf_byte) image = images[0] return image # @timeout(20) def _image_to_text(self, image): """Extracts text from image using pytesseract Parameters ---------- image : PIL.Image The image to extract text from Returns ------- str The extracted text from the image Examples -------- >>> image = PIL.Image.open("image.jpg") >>> text = _image_to_text(image) >>> print(text) DUMMY TEXT """ text = pytesseract.image_to_string(image) # text = "DUMMY TEXT" self.logger.debug(f"Extracted {text} from the image.") return text def _handle_image(self, element, page_number): """Handles the extraction of image information Parameters ---------- element : pdfminer.layout.LTFigure The element to extract the image from page_number : int The page number to extract the image from Returns ------- bool True if the image is a relevant image, False otherwise Notes ----- Extract the text from the image using pytesseract. Check if the text is a relevant text or not. If the text is a relevant text, add it to the one_page_contents dictionary with the key "images" If the text is not a relevant text, do nothing Return True if the image is a relevant image, False otherwise If the image is a relevant image, add it to the one_page_contents dictionary with the key "images" If the image is not a relevant image, do nothing Return True if the image is a relevant image, False otherwise """ cropped_pdf = self.__crop_image(element, page_number) image = self._convert_to_images(pdf_byte=cropped_pdf) extracted_text = self._image_to_text(image) if self.__check_for_relevant_text(extracted_text): self.one_page_contents["images"].append(extracted_text) return True return False def extract_one_page(self, page_number, pdfminer_page): """Extracts information from one page of the pdf Parameters ---------- page_number : int The page number to extract the information from pdfminer_page : pdfminer.layout.LTPage The pdfminer page object to extract the information from Returns ------- None """ self.one_page_contents = { "contents": [], "images": [], "tables": [], } self._handle_table(page_number=page_number) max_image_per_page = 2 image_number = 0 for element_number, element in enumerate(pdfminer_page._objs): type_ = type(element) self.logger.debug( f"Handling Page: {page_number}, Element: {element_number} Type: {type_}" ) if isinstance(element, LTTextContainer): self._handle_text(element) if isinstance(element, LTFigure) and image_number < max_image_per_page: added = self._handle_image( element=element, page_number=page_number, ) if added: image_number += 1 self.page_contents.append(self.one_page_contents) def create_final_text_content(self): """Creates the final text using the information extracted so far. The final text has all the textual information, image information and tabular information. This text directly can be used for machine learning. Returns ------- str The final text content of the pdf. This text can be directly used for machine learning. """ final_content = "" for page_number, content in enumerate(self.page_contents): final_content += f"PAGE {page_number}\n" text_contents = content["contents"] final_content += "\n".join(text_contents) for i, image in enumerate(content["images"]): final_content += f"IMAGE {i}\n{image.strip()}\nIMAGE {i} ENDS\n" for i, table in enumerate(content["tables"]): final_content += f"TABLE {i}\n{table.strip()}\nTABLE {i} ENDS\n" final_content += f"PAGE {page_number} ENDS\n" self.final_content = final_content return final_content def extract_pages(self): """Extracts information from all the pages of the pdf. This is the final method to be used Parameters ---------- None Returns ------- str The final text content of the pdf. This text can be directly used for machine learning. """ pages = extract_pages(self.byte_file) for page_number, page in enumerate(pages): self.logger.info(f"Working on the page: {page_number}") if page_number >= self.maximum_pages: self.logger.info(f"Maximum page limit reached. Breaking...") break self.extract_one_page(page_number, page) final_content = self.create_final_text_content() return final_content