Spaces:
Sleeping
Sleeping
| from docling.document_converter import DocumentConverter | |
| import logging | |
| import re | |
| from uuid import uuid4 | |
| from typing import List, Optional, Generator, Set | |
| from functools import partial, reduce | |
| from itertools import chain | |
| from PyPDF2 import PdfReader, PdfWriter | |
| tag_list = ["Sources:", "Source:", "Tags-", "Tags:", "CONTENTS", "ANNEX", "EXERCISES", "Project/Activity"] | |
| logger = logging.getLogger(__name__) | |
| import os | |
| try: | |
| converter = DocumentConverter() | |
| except Exception as e: | |
| logger.error(f"Error initializing Docling DocumentConverter: {e}") | |
| def split_pdf(input_pdf, output_pdf, start_page, end_page): | |
| reader = PdfReader(input_pdf) | |
| writer = PdfWriter() | |
| for i in range(start_page, end_page+1): | |
| writer.add_page(reader.pages[i]) | |
| with open(output_pdf, "wb") as output_file: | |
| writer.write(output_file) | |
| print(f"PDF split successfully: {output_pdf}") | |
| def get_texts(res): | |
| page_texts = {pg:"" for pg in res['pages'].keys()} | |
| texts = res.get('texts') | |
| for item in texts: | |
| for prov in item['prov']: | |
| page_no = prov['page_no'] | |
| text = item['text'] | |
| page_key = f'{page_no}' | |
| if page_key not in page_texts: | |
| page_texts[page_key] = text | |
| else: | |
| page_texts[page_key] += ' ' + text | |
| return page_texts | |
| def clean_the_text(text): | |
| """ | |
| Cleans the extracted text by removing unnecessary characters and formatting issues. | |
| Args: | |
| text (str): The extracted text. | |
| Returns: | |
| str: The cleaned text. | |
| """ | |
| try: | |
| text = re.sub(r'\n\s*\n', '\n', text) | |
| text = text.replace("\t", " ") | |
| text = text.replace("\f", " ") | |
| text = re.sub(r'\b(\w+\s*)\1{1,}', '\\1', text) | |
| text = re.sub(r'[^a-zA-Z0-9\s@\-/,.\\]', ' ', text) | |
| return text.strip() | |
| except Exception as e: | |
| logger.error(f"Error cleaning text: {e}") | |
| return text | |
| def get_tables(res_json): | |
| page_tables = {pg:[] for pg in res_json['pages'].keys()} | |
| try: | |
| tables = res_json.get('tables', []) | |
| if not isinstance(tables, list): | |
| raise ValueError("Expected 'tables' to be a list.") | |
| for table in tables: | |
| try: | |
| # Ensure 'prov' exists and has the necessary structure | |
| prov = table.get('prov', []) | |
| if not prov or not isinstance(prov, list): | |
| raise ValueError("Missing or invalid 'prov' structure in table.") | |
| page_no = str(prov[0].get('page_no')) | |
| if not page_no: | |
| raise ValueError("Missing or invalid 'page_no' in 'prov'.") | |
| # Ensure 'data' and 'grid' exist | |
| data = table.get('data', {}) | |
| grid = data.get('grid', []) | |
| if not isinstance(grid, list): | |
| raise ValueError("Missing or invalid 'grid' structure in 'data'.") | |
| # Add text to page_texts | |
| page_tables[f'{page_no}'].append(grid) | |
| except Exception as table_error: | |
| print(f"Error processing table: {table_error}") | |
| except Exception as e: | |
| print(f"Error processing tables: {e}") | |
| return page_tables | |
| def table_to_text_or_json(table, rtrn_type="text"): | |
| """ | |
| Converts a table to a single string or JSON format. | |
| Args: | |
| table (dict): The table object to convert. | |
| rtrn_type (str): The return type, either "text" or "json". Default is "text". | |
| Returns: | |
| str: The table converted to the specified format. | |
| """ | |
| table_text = "Here is a Table : \n" | |
| for row in table: | |
| for col in row: | |
| val = col.get('text') | |
| table_text+=f'{val} ,' | |
| table_text+='\n' | |
| return table_text | |
| def clean_file_name(text: str): | |
| """ | |
| Cleans the file name by removing any special characters. | |
| Args: | |
| text (str): The original file name. | |
| Returns: | |
| str: The cleaned file name. | |
| """ | |
| try: | |
| text = re.sub('[^a-zA-Z0-9 \n\.]', ' ', text) | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error cleaning file name: {e}") | |
| return text | |
| def find_and_remove_header_footer( | |
| text: str, n_chars: int, n_first_pages_to_ignore: int, n_last_pages_to_ignore: int | |
| ) -> str: | |
| """ | |
| Heuristic to find footers and headers across different pages by searching for the longest common string. | |
| For headers we only search in the first n_chars characters (for footer: last n_chars). | |
| Note: This heuristic uses exact matches and therefore works well for footers like "Copyright 2019 by XXX", | |
| but won't detect "Page 3 of 4" or similar. | |
| :param n_chars: number of first/last characters where the header/footer shall be searched in | |
| :param n_first_pages_to_ignore: number of first pages to ignore (e.g. TOCs often don't contain footer/header) | |
| :param n_last_pages_to_ignore: number of last pages to ignore | |
| :return: (cleaned pages, found_header_str, found_footer_str) | |
| """ | |
| pages = text.split("\f") | |
| # header | |
| start_of_pages = [p[:n_chars] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]] | |
| found_header = find_longest_common_ngram(start_of_pages) | |
| if found_header: | |
| pages = [page.replace(found_header, "") for page in pages] | |
| # footer | |
| end_of_pages = [p[-n_chars:] for p in pages[n_first_pages_to_ignore:-n_last_pages_to_ignore]] | |
| found_footer = find_longest_common_ngram(end_of_pages) | |
| if found_footer: | |
| pages = [page.replace(found_footer, "") for page in pages] | |
| logger.debug(f"Removed header '{found_header}' and footer '{found_footer}' in document") | |
| text = "\f".join(pages) | |
| return text | |
| def ngram(self, seq: str, n: int) -> Generator[str, None, None]: | |
| """ | |
| Return ngram (of tokens - currently split by whitespace) | |
| :param seq: str, string from which the ngram shall be created | |
| :param n: int, n of ngram | |
| :return: str, ngram as string | |
| """ | |
| # In order to maintain the original whitespace, but still consider \n and \t for n-gram tokenization, | |
| # we add a space here and remove it after creation of the ngrams again (see below) | |
| seq = seq.replace("\n", " \n") | |
| seq = seq.replace("\t", " \t") | |
| words = seq.split(" ") | |
| ngrams = ( | |
| " ".join(words[i : i + n]).replace(" \n", "\n").replace(" \t", "\t") for i in range(0, len(words) - n + 1) | |
| ) | |
| return ngrams | |
| def allngram(self, seq: str, min_ngram: int, max_ngram: int) -> Set[str]: | |
| lengths = range(min_ngram, max_ngram) if max_ngram else range(min_ngram, len(seq)) | |
| ngrams = map(partial(self.ngram, seq), lengths) | |
| res = set(chain.from_iterable(ngrams)) | |
| return res | |
| def find_longest_common_ngram( | |
| sequences: List[str], max_ngram: int = 30, min_ngram: int = 3 | |
| ) -> Optional[str]: | |
| """ | |
| Find the longest common ngram across different text sequences (e.g. start of pages). | |
| Considering all ngrams between the specified range. Helpful for finding footers, headers etc. | |
| :param sequences: list[str], list of strings that shall be searched for common n_grams | |
| :param max_ngram: int, maximum length of ngram to consider | |
| :param min_ngram: minimum length of ngram to consider | |
| :return: str, common string of all sections | |
| """ | |
| sequences = [s for s in sequences if s] # filter empty sequences | |
| if not sequences: | |
| return None | |
| seqs_ngrams = map(partial(allngram, min_ngram=min_ngram, max_ngram=max_ngram), sequences) | |
| intersection = reduce(set.intersection, seqs_ngrams) | |
| try: | |
| longest = max(intersection, key=len) | |
| except ValueError: | |
| # no common sequence found | |
| longest = "" | |
| return longest if longest.strip() else None | |
| class PdfToSectionConverter(): | |
| def __int__(self): | |
| """ | |
| Initializes the PdfToSectionConverter class. | |
| """ | |
| pass | |
| def convert(self, downloaded_pdf_path: str, file_title: str, doc_id: str = None, start_page_no: int = 0, | |
| end_page_no: int = 0): | |
| """ | |
| Converts a PDF document to sections with metadata. | |
| Args: | |
| doc_obj (BytesIO): The PDF document object. | |
| downloaded_pdf_path (str): Path to the downloaded PDF file. | |
| file_title (str): The title of the file. | |
| doc_id (str, optional): The document ID. Defaults to None. | |
| start_page_no (int, optional): The starting page number. Defaults to 0. | |
| end_page_no (int, optional): The ending page number. Defaults to 0. | |
| Returns: | |
| list: A list of dictionaries containing sections and metadata. | |
| """ | |
| try: | |
| print(f"Splitting pdf from page {start_page_no+1} to {end_page_no+1}") | |
| output_path = "/tmp/splitted.pdf" | |
| split_pdf(downloaded_pdf_path, output_path, start_page_no, end_page_no) | |
| print("OCR Started ....") | |
| result = converter.convert(output_path) | |
| json_objects = result.document.export_to_dict() | |
| pages = list(json_objects['pages'].keys()) | |
| texts = get_texts(json_objects) | |
| tables = get_tables(json_objects) | |
| except Exception as e: | |
| logger.error(f"Error getting JSON result from parser: {e}") | |
| return [] | |
| output_doc_lst = [] | |
| page_no = start_page_no | |
| try: | |
| for page in pages: | |
| if page_no > end_page_no: | |
| break | |
| page_no += 1 | |
| print(f"Page Number to be processed: {page_no}") | |
| meta = {"doc_id": doc_id, "page_no": page_no, "img_count": 0, "img_lst": []} | |
| meta_table = {"doc_id": doc_id, "page_no": page_no, "img_count": 0, "img_lst": "[]"} | |
| # Extract text from the page | |
| text_to_append = texts[page] | |
| text_to_append = clean_the_text(text_to_append) | |
| # Detect and extract tables | |
| tables_to_append = tables[page] | |
| if tables_to_append: | |
| tables_to_append = [table_to_text_or_json(table=i, rtrn_type="text") for i in tables_to_append] | |
| # Add the processed section to the output list | |
| output_doc_lst.append( | |
| {"doc_id": doc_id, "text": text_to_append, "vector_id": str(uuid4()), | |
| "meta": meta, "content_type": 'text'}) | |
| for table in tables_to_append: | |
| output_doc_lst.append( | |
| {"doc_id": doc_id, "text": table, "vector_id": str(uuid4()), | |
| "meta": meta_table, "content_type": 'table'}) | |
| # Post-process text to remove headers and footers | |
| text_to_append_list = "\f".join([i['text'] for i in output_doc_lst]) | |
| text_to_append_list = find_and_remove_header_footer(text=text_to_append_list, n_chars=10, | |
| n_first_pages_to_ignore=0, | |
| n_last_pages_to_ignore=0).split("\f") | |
| for i in range(len(output_doc_lst)): | |
| output_doc_lst[i]['text'] = clean_file_name(file_title) + "\n" + text_to_append_list[i] | |
| except Exception as e: | |
| logger.error(f"Error converting PDF to sections: {e}") | |
| return output_doc_lst | |