Spaces:
Sleeping
Sleeping
File size: 2,467 Bytes
ea9ca44 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | import re
from transformers import pipeline
from src.preprocess.cleaner import postprocess_extracted_text
# Load Hugging Face NER pipeline once
ner_pipeline = pipeline(
"token-classification",
model="dslim/distilbert-NER",
aggregation_strategy="simple"
)
def extract_name_and_mask(text: str) -> dict:
"""
Runs NER to:
1. Extract the candidate's name (heuristic: first PER entity).
2. Mask ALL Name (PER) entities in the text.
Returns:
{
"masked_text": str,
"candidate_name": str or None
}
"""
# 1. Run NER
entities = ner_pipeline(text)
spans = []
candidate_name = None
# 2. Collect Spans
for ent in entities:
label = ent.get("entity_group") or ent.get("entity")
if label in {"PER"}: # Only mask PER
spans.append({
"start": ent["start"],
"end": ent["end"],
"label": label,
"word": ent.get("word", text[ent["start"]:ent["end"]])
})
# Heuristic: The earliest PER entity is likely the candidate name
if not candidate_name:
candidate_name = text[ent["start"]:ent["end"]].strip()
# 3. Merge overlapping or adjacent spans
merged_spans = []
for span in sorted(spans, key=lambda s: s["start"]):
if merged_spans and span["start"] <= merged_spans[-1]["end"] + 1:
merged_spans[-1]["end"] = max(span["end"], merged_spans[-1]["end"])
else:
merged_spans.append(span)
# 4. Refine Candidate Name from merged spans
# first_per_span = next((s for s in merged_spans if s["label"] == "PER"), None)
# if first_per_span:
# candidate_name = text[first_per_span["start"]:first_per_span["end"]]
# WE DO NOT EXTRACT NAMES ANYMORE, ONLY MASK THEM.
candidate_name = None
# 5. Mask Text (Apply placeholders)
# Replace from back to avoid shifting indices
masked_text = text
for span in reversed(merged_spans):
placeholder = f"[{span['label']}]"
masked_text = masked_text[:span["start"]] + placeholder + masked_text[span["end"]:]
return {
"masked_text": postprocess_extracted_text(masked_text),
"candidate_name": candidate_name # Will be None
}
def remove_pii(text: str) -> str:
"""Deprecated wrapper. Use extract_name_and_mask instead."""
return extract_name_and_mask(text)["masked_text"]
|