""" AbstractionLayer: Non-differentiable entity abstraction pipeline. Replaces sensitive entities with typed abstract tokens and produces structural graphs. This runs OUTSIDE the model — raw text never enters the transformer. """ import re import hashlib from dataclasses import dataclass from typing import Dict, List, Optional, Tuple, Any from collections import defaultdict @dataclass class AbstractDocument: """Output of the abstraction layer.""" abstract_text: str amr_graph: Dict[str, Any] vault_id: str schema_version: str = "1.0" def _hash_vault(mapping: Dict[str, Any]) -> str: """Generate deterministic vault ID from entity mapping.""" serialized = str(sorted(mapping.items())) return hashlib.sha256(serialized.encode()).hexdigest()[:24] def _regex_based_ner(text: str) -> List[Dict[str, Any]]: """Fallback regex-based entity detection (no external NER model needed).""" entities = [] email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' for match in re.finditer(email_pattern, text): entities.append({"entity_group": "EMAIL", "word": match.group(), "start": match.start(), "end": match.end()}) phone_pattern = r'(?:\+1[-.\s]?)?(?:\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4})' for match in re.finditer(phone_pattern, text): entities.append({"entity_group": "PHONE", "word": match.group(), "start": match.start(), "end": match.end()}) ssn_pattern = r'\b\d{3}[-.\s]?\d{2}[-.\s]?\d{4}\b' for match in re.finditer(ssn_pattern, text): entities.append({"entity_group": "SSN", "word": match.group(), "start": match.start(), "end": match.end()}) money_pattern = r'(?:\$|USD|GBP|EUR|JPY|CNY)\s*(?:\d{1,3}(?:,\d{3})*|\d+)(?:\.\d{2})?' for match in re.finditer(money_pattern, text): entities.append({"entity_group": "MONEY", "word": match.group(), "start": match.start(), "end": match.end()}) date_pattern = (r'\b(?:January|February|March|April|May|June|July|August|' r'September|October|November|December|Jan|Feb|Mar|Apr|Jun|' r'Jul|Aug|Sep|Oct|Nov|Dec)\.?\s+\d{1,2},?\s+(?:\d{4}|\d{2})\b') for match in re.finditer(date_pattern, text): entities.append({"entity_group": "DATE", "word": match.group(), "start": match.start(), "end": match.end()}) url_pattern = r'https?://[^\s]+|www\.[^\s]+' for match in re.finditer(url_pattern, text): entities.append({"entity_group": "ID", "word": match.group(), "start": match.start(), "end": match.end()}) return entities def _parse_amr_fallback(abstract_text: str) -> Dict[str, Any]: """Simplified AMR parse — dependency-like graph from token relationships.""" tokens = abstract_text.split() nodes = [] edges = [] for i, tok in enumerate(tokens): nodes.append({"id": i, "concept": tok, "is_entity": tok.startswith("<") and tok.endswith(">"), "type": _extract_entity_type(tok) if tok.startswith("<") else "WORD"}) if i > 0: edges.append({"source": i - 1, "target": i, "relation": ":next"}) return {"nodes": nodes, "edges": edges, "root": 0} def _extract_entity_type(token: str) -> str: if token.startswith(" str: entities = sorted(entities, key=lambda e: e["start"]) result = "" last_end = 0 type_counter = defaultdict(int) for ent in entities: result += text[last_end:ent["start"]] ent_type = ent["entity_group"] type_counter[ent_type] += 1 idx = type_counter[ent_type] token = f"<{ent_type}_{idx}>" result += token last_end = ent["end"] result += text[last_end:] result = re.sub(r'\s+', ' ', result).strip() return result class AbstractionLayer: """Privacy abstraction pipeline. Converts raw text → abstract document with entity vault.""" ENTITY_SCHEMA = { "PERSON": "", "ORG": "", "LOC": "", "GPE": "", "MONEY": "", "DATE": "", "PHONE": "", "EMAIL": "", "SSN": "", "ID": "", "PRODUCT": "", "EVENT": "", "MISC": "", } def __init__(self, use_ner_model: bool = False, ner_model_name: Optional[str] = None): self.use_ner_model = use_ner_model if use_ner_model: try: from transformers import pipeline model_name = ner_model_name or "dslim/bert-base-NER" self.ner_pipeline = pipeline("ner", model=model_name, aggregation_strategy="simple") except ImportError: self.use_ner_model = False def abstract(self, text: str) -> AbstractDocument: if self.use_ner_model: raw = self.ner_pipeline(text) entities = [{"entity_group": e["entity_group"], "word": e["word"], "start": e["start"], "end": e["end"]} for e in raw] else: entities = _regex_based_ner(text) abstract_text = _replace_spans(text, entities) type_counter = defaultdict(int) vault = {} for ent in entities: ent_type = ent["entity_group"] type_counter[ent_type] += 1 token = f"<{ent_type}_{type_counter[ent_type]}>" vault[token] = ent vault_id = _hash_vault(vault) self._store_vault(vault_id, vault) amr_graph = _parse_amr_fallback(abstract_text) return AbstractDocument( abstract_text=abstract_text, amr_graph=amr_graph, vault_id=vault_id, schema_version="1.0" ) def _store_vault(self, vault_id: str, vault: Dict[str, Any]): if not hasattr(self, "_vault_store"): self._vault_store = {} self._vault_store[vault_id] = vault def retrieve_vault(self, vault_id: str) -> Dict[str, Any]: if not hasattr(self, "_vault_store"): return {} return self._vault_store.get(vault_id, {}) def is_secure(self, text: str) -> bool: if self.use_ner_model: entities = self.ner_pipeline(text) return len(entities) == 0 else: entities = _regex_based_ner(text) return len(entities) == 0