MosaHosseini commited on
Commit
7f196ee
·
verified ·
1 Parent(s): f6bc566

Upload 2 files

Browse files

updated masks personnumber also better recognition of names.

Files changed (2) hide show
  1. anonymize.py +140 -127
  2. app.py +64 -33
anonymize.py CHANGED
@@ -1,128 +1,141 @@
1
- import regex as re
2
- from typing import List, Tuple, Dict, Set
3
- from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
4
-
5
-
6
- class SwedishTextMasker:
7
- def __init__(self, model_name: str = "RecordedFuture/Swedish-NER", threshold: float = 0.85):
8
- self.tokenizer = AutoTokenizer.from_pretrained(model_name)
9
- self.model = AutoModelForTokenClassification.from_pretrained(model_name)
10
- self.ner_pipeline = pipeline("ner", model=self.model, tokenizer=self.tokenizer, aggregation_strategy="simple")
11
- self.threshold = threshold
12
-
13
- def _reconstruct_entities(self, tokens_with_labels: List[Tuple[str, str, float]]) -> List[Tuple[str, str]]:
14
- words = []
15
- current_word = ''
16
- current_label = ''
17
- scores = []
18
-
19
- for token, label, score in tokens_with_labels:
20
- if token.startswith('##'):
21
- current_word += token[2:]
22
- scores.append(score)
23
- else:
24
- if current_word:
25
- words.append((current_word, current_label , sum(scores) / len(scores)))
26
- current_word, current_label = token, label
27
- scores = [score]
28
-
29
- if current_word:
30
- words.append((current_word, current_label , sum(scores) / len(scores)))
31
-
32
- result = self._in_order_ent_list(words)
33
- print("\n\n\n the result of inorder ent list : \n\n" , result)
34
- return result
35
-
36
-
37
- def _in_order_ent_list(self , all_ents_list):
38
- threshold_ents = [ent for ent in all_ents_list if ent[2]>=self.threshold]
39
- threshold_ents_word = {ent[0] for ent in threshold_ents}
40
- result = [(ent[0] , ent[1]) for ent in all_ents_list if ent[0] in threshold_ents_word and len(ent[0]) >=2]
41
- return result
42
-
43
- def _get_chunks(self, text_list: List[str], chunk_size: int = 100) -> List[List[str]]:
44
- return [text_list[i:i + chunk_size] for i in range(0, len(text_list), chunk_size)]
45
-
46
- def _retrieve_ner(self, text: str) -> List[Tuple[str, str, float]]:
47
- results = self.ner_pipeline(text)
48
- return [
49
- (ent["word"], ent["entity_group"], ent["score"])
50
- for ent in results
51
- if ent["entity_group"] in {"ORG", "PER", "TIT"}
52
- ]
53
-
54
- def _get_entities(self, text: str) -> List[Tuple[str, str, float]]:
55
- tokens = text.split()
56
- if len(tokens) > 100:
57
- chunks = self._get_chunks(tokens)
58
- all_ents = []
59
- for chunk in chunks:
60
- chunk_text = " ".join(chunk)
61
- all_ents.extend(self._retrieve_ner(chunk_text))
62
- return all_ents
63
- else:
64
- return self._retrieve_ner(text)
65
-
66
- def _get_entity_dicts(self, entities: List[Tuple[str, str, float]]) -> Tuple[Dict[str, str], Dict[str, str], Set[str]]:
67
- persons = [ent[0] for ent in entities if ent[1] == "PER"]
68
- companies = [ent[0] for ent in entities if ent[1] == "ORG"]
69
- titles = {ent[0] for ent in entities if ent[1] == "TIT"}
70
-
71
- person_dict = {name: f"Person {chr(ord('A') + i)}" for i, name in enumerate(dict.fromkeys(persons))}
72
- company_dict = {name: f"ORG_COMPANY {chr(ord('A') + i)}" for i, name in enumerate(dict.fromkeys(companies))}
73
-
74
- return person_dict, company_dict, titles
75
-
76
- @staticmethod
77
- def mask_digits(text: str) -> str:
78
- return re.sub(r'\d', 'x', text)
79
-
80
- def mask_phone_numbers(self, text: str) -> str:
81
- phone_regex = re.compile(r'(?:\+|00)?\d[\d\s\-()]{5,}\d')
82
- return phone_regex.sub(lambda m: self.mask_digits(m.group()), text)
83
-
84
- def mask_org_numbers(self, text: str) -> str:
85
- org_regex = re.compile(r'\b\d{6}-?\d{4}\b')
86
- return org_regex.sub(lambda m: self.mask_digits(m.group()), text)
87
-
88
- def mask_emails(self, text: str) -> str:
89
- email_regex = re.compile(r'\b([a-zA-Z0-9._%+-]+)@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b')
90
- def email_masker(match):
91
- local, domain = match.groups()
92
- return f"{re.sub(r'[a-zA-Z0-9]', 'x', local)}@{re.sub(r'[a-zA-Z0-9]', 'x', domain)}"
93
- return email_regex.sub(email_masker, text)
94
-
95
- def mask_addresses(self, text: str) -> str:
96
- address_regex = re.compile(
97
- r'\b(?:[A-ZÅÄÖa-zåäöéÉèÈçÇß0-9\-]+\s)+\d{1,4}\s*,?\s*\d{3}\s?\d{2}\s+[A-ZÅÄÖa-zåäö\-]+',
98
- re.UNICODE
99
- )
100
- return address_regex.sub('[ADDRESS]', text)
101
-
102
- def mask_entities(self, text: str, entity_dict: Dict[str, str], tag: str) -> str:
103
- for original, masked in entity_dict.items():
104
- text = re.sub(re.escape(original), f"[{masked}]", text)
105
- return text
106
-
107
- def mask_titles(self, text: str, titles: Set[str]) -> str:
108
- for title in titles:
109
- text = re.sub(re.escape(title), "[Person_Title]", text)
110
- return text
111
-
112
- def mask_all(self, text: str) -> str:
113
- old_text_backup = text
114
- print("Original Text: \n\n" , text )
115
- text = self.mask_phone_numbers(text)
116
- text = self.mask_org_numbers(text)
117
- text = self.mask_emails(text)
118
- text = self.mask_addresses(text)
119
-
120
- ents_raw = self._get_entities(old_text_backup)
121
- ents = self._reconstruct_entities(ents_raw)
122
- person_dict, company_dict, title_set = self._get_entity_dicts(ents)
123
-
124
- text = self.mask_entities(text, company_dict, "ORG")
125
- text = self.mask_entities(text, person_dict, "PER")
126
- text = self.mask_titles(text, title_set)
127
-
 
 
 
 
 
 
 
 
 
 
 
 
 
128
  return text
 
1
+ import regex as re
2
+ from typing import List, Tuple, Dict, Set
3
+ from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
4
+
5
+
6
+ class SwedishTextMasker:
7
+ def __init__(self, model_name: str = "RecordedFuture/Swedish-NER", threshold: float = 0.85):
8
+ self.tokenizer = AutoTokenizer.from_pretrained(model_name)
9
+ self.model = AutoModelForTokenClassification.from_pretrained(model_name)
10
+ self.ner_pipeline = pipeline("ner", model=self.model, tokenizer=self.tokenizer, aggregation_strategy="simple")
11
+ self.threshold = threshold
12
+
13
+ def _reconstruct_entities(self, tokens_with_labels: List[Tuple[str, str, float]]) -> List[Tuple[str, str]]:
14
+ words = []
15
+ current_word = ''
16
+ current_label = ''
17
+ scores = []
18
+ print("token with labels: " ,tokens_with_labels)
19
+ for token, label, score in tokens_with_labels:
20
+ if token.startswith('##'):
21
+ current_word += token[2:]
22
+ scores.append(score)
23
+ else:
24
+ if current_word:
25
+ words.append((current_word, current_label , sum(scores) / len(scores)))
26
+ current_word, current_label = token, label
27
+ scores = [score]
28
+
29
+ if current_word:
30
+ words.append((current_word, current_label , sum(scores) / len(scores)))
31
+
32
+ words = [(re.sub(r'\s*-\s*', '-', w[0]), w[1], w[2]) for w in words]
33
+ result = self._in_order_ent_list(words)
34
+ print("\n\n\n the result of inorder ent list : \n\n" , result)
35
+ return result
36
+
37
+
38
+ def _in_order_ent_list(self , all_ents_list):
39
+ threshold_ents = [ent for ent in all_ents_list if ent[2]>=self.threshold]
40
+ threshold_ents_word = {ent[0] for ent in threshold_ents}
41
+ result = [(ent[0] , ent[1]) for ent in all_ents_list if ent[0] in threshold_ents_word and len(ent[0]) >=2]
42
+ return result
43
+
44
+ def _get_chunks(self, text_list: List[str], chunk_size: int = 100) -> List[List[str]]:
45
+ return [text_list[i:i + chunk_size] for i in range(0, len(text_list), chunk_size)]
46
+
47
+ def _retrieve_ner(self, text: str) -> List[Tuple[str, str, float]]:
48
+ results = self.ner_pipeline(text)
49
+ return [
50
+ (ent["word"], ent["entity_group"], ent["score"])
51
+ for ent in results
52
+ if ent["entity_group"] in {"ORG", "PER", "TIT"}
53
+ ]
54
+
55
+ def _get_entities(self, text: str) -> List[Tuple[str, str, float]]:
56
+ tokens = text.split()
57
+ if len(tokens) > 100:
58
+ chunks = self._get_chunks(tokens)
59
+ all_ents = []
60
+ for chunk in chunks:
61
+ chunk_text = " ".join(chunk)
62
+ all_ents.extend(self._retrieve_ner(chunk_text))
63
+ return all_ents
64
+ else:
65
+ return self._retrieve_ner(text)
66
+
67
+ def _get_entity_dicts(self, entities: List[Tuple[str, str, float]]) -> Tuple[Dict[str, str], Dict[str, str], Set[str]]:
68
+ persons = [ent[0] for ent in entities if ent[1] == "PER"]
69
+ companies = [ent[0] for ent in entities if ent[1] == "ORG"]
70
+ titles = {ent[0] for ent in entities if ent[1] == "TIT"}
71
+
72
+ person_dict = {name: f"Person {chr(ord('A') + i)}" for i, name in enumerate(dict.fromkeys(persons))}
73
+ company_dict = {name: f"ORG_COMPANY {chr(ord('A') + i)}" for i, name in enumerate(dict.fromkeys(companies))}
74
+
75
+ return person_dict, company_dict, titles
76
+
77
+
78
+ def remove_personnummer(self, text):
79
+ pattern = r"""
80
+ (?<!\d)
81
+ (?:
82
+ \d{6,8}[\-\s]?\d{4} |
83
+ \d{5}[\-\s]?\d{4}
84
+ )
85
+ (?!\d)
86
+ """
87
+ return re.sub(pattern, '[person/org nummer]', text, flags=re.VERBOSE)
88
+
89
+ @staticmethod
90
+ def mask_digits(text: str) -> str:
91
+ return re.sub(r'\d', 'x', text)
92
+
93
+ def mask_phone_numbers(self, text: str) -> str:
94
+ phone_regex = re.compile(r'(?:\+|00)?\d[\d\s\-()]{5,}\d')
95
+ return phone_regex.sub(lambda m: self.mask_digits(m.group()), text)
96
+
97
+ # def mask_org_numbers(self, text: str) -> str:
98
+ # org_regex = re.compile(r'\b\d{6}-?\d{4}\b')
99
+ # return org_regex.sub(lambda m: self.mask_digits(m.group()), text)
100
+
101
+ def mask_emails(self, text: str) -> str:
102
+ email_regex = re.compile(r'\b([a-zA-Z0-9._%+-]+)@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\b')
103
+ def email_masker(match):
104
+ local, domain = match.groups()
105
+ return f"{re.sub(r'[a-zA-Z0-9]', 'x', local)}@{re.sub(r'[a-zA-Z0-9]', 'x', domain)}"
106
+ return email_regex.sub(email_masker, text)
107
+
108
+ def mask_addresses(self, text: str) -> str:
109
+ address_regex = re.compile(
110
+ r'\b(?:[A-ZÅÄÖa-zåäöéÉèÈçÇß0-9\-]+\s)+\d{1,4}\s*,?\s*\d{3}\s?\d{2}\s+[A-ZÅÄÖa-zåäö\-]+',
111
+ re.UNICODE
112
+ )
113
+ return address_regex.sub('[ADDRESS]', text)
114
+
115
+ def mask_entities(self, text: str, entity_dict: Dict[str, str], tag: str) -> str:
116
+ for original, masked in entity_dict.items():
117
+ text = re.sub(re.escape(original), f"[{masked}]", text)
118
+ return text
119
+
120
+ def mask_titles(self, text: str, titles: Set[str]) -> str:
121
+ for title in titles:
122
+ text = re.sub(re.escape(title), "[Person_Title]", text)
123
+ return text
124
+
125
+ def mask_all(self, text: str) -> str:
126
+ old_text_backup = text
127
+ text = self.remove_personnummer(text)
128
+ text = self.mask_phone_numbers(text)
129
+ # text = self.mask_org_numbers(text)
130
+ text = self.mask_emails(text)
131
+ text = self.mask_addresses(text)
132
+
133
+ ents_raw = self._get_entities(old_text_backup)
134
+ ents = self._reconstruct_entities(ents_raw)
135
+ person_dict, company_dict, title_set = self._get_entity_dicts(ents)
136
+
137
+ text = self.mask_entities(text, company_dict, "ORG")
138
+ text = self.mask_entities(text, person_dict, "PER")
139
+ text = self.mask_titles(text, title_set)
140
+
141
  return text
app.py CHANGED
@@ -1,33 +1,64 @@
1
- import fitz # PyMuPDF
2
- import gradio as gr
3
- from anonymize import SwedishTextMasker
4
-
5
- # Instantiate once, globally
6
- text_anonymizer = SwedishTextMasker(threshold= 0.9)
7
-
8
- def extract_text_from_pdf(pdf_file):
9
- if pdf_file is None:
10
- return "No file uploa9999ded."
11
-
12
- # Approach 1: open via file path (usually safer)
13
- with fitz.open(pdf_file.name) as doc:
14
- text_output = ""
15
- for page in doc:
16
- text_output += page.get_text()
17
-
18
- raw_text = text_output.strip()
19
- anonymized_text = text_anonymizer.mask_all(raw_text)
20
- return anonymized_text
21
-
22
- # Gradio interface
23
- with gr.Blocks(title="PDF -> Anonymized Text") as demo:
24
- gr.Markdown("### 📄 PDF Anonymizer (text only, skips images)")
25
- with gr.Row():
26
- pdf_input = gr.File(label="Upload a PDF", file_types=[".pdf"])
27
- text_output = gr.Textbox(label="Anonymized Output", lines=20, interactive=False)
28
-
29
- extract_button = gr.Button("Anonymize Text")
30
- extract_button.click(fn=extract_text_from_pdf, inputs=pdf_input, outputs=text_output)
31
-
32
- if __name__ == "__main__":
33
- demo.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import fitz # PyMuPDF
2
+ import gradio as gr
3
+ from anonymize import SwedishTextMasker
4
+
5
+ # Instantiate once, globally
6
+ text_anonymizer = SwedishTextMasker(threshold= 0.5)
7
+
8
+ def join_short_lines(text, min_length=30):
9
+ """
10
+ Joins lines that are shorter than min_length with the next line.
11
+ """
12
+ lines = text.split('\n')
13
+ new_lines = []
14
+ buffer = ""
15
+ for line in lines:
16
+ stripped = line.strip()
17
+ if not stripped:
18
+ if buffer:
19
+ new_lines.append(buffer)
20
+ buffer = ""
21
+ new_lines.append("") # preserve empty lines
22
+ continue
23
+ if len(stripped) < min_length and not stripped.endswith(('.', ':', ';', '?', '!')):
24
+ buffer += " " + stripped if buffer else stripped
25
+ else:
26
+ if buffer:
27
+ buffer += " " + stripped
28
+ new_lines.append(buffer)
29
+ buffer = ""
30
+ else:
31
+ new_lines.append(stripped)
32
+ if buffer:
33
+ new_lines.append(buffer)
34
+ return "\n".join(new_lines)
35
+
36
+
37
+ def extract_text_from_pdf(pdf_file):
38
+ if pdf_file is None:
39
+ return "No file uploaded."
40
+
41
+ # Approach 1: open via file path (usually safer)
42
+ with fitz.open(pdf_file.name) as doc:
43
+ text_output = ""
44
+ for page in doc:
45
+ text_output += page.get_text(flags=1)
46
+
47
+ raw_text = text_output.strip()
48
+ # raw_text = join_short_lines(raw_text) # <--- Add this line!
49
+ print(raw_text)
50
+ anonymized_text = text_anonymizer.mask_all(raw_text)
51
+ return anonymized_text
52
+
53
+ # Gradio interface
54
+ with gr.Blocks(title="PDF -> Anonymized Text") as demo:
55
+ gr.Markdown("### 📄 PDF Anonymizer (text only, skips images)")
56
+ with gr.Row():
57
+ pdf_input = gr.File(label="Upload a PDF", file_types=[".pdf"])
58
+ text_output = gr.Textbox(label="Anonymized Output", lines=20, interactive=False)
59
+
60
+ extract_button = gr.Button("Anonymize Text")
61
+ extract_button.click(fn=extract_text_from_pdf, inputs=pdf_input, outputs=text_output)
62
+
63
+ if __name__ == "__main__":
64
+ demo.launch()