champ-chatbot / classes /prompt_sanitizer.py
qyle's picture
many fixes
720b664 verified
from typing import List, Optional
from presidio_analyzer import AnalyzerEngine, Pattern, PatternRecognizer
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
def create_ssn_pattern_recognizer():
# matches 111-111-111, 111 111 111, and 111111111
ssn_pattern = Pattern(
name="ssn_pattern", regex=r"\b\d{3}[- ]?\d{3}[- ]?\d{3}\b", score=0.8
)
return PatternRecognizer(supported_entity="SSN", patterns=[ssn_pattern])
def create_zip_code_pattern_recognizer():
zip_code_pattern = Pattern(
name="zip_code_pattern",
regex=r"\b[A-Z]\d[A-Z]\s?\d[A-Z]\d\b", # Matches A1A 1A1 and A1A1A1
score=0.8,
)
return PatternRecognizer(supported_entity="ZIP_CODE", patterns=[zip_code_pattern])
def create_street_pattern_recognizer():
bilingual_street_regex = (
r"\d+\s+(?:rue|boul|boulevard|av|avenue|place|square|st|street|rd|road|ave|blvd|lane|dr|drive)"
r"\s+[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+"
r"(?:\s+[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+)*"
r"|(?:\d+\s+)?[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+(?:\s+[A-ZÁÀÂÄÇÉÈÊËÍÎÏÓÔÖÚÛÜa-z]+)*"
r"\s+(?:rue|boul|boulevard|av|avenue|place|square|st|street|rd|road|ave|blvd|lane|dr|drive)\b"
)
street_pattern = Pattern(
name="street_pattern", regex=bilingual_street_regex, score=0.8
)
return PatternRecognizer(
supported_entity="STREET_ADDRESS", patterns=[street_pattern]
)
class PromptSanitizer:
_instance: Optional["PromptSanitizer"] = None
analyzer: AnalyzerEngine
anonymizer: AnonymizerEngine
operators: dict
target_entities: List[str]
def __new__(cls):
if cls._instance is None:
print("Initializing Presidio Engines (this should happen only once)...")
cls._instance = super(PromptSanitizer, cls).__new__(cls)
# Define which models to use for which language
configuration = {
"nlp_engine_name": "spacy",
"models": [
{"lang_code": "en", "model_name": "en_core_web_lg"},
{"lang_code": "fr", "model_name": "fr_core_news_lg"},
],
}
provider = NlpEngineProvider(nlp_configuration=configuration)
nlp_engine = provider.create_engine()
cls._instance.analyzer = AnalyzerEngine(nlp_engine=nlp_engine)
ssn_pattern_recognizer = create_ssn_pattern_recognizer()
zip_code_pattern_recognizer = create_zip_code_pattern_recognizer()
street_pattern_recognizer = create_street_pattern_recognizer()
cls._instance.analyzer.registry.add_recognizer(ssn_pattern_recognizer)
cls._instance.analyzer.registry.add_recognizer(zip_code_pattern_recognizer)
cls._instance.analyzer.registry.add_recognizer(street_pattern_recognizer)
cls._instance.anonymizer = AnonymizerEngine()
# Define standard masking rules
cls._instance.operators = {
"PERSON": OperatorConfig("replace", {"new_value": "[NAME]"}),
"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": "[EMAIL]"}),
"PHONE_NUMBER": OperatorConfig("replace", {"new_value": "[PHONE]"}),
"SSN": OperatorConfig("replace", {"new_value": "[SSN]"}),
"CREDIT_CARD": OperatorConfig(
"replace", {"new_value": "[CREDIT_CARD]"}
),
"LOCATION": OperatorConfig("replace", {"new_value": "[LOCATION]"}),
"STREET_ADDRESS": OperatorConfig(
"replace", {"new_value": "[LOCATION]"}
),
"ZIP_CODE": OperatorConfig("replace", {"new_value": "[LOCATION]"}),
}
cls._instance.target_entities = list(cls._instance.operators.keys())
return cls._instance
def sanitize(self, text: str) -> str:
"""Analyzes and redacts PII from the given text."""
if not text:
return text
lang = "en"
# 2. Detect PII
results = self.analyzer.analyze(
text=text, entities=self.target_entities, language=lang
)
# 3. Redact PII
anonymized_result = self.anonymizer.anonymize(
text=text,
analyzer_results=results, # pyright: ignore[reportArgumentType]
operators=self.operators,
)
return anonymized_result.text