| | from spacy.cli import download |
| | import spacy |
| | from tools.presidio_analyzer_custom import analyze_dict |
| | |
| | from typing import List |
| | from unstructured.documents.elements import Element |
| |
|
| | spacy.prefer_gpu() |
| |
|
| | def spacy_model_installed(model_name): |
| | try: |
| | import en_core_web_lg |
| | en_core_web_lg.load() |
| | print("Successfully imported spaCy model") |
| | |
| | |
| | except: |
| | download(model_name) |
| | spacy.load(model_name) |
| | print("Successfully imported spaCy model") |
| | |
| |
|
| |
|
| | |
| | |
| | model_name = "en_core_web_lg" |
| | spacy_model_installed(model_name) |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | import re |
| | import secrets |
| | import base64 |
| | import time |
| |
|
| | import pandas as pd |
| |
|
| | from faker import Faker |
| |
|
| | from presidio_analyzer import AnalyzerEngine, BatchAnalyzerEngine, PatternRecognizer |
| | from presidio_anonymizer import AnonymizerEngine, BatchAnonymizerEngine |
| | from presidio_anonymizer.entities import OperatorConfig |
| |
|
| |
|
| |
|
| | def anon_consistent_names(df): |
| | |
| | df_dict = df.to_dict(orient="list") |
| |
|
| | analyzer = AnalyzerEngine() |
| | batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer) |
| |
|
| | analyzer_results = batch_analyzer.analyze_dict(df_dict, language="en") |
| | analyzer_results = list(analyzer_results) |
| |
|
| | |
| | text = analyzer_results[3].value |
| |
|
| | |
| | recognizer_result = str(analyzer_results[3].recognizer_results) |
| |
|
| | |
| | recognizer_result |
| |
|
| | |
| | data_str = recognizer_result |
| |
|
| | |
| | |
| | list_strs = data_str[1:-1].split('], [') |
| |
|
| | def parse_dict(s): |
| | s = s.strip('[]') |
| | items = s.split(', ') |
| | d = {} |
| | for item in items: |
| | key, value = item.split(': ') |
| | if key == 'score': |
| | d[key] = float(value) |
| | elif key in ['start', 'end']: |
| | d[key] = int(value) |
| | else: |
| | d[key] = value |
| | return d |
| |
|
| | |
| |
|
| | result = [] |
| |
|
| | for lst_str in list_strs: |
| | |
| | dict_strs = lst_str.split(', type: ') |
| | dict_strs = [dict_strs[0]] + ['type: ' + s for s in dict_strs[1:]] |
| | |
| | |
| | dicts = [parse_dict(d) for d in dict_strs] |
| | result.append(dicts) |
| |
|
| | |
| |
|
| | |
| | names = [] |
| |
|
| | for idx, paragraph in enumerate(text): |
| | paragraph_texts = [] |
| | for dictionary in result[idx]: |
| | if dictionary['type'] == 'PERSON': |
| | paragraph_texts.append(paragraph[dictionary['start']:dictionary['end']]) |
| | names.append(paragraph_texts) |
| |
|
| | |
| | |
| | unique_names = list(set(name for sublist in names for name in sublist)) |
| | |
| | |
| | fake_names = pd.Series(unique_names).apply(fake_first_name) |
| |
|
| | |
| | mapping_df = pd.DataFrame(data={"Unique names":unique_names, |
| | "Fake names": fake_names}) |
| |
|
| | |
| | |
| | |
| | name_map = {r'\b' + k + r'\b': v for k, v in zip(mapping_df['Unique names'], mapping_df['Fake names'])} |
| |
|
| | |
| | name_map |
| |
|
| | |
| | scrubbed_df_consistent_names = df.replace(name_map, regex = True) |
| |
|
| | |
| | scrubbed_df_consistent_names |
| |
|
| | return scrubbed_df_consistent_names |
| |
|
| | def detect_file_type(filename): |
| | """Detect the file type based on its extension.""" |
| | if (filename.endswith('.csv')) | (filename.endswith('.csv.gz')) | (filename.endswith('.zip')): |
| | return 'csv' |
| | elif filename.endswith('.xlsx'): |
| | return 'xlsx' |
| | elif filename.endswith('.parquet'): |
| | return 'parquet' |
| | else: |
| | raise ValueError("Unsupported file type.") |
| |
|
| | def read_file(filename): |
| | """Read the file based on its detected type.""" |
| | file_type = detect_file_type(filename) |
| | |
| | if file_type == 'csv': |
| | return pd.read_csv(filename, low_memory=False) |
| | elif file_type == 'xlsx': |
| | return pd.read_excel(filename) |
| | elif file_type == 'parquet': |
| | return pd.read_parquet(filename) |
| |
|
| | def anonymise_script(text_list:List[Element], anon_strat:str, nlp_analyser=None): |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | |
| | df_dict = pd.DataFrame(data={"text":text_list}).to_dict(orient="list") |
| |
|
| | if nlp_analyser: |
| | analyzer = nlp_analyser |
| | else: |
| | analyzer = AnalyzerEngine() |
| |
|
| | |
| | titles_recognizer = PatternRecognizer(supported_entity="TITLE", |
| | deny_list=["Mr","Mrs","Miss", "Ms", "mr", "mrs", "miss", "ms"]) |
| |
|
| | analyzer.registry.add_recognizer(titles_recognizer) |
| |
|
| | batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer) |
| |
|
| | anonymizer = AnonymizerEngine() |
| |
|
| | batch_anonymizer = BatchAnonymizerEngine(anonymizer_engine = anonymizer) |
| |
|
| | print("Identifying personal data") |
| | analyse_tic = time.perf_counter() |
| | |
| | analyzer_results = analyze_dict(batch_analyzer, df_dict, language="en") |
| | |
| | analyzer_results = list(analyzer_results) |
| |
|
| | analyse_toc = time.perf_counter() |
| | analyse_time_out = f"Analysing the text took {analyse_toc - analyse_tic:0.1f} seconds." |
| | print(analyse_time_out) |
| |
|
| | |
| | key = secrets.token_bytes(16) |
| | key_string = base64.b64encode(key).decode('utf-8') |
| |
|
| | |
| | |
| | fake = Faker("en_UK") |
| |
|
| | def fake_first_name(x): |
| | return fake.first_name() |
| |
|
| | |
| | replace_config = eval('{"DEFAULT": OperatorConfig("replace")}') |
| | redact_config = eval('{"DEFAULT": OperatorConfig("redact")}') |
| | hash_config = eval('{"DEFAULT": OperatorConfig("hash")}') |
| | mask_config = eval('{"DEFAULT": OperatorConfig("mask", {"masking_char":"*", "chars_to_mask":100, "from_end":True})}') |
| | people_encrypt_config = eval('{"PERSON": OperatorConfig("encrypt", {"key": key_string})}') |
| | fake_first_name_config = eval('{"PERSON": OperatorConfig("custom", {"lambda": fake_first_name})}') |
| |
|
| |
|
| | if anon_strat == "replace": chosen_mask_config = replace_config |
| | if anon_strat == "redact": chosen_mask_config = redact_config |
| | if anon_strat == "hash": chosen_mask_config = hash_config |
| | if anon_strat == "mask": chosen_mask_config = mask_config |
| | if anon_strat == "encrypt": chosen_mask_config = people_encrypt_config |
| | elif anon_strat == "fake_first_name": chosen_mask_config = fake_first_name_config |
| |
|
| | |
| | |
| |
|
| | |
| | combined_config = {**chosen_mask_config} |
| | combined_config |
| |
|
| | print("Anonymising personal data") |
| | anonymizer_results = batch_anonymizer.anonymize_dict(analyzer_results, operators=combined_config) |
| |
|
| | |
| |
|
| | scrubbed_df = pd.DataFrame(data={"text":anonymizer_results["text"]}) |
| |
|
| | scrubbed_series = scrubbed_df["text"] |
| |
|
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | out_message = "Successfully anonymised" |
| | |
| | if anon_strat == "encrypt": |
| | out_message = out_message + ". Your decryption key is " + key_string + "." |
| | |
| | return scrubbed_series, out_message |
| |
|
| | def do_anonymise(in_file:str, anon_strat:str, chosen_cols:List[str]): |
| | |
| | |
| | |
| | anon_df = pd.DataFrame() |
| | |
| | if in_file: |
| | for match_file in in_file: |
| | match_temp_file = pd.read_csv(match_file.name, delimiter = ",", low_memory=False) |
| | anon_df = pd.concat([anon_df, match_temp_file]) |
| | |
| | |
| | all_cols_original_order = list(anon_df.columns) |
| | anon_df_part = anon_df[chosen_cols] |
| | anon_df_remain = anon_df.drop(chosen_cols, axis = 1) |
| | |
| | |
| | anon_df_part_out, out_message = anonymise_script(anon_df_part, anon_strat) |
| | |
| | |
| | anon_df_out = pd.concat([anon_df_part_out, anon_df_remain], axis = 1) |
| | anon_df_out = anon_df_out[all_cols_original_order] |
| | |
| | |
| | out_file_part = re.sub(r'\.csv', '', match_file.name) |
| | |
| | anon_export_file_name = out_file_part + "_anon_" + anon_strat + ".csv" |
| | |
| | anon_df_out.to_csv(anon_export_file_name, index = None) |
| | |
| | return out_message, anon_export_file_name |
| |
|