Spaces:
Running
Running
| import re | |
| import logging | |
| from typing import Dict | |
| logger = logging.getLogger(__name__) | |
| class SensitiveDataGuard: | |
| """ | |
| Rygorystyczny mechanizm hybrydowy (RegEx + opcjonalnie LLM) do ukrywania PII | |
| oraz tajemnicy przedsiębiorstwa (know-how, telemetria, finanse). | |
| Warstwy: | |
| 1. RegEx — NIP, PESEL, KRS, IBAN, Email, Telefon, Imię+Nazwisko (heurystyka) | |
| 2. LLM (Bielik/Gemini) — semantyczne maskowanie know-how (opcjonalnie) | |
| Zgodność: RODO Art. 4, AI Act Art. 10 (data governance for high-risk AI). | |
| """ | |
| def __init__(self): | |
| self.mapping: Dict[str, str] = {} | |
| self.counter = 1 | |
| # ── Wzorce PII ────────────────────────────────────────────────────── | |
| self.patterns = { | |
| # Identyfikatory biznesowe | |
| "NIP": r"\b\d{3}[- ]?\d{3}[- ]?\d{2}[- ]?\d{2}\b", | |
| "PESEL": r"\b\d{11}\b", | |
| "KRS": r"\b(?:KRS[:\s]*)?\d{10}\b", | |
| "REGON": r"\b\d{9}(?:\d{5})?\b", | |
| # Dane kontaktowe | |
| "EMAIL": r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b", | |
| "TELEFON": r"(?:\+48\s?)?(?:\d{2,3}[- ]?){3,4}\d{2,3}", | |
| # Bankowe | |
| "IBAN": r"\bPL\d{2}[ ]?\d{4}[ ]?\d{4}[ ]?\d{4}[ ]?\d{4}[ ]?\d{4}[ ]?\d{4}\b", | |
| # Patenty / Finanse | |
| "PATENT": r"\b(?:Pat\.|Zgłoszenie P\.)\s*\d+\b", | |
| "FINANSE": r"\b\d{1,3}(?:[ .,]\d{3})*(?:,\d{2})?\s*(?:PLN|EUR|USD|zł)\b", | |
| # Imię + Nazwisko (heurystyka: 2 słowa z wielkich liter, pl-locale) | |
| "OSOBA": r"\b[A-ZŁŚŻŹĆĄÓĘŃ][a-złśżźćąóęń]{2,}\s+[A-ZŁŚŻŹĆĄÓĘŃ][a-złśżźćąóęń]{2,}\b", | |
| # Adresy | |
| "ADRES": r"\bul\.\s+[A-ZŁŚŻŹĆĄÓĘŃ][^\n,]{3,40},\s*\d{2}-\d{3}\s+[A-ZŁŚŻŹĆĄÓĘŃ][^\n,]{3,30}\b", | |
| } | |
| def anonymize_text(self, text: str) -> str: | |
| """ | |
| Zastępuje wrażliwe fragmenty na tokeny. | |
| Warstwa 1 (RegEx) + Warstwa 2 (LLM Bielik). | |
| """ | |
| if not text: | |
| return text | |
| anonymized = text | |
| for pii_type, pattern in self.patterns.items(): | |
| matches = set(re.findall(pattern, anonymized, flags=re.IGNORECASE)) | |
| for match in matches: | |
| if match not in self.mapping: | |
| token = f"<{pii_type}_{self.counter}>" | |
| self.mapping[match] = token | |
| self.counter += 1 | |
| anonymized = anonymized.replace(match, self.mapping[match]) | |
| # Warstwa 2: LLM (Bielik) do semantycznego maskowania Know-How | |
| try: | |
| import sys | |
| import os | |
| sys.path.append(os.path.join(os.path.dirname(__file__), "..", "..")) | |
| from backend.core.llm_router import get_llm | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| llm = get_llm(task_type="pii_anonymization") | |
| prompt = ( | |
| "Jesteś surowym strażnikiem RODO i tajemnic przedsiębiorstwa. " | |
| "Twoim zadaniem jest zamiana WŁAŚCIWYCH NAZW unikalnych technologii, algorytmów " | |
| "i autorskich rozwiązań know-how na tag <KNOW_HOW_X>.\n" | |
| "MUSISZ zachować całą resztę tekstu IDEALNIE nienaruszoną (co do znaku). " | |
| "Nie modyfikuj zdań, nie tłumacz, tylko podmień wybrane słowa." | |
| ) | |
| resp = llm.invoke( | |
| [SystemMessage(content=prompt), HumanMessage(content=anonymized)] | |
| ) | |
| if resp and resp.content and "<KNOW_HOW_" in resp.content.upper(): | |
| anonymized = resp.content | |
| except Exception as e: | |
| logger.warning( | |
| "Ollama z modelem Bielik jest niedostępna. De-identyfikacja odbywa się tylko na warstwie RegEx. Analizy prawne mogą być mniej precyzyjne." | |
| ) | |
| logger.debug(f"Szczegóły błędu: {e}") | |
| return anonymized | |
| def deanonymize_text(self, text: str) -> str: | |
| """Przywraca tokeny do pierwotnej postaci w odpowiedzi""" | |
| if not text: | |
| return text | |
| deanonymized = text | |
| reverse_mapping = {v: k for k, v in self.mapping.items()} | |
| for token, original_value in reverse_mapping.items(): | |
| deanonymized = deanonymized.replace(token, original_value) | |
| return deanonymized | |
| def reset_for_session(self): | |
| self.mapping = {} | |
| self.counter = 1 | |
| anonymizer = SensitiveDataGuard() | |