hugging-hat commited on
Commit
a05a9b8
·
verified ·
1 Parent(s): 6ba6d38

Upload anonymise.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. anonymise.py +188 -0
anonymise.py ADDED
@@ -0,0 +1,188 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ NERPA – Text anonymisation using the fine-tuned GLiNER2 model.
3
+
4
+ Usage:
5
+ python anonymise.py "My name is John Smith, born 15/03/1990. Email: john@example.com"
6
+ python anonymise.py --file input.txt
7
+ python anonymise.py --file input.txt --output anonymised.txt
8
+ """
9
+
10
+ import argparse
11
+ import sys
12
+ from typing import Dict, List, Tuple
13
+
14
+ import torch
15
+ from gliner2 import GLiNER2
16
+
17
+
18
+ # Entity types the model was fine-tuned to recognise, with descriptions
19
+ # that guide the bi-encoder towards better detection.
20
+ PII_ENTITIES = {
21
+ "LOCATION": "Address, country, city, postcode, street, any other location",
22
+ "AGE": "Age of a person",
23
+ "DIGITAL_KEYS": "Digital keys, passwords, pins used to access anything like servers, banks, APIs, accounts etc",
24
+ "BANK_ACCOUNT_DETAILS": "Bank account details such as number, IBAN, SWIFT, routing numbers etc",
25
+ "CARD_DETAILS": "Debit or credit card details such as card number, CVV, expiration etc",
26
+ "DATE_TIME": "Generic date and time",
27
+ "DATE_OF_BIRTH": "Date of birth",
28
+ "PERSONAL_ID_NUMBERS": "Common personal identification numbers such as passport numbers, driving licenses, taxpayer and insurance numbers",
29
+ "TECHNICAL_ID_NUMBERS": "IP and MAC addresses, serial numbers and any other technical ID numbers",
30
+ "EMAIL": "Email",
31
+ "PERSON_NAME": "Person name",
32
+ "BUSINESS_NAME": "Business name",
33
+ "PHONE": "Any personal or other phone numbers",
34
+ "URL": "Any short or full URL",
35
+ "USERNAME": "Username",
36
+ "VEHICLE_ID_NUMBERS": "Any vehicle numbers like license plates, vehicle identification numbers",
37
+ }
38
+
39
+ CONFIDENCE_THRESHOLD = 0.25
40
+ CHUNK_SIZE = 3000
41
+ CHUNK_OVERLAP = 100
42
+
43
+
44
+ def load_model(model_path: str = ".") -> GLiNER2:
45
+ """Load the NERPA model onto the best available device."""
46
+ if torch.cuda.is_available():
47
+ device = torch.device("cuda")
48
+ elif torch.backends.mps.is_available():
49
+ device = torch.device("mps")
50
+ else:
51
+ device = torch.device("cpu")
52
+
53
+ model = GLiNER2.from_pretrained(model_path)
54
+ model.to(device)
55
+ return model
56
+
57
+
58
+ def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> Tuple[List[str], List[int]]:
59
+ """Split text into overlapping chunks, returning chunks and their start offsets."""
60
+ if not text:
61
+ return [], []
62
+ chunks, starts = [], []
63
+ step = chunk_size - overlap
64
+ pos = 0
65
+ while pos < len(text):
66
+ chunks.append(text[pos : pos + chunk_size])
67
+ starts.append(pos)
68
+ if pos + chunk_size >= len(text):
69
+ break
70
+ pos += step
71
+ return chunks, starts
72
+
73
+
74
+ def detect_entities(
75
+ model: GLiNER2,
76
+ text: str,
77
+ entities: Dict[str, str] = None,
78
+ threshold: float = CONFIDENCE_THRESHOLD,
79
+ ) -> List[dict]:
80
+ """
81
+ Detect PII entities in text, returning a list of
82
+ {"type": str, "start": int, "end": int, "score": float} dicts
83
+ with character offsets into the original text.
84
+ """
85
+ entities = entities or PII_ENTITIES
86
+
87
+ # Always detect both date types so the model can disambiguate
88
+ detect = dict(entities)
89
+ if "DATE_TIME" in detect and "DATE_OF_BIRTH" not in detect:
90
+ detect["DATE_OF_BIRTH"] = PII_ENTITIES["DATE_OF_BIRTH"]
91
+ elif "DATE_OF_BIRTH" in detect and "DATE_TIME" not in detect:
92
+ detect["DATE_TIME"] = PII_ENTITIES["DATE_TIME"]
93
+
94
+ chunks, offsets = chunk_text(text)
95
+
96
+ all_chunk_results = []
97
+ for batch_start in range(0, len(chunks), 32):
98
+ batch = chunks[batch_start : batch_start + 32]
99
+ results = model.batch_extract_entities(
100
+ batch,
101
+ detect,
102
+ include_confidence=True,
103
+ include_spans=True,
104
+ threshold=threshold,
105
+ )
106
+ all_chunk_results.extend(results)
107
+
108
+ # Merge results across chunks: de-duplicate overlapping detections
109
+ seen: Dict[Tuple[int, int], dict] = {}
110
+ for chunk_result, chunk_offset in zip(all_chunk_results, offsets):
111
+ for label, occurrences in chunk_result["entities"].items():
112
+ for occ in occurrences:
113
+ start = occ["start"] + chunk_offset
114
+ end = occ["end"] + chunk_offset
115
+ pos = (start, end)
116
+ if pos not in seen or seen[pos]["score"] < occ["confidence"]:
117
+ seen[pos] = {"type": label, "score": occ["confidence"]}
118
+
119
+ # Merge overlapping spans, keeping highest confidence label
120
+ items = sorted(
121
+ [(s, e, info) for (s, e), info in seen.items() if info["type"] in entities],
122
+ key=lambda x: (x[0], x[1]),
123
+ )
124
+ if not items:
125
+ return []
126
+
127
+ merged = []
128
+ cur_s, cur_e, cur_info = items[0]
129
+ for s, e, info in items[1:]:
130
+ if s < cur_e: # overlapping
131
+ cur_e = max(cur_e, e)
132
+ if info["score"] > cur_info["score"]:
133
+ cur_info = info
134
+ else:
135
+ merged.append({"type": cur_info["type"], "start": cur_s, "end": cur_e, "score": cur_info["score"]})
136
+ cur_s, cur_e, cur_info = s, e, info
137
+ merged.append({"type": cur_info["type"], "start": cur_s, "end": cur_e, "score": cur_info["score"]})
138
+
139
+ return merged
140
+
141
+
142
+ def anonymise(text: str, detected: List[dict]) -> str:
143
+ """Replace detected entities with placeholders like [PERSON_NAME]."""
144
+ # Process from end to start so offsets stay valid
145
+ result = text
146
+ for entity in sorted(detected, key=lambda e: e["start"], reverse=True):
147
+ placeholder = f'[{entity["type"]}]'
148
+ result = result[: entity["start"]] + placeholder + result[entity["end"] :]
149
+ return result
150
+
151
+
152
+ def main():
153
+ parser = argparse.ArgumentParser(description="Anonymise PII in text using the NERPA model.")
154
+ parser.add_argument("text", nargs="?", help="Text to anonymise (or use --file)")
155
+ parser.add_argument("--file", "-f", help="Read text from a file instead")
156
+ parser.add_argument("--output", "-o", help="Write anonymised text to file (default: stdout)")
157
+ parser.add_argument("--model", "-m", default=".", help="Path to model directory (default: current dir)")
158
+ parser.add_argument("--threshold", "-t", type=float, default=CONFIDENCE_THRESHOLD, help="Confidence threshold (default: 0.25)")
159
+ parser.add_argument("--show-entities", action="store_true", help="Print detected entities before anonymised text")
160
+ args = parser.parse_args()
161
+
162
+ if args.file:
163
+ with open(args.file) as f:
164
+ text = f.read()
165
+ elif args.text:
166
+ text = args.text
167
+ else:
168
+ parser.error("Provide text as an argument or use --file")
169
+
170
+ model = load_model(args.model)
171
+ detected = detect_entities(model, text, threshold=args.threshold)
172
+
173
+ if args.show_entities:
174
+ for e in detected:
175
+ print(f' {e["type"]:25s} [{e["start"]:5d}:{e["end"]:5d}] (score={e["score"]:.2f}) "{text[e["start"]:e["end"]]}"', file=sys.stderr)
176
+ print(file=sys.stderr)
177
+
178
+ result = anonymise(text, detected)
179
+
180
+ if args.output:
181
+ with open(args.output, "w") as f:
182
+ f.write(result)
183
+ else:
184
+ print(result)
185
+
186
+
187
+ if __name__ == "__main__":
188
+ main()