import streamlit as st import torch import os import re import json from transformers import AutoTokenizer, AutoModelForTokenClassification from sentence_transformers import SentenceTransformer, util from collections import defaultdict import os MODEL_PATH = "liaad/Citilink-XLMR-Anonymization-pt" MODEL_REL_PATH = "liaad/Citilink-mpnet-Entity-Linker-pt" st.set_page_config( page_title="️ Text Anonymization Demo", page_icon="🛡️", layout="wide", initial_sidebar_state="expanded" ) st.markdown(""" """, unsafe_allow_html=True) @st.cache_resource def load_models(): try: tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, add_prefix_space=True) model_ner = AutoModelForTokenClassification.from_pretrained(MODEL_PATH) rel_model = SentenceTransformer(MODEL_REL_PATH) return tokenizer, model_ner, rel_model, None except Exception as e: return None, None, None, str(e) def process_anonymization(text, threshold, tokenizer, model_ner, rel_model): if not text.strip(): return "Por favor, insira um texto.", {} id2label = model_ner.config.id2label inputs = tokenizer( text, truncation=True, max_length=512, stride=164, return_overflowing_tokens=True, return_offsets_mapping=True, padding=True, return_tensors="pt" ) all_predictions = [] offset_mapping_all = inputs.pop("offset_mapping") overflow_to_sample = inputs.pop("overflow_to_sample_mapping") input_ids_all = inputs["input_ids"] with torch.no_grad(): logits = model_ner(input_ids=input_ids_all).logits all_predictions = torch.argmax(logits, dim=2).tolist() entidades_brutas = [] id2label = model_ner.config.id2label SPACE_PREFIXES = [" ", "▁", "Ġ"] for window_idx, (predictions, offsets) in enumerate(zip(all_predictions, offset_mapping_all)): temp_entity = {"tokens": [], "start": None, "end": None, "label": None} for idx, (pred_id, offset) in enumerate(zip(predictions, offsets)): label_name = id2label[pred_id] start_char, end_char = int(offset[0]), int(offset[1]) if start_char == end_char: continue tag_base = label_name.split('-', 1)[1] if '-' in label_name else None token_id = input_ids_all[window_idx][idx].item() token = tokenizer.convert_ids_to_tokens(token_id) comeca_nova_palavra = any(token.startswith(p) for p in SPACE_PREFIXES) if label_name.startswith("B-"): if temp_entity["label"]: chars_to_remove = ".,;:!?" if temp_entity["label"] == "PERSONAL-PositionDepartment": chars_to_remove = ",;:!?" while (temp_entity["end"] - temp_entity["start"]) > 0 and text[ temp_entity["end"] - 1] in chars_to_remove: temp_entity["end"] -= 1 entidades_brutas.append(temp_entity) temp_entity = {"start": start_char, "end": end_char, "label": tag_base, "tokens": [token]} elif label_name.startswith("I-") and temp_entity["label"] == tag_base: temp_entity["tokens"].append(token) temp_entity["end"] = end_char elif not comeca_nova_palavra and temp_entity["label"] is not None: temp_entity["tokens"].append(token) temp_entity["end"] = end_char else: if temp_entity["label"]: chars_to_remove = ".,;:!?" if temp_entity["label"] == "PERSONAL-PositionDepartment": chars_to_remove = ",;:!?" # Removemos o ponto (.) da lista de limpeza while (temp_entity["end"] - temp_entity["start"]) > 0 and text[ temp_entity["end"] - 1] in chars_to_remove: temp_entity["end"] -= 1 entidades_brutas.append(temp_entity) temp_entity = {"tokens": [], "start": None, "end": None, "label": None} entidades_brutas.sort(key=lambda x: x["start"]) entidades_finais = [] for atual in entidades_brutas: if not entidades_finais: entidades_finais.append(atual) continue ultima = entidades_finais[-1] distancia = atual["start"] - ultima["end"] if distancia <= 1 and atual["label"] == ultima["label"]: ultima["end"] = atual["end"] ultima["tokens"].extend(atual["tokens"]) else: adicionar = True for i, selecionada in enumerate(entidades_finais): interseccao = max(0, min(atual["end"], selecionada["end"]) - max(atual["start"], selecionada["start"])) if interseccao > 0: # Se houver sobreposição, mantém a maior if (atual["end"] - atual["start"]) > (selecionada["end"] - selecionada["start"]): entidades_finais[i] = atual adicionar = False break if adicionar: entidades_finais.append(atual) labels_modelo = ["Name", "Address", "Company", "Vehicle", "PositionDepartment"] labels_com_id = ["Name", "AdministrativeInformation", "PositionDepartment", "Address", "PersonalDocument", "Company", "LicensePlate", "Vehicle"] known_entities = {} known_embeddings = {} id_counters = defaultdict(int) entidades_finais.sort(key=lambda x: x["start"]) for ent in entidades_finais: tag_limpa = ent["label"].replace("PERSONAL-", "") texto_original = text[ent["start"]:ent["end"]].strip() texto_key = texto_original.lower() assigned_id = None if tag_limpa in labels_com_id: if (tag_limpa, texto_key) in known_entities: assigned_id = known_entities[(tag_limpa, texto_key)] elif tag_limpa in labels_modelo: emb_atual = rel_model.encode(texto_key, convert_to_tensor=True) best_prob, best_match_id = 0.0, None candidatos = [(tk, tid) for (lbl, tk), tid in known_entities.items() if lbl == tag_limpa] for prev_text_key, prev_id in candidatos: emb_prev = known_embeddings.get(prev_text_key) if emb_prev is None: emb_prev = rel_model.encode(prev_text_key, convert_to_tensor=True) known_embeddings[prev_text_key] = emb_prev score = util.cos_sim(emb_atual, emb_prev).item() if score > best_prob: best_prob, best_match_id = score, prev_id if best_prob > threshold: assigned_id = best_match_id known_embeddings[texto_key] = emb_atual if assigned_id is None: id_counters[tag_limpa] += 1 assigned_id = id_counters[tag_limpa] known_entities[(tag_limpa, texto_key)] = assigned_id ent["entity_id"] = assigned_id # Output Construction entidades_para_substituir = sorted(entidades_finais, key=lambda x: x["start"], reverse=True) texto_anon = text relatorio_json = [] for ent in entidades_para_substituir: tag_limpa = ent["label"].replace("PERSONAL-", "") id_part = f"-{ent['entity_id']}" if ent.get('entity_id') else "" texto_original = text[ent["start"]:ent["end"]].strip() # 1. FORMATO DO ITEM JSON QUE PEDISTE relatorio_json.append({ "category": ent["label"], "text": texto_original, "start": ent["start"], "end": ent["end"], "id": ent.get("entity_id") }) placeholder = f' <{tag_limpa}{id_part}> ' texto_anon = texto_anon[:ent["start"]] + placeholder + texto_anon[ent["end"]:] relatorio_json.reverse() return re.sub(r' +', ' ', texto_anon).strip(), relatorio_json @st.cache_data def load_example_texts(): json_path = os.path.join(os.path.dirname(__file__), 'example_text.json') try: with open(json_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception: return {"Custom Text": "", "1º Portuguese Meeting Minute": "", "2º Portuguese Meeting Minute": "", "3º Portuguese Meeting Minute": ""} def main(): st.markdown('
🛡️ PID: Text Anonymization Demo
', unsafe_allow_html=True) st.markdown("""Automatic text anonymization for city council minutes and administrative documents
""", unsafe_allow_html=True) tokenizer, model_ner, rel_model, error = load_models() if error: st.error(f"Erro ao carregar modelos: {error}") st.stop() st.sidebar.header("⚙️ Configuration") st.sidebar.write("---") example_texts = load_example_texts() selected_example = st.sidebar.selectbox( "Choose an example:", options=list(example_texts.keys()), index=0 ) st.sidebar.markdown("Supported Entities:
Note: Entities marked with (ID) support consistent linking across the document.