Spaces:
Paused
Paused
| from typing import List, Optional | |
| from presidio_analyzer import AnalyzerEngine, Pattern, PatternRecognizer | |
| from presidio_analyzer.nlp_engine import NlpEngineProvider | |
| from presidio_anonymizer import AnonymizerEngine | |
| from presidio_anonymizer.entities import OperatorConfig | |
| def create_ssn_pattern_recognizer(): | |
| # matches 111-111-111, 111 111 111, and 111111111 | |
| ssn_pattern = Pattern( | |
| name="ssn_pattern", regex=r"\b\d{3}[- ]?\d{3}[- ]?\d{3}\b", score=0.8 | |
| ) | |
| return PatternRecognizer(supported_entity="SSN", patterns=[ssn_pattern]) | |
| def create_zip_code_pattern_recognizer(): | |
| zip_code_pattern = Pattern( | |
| name="zip_code_pattern", | |
| regex=r"\b[A-Z]\d[A-Z]\s?\d[A-Z]\d\b", # Matches A1A 1A1 and A1A1A1 | |
| score=0.8, | |
| ) | |
| return PatternRecognizer(supported_entity="ZIP_CODE", patterns=[zip_code_pattern]) | |
| def create_street_pattern_recognizer(): | |
| bilingual_street_regex = ( | |
| r"\d+\s+(?:rue|boul|boulevard|av|avenue|place|square|st|street|rd|road|ave|blvd|lane|dr|drive)" | |
| r"\s+[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+" | |
| r"(?:\s+[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+)*" | |
| r"|(?:\d+\s+)?[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+(?:\s+[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+)*" | |
| r"\s+(?:rue|boul|boulevard|av|avenue|place|square|st|street|rd|road|ave|blvd|lane|dr|drive)\b" | |
| ) | |
| street_pattern = Pattern( | |
| name="street_pattern", regex=bilingual_street_regex, score=0.8 | |
| ) | |
| return PatternRecognizer( | |
| supported_entity="STREET_ADDRESS", patterns=[street_pattern] | |
| ) | |
| class PromptSanitizer: | |
| _instance: Optional["PromptSanitizer"] = None | |
| analyzer: AnalyzerEngine | |
| anonymizer: AnonymizerEngine | |
| operators: dict | |
| target_entities: List[str] | |
| def __new__(cls): | |
| if cls._instance is None: | |
| print("Initializing Presidio Engines (this should happen only once)...") | |
| cls._instance = super(PromptSanitizer, cls).__new__(cls) | |
| # Define which models to use for which language | |
| configuration = { | |
| "nlp_engine_name": "spacy", | |
| "models": [ | |
| {"lang_code": "en", "model_name": "en_core_web_lg"}, | |
| {"lang_code": "fr", "model_name": "fr_core_news_lg"}, | |
| ], | |
| } | |
| provider = NlpEngineProvider(nlp_configuration=configuration) | |
| nlp_engine = provider.create_engine() | |
| cls._instance.analyzer = AnalyzerEngine(nlp_engine=nlp_engine) | |
| ssn_pattern_recognizer = create_ssn_pattern_recognizer() | |
| zip_code_pattern_recognizer = create_zip_code_pattern_recognizer() | |
| street_pattern_recognizer = create_street_pattern_recognizer() | |
| cls._instance.analyzer.registry.add_recognizer(ssn_pattern_recognizer) | |
| cls._instance.analyzer.registry.add_recognizer(zip_code_pattern_recognizer) | |
| cls._instance.analyzer.registry.add_recognizer(street_pattern_recognizer) | |
| cls._instance.anonymizer = AnonymizerEngine() | |
| # Define standard masking rules | |
| cls._instance.operators = { | |
| "PERSON": OperatorConfig("replace", {"new_value": "[NAME]"}), | |
| "EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[EMAIL]"}), | |
| "PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}), | |
| "SSN": OperatorConfig("replace", {"new_value": "[SSN]"}), | |
| "CREDIT_CARD": OperatorConfig( | |
| "replace", {"new_value": "[CREDIT_CARD]"} | |
| ), | |
| "LOCATION": OperatorConfig("replace", {"new_value": "[LOCATION]"}), | |
| "STREET_ADDRESS": OperatorConfig( | |
| "replace", {"new_value": "[LOCATION]"} | |
| ), | |
| "ZIP_CODE": OperatorConfig("replace", {"new_value": "[LOCATION]"}), | |
| } | |
| cls._instance.target_entities = list(cls._instance.operators.keys()) | |
| return cls._instance | |
| def sanitize(self, text: str) -> str: | |
| """Analyzes and redacts PII from the given text.""" | |
| if not text: | |
| return text | |
| lang = "en" | |
| # 2. Detect PII | |
| results = self.analyzer.analyze( | |
| text=text, entities=self.target_entities, language=lang | |
| ) | |
| # 3. Redact PII | |
| anonymized_result = self.anonymizer.anonymize( | |
| text=text, | |
| analyzer_results=results, # pyright: ignore[reportArgumentType] | |
| operators=self.operators, | |
| ) | |
| return anonymized_result.text | |