| | from abc import ABC, abstractmethod |
| | from pathlib import Path |
| | from urllib.parse import urlparse |
| | import requests |
| | import fitz |
| | import io |
| | import re |
| | import hashlib |
| | import os |
| |
|
| |
|
| | class FileTypeError(Exception): |
| | """Raised when the file type does not match the expected file type.""" |
| |
|
| |
|
| | class FileSchemeError(Exception): |
| | """Raised when the file scheme does not match the expected file scheme.""" |
| |
|
| |
|
| | class FileProcessor(ABC): |
| | type = None |
| |
|
| | def __init__(self, path): |
| | self.path = path |
| | self.file_scheme = self._get_file_scheme() |
| | self.__class__._check_file_type(path) |
| |
|
| | @abstractmethod |
| | def get_file_data(self): |
| | pass |
| |
|
| | @abstractmethod |
| | def _get_file_metadata(self): |
| | pass |
| |
|
| | @abstractmethod |
| | def _get_file_paragraphs(self): |
| | pass |
| |
|
| | @classmethod |
| | def _check_file_type(cls, path): |
| | file_type = Path(path).suffix.lower()[1:] |
| | if file_type != cls.type: |
| | raise FileTypeError( |
| | f"Invalid file type. {cls.__name__} expects a {cls.type} file" |
| | ) |
| |
|
| | def _get_file_scheme(self): |
| | parsed_path = urlparse(self.path) |
| | if ( |
| | not parsed_path.scheme |
| | or parsed_path.scheme.lower() == "file" |
| | or os.path.isfile(self.path) |
| | ): |
| | return "local" |
| | elif parsed_path.scheme.lower() in ["http", "https", "ftp"]: |
| | return "url" |
| | else: |
| | raise FileSchemeError("Unknown scheme") |
| |
|
| | def _preprocess_text(self, text): |
| | text = text.replace("\n", " ") |
| | text = re.sub("\s+", " ", text) |
| | text = text.encode("utf-8", "ignore").decode("utf-8", "ignore") |
| | return text |
| |
|
| | def _generate_hash(self, string): |
| | hash_object = hashlib.md5() |
| | hash_object.update(string.encode("utf-8", "ignore")) |
| | hex_dig = hash_object.hexdigest() |
| |
|
| | return hex_dig |
| |
|
| | def generate_paragraphs(): |
| | raise NotImplementedError |
| |
|
| | def generate_metadata(): |
| | raise NotImplementedError |
| |
|
| |
|
| | class PDFProcessor(FileProcessor): |
| | type = "pdf" |
| |
|
| | def __init__(self, path): |
| | super().__init__(path) |
| |
|
| | def get_file_data(self, merge_length=200): |
| | file = self._open_file() |
| |
|
| | file_metadata = self._get_file_metadata(file) |
| | file_paragraphs = self._get_file_paragraphs( |
| | file, file_metadata, start_page=1, end_page=None, merge_length=merge_length |
| | ) |
| |
|
| | file.close() |
| |
|
| | return file_metadata, file_paragraphs |
| |
|
| | def _get_file_metadata(self, file): |
| | file_metadata = dict() |
| |
|
| | metadata = file.metadata |
| |
|
| | unique_string = str(Path(self.path).name) + metadata["title"] |
| |
|
| | file_metadata["id"] = self._generate_hash(unique_string) |
| | file_metadata["title"] = metadata["title"] |
| | file_metadata["author"] = metadata["author"] |
| | file_metadata["subject"] = metadata["subject"] |
| | file_metadata["creation_date"] = metadata["creationDate"] |
| | file_metadata["modification_date"] = metadata["modDate"] |
| | file_metadata["n_pages"] = file.page_count |
| | if self.file_scheme == "local": |
| | file_metadata["url"] = str(Path(self.path).resolve()) |
| | else: |
| | file_metadata["url"] = self.path |
| | file_metadata["file_name"] = Path(self.path).name |
| | file_metadata["short_name"] = Path(self.path).name |
| | file_metadata["release_date"] = "" |
| | file_metadata["report_type"] = "" |
| | file_metadata["source"] = "" |
| |
|
| | return file_metadata |
| |
|
| | def _get_file_paragraphs( |
| | self, file, file_metadata, start_page=1, end_page=None, merge_length=200 |
| | ): |
| | if end_page is None: |
| | end_page = file_metadata["n_pages"] |
| |
|
| | file_paragraphs = [] |
| |
|
| | for page_num in range(start_page - 1, end_page): |
| | page = file.load_page(page_num) |
| | blocks = page.get_text("blocks") |
| |
|
| | for block in blocks: |
| | paragraph = self._process_block( |
| | block, page, page_num + start_page, file_metadata["id"] |
| | ) |
| | if paragraph is None: |
| | continue |
| |
|
| | first_char = paragraph["content"][0] |
| | if len(file_paragraphs) > 0: |
| | if ( |
| | len(file_paragraphs[-1]["content"]) + len(paragraph["content"]) |
| | < merge_length |
| | ) or (first_char.islower() and first_char.isalpha()): |
| | file_paragraphs[-1]["content"] += " " + paragraph["content"] |
| | file_paragraphs[-1]["length"] = len( |
| | file_paragraphs[-1]["content"] |
| | ) |
| | else: |
| | file_paragraphs.append(paragraph) |
| | else: |
| | file_paragraphs.append(paragraph) |
| |
|
| | return file_paragraphs |
| |
|
| | def _open_file(self): |
| | if self.file_scheme == "url": |
| | response = requests.get(self.path) |
| | file = fitz.open(stream=io.BytesIO(response.content), filetype="pdf") |
| | elif self.file_scheme == "local": |
| | file = fitz.open(self.path) |
| | return file |
| |
|
| | def _process_block(self, block, page, page_number, file_id): |
| | x0, y0, x1, y1, content, block_no, block_type = block |
| |
|
| | if content.isspace() or block_type == 1: |
| | return None |
| |
|
| | content = self._preprocess_text(content) |
| | unique_content_string = "_".join(map(str, block)) |
| | paragraph_id = self._generate_hash(unique_content_string) |
| |
|
| | w, h = page.rect.width, page.rect.height |
| | paragraph = { |
| | "id": paragraph_id, |
| | "document_id": file_id, |
| | "content_type": "text" if block_type == 0 else "image", |
| | "content": content, |
| | "length": len(content), |
| | "idx_block": block_no, |
| | "page_number": page_number, |
| | "x0": x0 / h, |
| | "y0": y0 / w, |
| | "x1": x1 / h, |
| | "y1": y1 / w, |
| | } |
| |
|
| | return paragraph |
| |
|
| |
|
| | class HTMLProcessor(FileProcessor): |
| | type = "html" |
| |
|
| | def __init__(self, path): |
| | super().__init__(path) |
| |
|
| | def get_file_data(self): |
| | pass |
| |
|
| | def _get_file_metadata(self): |
| | pass |
| |
|
| | def _get_file_paragraphs(self): |
| | pass |
| |
|
| | def _open_file(self): |
| | if self.file_scheme == "url": |
| | response = requests.get(self.path) |
| | file = response.text |
| | elif self.file_scheme == "local": |
| | file = open(self.path, "r").read() |
| | return file |
| |
|