Spaces:
Sleeping
Sleeping
| from transformers import AutoTokenizer | |
| from transformers import AutoModelForSeq2SeqLM | |
| import streamlit as st | |
| import fitz # PyMuPDF | |
| from docx import Document | |
| import re | |
| import nltk | |
| nltk.download('punkt') | |
| def sentence_tokenize(text): | |
| sentences = nltk.sent_tokenize(text) | |
| return sentences | |
| model_dir_large = 'edithram23/Redaction_Personal_info_v1' | |
| tokenizer_large = AutoTokenizer.from_pretrained(model_dir_large) | |
| model_large = AutoModelForSeq2SeqLM.from_pretrained(model_dir_large) | |
| # model_dir_small = 'edithram23/Redaction' | |
| # tokenizer_small = AutoTokenizer.from_pretrained(model_dir_small) | |
| # model_small = AutoModelForSeq2SeqLM.from_pretrained(model_dir_small) | |
| # def small(text, model=model_small, tokenizer=tokenizer_small): | |
| # inputs = ["Mask Generation: " + text.lower() + '.'] | |
| # inputs = tokenizer(inputs, max_length=256, truncation=True, return_tensors="pt") | |
| # output = model.generate(**inputs, num_beams=8, do_sample=True, max_length=len(text)) | |
| # decoded_output = tokenizer.batch_decode(output, skip_special_tokens=True)[0] | |
| # predicted_title = decoded_output.strip() | |
| # pattern = r'\[.*?\]' | |
| # redacted_text = re.sub(pattern, '[redacted]', predicted_title) | |
| # return redacted_text | |
| def mask_generation(text, model=model_large, tokenizer=tokenizer_large): | |
| if len(text) < 90: | |
| text = text + '.' | |
| # return small(text) | |
| inputs = ["Mask Generation: " + text.lower() + '.'] | |
| inputs = tokenizer(inputs, max_length=512, truncation=True, return_tensors="pt") | |
| output = model.generate(**inputs, num_beams=8, do_sample=True, max_length=len(text)) | |
| decoded_output = tokenizer.batch_decode(output, skip_special_tokens=True)[0] | |
| predicted_title = decoded_output.strip() | |
| pattern = r'\[.*?\]' | |
| redacted_text = re.sub(pattern, '[redacted]', predicted_title) | |
| return redacted_text | |
| def find_surrounding_words(text, target="[redacted]"): | |
| pattern = re.compile(r'([A-Za-z0-9_@#\$%\^&*\(\)\[\]\{\}\.\,]+)?\s*' + re.escape(target) + r'\s*([A-Za-z0-9_@#\$%\^&*\(\)\[\]\{\}\.\,]+)?') | |
| matches = pattern.finditer(text) | |
| results = [] | |
| for match in matches: | |
| before, after = match.group(1), match.group(2) | |
| if before: | |
| before_parts = before.split(',') | |
| before_parts = [item for item in before_parts if item.strip()] | |
| if len(before_parts) > 1: | |
| before_word = before_parts[0].strip() | |
| before_index = match.start(1) | |
| else: | |
| before_word = before_parts[0] | |
| before_index = match.start(1) | |
| else: | |
| before_word = None | |
| before_index = None | |
| if after: | |
| after_parts = after.split(',') | |
| after_parts = [item for item in after_parts if item.strip()] | |
| if len(after_parts) > 1: | |
| after_word = after_parts[0].strip() | |
| after_index = match.start(2) | |
| else: | |
| after_word = after_parts[0] | |
| after_index = match.start(2) | |
| else: | |
| after_word = None | |
| after_index = None | |
| if match.start() == 0: | |
| before_word = None | |
| before_index = None | |
| if match.end() == len(text): | |
| after_word = None | |
| after_index = None | |
| results.append({ | |
| "before_word": before_word, | |
| "after_word": after_word, | |
| "before_index": before_index, | |
| "after_index": after_index | |
| }) | |
| return results | |
| def redact_text(page, text): | |
| text_instances = page.search_for(text) | |
| for inst in text_instances: | |
| page.add_redact_annot(inst, fill=(0, 0, 0)) | |
| page.apply_redactions() | |
| def read_pdf(file): | |
| pdf_document = fitz.open(stream=file.read(), filetype="pdf") | |
| text = "" | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document.load_page(page_num) | |
| text += page.get_text() | |
| return text, pdf_document | |
| def read_docx(file): | |
| doc = Document(file) | |
| text = "\n".join([para.text for para in doc.paragraphs]) | |
| return text | |
| def read_txt(file): | |
| text = file.read().decode("utf-8") | |
| return text | |
| def process_file(file): | |
| if file.type == "application/pdf": | |
| return read_pdf(file) | |
| elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": | |
| return read_docx(file), None | |
| elif file.type == "text/plain": | |
| return read_txt(file), None | |
| else: | |
| return "Unsupported file type.", None | |
| st.title("Redaction") | |
| uploaded_file = st.file_uploader("Upload a file", type=["pdf", "docx", "txt"]) | |
| if uploaded_file is not None: | |
| file_contents, pdf_document = process_file(uploaded_file) | |
| if pdf_document: | |
| redacted_text = [] | |
| for page in pdf_document: | |
| pg = page.get_text() | |
| pg_lower = pg.lower() | |
| token = sentence_tokenize(pg) | |
| final = '' | |
| for t in token: | |
| t_lower = t.lower() | |
| final = mask_generation(t) | |
| words = find_surrounding_words(final) | |
| for i in range(len(words)): | |
| if words[i]['after_index'] is None: | |
| if words[i]['before_word'] in t_lower: | |
| fi = t_lower.index(words[i]['before_word']) | |
| fi = fi + len(words[i]['before_word']) | |
| li = len(t) | |
| redacted_text.append(t[fi:li]) | |
| elif words[i]['before_index'] is None: | |
| if words[i]['after_word'] in t_lower: | |
| fi = 0 | |
| li = t_lower.index(words[i]['after_word']) | |
| redacted_text.append(t[fi:li]) | |
| else: | |
| if words[i]['after_word'] in t_lower and words[i]['before_word'] in t_lower: | |
| before_word = words[i]['before_word'] | |
| after_word = words[i]['after_word'] | |
| fi = t_lower.index(before_word) | |
| fi = fi + len(before_word) | |
| li = t_lower.index(after_word) | |
| redacted_text.append(t[fi:li]) | |
| for page in pdf_document: | |
| for i in redacted_text: | |
| redact_text(page, i) | |
| output_pdf = "output_redacted.pdf" | |
| pdf_document.save(output_pdf) | |
| with open(output_pdf, "rb") as file: | |
| st.download_button( | |
| label="Download Processed PDF", | |
| data=file, | |
| file_name="processed_file.pdf", | |
| mime="application/pdf", | |
| ) | |
| else: | |
| token = sentence_tokenize(file_contents) | |
| final = '' | |
| for i in range(0, len(token)): | |
| final += mask_generation(token[i]) + '\n' | |
| processed_text = final | |
| st.text_area("OUTPUT", processed_text, height=400) | |
| st.download_button( | |
| label="Download Processed File", | |
| data=processed_text, | |
| file_name="processed_file.txt", | |
| mime="text/plain", | |
| ) | |