Spaces:
Sleeping
Sleeping
| from transformers import pipeline | |
| from transformers import AutoTokenizer | |
| from transformers import AutoModelForSeq2SeqLM | |
| import streamlit as st | |
| import fitz # PyMuPDF | |
| from docx import Document | |
| import re | |
| import nltk | |
| from presidio_analyzer import AnalyzerEngine, PatternRecognizer, RecognizerResult, Pattern | |
| nltk.download('punkt') | |
| def sentence_tokenize(text): | |
| sentences = nltk.sent_tokenize(text) | |
| return sentences | |
| model_dir_large = 'edithram23/Redaction_Personal_info_v1' | |
| tokenizer_large = AutoTokenizer.from_pretrained(model_dir_large) | |
| model_large = AutoModelForSeq2SeqLM.from_pretrained(model_dir_large) | |
| pipe1 = pipeline("token-classification", model="edithram23/new-bert-v2") | |
| # model_dir_small = 'edithram23/Redaction' | |
| # tokenizer_small = AutoTokenizer.from_pretrained(model_dir_small) | |
| # model_small = AutoModelForSeq2SeqLM.from_pretrained(model_dir_small) | |
| # def small(text, model=model_small, tokenizer=tokenizer_small): | |
| # inputs = ["Mask Generation: " + text.lower() + '.'] | |
| # inputs = tokenizer(inputs, max_length=256, truncation=True, return_tensors="pt") | |
| # output = model.generate(**inputs, num_beams=8, do_sample=True, max_length=len(text)) | |
| # decoded_output = tokenizer.batch_decode(output, skip_special_tokens=True)[0] | |
| # predicted_title = decoded_output.strip() | |
| # pattern = r'\[.*?\]' | |
| # redacted_text = re.sub(pattern, '[redacted]', predicted_title) | |
| # return redacted_text | |
| # Initialize the analyzer engine | |
| analyzer = AnalyzerEngine() | |
| # Define a custom address recognizer using a regex pattern | |
| address_pattern = Pattern(name="address", regex=r"\d+\s\w+\s(?:street|st|road|rd|avenue|ave|lane|ln|drive|dr|blvd|boulevard)\s*\w*", score=0.5) | |
| address_recognizer = PatternRecognizer(supported_entity="ADDRESS", patterns=[address_pattern]) | |
| # Add the custom address recognizer to the analyzer | |
| analyzer.registry.add_recognizer(address_recognizer) | |
| # analyzer.get_recognizers | |
| # Define a function to extract entities | |
| def combine_words(entities): | |
| combined_entities = [] | |
| current_entity = None | |
| for entity in entities: | |
| if current_entity: | |
| if current_entity['end'] == entity['start']: | |
| # Combine the words without space | |
| current_entity['word'] += entity['word'].replace('##', '') | |
| current_entity['end'] = entity['end'] | |
| elif current_entity['end'] + 1 == entity['start']: | |
| # Combine the words with a space | |
| current_entity['word'] += ' ' + entity['word'].replace('##', '') | |
| current_entity['end'] = entity['end'] | |
| else: | |
| # Add the previous combined entity to the list | |
| combined_entities.append(current_entity) | |
| # Start a new entity | |
| current_entity = entity.copy() | |
| current_entity['word'] = current_entity['word'].replace('##', '') | |
| else: | |
| # Initialize the first entity | |
| current_entity = entity.copy() | |
| current_entity['word'] = current_entity['word'].replace('##', '') | |
| # Add the last entity | |
| if current_entity: | |
| combined_entities.append(current_entity) | |
| return combined_entities | |
| def words_red_bert(text): | |
| final=[] | |
| sentences = sentence_tokenize(text) | |
| for sentence in sentences: | |
| x=[pipe1(sentence)] | |
| m = combine_words(x[0]) | |
| for j in m: | |
| if(j['entity']!='none' and len(j['word'])>1 and j['word']!=', '): | |
| final.append(j['word']) | |
| return final | |
| def extract_entities(text): | |
| entities = { | |
| "NAME": [], | |
| "PHONE_NUMBER": [], | |
| "EMAIL": [], | |
| "ADDRESS": [], | |
| "LOCATION": [], | |
| "IN_AADHAAR": [], | |
| } | |
| output = [] | |
| # Analyze the text for PII | |
| results = analyzer.analyze(text=text, language='en') | |
| for result in results: | |
| if result.entity_type == "PERSON": | |
| entities["NAME"].append(text[result.start:result.end]) | |
| output+=[text[result.start:result.end]] | |
| elif result.entity_type == "PHONE_NUMBER": | |
| entities["PHONE_NUMBER"].append(text[result.start:result.end]) | |
| output+=[text[result.start:result.end]] | |
| elif result.entity_type == "EMAIL_ADDRESS": | |
| entities["EMAIL"].append(text[result.start:result.end]) | |
| output+=[text[result.start:result.end]] | |
| elif result.entity_type == "ADDRESS": | |
| entities["ADDRESS"].append(text[result.start:result.end]) | |
| output+=[text[result.start:result.end]] | |
| elif result.entity_type == 'LOCATION': | |
| entities['LOCATION'].append(text[result.start:result.end]) | |
| output+=[text[result.start:result.end]] | |
| elif result.entity_type == 'IN_AADHAAR': | |
| entities['IN_PAN'].append(text[result.start:result.end]) | |
| output+=[text[result.start:result.end]] | |
| return entities,output | |
| def mask_generation(text, model=model_large, tokenizer=tokenizer_large): | |
| if len(text) < 90: | |
| text = text + '.' | |
| # return small(text) | |
| inputs = ["Mask Generation: " + text.lower() + '.'] | |
| inputs = tokenizer(inputs, max_length=512, truncation=True, return_tensors="pt") | |
| output = model.generate(**inputs, num_beams=8, do_sample=True, max_length=len(text)) | |
| decoded_output = tokenizer.batch_decode(output, skip_special_tokens=True)[0] | |
| predicted_title = decoded_output.strip() | |
| pattern = r'\[.*?\]' | |
| redacted_text = re.sub(pattern, '[redacted]', predicted_title) | |
| return redacted_text | |
| def redact_text(page, text): | |
| text_instances = page.search_for(text) | |
| for inst in text_instances: | |
| page.add_redact_annot(inst, fill=(0, 0, 0)) | |
| page.apply_redactions() | |
| def read_pdf(file): | |
| pdf_document = fitz.open(stream=file.read(), filetype="pdf") | |
| text = "" | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document.load_page(page_num) | |
| text += page.get_text() | |
| return text, pdf_document | |
| def read_docx(file): | |
| doc = Document(file) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| return text | |
| def read_txt(file): | |
| text = file.read().decode("utf-8") | |
| return text | |
| def process_file(file): | |
| if file.type == "application/pdf": | |
| return read_pdf(file) | |
| elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": | |
| return read_docx(file), None | |
| elif file.type == "text/plain": | |
| return read_txt(file), None | |
| else: | |
| return "Unsupported file type.", None | |
| st.title("Redaction") | |
| uploaded_file = st.file_uploader("Upload a file", type=["pdf", "docx", "txt"]) | |
| if uploaded_file is not None: | |
| file_contents, pdf_document = process_file(uploaded_file) | |
| if pdf_document: | |
| redacted_text = '' | |
| for pg in pdf_document: | |
| text = pg.get_text() | |
| sentences = sentence_tokenize(text) | |
| for sent in sentences: | |
| # x = mask_generation(sent) | |
| # sent_n_q_c=[] | |
| # sent_n = list(set(sent.lower().replace('.',' ').split("\n"))) | |
| # for i in sent_n: | |
| # for j in i.split(" "): | |
| # sent_n_q_c+=j.split(',') | |
| # x_q = x.lower().replace('.',' ').split(' ') | |
| # e=[] | |
| # for i in x_q: | |
| # e+=i.split(',') | |
| # t5_words=set(sent_n_q_c).difference(set(e)) | |
| entities,words_out = extract_entities(sent) | |
| # print("\nwords_out:",words_out) | |
| # print("\nT5",t5_words) | |
| # print("X:",x,"\nsent:",sent,"\nx_q:",x_q,"\nsent_n:",sent_n,"\ne:",e,"\nsent_n_q_c:",sent_n_q_c,'\nt5_words',t5_words) | |
| bert_words = words_red_bert(sent) | |
| # print("\nbert:",bert_words) | |
| new=[] | |
| for w in words_out: | |
| new+=w.split('\n') | |
| # words_out+=t5_words | |
| new+=bert_words | |
| words_out = [i for i in new if len(i)>3] | |
| # print("\nfinal:",words_out) | |
| words_out=sorted(words_out, key=len,reverse=True) | |
| for i in words_out: | |
| redact_text(pg,i) | |
| # st.text_area(redacted_text) | |
| output_pdf = "output_redacted.pdf" | |
| pdf_document.save(output_pdf) | |
| with open(output_pdf, "rb") as file: | |
| st.download_button( | |
| label="Download Processed PDF", | |
| data=file, | |
| file_name="processed_file.pdf", | |
| mime="application/pdf", | |
| ) | |
| else: | |
| token = sentence_tokenize(file_contents) | |
| final = '' | |
| for i in range(0, len(token)): | |
| final += mask_generation(token[i]) + '\n' | |
| processed_text = final | |
| st.text_area("OUTPUT", processed_text, height=400) | |
| st.download_button( | |
| label="Download Processed File", | |
| data=processed_text, | |
| file_name="processed_file.txt", | |
| mime="text/plain", | |
| ) | |