Spaces:
Paused
Paused
| ##################################################### | |
| ### DOCUMENT PROCESSOR [PDF READER UTILITIES] | |
| ##################################################### | |
| # Jonathan Wang | |
| # ABOUT: | |
| # This project creates an app to chat with PDFs. | |
| # This is the PDF READER UTILITIES. | |
| # It defines helper functions for the PDF reader, | |
| # such as getting Keywords or finding Contact Info. | |
| ##################################################### | |
| ### TODO Board: | |
| # Better Summarizer than T5, which has been stripped out? | |
| # Better keywords than the RAKE+YAKE fusion we're currently using? | |
| # Consider using GPE/GSP tagging with spacy to confirm mailing addresses? | |
| # Handle FigureCaption somehow. | |
| # Skip Header if it has a Page X or other page number construction. | |
| # Detect images that are substantially overlapping according to coordinates. | |
| # https://stackoverflow.com/questions/49897531/detect-overlapping-images-in-pil | |
| # Keep them in the following order: no confidence score, larger image, higher confidence score | |
| # Detect nodes whose text is substantially repeated at either the top or bottom of the page. | |
| # Utilize the coordinates to ignore the text on the top and bottom two lines. | |
| # Fix OCR issues with spell checking? | |
| # Remove images that are too small in size, and overlapping with text boxes. | |
| # Convert the List[BaseNode] -> List[BaseNode] functions into TransformComponents | |
| ##################################################### | |
| ### Imports | |
| from __future__ import annotations | |
| import difflib | |
| import re | |
| from collections import defaultdict | |
| from copy import deepcopy | |
| from typing import ( | |
| TYPE_CHECKING, | |
| List, | |
| Optional, | |
| Tuple, | |
| TypeVar, | |
| ) | |
| import rapidfuzz | |
| import regex | |
| from llama_index.core.schema import ( | |
| BaseNode, | |
| NodeRelationship, | |
| RelatedNodeInfo, | |
| ) | |
| if TYPE_CHECKING: | |
| from unstructured.documents import elements | |
| ##################################################### | |
| ### CODE | |
| GenericNode = TypeVar("GenericNode", bound=BaseNode) | |
| def clean_pdf_chunk(pdf_chunk: elements.Element) -> elements.Element: | |
| """Given a single element of text from a pdf read by Unstructured, clean its text.""" | |
| ### NOTE: Don't think it's work making this a separate TransformComponent. | |
| # We'd still need to clean bad characters from the reader. | |
| chunk_text = pdf_chunk.text | |
| if (len(chunk_text) > 0): | |
| # Clean any control characters which break the language detection for other parts of the reader. | |
| re_bad_chars = regex.compile(r"[\p{Cc}\p{Cs}]+") | |
| chunk_text = re_bad_chars.sub("", chunk_text) | |
| # Remove PDF citations text | |
| chunk_text = re.sub("\\(cid:\\d+\\)", "", chunk_text) # matches (cid:###) | |
| # Clean whitespace and broken paragraphs | |
| # chunk_text = clean_extra_whitespace(chunk_text) | |
| # chunk_text = group_broken_paragraphs(chunk_text) | |
| # Save cleaned text. | |
| pdf_chunk.text = chunk_text | |
| return pdf_chunk | |
| def clean_abbreviations(pdf_chunks: list[GenericNode]) -> list[GenericNode]: | |
| """Remove any common abbreviations in the text which can confuse the sentence model. | |
| Args: | |
| pdf_chunks (List[GenericNode]): List of llama-index nodes. | |
| Returns: | |
| List[GenericNode]: The nodes with cleaned text, abbreviations replaced. | |
| """ | |
| for pdf_chunk in pdf_chunks: | |
| text = getattr(pdf_chunk, "text", "") | |
| if (text == ""): | |
| continue | |
| # No. -> Number | |
| text = re.sub(r"\bNo\b\.\s", "Number", text, flags=re.IGNORECASE) | |
| # Fig. -> Figure | |
| text = re.sub(r"\bFig\b\.", "Figure", text, flags=re.IGNORECASE) | |
| # Eq. -> Equation | |
| text = re.sub(r"\bEq\b\.", "Equation", text, flags=re.IGNORECASE) | |
| # Mr. -> Mr | |
| text = re.sub(r"\bMr\b\.", "Mr", text, flags=re.IGNORECASE) | |
| # Mrs. -> Mrs | |
| text = re.sub(r"\bMrs\b\.", "Mrs", text, flags=re.IGNORECASE) | |
| # Dr. -> Dr | |
| text = re.sub(r"\bDr\b\.", "Dr", text, flags=re.IGNORECASE) | |
| # Jr. -> Jr | |
| text = re.sub(r"\bJr\b\.", "Jr", text, flags=re.IGNORECASE) | |
| # etc. -> etc | |
| text = re.sub(r"\betc\b\.", "etc", text, flags=re.IGNORECASE) | |
| pdf_chunk.text = text | |
| return pdf_chunks | |
| def _remove_chunk( | |
| pdf_chunks: list[GenericNode], | |
| chunk_index: int | None=None, | |
| chunk_id: str | None=None | |
| ) -> list[GenericNode]: | |
| """Given a list of chunks, remove the chunk at the given index or with the given id. | |
| Args: | |
| pdf_chunks (List[GenericNode]): The list of chunks. | |
| chunk_index (Optional[int]): The index of the chunk to remove. | |
| chunk_id (Optional[str]): The id of the chunk to remove. | |
| Returns: | |
| List[GenericNode]: The updated list of chunks, without the removed chunk. | |
| """ | |
| if (chunk_index is None and chunk_id is None): | |
| msg = "_remove_chunk: Either chunk_index or chunk_id must be set." | |
| raise ValueError(msg) | |
| # Convert chunk_id to chunk_index | |
| elif (chunk_index is None): | |
| chunk = next((c for c in pdf_chunks if c.node_id == chunk_id), None) | |
| if chunk is not None: | |
| chunk_index = pdf_chunks.index(chunk) | |
| else: | |
| msg = f"_remove_chunk: No chunk found with id {chunk_id}." | |
| raise ValueError(msg) | |
| elif (chunk_index < 0 or chunk_index >= len(pdf_chunks)): | |
| msg = f"_remove_chunk: Chunk {chunk_index} is out of range. Maximum index is {len(pdf_chunks) - 1}." | |
| raise ValueError(msg) | |
| # Update the previous-next node relationships around that index | |
| def _node_rel_prev_next(prev_node: GenericNode, next_node: GenericNode) -> tuple[GenericNode, GenericNode]: | |
| """Update pre-next node relationships between two nodes.""" | |
| prev_node.relationships[NodeRelationship.NEXT] = RelatedNodeInfo( | |
| node_id=next_node.node_id, | |
| metadata={"filename": next_node.metadata["filename"]} | |
| ) | |
| next_node.relationships[NodeRelationship.PREVIOUS] = RelatedNodeInfo( | |
| node_id=prev_node.node_id, | |
| metadata={"filename": prev_node.metadata["filename"]} | |
| ) | |
| return (prev_node, next_node) | |
| if (chunk_index > 0 and chunk_index < len(pdf_chunks) - 1): | |
| pdf_chunks[chunk_index - 1], pdf_chunks[chunk_index + 1] = _node_rel_prev_next(prev_node=pdf_chunks[chunk_index - 1], next_node=pdf_chunks[chunk_index + 1]) | |
| popped_chunk = pdf_chunks.pop(chunk_index) | |
| chunk_id = chunk_id or popped_chunk.node_id | |
| # Remove any references to the removed chunk in node relationships or metadata | |
| for node in pdf_chunks: | |
| node.relationships = {k: v for k, v in node.relationships.items() if v.node_id != chunk_id} | |
| node.metadata = {k: v for k, v in node.metadata.items() if ((isinstance(v, list) and (chunk_id in v)) or (v != chunk_id))} | |
| return pdf_chunks | |
| def _clean_overlap_text( | |
| text1: str, | |
| text2: str, | |
| combining_text: str=" ", | |
| min_length: int | None = 1, | |
| max_length: int | None = 50, | |
| overlap_threshold: float = 0.9 | |
| ) -> str: | |
| r"""Remove any overlapping text between two strings. | |
| Args: | |
| text1 (str): The first string. | |
| text2 (str): The second string. | |
| combining_text (str, optional): The text to combine the two strings with. Defaults to space (' '). Can also be \n. | |
| min_length (int, optional): The minimum length of the overlap. Defaults to 1. None is no minimum. | |
| max_length (int, optional): The maximum length of the overlap. Defaults to 50. None is no maximum. | |
| overlap_threshold (float, optional): The threshold for being an overlap. Defaults to 0.8. | |
| Returns: | |
| str: The strings combined with the overlap removed. | |
| """ | |
| for overlap_len in range(min(len(text1), len(text2), (max_length or len(text1))), ((min_length or 1)-1), -1): | |
| end_substring = text1[-overlap_len:] | |
| start_substring = text2[:overlap_len] | |
| similarity = difflib.SequenceMatcher(None, end_substring, start_substring).ratio() | |
| if (similarity >= overlap_threshold): | |
| return combining_text.join([text1[:-overlap_len], text2[overlap_len:]]).strip() | |
| return combining_text.join([text1, text2]).strip() | |
| def _combine_chunks(c1: GenericNode, c2: GenericNode) -> GenericNode: | |
| """Combine two chunks into one. | |
| Args: | |
| c1 (GenericNode): The first chunk. | |
| c2 (GenericNode): The second chunk. | |
| Returns: | |
| GenericNode: The combined chunk. | |
| """ | |
| # Metadata merging | |
| # Type merging | |
| text_types = ["NarrativeText", "ListItem", "Formula", "UncategorizedText", "Composite-TextOnly"] | |
| image_types = ["FigureCaption", "Image"] # things that make Image nodes. | |
| def _combine_chunks_type(c1_type: str, c2_type: str) -> str: | |
| """Combine the types of two chunks. | |
| Args: | |
| c1_type (str): The type of the first chunk. | |
| c2_type (str): The type of the second chunk. | |
| Returns: | |
| str: The type of the combined chunk. | |
| """ | |
| if (c1_type == c2_type): | |
| return c1_type | |
| elif (c1_type in text_types and c2_type in text_types): | |
| return "Composite-TextOnly" | |
| elif (c1_type in image_types and c2_type in image_types): | |
| return "Image" # Add caption to image | |
| else: | |
| return "Composite" | |
| c1_type = c1.metadata["type"] | |
| c2_type = c2.metadata["type"] | |
| c1.metadata["type"] = _combine_chunks_type(c1_type, c2_type) | |
| # All other metadata merging | |
| for k, v in c2.metadata.items(): | |
| if k not in c1.metadata: | |
| c1.metadata[k] = v | |
| # Merge lists | |
| elif k in ["page_number", 'page_name', 'languages', 'emphasized_text_contents', 'link_texts', 'link_urls']: | |
| if not isinstance(c1.metadata[k], list): | |
| c1.metadata[k] = list(c1.metadata[k]) | |
| if (v not in c1.metadata[k]): | |
| # Add to list, dedupe | |
| c1.metadata[k].extend(v) | |
| c1.metadata[k] = sorted(set(c1.metadata[k])) | |
| # Text merging | |
| c1_text = getattr(c1, "text", "") | |
| c2_text = getattr(c2, "text", "") | |
| if (c1_text == c2_text): | |
| # No duplicates. | |
| return c1 | |
| if (c1_text == "" or c2_text == ""): | |
| c1.text = c1_text + c2_text | |
| return c1 | |
| # Check if a sentence has been split between two chunks | |
| # Option 1: letters | |
| c1_text_last = c1_text[-1] | |
| # Check if c1_text_last has a lowercase letter, digit, or punctuation that doesn't end a sentence | |
| if (re.search(r'[\da-z\[\]\(\)\{\}\<\>\%\^\&\"\'\:\;\,\/\-\_\+\= \t\n\r]', c1_text_last)): | |
| # We can probably combine these two texts as if they were on the same line. | |
| c1.text = _clean_overlap_text(c1_text, c2_text, combining_text=" ") | |
| else: | |
| # We'll treat these as if they were on separate lines. | |
| c1.text = _clean_overlap_text(c1_text, c2_text, combining_text="\n") | |
| # NOTE: Relationships merging is handled in other functions, because it requires looking back at prior prior chunks. | |
| return c1 | |
| def dedupe_title_chunks(pdf_chunks: list[GenericNode]) -> list[GenericNode]: | |
| """Given a list of chunks, return a list of chunks without any title duplicates. | |
| Args: | |
| pdf_chunks (List[BaseNode]): The list of chunks to have titles deduped. | |
| Returns: | |
| List[BaseNode]: The deduped list of chunks. | |
| """ | |
| index = 0 | |
| while (index < len(pdf_chunks)): | |
| if ( | |
| (pdf_chunks[index].metadata["type"] in ("Title")) # is title | |
| and (index > 0) # is not first chunk | |
| and (pdf_chunks[index - 1].metadata["type"] in ("Title")) # previous chunk is also title | |
| ): | |
| # if (getattr(pdf_chunks[index], 'text', None) != getattr(pdf_chunks[index - 1], 'text', '')): | |
| # pdf_chunks[index].text = getattr(pdf_chunks[index - 1], 'text', '') + '\n' + getattr(pdf_chunks[index], 'text', '') | |
| pdf_chunks[index] = _combine_chunks(pdf_chunks[index - 1], pdf_chunks[index]) | |
| # NOTE: We'll remove the PRIOR title, since duplicates AND child relationships are built on the CURRENT title. | |
| # There shouldn't be any PARENT/CHILD relationships to the title that we are deleting, so this seems fine. | |
| pdf_chunks = _remove_chunk(pdf_chunks=pdf_chunks, chunk_index=index-1) | |
| # NOTE: don't need to shift index because we removed an element. | |
| else: | |
| # We don't care about any situations other than consecutive title chunks. | |
| index += 1 | |
| return (pdf_chunks) | |
| def combine_listitem_chunks(pdf_chunks: list[GenericNode]) -> list[GenericNode]: | |
| """Given a list of chunks, combine any adjacent chunks which are ListItems into one List. | |
| Args: | |
| pdf_chunks (List[GenericNode]): The list of chunks to combine. | |
| Returns: | |
| List[GenericNode]: The list of chunks with ListItems combined into one List chunk. | |
| """ | |
| index = 0 | |
| while (index < len(pdf_chunks)): | |
| if ( | |
| (pdf_chunks[index].metadata["type"] == "ListItem") # is list item | |
| and (index > 0) # is not first chunk | |
| and (pdf_chunks[index - 1].metadata["type"] == "ListItem") # previous chunk is also list item | |
| ): | |
| # Okay, we have a consecutive list item. Combine into one list. | |
| # NOTE: We'll remove the PRIOR list item, since duplicates AND child relationships are built on the CURRENT list item. | |
| # 1. Append prior list item's text to the current list item's text | |
| # pdf_chunks[index].text = getattr(pdf_chunks[index - 1], 'text', '') + '\n' + getattr(pdf_chunks[index], 'text', '') | |
| pdf_chunks[index] = _combine_chunks(pdf_chunks[index - 1], pdf_chunks[index]) | |
| # 2. Remove PRIOR list item | |
| pdf_chunks.pop(index - 1) | |
| # 3. Replace NEXT relationship from PRIOR list item with the later list item node ID, if prior prior node exists. | |
| if (index - 2 >= 0): | |
| pdf_chunks[index - 2].relationships[NodeRelationship.NEXT] = RelatedNodeInfo( | |
| node_id=pdf_chunks[index].node_id, | |
| metadata={"filename": pdf_chunks[index].metadata["filename"]} | |
| ) | |
| # 4. Replace PREVIOUS relationship from LATER list item with the prior prior node ID, if prior prior node exists. | |
| pdf_chunks[index].relationships[NodeRelationship.PREVIOUS] = RelatedNodeInfo( | |
| node_id=pdf_chunks[index - 2].node_id, | |
| metadata={"filename": pdf_chunks[index - 2].metadata['filename']} | |
| ) | |
| # NOTE: the PARENT/CHILD relationships should be the same as the previous list item, so this seems fine. | |
| else: | |
| # We don't care about any situations other than consecutive list item chunks. | |
| index += 1 | |
| return (pdf_chunks) | |
| def remove_header_footer_repeated( | |
| pdf_chunks_input: list[GenericNode], | |
| window_size: int = 3, | |
| fuzz_threshold: int = 80 | |
| ) -> list[GenericNode]: | |
| """Given a list of chunks, remove any header/footer chunks that are repeated across pages. | |
| Args: | |
| pdf_chunks (List[GenericNode]): The list of chunks to process. | |
| window_size (int): The number of chunks to consider at the beginning and end of each page. | |
| fuzz_threshold (int): The threshold for fuzzy matching of chunk texts. | |
| Returns: | |
| List[GenericNode]: The list of chunks with header/footer chunks removed. | |
| """ | |
| nodes_to_remove = set() # id's to remove. | |
| pdf_chunks = deepcopy(pdf_chunks_input) | |
| # Build a dictionary of chunks by page number | |
| chunks_by_page = defaultdict(list) | |
| for chunk in pdf_chunks: | |
| chunk_page_number = min(chunk.metadata["page_number"]) if isinstance(chunk.metadata["page_number"], list) else chunk.metadata["page_number"] | |
| chunks_by_page[chunk_page_number].append(chunk) | |
| # Get the first window_size and last window_size chunks on each page | |
| header_candidates = defaultdict(set) # hashmap of chunk text, and set of chunk ids with that text. | |
| footer_candidates = defaultdict(set) # hashmap of chunk text, and set of chunk ids with that text. | |
| page_number_regex = re.compile(r"(?:-|\( ?)?\b(?:page|p\.?(?:[pg](?:\b|\.)?)?)? ?(?:\d+|\b[ivxm]+\b)\.?(?: ?-|\))?\b", re.IGNORECASE) | |
| for chunks in chunks_by_page.values(): | |
| header_chunks = chunks[:window_size] | |
| footer_chunks = chunks[-window_size:] | |
| for chunk in header_chunks: | |
| chunk_text = getattr(chunk, "text", "") | |
| if chunk.metadata["type"] == "Header" and len(chunk_text) > 0: | |
| chunk_text_is_pagenum_only = page_number_regex.match(chunk_text) | |
| if chunk_text_is_pagenum_only and (len(chunk_text_is_pagenum_only.group(0)) == len(chunk_text)): | |
| # Full match! | |
| chunk.text = "Page Number Only" | |
| nodes_to_remove.add(chunk.node_id) | |
| elif chunk_text_is_pagenum_only and len(chunk_text_is_pagenum_only.group(0)) > 0: | |
| # Remove the page number content from the chunk text for this exercise | |
| chunk_text = page_number_regex.sub('', chunk_text) | |
| chunk.text = chunk_text | |
| if chunk.metadata["type"] not in ("Image", "Table") and len(chunk_text) > 0: | |
| header_candidates[chunk_text].add(chunk.node_id) | |
| for chunk in footer_chunks: | |
| chunk_text = getattr(chunk, "text", "") | |
| if chunk.metadata["type"] == "Footer" and len(chunk_text) > 0: | |
| chunk_text_is_pagenum_only = page_number_regex.match(chunk_text) | |
| if chunk_text_is_pagenum_only and (len(chunk_text_is_pagenum_only.group(0)) == len(chunk_text)): | |
| # Full match! | |
| chunk.text = "Page Number Only" | |
| nodes_to_remove.add(chunk.node_id) | |
| elif chunk_text_is_pagenum_only and len(chunk_text_is_pagenum_only.group(0)) > 0: | |
| # Remove the page number content from the chunk text for this exercise | |
| chunk_text = page_number_regex.sub('', chunk_text) | |
| chunk.text = chunk_text | |
| if chunk.metadata["type"] not in ("Image", "Table") and len(chunk_text) > 0: | |
| footer_candidates[chunk_text].add(chunk.node_id) | |
| # Identify any texts which are too similar to other header texts. | |
| header_texts = list(header_candidates.keys()) | |
| header_distance_matrix = rapidfuzz.process.cdist(header_texts, header_texts, scorer=rapidfuzz.fuzz.ratio, score_cutoff=fuzz_threshold) | |
| footer_texts = list(footer_candidates.keys()) | |
| footer_distance_matrix = rapidfuzz.process.cdist(footer_texts, footer_texts, scorer=rapidfuzz.fuzz.ratio, score_cutoff=fuzz_threshold) | |
| # Combine header candidates which are too similar to each other in the distance matrix | |
| for i in range(len(header_distance_matrix)-1): | |
| for j in range(i+1, len(header_distance_matrix)): | |
| if i == j: | |
| continue | |
| if header_distance_matrix[i][j] >= fuzz_threshold: | |
| header_candidates[header_texts[i]].update(header_candidates[header_texts[j]]) | |
| header_candidates[header_texts[j]].update(header_candidates[header_texts[i]]) | |
| for i in range(len(footer_distance_matrix)-1): | |
| for j in range(i+1, len(footer_distance_matrix)): | |
| if i == j: | |
| continue | |
| if footer_distance_matrix[i][j] >= fuzz_threshold: | |
| footer_candidates[footer_texts[i]].update(footer_candidates[footer_texts[j]]) | |
| footer_candidates[footer_texts[j]].update(footer_candidates[footer_texts[i]]) | |
| headers_to_remove = set() | |
| for chunk_ids in header_candidates.values(): | |
| if len(chunk_ids) > 1: | |
| headers_to_remove.update(chunk_ids) | |
| footers_to_remove = set() | |
| for chunk_ids in footer_candidates.values(): | |
| if len(chunk_ids) > 1: | |
| footers_to_remove.update(chunk_ids) | |
| nodes_to_remove = nodes_to_remove.union(headers_to_remove.union(footers_to_remove)) | |
| for node_id in nodes_to_remove: | |
| pdf_chunks = _remove_chunk(pdf_chunks=pdf_chunks, chunk_id=node_id) | |
| return pdf_chunks | |
| def remove_overlap_images(pdf_chunks: list[GenericNode]) -> list[GenericNode]: | |
| # TODO(Jonathan Wang): Implement this function to remove images which are completely overlapping each other | |
| # OR... get a better dang reader! | |
| raise NotImplementedError | |
| def chunk_by_header( | |
| pdf_chunks_in: list[GenericNode], | |
| combine_text_under_n_chars: int = 1024, | |
| multipage_sections: bool = True, | |
| # ) -> Tuple[List[GenericNode], List[GenericNode]]: | |
| ) -> list[GenericNode]: | |
| """Combine chunks together that are part of the same header and have similar meaning. | |
| Args: | |
| pdf_chunks (List[GenericNode]): List of chunks to be combined. | |
| Returns: | |
| List[GenericNode]: List of combined chunks. | |
| List[GenericNode]: List of original chunks, with node references updated. | |
| """ | |
| # TODO(Jonathan Wang): Handle semantic chunking between elements within a Header chunk. | |
| # TODO(Jonathan Wang): Handle splitting element chunks if they are over `max_characters` in length (does this ever really happen?) | |
| # TODO(Jonathan Wang): Handle relationships between nodes. | |
| pdf_chunks = deepcopy(pdf_chunks_in) | |
| output = [] | |
| id_to_index = {} | |
| index = 0 | |
| # Pass 1: Combine chunks together that are part of the same title chunk. | |
| while (index < len(pdf_chunks)): | |
| chunk = pdf_chunks[index] | |
| if (chunk.metadata["type"] in ["Header", "Footer", "Image", "Table"]): | |
| # These go immediately into the semantic title chunks and also reset the new node. | |
| # Let's add a newline to distinguish from any other content. | |
| if (chunk.metadata["type"] in ["Header", "Footer", "Table"]): | |
| chunk.text = getattr(chunk, "text", "") + "\n" | |
| output.append(chunk) | |
| index += 1 | |
| continue | |
| # Make a new node if we have a new title (or if we don't have a title). | |
| if ( | |
| chunk.metadata["type"] == "Title" | |
| ): | |
| # We're good, this node can stay as a TitleChunk. | |
| chunk.metadata['type'] = 'Composite' | |
| # if (not isinstance(chunk.metadata['page number'], list)): | |
| # chunk.metadata['page number'] = [chunk.metadata['page number']] | |
| # Let's add a newline to distinguish the title from the content. | |
| setattr(chunk, 'text', getattr(chunk, 'text', '') + "\n") | |
| output.append(chunk) | |
| id_to_index[chunk.id_] = len(output) - 1 | |
| index += 1 | |
| continue | |
| elif (chunk.metadata.get('parent_id', None) in id_to_index): | |
| # This chunk is part of the same title as a prior chunk. | |
| # Add this text into the prior title node. | |
| jndex = id_to_index[chunk.metadata['parent_id']] | |
| # if (not isinstance(output[jndex].metadata['page number'], list)): | |
| # output[jndex].metadata['page number'] = [chunk.metadata['page number']] | |
| output[jndex] = _combine_chunks(output[jndex], chunk) | |
| # output[jndex].text = getattr(output[jndex], 'text', '') + '\n' + getattr(chunk, 'text', '') | |
| # output[jndex].metadata['page number'] = list(set(output[jndex].metadata['page number'] + [chunk.metadata['page number']])) | |
| # output[jndex].metadata['languages'] = list(set(output[jndex].metadata['languages'] + chunk.metadata['languages'])) | |
| pdf_chunks.remove(chunk) | |
| continue | |
| elif ( | |
| (chunk.metadata.get('parent_id', None) is None) | |
| and ( | |
| len(getattr(chunk, 'text', '')) > combine_text_under_n_chars # big enough text section to stand alone | |
| or (len(id_to_index.keys()) <= 0) # no prior title | |
| ) | |
| ): | |
| # Okay, so either we don't have a title, or it was interrupted by an image / table. | |
| # This chunk can stay as a TextChunk. | |
| chunk.metadata['type'] = 'Composite-TextOnly' | |
| # if (not isinstance(chunk.metadata['page number'], list)): | |
| # chunk.metadata['page number'] = [chunk.metadata['page number']] | |
| output.append(chunk) | |
| id_to_index[chunk.id_] = len(output) - 1 | |
| index += 1 | |
| continue | |
| else: | |
| # Add the text to the prior node that isn't a table or image. | |
| jndex = len(output) - 1 | |
| while ( | |
| (jndex >= 0) | |
| and (output[jndex].metadata['type'] in ['Table', 'Image']) | |
| ): | |
| # for title_chunk in output: | |
| # print(f'''{title_chunk.id_}: {title_chunk.metadata['type']}, text: {title_chunk.text}, parent: {title_chunk.metadata['parent_id']}''') | |
| jndex -= 1 | |
| if (jndex < 0): | |
| raise Exception(f'''Prior title chunk not found: {index}, {chunk.metadata.get('parent_id', None)}''') | |
| # Add this text into the prior title node. | |
| # if (not isinstance(output[jndex].metadata['page number'], list)): | |
| # output[jndex].metadata['page number'] = [chunk.metadata['page number']] | |
| output[jndex] = _combine_chunks(output[jndex], chunk) | |
| # output[jndex].text = getattr(output[jndex], 'text', '') + ' ' + getattr(chunk, 'text', '') | |
| # output[jndex].metadata['page number'] = list(set(output[jndex].metadata['page number'] + [chunk.metadata['page number']])) | |
| # output[jndex].metadata['languages'] = list(set(output[jndex].metadata['languages'] + chunk.metadata['languages'])) | |
| pdf_chunks.remove(chunk) | |
| # TODO: Update relationships between nodes. | |
| continue | |
| return (output) | |
| ### TODO: | |
| # Merge images together that are substantially overlapping. | |
| # Favour image with no confidence score. (these come straight from pdf). | |
| # Favour the larger image over the smaller one. | |
| # Favour the image with higher confidence score. | |
| def merge_images() -> None: | |
| pass | |