Spaces:
Build error
Build error
| # Copyright (c) 2022, Lawrence Livermore National Security, LLC. | |
| # All rights reserved. | |
| # See the top-level LICENSE and NOTICE files for details. | |
| # LLNL-CODE-838964 | |
| # SPDX-License-Identifier: Apache-2.0-with-LLVM-exception | |
| import json | |
| from tokenizers.pre_tokenizers import Whitespace | |
| import base_utils | |
| import spacy | |
| def guess_sentences(tokens, text): | |
| sentence_delems = ('.', '?', ').', '!') | |
| sentences = [] | |
| sentence = [] | |
| maybe_delem = None | |
| for token in tokens: | |
| # check next token to see if there is space after prev delem | |
| if maybe_delem != None: | |
| if maybe_delem[1][1] < token[1][0]: | |
| sentences.append(sentence) | |
| sentence = [] | |
| maybe_delem = None | |
| sentence.append(token) | |
| if token[0] in sentence_delems: | |
| maybe_delem = token | |
| if sentence != []: | |
| sentences.append(sentence) | |
| return sentences | |
| def spacey_sentences(text): | |
| nlp = spacy.blank('en') | |
| nlp.add_pipe('sentencizer') | |
| sentences = [s.text for s in nlp(text).sents] | |
| return sentences | |
| def add_coords(sentences, all_coords): | |
| sentences_out = [] | |
| for sentence in sentences: | |
| new_sentence = [] | |
| for token in sentence: | |
| indexes = token[1] | |
| bbox = all_coords[indexes[0]] | |
| for i in range(indexes[0]+1, indexes[1]): | |
| bbox = base_utils.union(bbox, all_coords[i]) | |
| new_sentence.append((token[0],token[1],bbox)) | |
| sentences_out.append(new_sentence) | |
| return sentences_out | |
| def sentence_extract(document): | |
| """ | |
| Convert extract .PDF result .pkl into tokens with max length of 384 tokens, seperated | |
| on sentence delimiter boundaries such as .!? | |
| """ | |
| max_tokens = 384 | |
| document_tree = json.load(open(document,'r')) | |
| sections_per_page = {} | |
| for page_num, page in document_tree.items(): | |
| # Tokenize per section (rectangular block that was detected by DIT) | |
| word_sections = [] | |
| text_sections = [] | |
| for section in page: | |
| text_sections.append(section['text']) | |
| all_text = '' | |
| all_coord = [] | |
| if 'subelements' not in section: | |
| continue | |
| for subelement in section['subelements']: | |
| for char in subelement: | |
| all_text += char[1] | |
| all_coord.append(char[0]) | |
| # check for weird characters, e.g. "(cid:206)", "ff", "fi", etc | |
| # if string isn't just 1 character, it's an irregular LTChar (character) from pdfminer. | |
| # instead of skipping them, we can just create extra duplicate coordinates for the additional characters. | |
| if len(char[1]) > 1: | |
| bad_char_len = len(char[1]) | |
| dupe_coord_amt = (bad_char_len - 1) | |
| for dupe_i in range(dupe_coord_amt): | |
| all_coord.append(char[0]) | |
| pre_tokenizer = Whitespace() | |
| sentences_pre_tok = spacey_sentences(all_text) | |
| sentences = [] | |
| for sentence in sentences_pre_tok: | |
| tokenized = pre_tokenizer.pre_tokenize_str(sentence) | |
| sentences.append(tokenized) | |
| sentences = add_coords(sentences, all_coord) | |
| word_section = [] | |
| t = 0 | |
| for sentence in sentences: | |
| t += len(sentence) | |
| if t <= max_tokens: | |
| # update character indicies from concatenating sentences | |
| if len(word_section) > 0: | |
| last_word_obj = word_section[-1] | |
| _, (_, char_idx_offset), _ = last_word_obj | |
| sentence = [(w, (sc+char_idx_offset+1, ec+char_idx_offset+1), bbox) for w, (sc, ec), bbox in sentence] | |
| word_section += sentence | |
| else: | |
| word_sections.append(word_section) | |
| word_section = sentence | |
| t = len(sentence) | |
| word_sections.append(word_section) | |
| sections = {'text_sections':text_sections, 'word_sections':word_sections} | |
| sections_per_page[page_num] = sections | |
| return sections_per_page | |
| def format_output_contexts(sections_per_page): | |
| all_contexts = {} | |
| for page_idx in sections_per_page.keys(): | |
| text_sections = sections_per_page[page_idx]['text_sections'] | |
| word_sections = sections_per_page[page_idx]['word_sections'] | |
| for text_section, word_section in zip(text_sections, word_sections): | |
| whitespaced_text = ' '.join([word[0] for word in word_section]) | |
| words_info = [] | |
| for word in word_section: | |
| words_info.append({'word_text':word[0], 'char_indices':word[1], 'word_bbox':word[2]}) | |
| context_row = {'text':text_section, 'whitespaced_text':whitespaced_text, 'page_idx':int(page_idx), 'words_info':words_info} | |
| context_id = 'context_{0}'.format(len(all_contexts)) | |
| all_contexts[context_id] = context_row | |
| return all_contexts | |
| def get_contexts(json_input): | |
| json_output = 'contexts_{0}'.format(json_input) | |
| sections_per_page = sentence_extract(json_input) | |
| all_contexts = format_output_contexts(sections_per_page) | |
| with open(json_output, 'w', encoding='utf8') as json_out: | |
| json.dump(all_contexts, json_out, ensure_ascii=False, indent=4) | |