Spaces:
Build error
Build error
| from pdfminer.high_level import extract_pages | |
| from pdfminer.layout import LTTextContainer | |
| from tqdm import tqdm | |
| import re | |
| import gradio as gr | |
| import os | |
| import accelerate | |
| # import spaces | |
| import subprocess | |
| # from huggingface_hub import hf_hub_download, InferenceClient | |
| # from llama_cpp import Llama | |
| # from huggingface_hub import login | |
| # login(token = os.getenv('HF_TOKEN')) | |
| # repo_id = "srijaydeshpande/Deid-Fine-Tuned" | |
| # model_id = "deid_finetuned.Q4_K_M.gguf" | |
| # hf_hub_download( | |
| # repo_id="srijaydeshpande/Deid-Fine-Tuned", | |
| # filename="deid_finetuned.Q4_K_M.gguf", | |
| # local_dir = "./models" | |
| # ) | |
| def process_document(pdf_path): | |
| extracted_pages = extract_pages(pdf_path) | |
| page2content = {} | |
| for extracted_page in tqdm(extracted_pages): | |
| page_id = extracted_page.pageid | |
| content = process_page(extracted_page) | |
| page2content[page_id] = content | |
| return page2content | |
| def process_page(extracted_page): | |
| content = [] | |
| elements = [element for element in extracted_page._objs] | |
| elements.sort(key=lambda a: a.y1, reverse=True) | |
| for i, element in enumerate(elements): | |
| if isinstance(element, LTTextContainer): | |
| line_text = extract_text_and_normalize(element) | |
| content.append(line_text) | |
| content = re.sub('\n+', '\n', ''.join(content)) | |
| return content | |
| def extract_text_and_normalize(element): | |
| # Extract text from line and split it with new lines | |
| line_texts = element.get_text().split('\n') | |
| norm_text = '' | |
| for line_text in line_texts: | |
| line_text = line_text.strip() | |
| if not line_text: | |
| line_text = '\n' | |
| else: | |
| line_text = re.sub('\s+', ' ', line_text) | |
| if not re.search('[\w\d\,\-]', line_text[-1]): | |
| line_text += '\n' | |
| else: | |
| line_text += ' ' | |
| norm_text += line_text | |
| return norm_text | |
| def txt_to_html(text): | |
| html_content = "<html><body>" | |
| for line in text.split('\n'): | |
| html_content += "<p>{}</p>".format(line.strip()) | |
| html_content += "</body></html>" | |
| return html_content | |
| def deidentify_doc(llm_type, pdftext, maxtokens, temperature, top_probability): | |
| prompt = "In the following text, perform the following actions: 1. Replace only the calendar dates with term [date]. Example: if input is 'Date of birth: 15/5/1959 calculated BP (Systolic 158.00 mm, Diastolic 124.95 mm)' output should be 'Date of birth: [date] calculated BP (Systolic 158.00 mm, Diastolic 124.95 mm)' 2. Replace location or address, such as '3970 Longview Drive, CV36HE' with term [address]. Replace complete GP address, such as 'Phanton Medical Centre, Birmingham, CV36HE' with term [address]. It is important that all addresses are completely replaced with [address]. 3. Replace any person name with term [name]. It is important that all person names are replaced with term [name]. Remove any gender terms 'male' or 'female' if exists. 4. Replace the nhs number and the case note number with term [ID]. Replace Hospital number with [ID]. 4. Replace age of person with [age]. It is important that all age numbers are completely replaced with [age]." | |
| if(llm_type == 'Fine tuned LLama3'): | |
| llm = Llama( | |
| model_path="models/" + model_id, | |
| flash_attn=True, | |
| n_gpu_layers=81, | |
| n_batch=1024, | |
| n_ctx=8192, | |
| ) | |
| output = llm.create_chat_completion( | |
| messages=[ | |
| {"from": "user", "value": prompt + ' Text: ' + pdftext}, | |
| ], | |
| max_tokens=maxtokens, | |
| temperature=temperature | |
| ) | |
| output = output['choices'][0]['message']['content'] | |
| # Remove starting header string in output | |
| find_index = output.find(' '.join(pdftext.split()[:3])) | |
| if find_index != -1: | |
| output = output[find_index:].strip() | |
| last_index = output.rfind(' '.join(pdftext.split()[-3:])) | |
| if last_index != -1: | |
| output = output[:last_index].strip() | |
| output = llm.create_chat_completion( | |
| messages=[ | |
| {"from": "user", "value": prompt + ' Text: ' + output}, | |
| ], | |
| max_tokens=maxtokens, | |
| temperature=temperature | |
| ) | |
| output = output['choices'][0]['message']['content'] | |
| # Remove starting header string in output | |
| find_index = output.find(' '.join(pdftext.split()[:3])) | |
| if find_index != -1: | |
| output = output[find_index:].strip() | |
| last_text_to_find = ' '.join(pdftext.split()[-2:]) | |
| last_index = output.rfind(last_text_to_find) | |
| if last_index != -1 and last_index>(len(pdftext)/2): | |
| output = output[:last_index+len(last_text_to_find)].strip() | |
| yield output | |
| else: | |
| client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
| messages = [{"role": "assistant", "content": prompt}] | |
| messages.append({"role": "user", "content": pdftext}) | |
| response = "" | |
| for message in client.chat_completion( | |
| messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_probability, | |
| ): | |
| # token = message.choices[0].delta.content | |
| token = message['choices'][0]['message']['content'] | |
| print('RESPONSE IS ',token) | |
| response += token | |
| return response | |
| # @spaces.GPU(duration=80) | |
| def pdf_to_text(files, llm_type, maxtokens=2048, temperature=0, top_probability=0.95): | |
| files=[files] | |
| for file in files: | |
| if not file: | |
| return 'Please provide a valid PDF' | |
| file_name = os.path.basename(file) | |
| file_name_splt = file_name.split('.') | |
| accumulated_text = "" # Store streamed data | |
| if (len(file_name_splt) > 1 and file_name_splt[1] == 'pdf'): | |
| page2content = process_document(file) | |
| anonymized_text = '' | |
| original_pdf_text = '' | |
| for page_id in page2content: | |
| pdftext = page2content[page_id] | |
| original_pdf_text += pdftext + '\n' | |
| # response_generator = deidentify_doc(llm_type, pdftext, maxtokens, temperature, top_probability) | |
| # print('RESPONSE GENERATOR IS ',response_generator) | |
| # for chunk in response_generator: | |
| # accumulated_text += chunk | |
| # yield accumulated_text # Keep updating output | |
| # return response_generator # + "\n\n" | |
| print('Extracted Page Content Is ', original_pdf_text) | |
| print('------------------------------------------------------------') | |
| # return anonymized_text | |
| css = ".gradio-container {background: 'logo.png'}" | |
| temp_slider = gr.Slider(minimum=0, maximum=2, value=0.9, label="Temperature Value") | |
| prob_slider = gr.Slider(minimum=0, maximum=1, value=0.95, label="Max Probability Value") | |
| max_tokens = gr.Number(value=600, label="Max Tokens") | |
| input_folder = gr.File(file_count='multiple') | |
| input_folder_text = gr.Textbox(label='Enter output folder path') | |
| output_text = gr.Textbox() | |
| output_path_component = gr.File(label="Select Output Path") | |
| llm_type = gr.Radio(["Fine tuned LLama3", "Zephyr-7B-β"]) | |
| iface = gr.Interface( | |
| fn=pdf_to_text, | |
| inputs=['file', llm_type], | |
| outputs=gr.Textbox(), | |
| title='Histofy EndoDeID (Endoscopy Report De-Identification)', | |
| description="This application assists to remove personal information from the uploaded clinical report", | |
| theme=gr.themes.Soft(), | |
| ) | |
| iface.launch() |