DeID / app.py
srijaydeshpande's picture
Update app.py
8adb659 verified
from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextContainer
from tqdm import tqdm
import re
import gradio as gr
import os
import accelerate
# import spaces
import subprocess
# from huggingface_hub import hf_hub_download, InferenceClient
# from llama_cpp import Llama
# from huggingface_hub import login
# login(token = os.getenv('HF_TOKEN'))
# repo_id = "srijaydeshpande/Deid-Fine-Tuned"
# model_id = "deid_finetuned.Q4_K_M.gguf"
# hf_hub_download(
# repo_id="srijaydeshpande/Deid-Fine-Tuned",
# filename="deid_finetuned.Q4_K_M.gguf",
# local_dir = "./models"
# )
def process_document(pdf_path):
extracted_pages = extract_pages(pdf_path)
page2content = {}
for extracted_page in tqdm(extracted_pages):
page_id = extracted_page.pageid
content = process_page(extracted_page)
page2content[page_id] = content
return page2content
def process_page(extracted_page):
content = []
elements = [element for element in extracted_page._objs]
elements.sort(key=lambda a: a.y1, reverse=True)
for i, element in enumerate(elements):
if isinstance(element, LTTextContainer):
line_text = extract_text_and_normalize(element)
content.append(line_text)
content = re.sub('\n+', '\n', ''.join(content))
return content
def extract_text_and_normalize(element):
# Extract text from line and split it with new lines
line_texts = element.get_text().split('\n')
norm_text = ''
for line_text in line_texts:
line_text = line_text.strip()
if not line_text:
line_text = '\n'
else:
line_text = re.sub('\s+', ' ', line_text)
if not re.search('[\w\d\,\-]', line_text[-1]):
line_text += '\n'
else:
line_text += ' '
norm_text += line_text
return norm_text
def txt_to_html(text):
html_content = "<html><body>"
for line in text.split('\n'):
html_content += "<p>{}</p>".format(line.strip())
html_content += "</body></html>"
return html_content
def deidentify_doc(llm_type, pdftext, maxtokens, temperature, top_probability):
prompt = "In the following text, perform the following actions: 1. Replace only the calendar dates with term [date]. Example: if input is 'Date of birth: 15/5/1959 calculated BP (Systolic 158.00 mm, Diastolic 124.95 mm)' output should be 'Date of birth: [date] calculated BP (Systolic 158.00 mm, Diastolic 124.95 mm)' 2. Replace location or address, such as '3970 Longview Drive, CV36HE' with term [address]. Replace complete GP address, such as 'Phanton Medical Centre, Birmingham, CV36HE' with term [address]. It is important that all addresses are completely replaced with [address]. 3. Replace any person name with term [name]. It is important that all person names are replaced with term [name]. Remove any gender terms 'male' or 'female' if exists. 4. Replace the nhs number and the case note number with term [ID]. Replace Hospital number with [ID]. 4. Replace age of person with [age]. It is important that all age numbers are completely replaced with [age]."
if(llm_type == 'Fine tuned LLama3'):
llm = Llama(
model_path="models/" + model_id,
flash_attn=True,
n_gpu_layers=81,
n_batch=1024,
n_ctx=8192,
)
output = llm.create_chat_completion(
messages=[
{"from": "user", "value": prompt + ' Text: ' + pdftext},
],
max_tokens=maxtokens,
temperature=temperature
)
output = output['choices'][0]['message']['content']
# Remove starting header string in output
find_index = output.find(' '.join(pdftext.split()[:3]))
if find_index != -1:
output = output[find_index:].strip()
last_index = output.rfind(' '.join(pdftext.split()[-3:]))
if last_index != -1:
output = output[:last_index].strip()
output = llm.create_chat_completion(
messages=[
{"from": "user", "value": prompt + ' Text: ' + output},
],
max_tokens=maxtokens,
temperature=temperature
)
output = output['choices'][0]['message']['content']
# Remove starting header string in output
find_index = output.find(' '.join(pdftext.split()[:3]))
if find_index != -1:
output = output[find_index:].strip()
last_text_to_find = ' '.join(pdftext.split()[-2:])
last_index = output.rfind(last_text_to_find)
if last_index != -1 and last_index>(len(pdftext)/2):
output = output[:last_index+len(last_text_to_find)].strip()
yield output
else:
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta")
messages = [{"role": "assistant", "content": prompt}]
messages.append({"role": "user", "content": pdftext})
response = ""
for message in client.chat_completion(
messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_probability,
):
# token = message.choices[0].delta.content
token = message['choices'][0]['message']['content']
print('RESPONSE IS ',token)
response += token
return response
# @spaces.GPU(duration=80)
def pdf_to_text(files, llm_type, maxtokens=2048, temperature=0, top_probability=0.95):
files=[files]
for file in files:
if not file:
return 'Please provide a valid PDF'
file_name = os.path.basename(file)
file_name_splt = file_name.split('.')
accumulated_text = "" # Store streamed data
if (len(file_name_splt) > 1 and file_name_splt[1] == 'pdf'):
page2content = process_document(file)
anonymized_text = ''
original_pdf_text = ''
for page_id in page2content:
pdftext = page2content[page_id]
original_pdf_text += pdftext + '\n'
# response_generator = deidentify_doc(llm_type, pdftext, maxtokens, temperature, top_probability)
# print('RESPONSE GENERATOR IS ',response_generator)
# for chunk in response_generator:
# accumulated_text += chunk
# yield accumulated_text # Keep updating output
# return response_generator # + "\n\n"
print('Extracted Page Content Is ', original_pdf_text)
print('------------------------------------------------------------')
# return anonymized_text
css = ".gradio-container {background: 'logo.png'}"
temp_slider = gr.Slider(minimum=0, maximum=2, value=0.9, label="Temperature Value")
prob_slider = gr.Slider(minimum=0, maximum=1, value=0.95, label="Max Probability Value")
max_tokens = gr.Number(value=600, label="Max Tokens")
input_folder = gr.File(file_count='multiple')
input_folder_text = gr.Textbox(label='Enter output folder path')
output_text = gr.Textbox()
output_path_component = gr.File(label="Select Output Path")
llm_type = gr.Radio(["Fine tuned LLama3", "Zephyr-7B-β"])
iface = gr.Interface(
fn=pdf_to_text,
inputs=['file', llm_type],
outputs=gr.Textbox(),
title='Histofy EndoDeID (Endoscopy Report De-Identification)',
description="This application assists to remove personal information from the uploaded clinical report",
theme=gr.themes.Soft(),
)
iface.launch()