File size: 5,744 Bytes
aa29d50
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
Helper methods for the Presidio Streamlit app
"""
from typing import List, Optional, Tuple
import logging
import streamlit as st
from presidio_analyzer import (
    AnalyzerEngine,
    RecognizerResult,
    RecognizerRegistry,
    PatternRecognizer,
    Pattern,
)
from presidio_analyzer.nlp_engine import NlpEngine
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

logger = logging.getLogger("presidio-streamlit")

@st.cache_resource
def nlp_engine_and_registry(
    model_family: str,
    model_path: str,
) -> Tuple[NlpEngine, RecognizerRegistry]:
    """Create the NLP Engine instance based on the requested model."""
    registry = RecognizerRegistry()
    
    if model_family.lower() == "spacy":
        from spacy.language import Language
        import spacy
        try:
            nlp = spacy.load(model_path)
            registry.load_predefined_recognizers()
            registry.add_recognizer_from_dict({
                "name": "spacy_recognizer",
                "supported_language": "en",
                "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME", "NRP"],
                "model": model_path,
                "package": "spacy",
            })
            return nlp, registry
        except Exception as e:
            logger.error(f"Failed to load spaCy model {model_path}: {str(e)}")
            raise
    elif model_family.lower() == "flair":
        from flair.models import SequenceTagger
        from flair.data import Sentence
        try:
            tagger = SequenceTagger.load(model_path)
            registry.load_predefined_recognizers()
            registry.add_recognizer_from_dict({
                "name": "flair_recognizer",
                "supported_language": "en",
                "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION"],
                "model": model_path,
                "package": "flair",
            })
            return tagger, registry
        except Exception as e:
            logger.error(f"Failed to load Flair model {model_path}: {str(e)}")
            raise
    elif model_family.lower() == "huggingface":
        from transformers import pipeline
        try:
            nlp = pipeline("ner", model=model_path, tokenizer=model_path)
            registry.load_predefined_recognizers()
            registry.add_recognizer_from_dict({
                "name": "huggingface_recognizer",
                "supported_language": "en",
                "supported_entities": ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"],
                "model": model_path,
                "package": "transformers",
            })
            return nlp, registry
        except Exception as e:
            logger.error(f"Failed to load HuggingFace model {model_path}: {str(e)}")
            raise
    else:
        raise ValueError(f"Model family {model_family} not supported")

@st.cache_resource
def analyzer_engine(
    model_family: str,
    model_path: str,
) -> AnalyzerEngine:
    """Create the Analyzer Engine instance based on the requested model."""
    nlp_engine, registry = nlp_engine_and_registry(model_family, model_path)
    analyzer = AnalyzerEngine(registry=registry)
    return analyzer

@st.cache_data
def get_supported_entities(model_family: str, model_path: str) -> List[str]:
    """Return supported entities for the selected model."""
    if model_family.lower() == "spacy":
        return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME", "NRP"]
    elif model_family.lower() == "huggingface":
        return ["PERSON", "LOCATION", "ORGANIZATION", "DATE_TIME"]
    elif model_family.lower() == "flair":
        return ["PERSON", "LOCATION", "ORGANIZATION"]
    return ["PERSON", "LOCATION", "ORGANIZATION"]

def analyze(
    analyzer: AnalyzerEngine,
    text: str,
    entities: List[str],
    language: str,
    score_threshold: float,
    return_decision_process: bool,
    allow_list: List[str],
    deny_list: List[str],
) -> List[RecognizerResult]:
    """Analyze text for PHI entities."""
    results = analyzer.analyze(
        text=text,
        entities=entities,
        language=language,
        score_threshold=score_threshold,
        return_decision_process=return_decision_process,
    )
    # Apply allow and deny lists
    filtered_results = []
    for result in results:
        text_snippet = text[result.start:result.end].lower()
        if any(word.lower() in text_snippet for word in allow_list):
            continue
        if any(word.lower() in text_snippet for word in deny_list):
            filtered_results.append(result)
        elif not deny_list:
            filtered_results.append(result)
    return filtered_results

def anonymize(
    text: str,
    operator: str,
    analyze_results: List[RecognizerResult],
    mask_char: str = "*",
    number_of_chars: int = 15,
) -> dict:
    """Anonymize detected PHI entities in the text."""
    anonymizer = AnonymizerEngine()
    operator_config = {
        "DEFAULT": OperatorConfig(operator, {})
    }
    if operator == "mask":
        operator_config["DEFAULT"] = OperatorConfig(operator, {
            "masking_char": mask_char,
            "chars_to_mask": number_of_chars,
        })
    return anonymizer.anonymize(
        text=text,
        analyzer_results=analyze_results,
        operators=operator_config,
    )

def create_ad_hoc_deny_list_recognizer(
    deny_list: Optional[List[str]] = None,
) -> Optional[PatternRecognizer]:
    """Create a recognizer for deny list items."""
    if not deny_list:
        return None
    deny_list_recognizer = PatternRecognizer(
        supported_entity="GENERIC_PII", deny_list=deny_list
    )
    return deny_list_recognizer