Spaces:
Sleeping
Sleeping
File size: 5,744 Bytes
aa29d50 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
"""
Helper methods for the Presidio Streamlit app
"""
from typing import List, Optional, Tuple
import logging
import streamlit as st
from presidio_analyzer import (
AnalyzerEngine,
RecognizerResult,
RecognizerRegistry,
PatternRecognizer,
Pattern,
)
from presidio_analyzer.nlp_engine import NlpEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
logger = logging.getLogger("presidio-streamlit")
@st.cache_resource
def nlp_engine_and_registry(
model_family: str,
model_path: str,
) -> Tuple[NlpEngine, RecognizerRegistry]:
"""Create the NLP Engine instance based on the requested model."""
registry = RecognizerRegistry()
if model_family.lower() == "spacy":
from spacy.language import Language
import spacy
try:
nlp = spacy.load(model_path)
registry.load_predefined_recognizers()
registry.add_recognizer_from_dict({
"name": "spacy_recognizer",
"supported_language": "en",
"supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME", "NRP"],
"model": model_path,
"package": "spacy",
})
return nlp, registry
except Exception as e:
logger.error(f"Failed to load spaCy model {model_path}: {str(e)}")
raise
elif model_family.lower() == "flair":
from flair.models import SequenceTagger
from flair.data import Sentence
try:
tagger = SequenceTagger.load(model_path)
registry.load_predefined_recognizers()
registry.add_recognizer_from_dict({
"name": "flair_recognizer",
"supported_language": "en",
"supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"],
"model": model_path,
"package": "flair",
})
return tagger, registry
except Exception as e:
logger.error(f"Failed to load Flair model {model_path}: {str(e)}")
raise
elif model_family.lower() == "huggingface":
from transformers import pipeline
try:
nlp = pipeline("ner", model=model_path, tokenizer=model_path)
registry.load_predefined_recognizers()
registry.add_recognizer_from_dict({
"name": "huggingface_recognizer",
"supported_language": "en",
"supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"],
"model": model_path,
"package": "transformers",
})
return nlp, registry
except Exception as e:
logger.error(f"Failed to load HuggingFace model {model_path}: {str(e)}")
raise
else:
raise ValueError(f"Model family {model_family} not supported")
@st.cache_resource
def analyzer_engine(
model_family: str,
model_path: str,
) -> AnalyzerEngine:
"""Create the Analyzer Engine instance based on the requested model."""
nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
analyzer = AnalyzerEngine(registry=registry)
return analyzer
@st.cache_data
def get_supported_entities(model_family: str, model_path: str) -> List[str]:
"""Return supported entities for the selected model."""
if model_family.lower() == "spacy":
return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME", "NRP"]
elif model_family.lower() == "huggingface":
return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"]
elif model_family.lower() == "flair":
return ["PERSON", "LOCATION", "ORGANIZATION"]
return ["PERSON", "LOCATION", "ORGANIZATION"]
def analyze(
analyzer: AnalyzerEngine,
text: str,
entities: List[str],
language: str,
score_threshold: float,
return_decision_process: bool,
allow_list: List[str],
deny_list: List[str],
) -> List[RecognizerResult]:
"""Analyze text for PHI entities."""
results = analyzer.analyze(
text=text,
entities=entities,
language=language,
score_threshold=score_threshold,
return_decision_process=return_decision_process,
)
# Apply allow and deny lists
filtered_results = []
for result in results:
text_snippet = text[result.start:result.end].lower()
if any(word.lower() in text_snippet for word in allow_list):
continue
if any(word.lower() in text_snippet for word in deny_list):
filtered_results.append(result)
elif not deny_list:
filtered_results.append(result)
return filtered_results
def anonymize(
text: str,
operator: str,
analyze_results: List[RecognizerResult],
mask_char: str = "*",
number_of_chars: int = 15,
) -> dict:
"""Anonymize detected PHI entities in the text."""
anonymizer = AnonymizerEngine()
operator_config = {
"DEFAULT": OperatorConfig(operator, {})
}
if operator == "mask":
operator_config["DEFAULT"] = OperatorConfig(operator, {
"masking_char": mask_char,
"chars_to_mask": number_of_chars,
})
return anonymizer.anonymize(
text=text,
analyzer_results=analyze_results,
operators=operator_config,
)
def create_ad_hoc_deny_list_recognizer(
deny_list: Optional[List[str]] = None,
) -> Optional[PatternRecognizer]:
"""Create a recognizer for deny list items."""
if not deny_list:
return None
deny_list_recognizer = PatternRecognizer(
supported_entity="GENERIC_PII", deny_list=deny_list
)
return deny_list_recognizer |