File size: 2,467 Bytes
ea9ca44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import re
from transformers import pipeline
from src.preprocess.cleaner import postprocess_extracted_text

# Load Hugging Face NER pipeline once
ner_pipeline = pipeline(
    "token-classification",
    model="dslim/distilbert-NER",
    aggregation_strategy="simple"
)

def extract_name_and_mask(text: str) -> dict:
    """
    Runs NER to:
    1. Extract the candidate's name (heuristic: first PER entity).
    2. Mask ALL Name (PER) entities in the text.
    
    Returns:
    {
        "masked_text": str,
        "candidate_name": str or None
    }
    """
    # 1. Run NER
    entities = ner_pipeline(text)
    spans = []
    candidate_name = None
    
    # 2. Collect Spans
    for ent in entities:
        label = ent.get("entity_group") or ent.get("entity")
        if label in {"PER"}: # Only mask PER
            spans.append({
                "start": ent["start"],
                "end": ent["end"],
                "label": label,
                "word": ent.get("word", text[ent["start"]:ent["end"]]) 
            })
            
            # Heuristic: The earliest PER entity is likely the candidate name
            if not candidate_name:
                candidate_name = text[ent["start"]:ent["end"]].strip()

    # 3. Merge overlapping or adjacent spans
    merged_spans = []
    for span in sorted(spans, key=lambda s: s["start"]):
        if merged_spans and span["start"] <= merged_spans[-1]["end"] + 1:
            merged_spans[-1]["end"] = max(span["end"], merged_spans[-1]["end"])
        else:
            merged_spans.append(span)

    # 4. Refine Candidate Name from merged spans
    # first_per_span = next((s for s in merged_spans if s["label"] == "PER"), None)
    # if first_per_span:
    #     candidate_name = text[first_per_span["start"]:first_per_span["end"]]
    
    # WE DO NOT EXTRACT NAMES ANYMORE, ONLY MASK THEM.
    candidate_name = None 

    # 5. Mask Text (Apply placeholders)
    # Replace from back to avoid shifting indices
    masked_text = text
    for span in reversed(merged_spans):
        placeholder = f"[{span['label']}]"
        masked_text = masked_text[:span["start"]] + placeholder + masked_text[span["end"]:]

    return {
        "masked_text": postprocess_extracted_text(masked_text),
        "candidate_name": candidate_name # Will be None
    }

def remove_pii(text: str) -> str:
    """Deprecated wrapper. Use extract_name_and_mask instead."""
    return extract_name_and_mask(text)["masked_text"]