Spaces:
Paused
Paused
| import logging | |
| import re | |
| from typing import Optional | |
| from gliner import GLiNER | |
| logger = logging.getLogger("uvicorn") | |
| LABELS = [ | |
| "email", | |
| "date_of_birth", | |
| "last_name", | |
| "street_address", | |
| ] | |
| LABELS_PLACEHOLDERS = { | |
| "email": "an email", | |
| "phone_number": "a phone number", | |
| "date_of_birth": "a date of birth", | |
| "last_name": "a last name", | |
| "street_address": "a location", | |
| "ssn": "a ssn", | |
| } | |
| RE_SSN = r"\b\d{3}[- ]?\d{3}[- ]?\d{3}\b" | |
| RE_ZIP = ( | |
| r"\b[ABCEGHJKLMNPRSTVXY]\d[ABCEGHJKLMNPRSTVWXYZ][ ]?\d[ABCEGHJKLMNPRSTVWXYZ]\d\b" | |
| ) | |
| RE_PHONE = r"(?:\+?\d{1,3}[-\s.]?)?\(?\d{3}\)?[-\s.]?\d{3}[-\s.]?\d{4}" | |
| def clean_backslashes(txt: str) -> str: | |
| """Cleans backslashes from a string. | |
| For example, passing the string "It\'s not for everyone" will return "It's not for everyone". | |
| Backslashes next to names or locations confuse the PII filter. | |
| Args: | |
| txt (str): String to clean | |
| Returns: | |
| str: Cleaned string | |
| """ | |
| return txt.replace("\\'", "'") | |
| def chunk_text(text: str, max_chars: int = 1000) -> list[tuple[str, int]]: | |
| """ | |
| The text is sometimes too large for the model. We chunk it here so we can pass | |
| each chunk to the model one by one. | |
| """ | |
| chunks = [] | |
| start = 0 | |
| text_len = len(text) | |
| while start < text_len: | |
| # On prend un bloc (environ 1000 caractères ~ 250-300 tokens) | |
| end = start + max_chars | |
| # Pour éviter de couper un mot au milieu, on recule jusqu'au dernier espace | |
| if end < text_len: | |
| end = text.rfind(" ", start, end) | |
| if end <= start: # Si aucun espace n'est trouvé | |
| end = start + max_chars | |
| chunks.append((text[start:end], start)) | |
| # On avance le curseur (on peut ajouter un overlap ici si nécessaire) | |
| start = end | |
| return chunks | |
| class PIIFilter: | |
| _instance: Optional["PIIFilter"] = None | |
| model: None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| logger.info("Loading the PII filter into memory...") | |
| cls._instance = super(PIIFilter, cls).__new__(cls) | |
| # TODO: manual SSN detection | |
| cls._instance.model = GLiNER.from_pretrained("nvidia/gliner-PII") | |
| return cls._instance | |
| def sanitize(self, text: str) -> str: | |
| if not text: | |
| return text | |
| text = clean_backslashes(text) | |
| all_entities = [] | |
| # 1. Chunking pour GLiNER (max_chars=1000 pour rester sous les 384 tokens) | |
| chunks = chunk_text(text, max_chars=1000) | |
| for chunk, offset in chunks: | |
| chunk_entities = self.model.predict_entities(chunk, LABELS, threshold=0.6) | |
| for ent in chunk_entities: | |
| all_entities.append( | |
| { | |
| "start": ent["start"] + offset, | |
| "end": ent["end"] + offset, | |
| "label": ent["label"], | |
| } | |
| ) | |
| # 2. Ajout des détections par Regex | |
| regex_rules = [ | |
| (RE_SSN, "ssn"), | |
| (RE_ZIP, "street_address"), | |
| (RE_PHONE, "phone_number"), | |
| ] | |
| for pattern, label in regex_rules: | |
| for match in re.finditer(pattern, text): | |
| all_entities.append( | |
| {"start": match.start(), "end": match.end(), "label": label} | |
| ) | |
| # 3. Gestion des chevauchements (Overlaps) | |
| # Si deux entités se chevauchent, on garde la plus large. | |
| all_entities.sort(key=lambda x: x["start"]) | |
| merged_entities = [] | |
| if all_entities: | |
| current = all_entities[0] | |
| for next_ent in all_entities[1:]: | |
| if next_ent["start"] < current["end"]: | |
| # Chevauchement trouvé, on prend l'enveloppe maximale | |
| current["end"] = max(current["end"], next_ent["end"]) | |
| # On peut aussi décider ici quel label prioriser | |
| else: | |
| merged_entities.append(current) | |
| current = next_ent | |
| merged_entities.append(current) | |
| # 4. Remplacement (en partant de la fin pour garder les index valides) | |
| redacted_text = text | |
| for entity in sorted(merged_entities, key=lambda x: x["start"], reverse=True): | |
| placeholder = LABELS_PLACEHOLDERS[entity["label"]] | |
| redacted_text = ( | |
| redacted_text[: entity["start"]] | |
| + placeholder | |
| + redacted_text[entity["end"] :] | |
| ) | |
| return redacted_text | |