| import re
|
| import json
|
| from unidecode import unidecode
|
| import pandas as pd
|
| from presidio_anonymizer.entities import OperatorConfig
|
| from presidio_analyzer import AnalyzerEngine, PatternRecognizer
|
| from presidio_analyzer.nlp_engine import NlpEngineProvider
|
| from presidio_anonymizer import AnonymizerEngine
|
| import streamlit as st
|
| from .web_utilities import st_cache_data_if, st_cache_resource_if, supported_cache
|
|
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=5, ttl=3600)
|
| def anonymize_analyzer(MarianText_letter, _analyzer, proper_noun, Last_name, First_name):
|
| MarianText_anonymize_letter = MarianText_letter
|
|
|
| analyzer_results_keep = []
|
| analyzer_results_return = []
|
| analyzer_results_saved = []
|
| analyzer_results = _analyzer.analyze(
|
| text=MarianText_letter,
|
| language="en",
|
| entities=["DATE_TIME", "PERSON", "FRENCH_CITY"],
|
| allow_list=[
|
| "evening",
|
| "day",
|
| "the day",
|
| "the age of",
|
| "age",
|
| "years",
|
| "week",
|
| "years old",
|
| "months",
|
| "hours",
|
| "night",
|
| "noon",
|
| "nights",
|
| "tomorrow",
|
| "today",
|
| "yesterday",
|
| ],
|
| )
|
| len_to_add = 0
|
| analyser_results_to_sort = {}
|
| i = 0
|
| detect_duplicated = []
|
| for element in analyzer_results:
|
| if element.start not in detect_duplicated:
|
| analyser_results_to_sort[i] = element.start
|
| detect_duplicated.append(element.start)
|
| else:
|
| pass
|
| i = i + 1
|
| sorted_tuples = sorted(analyser_results_to_sort.items(), key=lambda x: x[1])
|
| sorted_dict = {k: v for k, v in sorted_tuples}
|
| print(sorted_dict)
|
| exception_list_presidio = ["age", "year", "month", "day", "hour", "week"]
|
|
|
| for element_raw in sorted_dict:
|
| element = analyzer_results[element_raw]
|
| word = MarianText_letter[element.start : element.end]
|
| exception_detected = [e for e in exception_list_presidio if e in word.lower()]
|
| if word.count("/") == 1 or word.count("/") > 2:
|
| exception_detected.append("/ or ///")
|
| if len(exception_detected) == 0:
|
| if word.lower().strip() in proper_noun:
|
| word_to_replace = (
|
| "**:green[" + word + "]** `[" + element.entity_type + "]`"
|
| )
|
| MarianText_anonymize_letter = (
|
| MarianText_anonymize_letter[: element.start + len_to_add]
|
| + word_to_replace
|
| + MarianText_anonymize_letter[element.end + len_to_add :]
|
| )
|
| analyzer_results_saved.append(
|
| {
|
| "name": Last_name,
|
| "surname": First_name,
|
| "type": "deidentification",
|
| "value": word,
|
| "correction": element.entity_type,
|
| "lf_detected": False,
|
| "manual_validation": False,
|
| }
|
| )
|
|
|
| else:
|
| word_to_replace = (
|
| "**:red[" + word + "]** `[" + element.entity_type + "]`"
|
| )
|
| MarianText_anonymize_letter = (
|
| MarianText_anonymize_letter[: element.start + len_to_add]
|
| + word_to_replace
|
| + MarianText_anonymize_letter[element.end + len_to_add :]
|
| )
|
| analyzer_results_keep.append(
|
| {
|
| "name": Last_name,
|
| "surname": First_name,
|
| "type": "deidentification",
|
| "value": word,
|
| "correction": element.entity_type,
|
| "lf_detected": True,
|
| "manual_validation": True,
|
| }
|
| )
|
|
|
| analyzer_results_return.append(element)
|
| len_to_add = len_to_add + len(word_to_replace) - len(word)
|
| else:
|
| analyzer_results_saved.append(
|
| {
|
| "name": Last_name,
|
| "surname": First_name,
|
| "type": "deidentification",
|
| "value": word,
|
| "correction": element.entity_type,
|
| "lf_detected": False,
|
| "manual_validation": False,
|
| }
|
| )
|
|
|
| del analyzer_results
|
| del len_to_add
|
| del exception_list_presidio
|
| del analyser_results_to_sort
|
| del sorted_tuples
|
| del sorted_dict
|
|
|
| return (
|
| MarianText_anonymize_letter,
|
| analyzer_results_return,
|
| analyzer_results_keep,
|
| analyzer_results_saved,
|
| )
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=5, ttl=3600)
|
| def anonymize_engine(MarianText_letter, _analyzer_results_return, _engine, _nlp):
|
| result = _engine.anonymize(
|
| text=MarianText_letter,
|
| analyzer_results=_analyzer_results_return,
|
| operators={
|
| "PERSON": OperatorConfig("replace", {"new_value": ""}),
|
| "LOCATION": OperatorConfig("replace", {"new_value": ""}),
|
| "FRENCH_CITY": OperatorConfig("replace", {"new_value": ""}),
|
| },
|
| )
|
| return reformat_to_report(result.text, _nlp)
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=10, ttl=3600)
|
| def add_space_to_comma(texte, _nlp):
|
| text_list = []
|
| regex = "(?<!\d)(\,)(?!\d)(?!.*\1)"
|
| for sentence in _nlp.process(texte).sentences:
|
| text_space = re.sub(regex, " , ", sentence.text.replace("\n", " "))
|
| text_space_no_db = text_space.replace(" ", " ")
|
| text_list.append(text_space_no_db)
|
|
|
| return " ".join(text_list)
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=10, ttl=3600)
|
| def add_space_to_endpoint(texte, _nlp):
|
| text_list = []
|
| regex = "(?<!\d)(\.)(?!\d)(?!.*\1)"
|
| for sentence in _nlp.process(texte).sentences:
|
| text_space = re.sub(regex, " . ", sentence.text.replace("\n", " "))
|
| text_space_no_db = text_space.replace(" ", " ")
|
| text_list.append(text_space_no_db)
|
|
|
| return " ".join(text_list)
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=10, ttl=3600)
|
| def add_space_to_leftp(texte, _nlp):
|
| text_list = []
|
| regex = "(?<!\d)(\()(?!\d)(?!.*\1)"
|
| for sentence in _nlp.process(texte).sentences:
|
| text_space = re.sub(regex, " ( ", sentence.text.replace("\n", " "))
|
| text_space_no_db = text_space.replace(" ", " ")
|
| text_list.append(text_space_no_db)
|
|
|
| return " ".join(text_list)
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=10, ttl=3600)
|
| def add_space_to_rightp(texte, _nlp):
|
| text_list = []
|
| regex = "(?<!\d)(\))(?!\d)(?!.*\1)"
|
| for sentence in _nlp.process(texte).sentences:
|
| text_space = re.sub(regex, " ) ", sentence.text.replace("\n", " "))
|
| text_space_no_db = text_space.replace(" ", " ")
|
| text_list.append(text_space_no_db)
|
|
|
| return " ".join(text_list)
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=10, ttl=3600)
|
| def add_space_to_stroph(texte, _nlp):
|
| text_list = []
|
| regex = "(?<!\d)(')(?!\d)(?!.*\1)"
|
| for sentence in _nlp.process(texte).sentences:
|
| text_space = re.sub(regex, " ' ", sentence.text.replace("\n", " "))
|
| text_space_no_db = text_space.replace(" ", " ")
|
| text_list.append(text_space_no_db)
|
|
|
| return " ".join(text_list)
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=10, ttl=3600)
|
| def add_space_to_comma_endpoint(texte, _nlp):
|
| text_fr_comma = add_space_to_comma(texte, _nlp)
|
| text_fr_comma_endpoint = add_space_to_endpoint(text_fr_comma, _nlp)
|
| text_fr_comma_endpoint_leftpc = add_space_to_leftp(text_fr_comma_endpoint, _nlp)
|
| text_fr_comma_endpoint_leftpc_right_pc = add_space_to_rightp(
|
| text_fr_comma_endpoint_leftpc, _nlp
|
| )
|
| del text_fr_comma
|
| del text_fr_comma_endpoint
|
| del text_fr_comma_endpoint_leftpc
|
| return text_fr_comma_endpoint_leftpc_right_pc
|
|
|
|
|
| @st_cache_resource_if(supported_cache, max_entries=5, ttl=3600)
|
| def get_abbreviation_dict_correction():
|
|
|
| with open("data/fr_abbreviations.json", "r") as outfile:
|
| hpo_abbreviations = json.load(outfile)
|
| return hpo_abbreviations
|
|
|
|
|
|
|
| @st_cache_data_if(supported_cache, max_entries=10, ttl=3600)
|
| def reformat_to_report(text, _nlp):
|
| cutsentence = []
|
| for sentence in _nlp.process(text).sentences:
|
| cutsentence.append(
|
| sentence.text.replace(" ,", ",")
|
| .replace(" .", ".")
|
| .replace(" )", ")")
|
| .replace(" (", "(")
|
| .replace(" '", "'")
|
| )
|
| return " \n".join(cutsentence)
|
|
|
|
|
| @st_cache_resource_if(supported_cache, max_entries=5, ttl=3600)
|
| def get_cities_list():
|
| cities = pd.read_csv("data/proper_noun_location_sort.csv")
|
| cities.columns = ["ville"]
|
| whole_cities_patterns = []
|
| list_cities = cities["ville"].to_list()
|
| for element in list_cities:
|
| whole_cities_patterns.append(element)
|
| whole_cities_patterns.append(element.lower().capitalize())
|
| whole_cities_patterns.append(element.upper())
|
| whole_cities_patterns.append(unidecode(element))
|
| whole_cities_patterns.append(unidecode(element).lower().capitalize())
|
| whole_cities_patterns.append(unidecode(element).upper())
|
| del cities
|
| del list_cities
|
| return whole_cities_patterns
|
|
|
|
|
| @st_cache_resource_if(supported_cache, max_entries=5, ttl=3600)
|
| def get_list_not_deidentify():
|
| proper_noun_data = pd.read_csv(
|
| "data/exception_list_anonymization.tsv", sep="\t", header=None
|
| ).astype(str)
|
|
|
| drug_data = pd.read_csv("data/drug_name.tsv", sep="\t", header=None).astype(str)
|
|
|
| gene_data = pd.read_csv("data/gene_name.tsv", sep="\t", header=None).astype(str)
|
|
|
| proper_noun_list = (
|
| proper_noun_data[0].to_list()
|
| + drug_data[0].to_list()
|
| + gene_data[0].to_list()
|
| + [
|
| "PN",
|
| "TN",
|
| "SD",
|
| "PCN",
|
| "cher",
|
| "chere",
|
| "CAS",
|
| "INDEX",
|
| "APGAR",
|
| "M",
|
| "Ms",
|
| "Mr",
|
| "Behçet",
|
| "hypoacousia",
|
| ]
|
| )
|
| proper_noun = [x.lower() for x in proper_noun_list]
|
|
|
| del proper_noun_data
|
| del drug_data
|
| del gene_data
|
| del proper_noun_list
|
| return proper_noun
|
|
|
|
|
|
|
| @st_cache_resource_if(supported_cache, max_entries=5, ttl=3600)
|
| def change_name_patient_abbreviations(Report, Last_name, First_name, abbreviations_dict):
|
| Report_name = Report
|
|
|
| dict_correction_name_abbreviations = {
|
| "M.": "M",
|
| "Mme.": "Mme",
|
| "Mlle.": "Mlle",
|
| "Dr.": "Docteur",
|
| "Dr": "Docteur",
|
| "Pr.": "Professeur",
|
| "Pr": "Professeur",
|
| }
|
|
|
| for firstname in First_name.split():
|
| dict_correction_name_abbreviations[firstname] = "CAS"
|
| for lastname in Last_name.split():
|
| dict_correction_name_abbreviations[lastname] = "INDEX"
|
| for key, value in abbreviations_dict.items():
|
| dict_correction_name_abbreviations[key] = value
|
|
|
| list_replaced = []
|
| splitted_Report = Report_name.replace("\n", " ").split(" ")
|
| replaced_Report = []
|
| for i in splitted_Report:
|
| append_word = i
|
| replace_word = None
|
| for key, value in dict_correction_name_abbreviations.items():
|
| i_check = i.lower().strip().replace(",", "").replace(".", "")
|
| if i_check == key.lower().strip():
|
| to_replace = i.strip().replace(",", "").replace(".", "")
|
| replace_word = value
|
| if i_check == Last_name or i_check == First_name:
|
| list_replaced.append(
|
| {
|
| "name": Last_name,
|
| "surname": First_name,
|
| "type": "index_case",
|
| "value": i.strip().replace(",", "").replace(".", ""),
|
| "correction": value,
|
| "lf_detected": True,
|
| "manual_validation": True,
|
| }
|
| )
|
| else:
|
| list_replaced.append(
|
| {
|
| "name": Last_name,
|
| "surname": First_name,
|
| "type": "abbreviations",
|
| "value": i.strip().replace(",", "").replace(".", ""),
|
| "correction": value,
|
| "lf_detected": True,
|
| "manual_validation": True,
|
| }
|
| )
|
| if replace_word:
|
| append_word = append_word.replace(to_replace, replace_word)
|
| replaced_Report.append(append_word)
|
| del dict_correction_name_abbreviations
|
| del splitted_Report
|
| return " ".join(replaced_Report), list_replaced
|
|
|
|
|
| @st_cache_resource_if(supported_cache, max_entries=5, ttl=3600)
|
| def config_deidentify(cities_list):
|
| configuration = {
|
| "nlp_engine_name": "spacy",
|
| "models": [{"lang_code": "en", "model_name": "en_core_web_lg"}],
|
| }
|
|
|
|
|
| provider = NlpEngineProvider(nlp_configuration=configuration)
|
| nlp_engine = provider.create_engine()
|
| frcity_recognizer = PatternRecognizer(
|
| supported_entity="FRENCH_CITY", deny_list=cities_list
|
| )
|
|
|
| analyzer = AnalyzerEngine(nlp_engine=nlp_engine, supported_languages=["en"])
|
| analyzer.registry.add_recognizer(frcity_recognizer)
|
| engine = AnonymizerEngine()
|
| del configuration
|
| del provider
|
| del nlp_engine
|
| del frcity_recognizer
|
| return analyzer, engine |