MosaHosseini commited on
Commit
378b61e
·
verified ·
1 Parent(s): 833a0d6

Upload 3 files

Browse files

app.py is for gradio interface and pdf to text and anonymize.py is for the text masker.

Files changed (3) hide show
  1. anonymize.py +128 -0
  2. app.py +33 -0
  3. requirements.txt +5 -0
anonymize.py ADDED
@@ -0,0 +1,128 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import regex as re
2
+ from typing import List, Tuple, Dict, Set
3
+ from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
4
+
5
+
6
+ class SwedishTextMasker:
7
+ def __init__(self, model_name: str = "RecordedFuture/Swedish-NER", threshold: float = 0.85):
8
+ self.tokenizer = AutoTokenizer.from_pretrained(model_name)
9
+ self.model = AutoModelForTokenClassification.from_pretrained(model_name)
10
+ self.ner_pipeline = pipeline("ner", model=self.model, tokenizer=self.tokenizer, aggregation_strategy="simple")
11
+ self.threshold = threshold
12
+
13
+ def _reconstruct_entities(self, tokens_with_labels: List[Tuple[str, str, float]]) -> List[Tuple[str, str]]:
14
+ words = []
15
+ current_word = ''
16
+ current_label = ''
17
+ scores = []
18
+
19
+ for token, label, score in tokens_with_labels:
20
+ if token.startswith('##'):
21
+ current_word += token[2:]
22
+ scores.append(score)
23
+ else:
24
+ if current_word:
25
+ words.append((current_word, current_label , sum(scores) / len(scores)))
26
+ current_word, current_label = token, label
27
+ scores = [score]
28
+
29
+ if current_word:
30
+ words.append((current_word, current_label , sum(scores) / len(scores)))
31
+
32
+ result = self._in_order_ent_list(words)
33
+ print("\n\n\n the result of inorder ent list : \n\n" , result)
34
+ return result
35
+
36
+
37
+ def _in_order_ent_list(self , all_ents_list):
38
+ threshold_ents = [ent for ent in all_ents_list if ent[2]>=self.threshold]
39
+ threshold_ents_word = {ent[0] for ent in threshold_ents}
40
+ result = [(ent[0] , ent[1]) for ent in all_ents_list if ent[0] in threshold_ents_word and len(ent[0]) >=2]
41
+ return result
42
+
43
+ def _get_chunks(self, text_list: List[str], chunk_size: int = 100) -> List[List[str]]:
44
+ return [text_list[i:i + chunk_size] for i in range(0, len(text_list), chunk_size)]
45
+
46
+ def _retrieve_ner(self, text: str) -> List[Tuple[str, str, float]]:
47
+ results = self.ner_pipeline(text)
48
+ return [
49
+ (ent["word"], ent["entity_group"], ent["score"])
50
+ for ent in results
51
+ if ent["entity_group"] in {"ORG", "PER", "TIT"}
52
+ ]
53
+
54
+ def _get_entities(self, text: str) -> List[Tuple[str, str, float]]:
55
+ tokens = text.split()
56
+ if len(tokens) > 100:
57
+ chunks = self._get_chunks(tokens)
58
+ all_ents = []
59
+ for chunk in chunks:
60
+ chunk_text = " ".join(chunk)
61
+ all_ents.extend(self._retrieve_ner(chunk_text))
62
+ return all_ents
63
+ else:
64
+ return self._retrieve_ner(text)
65
+
66
+ def _get_entity_dicts(self, entities: List[Tuple[str, str, float]]) -> Tuple[Dict[str, str], Dict[str, str], Set[str]]:
67
+ persons = [ent[0] for ent in entities if ent[1] == "PER"]
68
+ companies = [ent[0] for ent in entities if ent[1] == "ORG"]
69
+ titles = {ent[0] for ent in entities if ent[1] == "TIT"}
70
+
71
+ person_dict = {name: f"Person {chr(ord('A') + i)}" for i, name in enumerate(dict.fromkeys(persons))}
72
+ company_dict = {name: f"ORG_COMPANY {chr(ord('A') + i)}" for i, name in enumerate(dict.fromkeys(companies))}
73
+
74
+ return person_dict, company_dict, titles
75
+
76
+ @staticmethod
77
+ def mask_digits(text: str) -> str:
78
+ return re.sub(r'\d', 'x', text)
79
+
80
+ def mask_phone_numbers(self, text: str) -> str:
81
+ phone_regex = re.compile(r'(?:\+|00)?\d[\d\s\-()]{5,}\d')
82
+ return phone_regex.sub(lambda m: self.mask_digits(m.group()), text)
83
+
84
+ def mask_org_numbers(self, text: str) -> str:
85
+ org_regex = re.compile(r'\b\d{6}-?\d{4}\b')
86
+ return org_regex.sub(lambda m: self.mask_digits(m.group()), text)
87
+
88
+ def mask_emails(self, text: str) -> str:
89
+ email_regex = re.compile(r'\b([a-zA-Z0-9._%+-]+)@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b')
90
+ def email_masker(match):
91
+ local, domain = match.groups()
92
+ return f"{re.sub(r'[a-zA-Z0-9]', 'x', local)}@{re.sub(r'[a-zA-Z0-9]', 'x', domain)}"
93
+ return email_regex.sub(email_masker, text)
94
+
95
+ def mask_addresses(self, text: str) -> str:
96
+ address_regex = re.compile(
97
+ r'\b(?:[A-ZÅÄÖa-zåäöéÉèÈçÇß0-9\-]+\s)+\d{1,4}\s*,?\s*\d{3}\s?\d{2}\s+[A-ZÅÄÖa-zåäö\-]+',
98
+ re.UNICODE
99
+ )
100
+ return address_regex.sub('[ADDRESS]', text)
101
+
102
+ def mask_entities(self, text: str, entity_dict: Dict[str, str], tag: str) -> str:
103
+ for original, masked in entity_dict.items():
104
+ text = re.sub(re.escape(original), f"[{masked}]", text)
105
+ return text
106
+
107
+ def mask_titles(self, text: str, titles: Set[str]) -> str:
108
+ for title in titles:
109
+ text = re.sub(re.escape(title), "[Person_Title]", text)
110
+ return text
111
+
112
+ def mask_all(self, text: str) -> str:
113
+ old_text_backup = text
114
+ print("Original Text: \n\n" , text )
115
+ text = self.mask_phone_numbers(text)
116
+ text = self.mask_org_numbers(text)
117
+ text = self.mask_emails(text)
118
+ text = self.mask_addresses(text)
119
+
120
+ ents_raw = self._get_entities(old_text_backup)
121
+ ents = self._reconstruct_entities(ents_raw)
122
+ person_dict, company_dict, title_set = self._get_entity_dicts(ents)
123
+
124
+ text = self.mask_entities(text, company_dict, "ORG")
125
+ text = self.mask_entities(text, person_dict, "PER")
126
+ text = self.mask_titles(text, title_set)
127
+
128
+ return text
app.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import fitz # PyMuPDF
2
+ import gradio as gr
3
+ from anonymize import SwedishTextMasker
4
+
5
+ # Instantiate once, globally
6
+ text_anonymizer = SwedishTextMasker(threshold= 0.9)
7
+
8
+ def extract_text_from_pdf(pdf_file):
9
+ if pdf_file is None:
10
+ return "No file uploa9999ded."
11
+
12
+ # Approach 1: open via file path (usually safer)
13
+ with fitz.open(pdf_file.name) as doc:
14
+ text_output = ""
15
+ for page in doc:
16
+ text_output += page.get_text()
17
+
18
+ raw_text = text_output.strip()
19
+ anonymized_text = text_anonymizer.mask_all(raw_text)
20
+ return anonymized_text
21
+
22
+ # Gradio interface
23
+ with gr.Blocks(title="PDF -> Anonymized Text") as demo:
24
+ gr.Markdown("### ?? PDF Anonymizer (text only, skips images)")
25
+ with gr.Row():
26
+ pdf_input = gr.File(label="Upload a PDF", file_types=[".pdf"])
27
+ text_output = gr.Textbox(label="Anonymized Output", lines=20, interactive=False)
28
+
29
+ extract_button = gr.Button("Anonymize Text")
30
+ extract_button.click(fn=extract_text_from_pdf, inputs=pdf_input, outputs=text_output)
31
+
32
+ if __name__ == "__main__":
33
+ demo.launch()
requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ transformers==4.52.4
2
+ torch==2.7.0
3
+ regex==2024.11.6
4
+ gradio==5.32.1
5
+ PyMuPDF==1.26.3