Spaces:
Sleeping
Sleeping
| """ | |
| Helper methods for the Presidio Streamlit app | |
| """ | |
| from typing import List, Optional | |
| import spacy | |
| import streamlit as st | |
| from presidio_analyzer import AnalyzerEngine, RecognizerResult, RecognizerRegistry | |
| from presidio_analyzer.nlp_engine import NlpEngineProvider | |
| from presidio_anonymizer import AnonymizerEngine | |
| from presidio_anonymizer.entities import OperatorConfig | |
| from flair_recognizer import FlairRecognizer | |
| from openai_fake_data_generator import ( | |
| set_openai_key, | |
| call_completion_model, | |
| create_prompt, | |
| ) | |
| from transformers_rec import ( | |
| STANFORD_COFIGURATION, | |
| TransformersRecognizer, | |
| BERT_DEID_CONFIGURATION, | |
| ) | |
| def analyzer_engine(model_path: str): | |
| """Return AnalyzerEngine. | |
| :param model_path: Which model to use for NER: | |
| "StanfordAIMI/stanford-deidentifier-base", | |
| "obi/deid_roberta_i2b2", | |
| "en_core_web_lg" | |
| """ | |
| registry = RecognizerRegistry() | |
| registry.load_predefined_recognizers() | |
| # Set up NLP Engine according to the model of choice | |
| if model_path == "en_core_web_lg": | |
| if not spacy.util.is_package("en_core_web_lg"): | |
| spacy.cli.download("en_core_web_lg") | |
| nlp_configuration = { | |
| "nlp_engine_name": "spacy", | |
| "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}], | |
| } | |
| elif model_path == "flair/ner-english-large": | |
| flair_recognizer = FlairRecognizer() | |
| nlp_configuration = { | |
| "nlp_engine_name": "spacy", | |
| "models": [{"lang_code": "en", "model_name": "en_core_web_sm"}], | |
| } | |
| registry.add_recognizer(flair_recognizer) | |
| registry.remove_recognizer("SpacyRecognizer") | |
| else: | |
| if not spacy.util.is_package("en_core_web_sm"): | |
| spacy.cli.download("en_core_web_sm") | |
| # Using a small spaCy model + a HF NER model | |
| transformers_recognizer = TransformersRecognizer(model_path=model_path) | |
| registry.remove_recognizer("SpacyRecognizer") | |
| if model_path == "StanfordAIMI/stanford-deidentifier-base": | |
| transformers_recognizer.load_transformer(**STANFORD_COFIGURATION) | |
| elif model_path == "obi/deid_roberta_i2b2": | |
| transformers_recognizer.load_transformer(**BERT_DEID_CONFIGURATION) | |
| # Use small spaCy model, no need for both spacy and HF models | |
| # The transformers model is used here as a recognizer, not as an NlpEngine | |
| nlp_configuration = { | |
| "nlp_engine_name": "spacy", | |
| "models": [{"lang_code": "en", "model_name": "en_core_web_sm"}], | |
| } | |
| registry.add_recognizer(transformers_recognizer) | |
| nlp_engine = NlpEngineProvider(nlp_configuration=nlp_configuration).create_engine() | |
| analyzer = AnalyzerEngine(nlp_engine=nlp_engine, registry=registry) | |
| return analyzer | |
| def anonymizer_engine(): | |
| """Return AnonymizerEngine.""" | |
| return AnonymizerEngine() | |
| def get_supported_entities(st_model: str): | |
| """Return supported entities from the Analyzer Engine.""" | |
| return analyzer_engine(st_model).get_supported_entities() | |
| def analyze(st_model: str, **kwargs): | |
| """Analyze input using Analyzer engine and input arguments (kwargs).""" | |
| if "entities" not in kwargs or "All" in kwargs["entities"]: | |
| kwargs["entities"] = None | |
| return analyzer_engine(st_model).analyze(**kwargs) | |
| def anonymize( | |
| text: str, | |
| operator: str, | |
| analyze_results: List[RecognizerResult], | |
| mask_char: Optional[str] = None, | |
| number_of_chars: Optional[str] = None, | |
| encrypt_key: Optional[str] = None, | |
| ): | |
| """Anonymize identified input using Presidio Anonymizer. | |
| :param text: Full text | |
| :param operator: Operator name | |
| :param mask_char: Mask char (for mask operator) | |
| :param number_of_chars: Number of characters to mask (for mask operator) | |
| :param encrypt_key: Encryption key (for encrypt operator) | |
| :param analyze_results: list of results from presidio analyzer engine | |
| """ | |
| if operator == "mask": | |
| operator_config = { | |
| "type": "mask", | |
| "masking_char": mask_char, | |
| "chars_to_mask": number_of_chars, | |
| "from_end": False, | |
| } | |
| # Define operator config | |
| elif operator == "encrypt": | |
| operator_config = {"key": encrypt_key} | |
| elif operator == "highlight": | |
| operator_config = {"lambda": lambda x: x} | |
| else: | |
| operator_config = None | |
| # Change operator if needed as intermediate step | |
| if operator == "highlight": | |
| operator = "custom" | |
| elif operator == "synthesize": | |
| operator = "replace" | |
| else: | |
| operator = operator | |
| res = anonymizer_engine().anonymize( | |
| text, | |
| analyze_results, | |
| operators={"DEFAULT": OperatorConfig(operator, operator_config)}, | |
| ) | |
| return res | |
| def annotate(text: str, analyze_results: List[RecognizerResult]): | |
| """Highlight the identified PII entities on the original text | |
| :param text: Full text | |
| :param analyze_results: list of results from presidio analyzer engine | |
| """ | |
| tokens = [] | |
| # Use the anonymizer to resolve overlaps | |
| results = anonymize( | |
| text=text, | |
| operator="highlight", | |
| analyze_results=analyze_results, | |
| ) | |
| # sort by start index | |
| results = sorted(results.items, key=lambda x: x.start) | |
| for i, res in enumerate(results): | |
| if i == 0: | |
| tokens.append(text[: res.start]) | |
| # append entity text and entity type | |
| tokens.append((text[res.start : res.end], res.entity_type)) | |
| # if another entity coming i.e. we're not at the last results element, add text up to next entity | |
| if i != len(results) - 1: | |
| tokens.append(text[res.end : results[i + 1].start]) | |
| # if no more entities coming, add all remaining text | |
| else: | |
| tokens.append(text[res.end :]) | |
| return tokens | |
| def create_fake_data( | |
| text: str, | |
| analyze_results: List[RecognizerResult], | |
| openai_key: str, | |
| openai_model_name: str, | |
| ): | |
| """Creates a synthetic version of the text using OpenAI APIs""" | |
| if not openai_key: | |
| return "Please provide your OpenAI key" | |
| results = anonymize(text=text, operator="replace", analyze_results=analyze_results) | |
| set_openai_key(openai_key) | |
| prompt = create_prompt(results.text) | |
| fake = call_openai_api(prompt, openai_model_name) | |
| return fake | |
| def call_openai_api(prompt: str, openai_model_name: str) -> str: | |
| fake_data = call_completion_model(prompt, model=openai_model_name) | |
| return fake_data | |