temsa's picture
Fix standalone ContextPII helper imports
509e626 verified
#!/usr/bin/env python3
from __future__ import annotations
import math
import re
from functools import lru_cache
from pathlib import Path
import sys
from typing import Any
import unicodedata
import numpy as np
ROOT_DIR = Path(__file__).resolve().parents[2]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
from base_common import (
dedupe_spans,
label_max_span_tokens_from_config,
label_min_nonspace_chars_from_config,
label_names_from_config,
load_onnx_session,
normalize_entity_name,
safe_auto_tokenizer,
)
def label_thresholds_from_config(config, default_threshold: float) -> dict[str, float]:
raw = getattr(config, "span_label_thresholds", None) or {}
out = {normalize_entity_name(key): float(value) for key, value in raw.items()}
for label in label_names_from_config(config):
out.setdefault(label, float(default_threshold))
return out
def valid_offset(offset: tuple[int, int]) -> bool:
return bool(offset) and int(offset[1]) > int(offset[0])
def nonspace_length(text: str, start: int, end: int) -> int:
return sum(0 if ch.isspace() else 1 for ch in text[int(start) : int(end)])
def alnum_upper(text: str) -> str:
return "".join(ch for ch in text.upper() if ch.isalnum())
@lru_cache(maxsize=16384)
def normalize_surface(text: str) -> str:
value = unicodedata.normalize("NFKD", text)
value = "".join(ch for ch in value if not unicodedata.combining(ch))
value = value.replace("\u00A0", " ").replace("\u202F", " ")
value = re.sub(r"\s+", " ", value.strip().lower())
return value
IRISH_CITY_FORMS = (
"Dublin",
"Baile Átha Cliath",
"mBaile Átha Cliath",
"mBaile Atha Cliath",
"Galway",
"Gaillimh",
"Cork",
"Cork City",
"Corcaigh",
"gCorcaigh",
"Limerick",
"Luimneach",
"Waterford",
"Port Láirge",
"Kilkenny",
"Cill Chainnigh",
"Carlow",
"Ceatharlach",
"Sligo",
"Sligeach",
"Tralee",
"Trá Lí",
"Ennis",
"Inis",
"Letterkenny",
"Leitir Ceanainn",
"Castlebar",
"Caisleán an Bharraigh",
"Caislean an Bharraigh",
"gCaisleán an Bharraigh",
"gCaislean an Bharraigh",
"Wexford",
"Loch Garman",
"Navan",
"Uaimh",
"An Uaimh",
"hUaimh",
"nUaimh",
"Dundalk",
"Dún Dealgan",
"Dun Dealgan",
"Mullingar",
"Muileann gCearr",
"An Muileann gCearr",
"Tullamore",
"Tulach Mhór",
"Tulach Mhor",
"dTulach Mhór",
"dTulach Mhor",
"Portlaoise",
"Port Laoise",
"bPort Laoise",
"Bray",
"Bré",
"Bre",
"mBré",
"mBre",
"Athlone",
"Baile Átha Luain",
"Baile Atha Luain",
"mBaile Átha Luain",
"mBaile Atha Luain",
)
IRISH_CITY_SURFACES = {normalize_surface(value) for value in IRISH_CITY_FORMS}
IRISH_COUNTY_FORMS = (
"Co. Dublin",
"County Dublin",
"Co. Bhaile Átha Cliath",
"Contae Bhaile Átha Cliath",
"gContae Bhaile Átha Cliath",
"Co. Galway",
"County Galway",
"Co. na Gaillimhe",
"Contae na Gaillimhe",
"gContae na Gaillimhe",
"Co. Cork",
"County Cork",
"Co. Chorcaí",
"Contae Chorcaí",
"gContae Chorcaí",
"Co. Limerick",
"County Limerick",
"Co. Luimnigh",
"Contae Luimnigh",
"gContae Luimnigh",
"Co. Waterford",
"County Waterford",
"Co. Phort Láirge",
"Contae Phort Láirge",
"gContae Phort Láirge",
"Co. Kilkenny",
"County Kilkenny",
"Co. Chill Chainnigh",
"Contae Chill Chainnigh",
"gContae Chill Chainnigh",
"Co. Carlow",
"County Carlow",
"Co. Cheatharlach",
"Contae Cheatharlach",
"gContae Cheatharlach",
"Co. Sligo",
"County Sligo",
"Co. Shligigh",
"Contae Shligigh",
"gContae Shligigh",
"Co. Kerry",
"County Kerry",
"Co. Chiarraí",
"Contae Chiarraí",
"gContae Chiarraí",
"Co. Clare",
"County Clare",
"Co. an Chláir",
"Contae an Chláir",
"gContae an Chláir",
"Co. Donegal",
"County Donegal",
"Co. Dhún na nGall",
"Co. Dhun na nGall",
"Contae Dhún na nGall",
"Contae Dhun na nGall",
"gContae Dhún na nGall",
"gContae Dhun na nGall",
"Co. Mayo",
"County Mayo",
"Co. Mhaigh Eo",
"Contae Mhaigh Eo",
"gContae Mhaigh Eo",
"Co. Wexford",
"County Wexford",
"Co. Loch Garman",
"Contae Loch Garman",
"gContae Loch Garman",
"Co. Meath",
"County Meath",
"Co. na Mí",
"Co. na Mi",
"Contae na Mí",
"Contae na Mi",
"gContae na Mí",
"gContae na Mi",
"Co. Louth",
"County Louth",
"Co. Lú",
"Co. Lu",
"Contae Lú",
"Contae Lu",
"gContae Lú",
"gContae Lu",
"Co. Westmeath",
"County Westmeath",
"Co. na hIarmhí",
"Co. na hIarmhi",
"Contae na hIarmhí",
"Contae na hIarmhi",
"gContae na hIarmhí",
"gContae na hIarmhi",
"Co. Offaly",
"County Offaly",
"Co. Uíbh Fhailí",
"Co. Uibh Fhaili",
"Contae Uíbh Fhailí",
"Contae Uibh Fhaili",
"gContae Uíbh Fhailí",
"gContae Uibh Fhaili",
"Co. Laois",
"County Laois",
"Contae Laoise",
"gContae Laoise",
"Co. Wicklow",
"County Wicklow",
"Co. Chill Mhantáin",
"Co. Chill Mhantain",
"Contae Chill Mhantáin",
"Contae Chill Mhantain",
"gContae Chill Mhantáin",
"gContae Chill Mhantain",
)
IRISH_COUNTY_SURFACES = {normalize_surface(value) for value in IRISH_COUNTY_FORMS}
COUNTY_STOP_SURFACES = {
normalize_surface(value)
for value in {
"County Hall",
"County House",
"County Council",
"County Offices",
"County Office",
}
}
IRISH_CITY_PREFIX_CHARS = {"n", "g", "m", "b", "d", "h"}
STREET_SUFFIX_RE = re.compile(
r"(?i)\b(street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)\b"
)
BUILDING_SUFFIX_RE = re.compile(
r"(?i)\b(house|cottage|lodge|villa|apartments?|building|business\s+centre|community\s+centre|shopping\s+centre|retail\s+park|Teach(?:ín|in)?)\b"
)
PHONE_SURFACE_RE = re.compile(r"^[+().\d][+().\d \-/\u00A0\u202F]*\d$")
ACCOUNT_DIGIT_SURFACE_RE = re.compile(r"^[\d \-\u00A0\u202F]+$")
MONTH_NAME_RE = (
r"(?:January|February|March|April|May|June|July|August|September|October|November|December|"
r"Eanáir|Eanair|Feabhra|Márta|Marta|Aibreán|Aibrean|Bealtaine|Meitheamh|Iúil|Iuil|Lúnasa|Lunasa|"
r"Meán\s+Fómhair|Mean\s+Fomhair|Deireadh\s+Fómhair|Deireadh\s+Fomhair|Samhain|Nollaig)"
)
DATE_OF_BIRTH_RE = re.compile(
rf"(?i)^(?:\d{{1,2}}[./-]\d{{1,2}}[./-]\d{{2,4}}|\d{{4}}-\d{{2}}-\d{{2}}|(?:an\s+)?\d{{1,2}}(?:st|nd|rd|th|ú)?\s+{MONTH_NAME_RE}[,]?\s+\d{{2,4}}|{MONTH_NAME_RE}\s+\d{{1,2}},?\s+\d{{2,4}})$"
)
DATE_OF_BIRTH_VALUE_RE = re.compile(
rf"(?<![A-Za-z0-9])(\d{{1,2}}[./-]\d{{1,2}}[./-]\d{{2,4}}|\d{{4}}-\d{{2}}-\d{{2}}|(?:an\s+)?\d{{1,2}}(?:st|nd|rd|th|ú)?\s+{MONTH_NAME_RE}[,]?\s+\d{{2,4}}|{MONTH_NAME_RE}\s+\d{{1,2}},?\s+\d{{2,4}})(?![A-Za-z0-9])"
)
AGE_CONTEXT_RE = re.compile(r"(?i)\b(age|aged|years?\s+old|year\s+old|year-old|yrs?\s+old|y/?o|yo|aois|bliana\s+d['’]aois|mbliana\s+d['’]aois)\b")
AGE_INLINE_SUFFIX_RE = re.compile(r"(?i)^(?:-year-old\b|yo\b|y/o\b|yrs?\b)")
AGE_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])(\d{1,3})(?![A-Za-z0-9])")
AGE_SELF_PREFIX_RE = re.compile(r"(?i)(?:^|.*\b)(?:i\s+am|i['’]?m|im|t[áa]\s+m[ée]|t[áa]im)\s*$")
DOB_CONTEXT_RE = re.compile(
r"(?i)\b(dob|date\s+of\s+birth|born(?:\s+on)?|data\s+breithe|dáta\s+breithe|dhata\s+breithe|dháta\s+breithe|rugadh)\b"
)
DOB_SUFFIX_CONTEXT_RE = re.compile(
r"(?i)\b(?:my\s+date\s+of\s+birth|mo\s+(?:dáta|dháta|data|dhata)\s+breithe|dob|date\s+of\s+birth|rugadh)\b"
)
def has_dob_suffix_context(text: str, end: int, window: int = 40) -> bool:
suffix = text[int(end) : min(len(text), int(end) + window)]
match = DOB_SUFFIX_CONTEXT_RE.search(suffix)
if not match:
return False
return not any(ch in ",.;:\n\r" for ch in suffix[: int(match.start())])
ADDRESS_FIELD_CUE_PATTERN = r"(?:address(?:\s+line\s+\d+)?(?:\s+is)?|my\s+address\s+is|seoladh(?:\s+l[ií]nte?\s+\d+)?|is\s+[ée]\s+mo\s+sheoladh)"
ADDRESS_LINE_CUE_RE = re.compile(r"(?i)\b(?:address\s+line\s+\d+|seoladh\s+l[ií]nte?\s+\d+)\b")
ADDRESS_CUE_RE = re.compile(
rf"(?i)\b({ADDRESS_FIELD_CUE_PATTERN}|sheoladh|allocation\s+centre|intreo\s+centre|ionad\s+leithdh[aá]ilte|ionad\s+intreo|live\s+at|lives\s+at|living\s+at|located\s+at|i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]\s+m[ée]\s+i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]im\s+i\s+mo\s+ch[oó]na[ií]\s+ag|cónai\s+ag|chónai\s+ag|conai\s+ag|chonai\s+ag)\b"
)
CITY_COUNTY_PREFIX_RE = re.compile(r"(?i)(?:county|co\.|contae|gcontae)(?:\s+na)?\s*$")
PPSN_CUE_RE = re.compile(
r"(?i)\b(ppsn|upsp|personal public service(?:\s+number)?|uimhir\s+(?:mo\s+)?upsp|uimhir\s+(?:mo\s+)?ppsn)\b"
)
NAME_STOP_SURFACES = {
normalize_surface(value)
for value in {
"Address",
"Name",
"Phone",
"Email",
"Seoladh",
"Ainm",
"Teagmháil",
"Teagmhail",
"Ríomhphost",
"Riomhphost",
"Eirchód",
"Eirchod",
"Eircode",
"PPSN",
"UPSP",
"Call",
"Glao",
"Glaoigh",
"Rugadh",
"Ionad",
"Intreo",
"Cill",
"Sampla",
"Leithdháilte",
"Leithdhailte",
"Leithdháil",
"Leithdhail",
"Leithdh",
"Apartment",
"Flat",
"Unit",
"Suite",
"Árasán",
"Arasan",
"Aonad",
"County",
"Contae",
"Fón",
"Fon",
"January",
"February",
"March",
"April",
"May",
"June",
"July",
"August",
"September",
"October",
"November",
"December",
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
"Friday",
"Saturday",
"Sunday",
"Eanáir",
"Feabhra",
"Márta",
"Aibreán",
"Aibrean",
"Bealtaine",
"Meitheamh",
"Iúil",
"Iuil",
"Lúnasa",
"Lunasa",
"Meán Fómhair",
"Mean Fomhair",
"Deireadh Fómhair",
"Deireadh Fomhair",
"Samhain",
"Nollaig",
"Luan",
"Máirt",
"Mairt",
"Céadaoin",
"Ceadaoin",
"Déardaoin",
"Deardaoin",
"Aoine",
"Satharn",
"Domhnach",
}
}
NAME_PARTICLE_SURFACES = {
normalize_surface(value)
for value in {"Ó", "O", "Ní", "Ni", "Nic", "Mac", "Mc", "de", "van", "von"}
}
STREET_TRAILING_BLOCK_SURFACES = {
normalize_surface(value)
for value in {
"are",
"public",
"contact",
"details",
"website",
"open",
"before",
"visiting",
"roimh",
"chuairt",
"agus",
"and",
"the",
"is",
"ta",
}
}
ADDRESS_UNIT_PREFIX_RE = re.compile(r"(?i)^(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\b")
HOUSE_NAME_PREFIX_RE = re.compile(
r"(?i)^(?:[A-ZÁÉÍÓÚ][\w'’.-]+(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]+){0,2}\s+(?:house|cottage|lodge|villa)|teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚ][\w'’.-]+)?)$"
)
STREET_ADDRESS_VALUE_RE = re.compile(
r"(?i)(?<![\w@])("
r"(?:(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\s+[A-Za-z0-9-]+,\s+)?"
r"(?:(?:[A-ZÁÉÍÓÚ][\w'’.-]+(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]+){0,2}\s+(?:house|cottage|lodge|villa)|teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚ][\w'’.-]+)?),\s+)?"
r"(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)(?:\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*){0,2}"
r")"
)
def is_plausible_last_name_sequence(value: str) -> bool:
tokens = [token for token in re.split(r"\s+", value.strip()) if token]
if not tokens:
return False
for token in tokens:
if not any(ch.isalpha() for ch in token):
return False
if not all(is_name_token_char(ch) for ch in token):
return False
alpha_chars = [ch for ch in token if ch.isalpha()]
first_alpha = alpha_chars[0] if alpha_chars else ""
if first_alpha.isupper():
continue
if len(alpha_chars) >= 2 and alpha_chars[0].islower() and alpha_chars[1].isupper():
continue
if normalize_surface(token) in NAME_PARTICLE_SURFACES:
continue
return False
return True
def is_reasonable_span_text(label: str, text: str, start: int, end: int) -> bool:
value = text[int(start) : int(end)].strip()
if not value:
return False
upper: str | None = None
if label in {"FIRST_NAME", "LAST_NAME"}:
if not any(ch.isalpha() for ch in value):
return False
if any(ch.isdigit() for ch in value):
return False
if int(start) > 0 and text[int(start) - 1].isalpha():
return False
if int(end) < len(text) and text[int(end)].isalpha():
return False
if normalize_surface(value) in NAME_STOP_SURFACES:
return False
if label == "FIRST_NAME" and any(ch.isspace() for ch in value):
return False
if any(ch in ".,;:/@()" for ch in value):
return False
if label == "FIRST_NAME":
first_alpha = next((ch for ch in value if ch.isalpha()), "")
if not first_alpha or not first_alpha.isupper():
return False
if label == "LAST_NAME" and not is_plausible_last_name_sequence(value):
return False
if start > 0 and text[int(start) - 1].isdigit():
return False
return True
if label == "EMAIL":
if "@" not in value:
return False
local, _, domain = value.partition("@")
return bool(local) and "." in domain
if label == "PHONE_NUMBER":
normalized = value.replace("\u00A0", " ").replace("\u202F", " ").strip()
if any(ch.isalpha() for ch in normalized):
return False
if "@" in normalized:
return False
if int(start) > 0 and text[int(start) - 1].isalnum():
return False
if int(end) < len(text) and text[int(end)].isalnum():
return False
if not PHONE_SURFACE_RE.match(normalized):
return False
digits = "".join(ch for ch in value if ch.isdigit())
if normalized.startswith("+353"):
tail = digits[3:]
if tail.startswith("0"):
tail = tail[1:]
return 8 <= len(tail) <= 9
if not digits.startswith("0"):
return False
if digits.startswith("0818") or digits.startswith("1800"):
return len(digits) == 10
if digits.startswith("08"):
return len(digits) == 10
if digits.startswith("01"):
return len(digits) == 9
return 9 <= len(digits) <= 10
if label == "PPSN":
upper = alnum_upper(value)
return bool(len(upper) in {8, 9} and upper[:7].isdigit() and upper[7:].isalpha())
if label == "POSTCODE":
compact = value.replace(" ", "").replace("\u00A0", "").replace("\u202F", "")
if any(not (ch.isalnum() or ch.isspace()) for ch in value):
return False
if len(compact) != 7:
return False
routing = compact[:3]
unique = compact[3:]
routing_ok = bool(
(routing[0].isalpha() and routing[1:].isdigit())
or routing == "D6W"
)
unique_ok = bool(
len(unique) == 4
and unique[0].isalpha()
and unique[1:].isalnum()
)
return routing_ok and unique_ok
if label == "PASSPORT_NUMBER":
return bool(re.fullmatch(r"[A-Z]{1,2}\s?\d{7}", value.strip()))
if label == "BANK_ROUTING_NUMBER":
digits = "".join(ch for ch in value if ch.isdigit())
if len(digits) != 6:
return False
context = text[max(0, int(start) - 32) : min(len(text), int(end) + 24)]
return bool(BANK_ROUTING_CONTEXT_RE.search(context))
if label == "SWIFT_BIC":
upper = alnum_upper(value)
return len(upper) in {8, 11} and upper.isalnum()
if label == "CREDIT_DEBIT_CARD":
digits = "".join(ch for ch in value if ch.isdigit())
return 12 <= len(digits) <= 19
if label == "ACCOUNT_NUMBER":
upper = alnum_upper(value)
if upper.startswith("IE"):
return bool(re.fullmatch(r"IE\d{2}[A-Z0-9]{18}", upper))
if not ACCOUNT_DIGIT_SURFACE_RE.fullmatch(value.strip()):
return False
digits = "".join(ch for ch in value if ch.isdigit())
return 6 <= len(digits) <= 34
if label == "AGE":
digits = "".join(ch for ch in value if ch.isdigit())
if digits != value.strip():
return False
if not digits:
return False
if int(start) > 0 and text[int(start) - 1].isalnum():
return False
trailing = text[int(end) : min(len(text), int(end) + 12)]
if int(end) < len(text) and text[int(end)].isalnum() and not AGE_INLINE_SUFFIX_RE.match(trailing):
return False
if int(start) > 0 and text[int(start) - 1] in "/-":
return False
if int(end) < len(text) and text[int(end)] in "/-" and not AGE_INLINE_SUFFIX_RE.match(trailing):
return False
age = int(digits)
if not (0 < age <= 120):
return False
context = text[max(0, int(start) - 32) : min(len(text), int(end) + 24)]
prefix = text[max(0, int(start) - 24) : int(start)]
return bool(AGE_CONTEXT_RE.search(context) or AGE_SELF_PREFIX_RE.search(prefix))
if label == "DATE_OF_BIRTH":
if not any(ch.isdigit() for ch in value):
return False
if not DATE_OF_BIRTH_RE.match(value.strip()):
return False
prefix = text[max(0, int(start) - 96) : int(start)]
return bool(DOB_CONTEXT_RE.search(prefix) or has_dob_suffix_context(text, int(end)))
if label == "CITY":
if any(ch.isdigit() for ch in value):
return False
prefix = text[max(0, int(start) - 20) : int(start)]
if CITY_COUNTY_PREFIX_RE.search(prefix):
return False
return normalize_surface(value) in IRISH_CITY_SURFACES
if label == "COUNTY":
if any(ch.isdigit() for ch in value):
return False
normalized = normalize_surface(value)
if normalized in COUNTY_STOP_SURFACES:
return False
if normalized.startswith(("county hall", "county house", "county council", "county office", "county offices")):
return False
if normalized in IRISH_COUNTY_SURFACES:
return True
if normalized.startswith(("county ", "contae ", "gcontae ", "co. ")):
tail = normalized.split(" ", 1)[1] if " " in normalized else ""
if tail in {"hall", "house", "council", "office", "offices"}:
return False
return True
return False
if label == "STREET_ADDRESS":
cleaned = value.strip()
address_parts = [part.strip() for part in cleaned.split(",")]
if len(address_parts) > 3:
return False
prefix_part = ""
building_part = ""
street_part = cleaned
if len(address_parts) == 2:
prefix_part, street_part = address_parts
if not prefix_part or not street_part:
return False
if not (
ADDRESS_UNIT_PREFIX_RE.match(prefix_part)
or HOUSE_NAME_PREFIX_RE.match(prefix_part)
):
return False
elif len(address_parts) == 3:
prefix_part, building_part, street_part = address_parts
if not prefix_part or not building_part or not street_part:
return False
if not ADDRESS_UNIT_PREFIX_RE.match(prefix_part):
return False
if not HOUSE_NAME_PREFIX_RE.match(building_part):
return False
suffix_match = STREET_SUFFIX_RE.search(street_part)
if not suffix_match:
return False
if any(ch in "@:;" for ch in cleaned):
return False
trailing = street_part[int(suffix_match.end()) :].strip()
trailing_tokens = [token for token in re.split(r"\s+", trailing) if token]
if len(trailing_tokens) > 3:
return False
if any(normalize_surface(token) in STREET_TRAILING_BLOCK_SURFACES for token in trailing_tokens):
return False
has_digit = any(ch.isdigit() for ch in street_part)
if has_digit and not re.match(r"^\s*\d{1,4}\b", street_part):
return False
title_tokens = [token for token in re.split(r"\s+", street_part) if token]
if not has_digit and not prefix_part:
context = text[max(0, int(start) - 24) : min(len(text), int(end) + 12)]
if not ADDRESS_CUE_RE.search(context):
return False
return has_digit or len(title_tokens) >= 2
return True
def spans_overlap(a: dict, b: dict) -> bool:
return int(a["start"]) < int(b["end"]) and int(b["start"]) < int(a["end"])
def is_name_token_char(ch: str) -> bool:
return ch.isalpha() or ch in {"-", "'", "’"}
def is_plausible_first_name(value: str) -> bool:
if not value:
return False
if any(ch.isspace() for ch in value):
return False
if any(ch.isdigit() for ch in value):
return False
if any(ch in ",;:/@()" for ch in value):
return False
if not any(ch.isalpha() for ch in value):
return False
first_alpha = next((ch for ch in value if ch.isalpha()), "")
if not first_alpha or not first_alpha.isupper():
return False
return all(is_name_token_char(ch) for ch in value)
def is_plausible_cued_first_name(value: str) -> bool:
if not value:
return False
if any(ch.isspace() for ch in value):
return False
if any(ch.isdigit() for ch in value):
return False
if any(ch in ",;:/@()" for ch in value):
return False
if not any(ch.isalpha() for ch in value):
return False
return all(is_name_token_char(ch) for ch in value)
def is_plausible_cued_last_name_sequence(value: str) -> bool:
tokens = [token for token in re.split(r"\s+", value.strip()) if token]
if not tokens:
return False
for token in tokens:
if not any(ch.isalpha() for ch in token):
return False
if not all(is_name_token_char(ch) for ch in token):
return False
alpha_chars = [ch for ch in token if ch.isalpha()]
first_alpha = alpha_chars[0] if alpha_chars else ""
if first_alpha.isupper() or first_alpha.islower():
continue
if normalize_surface(token) in NAME_PARTICLE_SURFACES:
continue
return False
return True
def extract_name_tokens_after_cue(text: str, cue_end: int, max_tokens: int = 4) -> list[tuple[int, int, str]]:
cursor = cue_end
while cursor < len(text) and text[cursor].isspace():
cursor += 1
tokens: list[tuple[int, int, str]] = []
while cursor < len(text):
saw_line_break = False
while cursor < len(text) and text[cursor].isspace():
if text[cursor] in "\r\n":
saw_line_break = True
cursor += 1
if saw_line_break and tokens:
break
if cursor >= len(text) or text[cursor] in ",.;:\n":
break
token_start = cursor
while cursor < len(text) and is_name_token_char(text[cursor]):
cursor += 1
if token_start == cursor:
break
token = text[token_start:cursor]
normalized = normalize_surface(token)
if tokens and normalized in NAME_CUE_STOP_SURFACES:
break
if not all(is_name_token_char(ch) for ch in token):
break
tokens.append((token_start, cursor, token))
if len(tokens) >= max_tokens:
break
if cursor < len(text) and text[cursor] in ",.;:\n":
break
if cursor < len(text) and not text[cursor].isspace():
break
return tokens
def repair_name_particle_surnames(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
surname_re = re.compile(
r"^[ \t]*((?:Ní|Ni|Ó|O|Nic|Mac|Mc)[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*)?)"
)
first_names = [span for span in repaired if span["label"] == "FIRST_NAME"]
for first_name in first_names:
tail = text[int(first_name["end"]) : min(len(text), int(first_name["end"]) + 40)]
match = surname_re.match(tail)
if not match:
continue
start = int(first_name["end"]) + int(match.start(1))
end = int(first_name["end"]) + int(match.end(1))
candidate = text[start:end]
if not is_plausible_cued_last_name_sequence(candidate):
continue
candidate_span = {
"start": start,
"end": end,
"label": "LAST_NAME",
"score": 0.66,
"text": candidate,
}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in {"FIRST_NAME", "LAST_NAME"}
)
]
repaired.append(candidate_span)
return repaired
def repair_first_name_from_last_name(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for last_name in [span for span in repaired if span["label"] == "LAST_NAME"]:
if any(
span["label"] == "FIRST_NAME"
and int(span["end"]) <= int(last_name["start"])
and int(last_name["start"]) - int(span["end"]) <= 2
for span in repaired
):
continue
cursor = int(last_name["start"]) - 1
if cursor < 0 or not text[cursor].isspace():
continue
while cursor >= 0 and text[cursor].isspace():
cursor -= 1
token_end = cursor + 1
while cursor >= 0 and is_name_token_char(text[cursor]):
cursor -= 1
token_start = cursor + 1
if token_end <= token_start:
continue
candidate = text[token_start:token_end]
if not is_plausible_first_name(candidate):
continue
candidate_span = {
"start": token_start,
"end": token_end,
"label": "FIRST_NAME",
"score": float(last_name.get("score", 0.5)) * 0.6,
"text": candidate,
}
if any(spans_overlap(candidate_span, other) for other in repaired if other["label"] == "FIRST_NAME"):
continue
repaired.append(candidate_span)
return repaired
def repair_contextual_name_cues(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in NAME_SELF_CUE_RE.finditer(text):
if any(
other["label"] == "FIRST_NAME"
and 0 <= int(other["start"]) - match.end() <= 4
for other in repaired
) and any(
other["label"] == "LAST_NAME"
and 0 <= int(other["start"]) - match.end() <= 16
for other in repaired
):
continue
cursor = match.end()
while cursor < len(text) and text[cursor].isspace():
cursor += 1
start = cursor
while cursor < len(text) and (is_name_token_char(text[cursor]) or text[cursor].isspace()):
cursor += 1
candidate = text[start:cursor].strip()
raw_tokens = [token for token in re.split(r"\s+", candidate) if token]
tokens: list[str] = []
for token in raw_tokens:
normalized = normalize_surface(token)
if tokens and normalized in LOWER_NAME_STOP_SURFACES:
break
if not all(is_name_token_char(ch) for ch in token):
break
tokens.append(token)
if len(tokens) >= 4:
break
if len(tokens) < 2:
continue
first_value = tokens[0]
last_value = " ".join(tokens[1:])
if not is_plausible_cued_first_name(first_value):
continue
if not is_plausible_cued_last_name_sequence(last_value):
continue
first_start = text.find(first_value, start, cursor)
if first_start < 0:
continue
first_end = first_start + len(first_value)
last_start = text.find(last_value, first_end, cursor)
if last_start < 0:
continue
last_end = last_start + len(last_value)
first_span = {
"start": first_start,
"end": first_end,
"label": "FIRST_NAME",
"score": 0.63,
"text": text[first_start:first_end],
}
last_span = {
"start": last_start,
"end": last_end,
"label": "LAST_NAME",
"score": 0.63,
"text": text[last_start:last_end],
}
repaired = [
other
for other in repaired
if not (
spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"}
) and not (
spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"}
)
]
repaired.extend([first_span, last_span])
return repaired
def repair_role_name_cues(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in NAME_ROLE_CUE_RE.finditer(text):
token_bounds = extract_name_tokens_after_cue(text, match.end())
if len(token_bounds) < 2:
continue
first_start, first_end, first_value = token_bounds[0]
last_start = token_bounds[1][0]
last_end = token_bounds[-1][1]
last_value = text[last_start:last_end]
if not is_plausible_cued_first_name(first_value):
continue
if not is_plausible_cued_last_name_sequence(last_value):
continue
first_span = {
"start": first_start,
"end": first_end,
"label": "FIRST_NAME",
"score": 0.63,
"text": text[first_start:first_end],
}
last_span = {
"start": last_start,
"end": last_end,
"label": "LAST_NAME",
"score": 0.63,
"text": text[last_start:last_end],
}
repaired = [
other
for other in repaired
if not (
spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"}
) and not (
spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"}
)
]
repaired.extend([first_span, last_span])
return repaired
def repair_surname_field_cues(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in SURNAME_CUE_RE.finditer(text):
token_bounds = extract_name_tokens_after_cue(text, match.end())
if not token_bounds:
continue
start = token_bounds[0][0]
end = token_bounds[-1][1]
candidate = text[start:end]
if not is_plausible_cued_last_name_sequence(candidate):
continue
last_span = {
"start": start,
"end": end,
"label": "LAST_NAME",
"score": 0.64,
"text": candidate,
}
repaired = [
other
for other in repaired
if not (
spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"}
)
]
repaired.append(last_span)
return repaired
def repair_name_before_structured_cues(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in NAME_BEFORE_STRUCTURED_CUE_RE.finditer(text):
token_bounds = extract_name_tokens_after_cue(text, match.start(1))
if len(token_bounds) < 2:
continue
first_start, first_end, first_value = token_bounds[0]
last_start = token_bounds[1][0]
last_end = token_bounds[-1][1]
last_value = text[last_start:last_end]
if not is_plausible_first_name(first_value):
continue
if not is_plausible_last_name_sequence(last_value):
continue
first_span = {
"start": first_start,
"end": first_end,
"label": "FIRST_NAME",
"score": 0.64,
"text": text[first_start:first_end],
}
last_span = {
"start": last_start,
"end": last_end,
"label": "LAST_NAME",
"score": 0.64,
"text": text[last_start:last_end],
}
repaired = [
other
for other in repaired
if not (
spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"}
) and not (
spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"}
)
]
repaired.extend([first_span, last_span])
return repaired
PASSPORT_CUE_RE = re.compile(
r"(?i)(passport(?:\s+number)?|phas|uimhir\s+(?:mo\s+)?phas)"
)
PASSPORT_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])([A-Z]{1,2}\s?\d{7})(?![A-Za-z0-9])")
EMAIL_EXTRACT_RE = re.compile(r"([^\s@,;:()<>]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,})", re.UNICODE)
PHONE_CUE_RE = re.compile(
r"(?i)\b(phone|call|contact|reach\s+me|glaoigh\s+ar|teagmh[aá]il|uimhir|m['’]uimhir|f[oó]n|fon|teileaf[oó]n|telefon)\b"
)
NAME_SELF_CUE_RE = re.compile(
r"(?i)\b(my\s+name\s+is|is\s+mise|is\s+[ée]\s+m['’]?ainm|is\s+[ée]\s+mo\s+ainm)\b"
)
NAME_ROLE_CUE_RE = re.compile(
r"(?i)(?:\b(?:applicant|customer|claimant|patient|an\s+t-iarratas[oó]ir|iarratas[oó]ir)\b\s*[:,]\s*|\b(?:full\s+name|name|ainm(?!\s+teaghlaigh))\b\s*:\s*)"
)
SURNAME_CUE_RE = re.compile(
r"(?i)\b(?:my\s+)?(?:surname|last\s+name|family\s+name|ainm\s+teaghlaigh|sloinne)\b(?:\s+is)?\s*[:,-]?\s*"
)
NAME_PARTICLE_SURNAME_RE = re.compile(r"(?i)(?:\bN[ií]\b|\bÓ\b|\bNic\b|\bMac\b|\bMc\b|\bO['’])")
NAME_BEFORE_STRUCTURED_CUE_RE = re.compile(
r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’.-]*(?:\s+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’.-]*){1,3})(?=\s*,\s*(?:PPSN|UPSP|DOB|Date\s+of\s+birth|D[áa]ta\s+breithe|Address|Seoladh|lives?\s+at|my\s+phone|phone|email|r-phost))"
)
LOWER_NAME_STOP_SURFACES = {
normalize_surface(value)
for value in {"and", "agus", "is", "ta", "tá", "my", "mo", "an", "the"}
}
NAME_CUE_STOP_SURFACES = {
normalize_surface(value)
for value in {
"and",
"agus",
"submitted",
"provided",
"gave",
"her",
"his",
"their",
"she",
"he",
"email",
"phone",
"fón",
"fon",
"ppsn",
"upsp",
"address",
"seoladh",
"dob",
"age",
"aois",
"bank",
"iban",
"swift",
"chuir",
"isteach",
"sí",
"si",
"a",
"huimhir",
}
}
ORG_NAME_TRAILING_SURFACES = {
normalize_surface(value)
for value in {"centre", "center", "clinic", "hospital", "office", "service", "section", "unit", "council"}
}
ORG_CITY_TAIL_RE = re.compile(
r"(?i)^\s+(?:intreo\s+centre|business\s+centre|community\s+centre|shopping\s+centre|retail\s+park|p[áa]irc\s+miond[ií]ola)\b"
)
ORG_NAME_PREFIX_RE = re.compile(
r"(?i)(?:retail\s+park|business\s+centre|community\s+centre|shopping\s+centre|p[áa]irc\s+miond[ií]ola)\s*$"
)
PUBLIC_CONTACT_DETAILS_RE = re.compile(r"(?i)\bpublic\s+contact\s+details\b")
CITY_CUE_RE = re.compile(
r"(?i)\b(address|seoladh|located|suite|centre|center|ionad|intreo|clinic|hospital|ospid[eé]al|hse|fss)\b"
)
BANK_ROUTING_CONTEXT_RE = re.compile(
r"(?i)\b(sort\s+code|routing\s+number|bank\s+of\s+ireland|aib|cod\s+sort[aá]la|sort[aá]la)\b"
)
PHONE_VALUE_RE = re.compile(
r"(?<![A-Za-z0-9])((?:\+353(?:\s*\((?:0)?\d{1,2}\))?[\s\-./]?|0)\d(?:[\s\-./]?\d){6,13}|\(\s*0\d{1,2}\s*\)(?:[\s\-./]?\d){6,10})(?![A-Za-z0-9])"
)
PPSN_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])(\d{7}(?:[\s-]*[A-Za-z]){1,2})(?![A-Za-z0-9])")
POSTCODE_VALUE_RE = re.compile(
r"(?<![A-Za-z0-9])((?:[A-Za-z]\d{2}|D6W)[\s\u00A0\u202F]?[A-Za-z][A-Za-z0-9]{3})(?![A-Za-z0-9])"
)
CITY_BEFORE_POSTCODE_RE = re.compile(
r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2})(?=\s*,\s*(?:(?:County|Contae|gContae|Co\.)\s+[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\s*,\s*)?(?:[A-Z]\d{2}|D6W))"
)
CITY_BEFORE_COUNTY_RE = re.compile(
r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,3})(?=\s*,\s*(?:County|Contae|gContae|Co\.)\s+[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\b)"
)
CITY_TOWN_SUFFIX_RE = re.compile(
r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\s+Town)(?=\s*,\s*(?:(?:County|Contae|gContae|Co\.)\s+[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\s*,\s*)?(?:[A-Z]\d{2}|D6W))"
)
CITY_FIELD_VALUE_RE = re.compile(
r"(?im)(?:^|[\n\r])\s*(?:city(?:/town)?|town|cathair|baile)\b\s*[:,-]?\s*([A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,3})"
)
INLINE_CITY_FIELD_VALUE_RE = re.compile(
r"(?i)\b(?:city(?:/town)?|cathair|baile)\b\s*[:,-]\s*([A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,3})"
)
COUNTY_FIELD_VALUE_RE = re.compile(
r"(?im)(?:^|[\n\r])\s*(?:county|co\.|contae|gcontae)\b\s*[:,-]?\s*((?:Co\.[ \t]+)?[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,2})"
)
INLINE_COUNTY_FIELD_VALUE_RE = re.compile(
r"(?i)\b(?:county|co\.|contae|gcontae)\b\s*[:,-]\s*((?:Co\.[ \t]+)?[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,2})"
)
COUNTY_VALUE_RE = re.compile(
r"(?<![A-Za-zÁÉÍÓÚáéíóú])((?:County|Contae|gContae|Co\.)(?:\s+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){1,3})(?![A-Za-zÁÉÍÓÚáéíóú])"
)
ADDRESS_BLOCK_VALUE_RE = re.compile(
rf"(?i)\b(?:{ADDRESS_FIELD_CUE_PATTERN}|live\s+at|lives\s+at|living\s+at|located\s+at|t[áa]\s+m[ée]\s+i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]im\s+i\s+mo\s+ch[oó]na[ií]\s+ag)\b\s*[:,-]?\s*([^,\n.]+)"
)
ADDRESS_LINE_PREFIX_VALUE_RE = re.compile(
r"(?im)^(?:address\s+line\s+1|seoladh\s+l[ií]ne\s+1)\s*:\s*((?:(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\s+[A-Za-z0-9-]+,\s+)?(?:[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,5}|Teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]+)?))\s*$"
)
ADDRESS_PLACE_VALUE_RE = re.compile(
rf"(?i)\b(?:{ADDRESS_FIELD_CUE_PATTERN}|live\s+at|lives\s+at|living\s+at|located\s+at|allocation\s+centre|intreo\s+centre)\b\s*[:,-]?\s*("
r"(?:(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\s+[A-Za-z0-9-]+,\s+)?"
r"(?:[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,4}\s+(?:business\s+centre|community\s+centre|shopping\s+centre|retail\s+park)|P[áa]irc\s+Miond[ií]ola(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*)?)"
r")"
)
ADDRESS_BUILDING_TAIL_RE = re.compile(
r"^\s*,\s*((?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:house|cottage|lodge|villa|apartments?|building|business\s+centre|community\s+centre|shopping\s+centre|retail\s+park)|Teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]+)?)",
flags=re.IGNORECASE,
)
LOCATION_FORM_EDGE = r"[A-Za-zÁÉÍÓÚáéíóú]"
_BOUNDED_FORM_PATTERNS = {
form: re.compile(
rf"(?<!{LOCATION_FORM_EDGE}){re.escape(form)}(?!{LOCATION_FORM_EDGE})",
flags=re.IGNORECASE,
)
for form in {*(IRISH_CITY_FORMS), *(IRISH_COUNTY_FORMS)}
}
def iter_bounded_form_matches(form: str, text: str):
pattern = _BOUNDED_FORM_PATTERNS.get(form)
if pattern is None:
pattern = re.compile(
rf"(?<!{LOCATION_FORM_EDGE}){re.escape(form)}(?!{LOCATION_FORM_EDGE})",
flags=re.IGNORECASE,
)
_BOUNDED_FORM_PATTERNS[form] = pattern
return pattern.finditer(text)
def repair_contextual_passport_numbers(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in PASSPORT_VALUE_RE.finditer(text):
start, end = match.span(1)
candidate_span = {
"start": start,
"end": end,
"label": "PASSPORT_NUMBER",
"score": 0.67,
"text": text[start:end],
}
if any(
other["label"] == "PASSPORT_NUMBER"
and int(other["start"]) <= start
and int(other["end"]) >= end
for other in repaired
):
continue
cue_window = text[max(0, start - 32) : start]
if not PASSPORT_CUE_RE.search(cue_window):
continue
conflicting_labels = {"PHONE_NUMBER", "PPSN", "ACCOUNT_NUMBER", "AGE", "PASSPORT_NUMBER"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_ppsn_variants(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in PPSN_VALUE_RE.finditer(text):
start, end = match.span(1)
value = text[start:end]
compact = alnum_upper(value)
if not (len(compact) in {8, 9} and compact[:7].isdigit() and compact[7:].isalpha()):
continue
cue_window = text[max(0, start - 32) : min(len(text), end + 24)]
has_cue = bool(PPSN_CUE_RE.search(cue_window))
candidate_span = {
"start": start,
"end": end,
"label": "PPSN",
"score": 0.72 if has_cue else 0.58,
"text": value,
}
conflicting_labels = {"PHONE_NUMBER", "PASSPORT_NUMBER", "ACCOUNT_NUMBER", "AGE", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels.union({"PPSN"})
)
]
repaired.append(candidate_span)
return repaired
def repair_contextual_date_of_birth(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in DATE_OF_BIRTH_VALUE_RE.finditer(text):
start, end = match.span(1)
cue_window = text[max(0, start - 96) : start]
if not (DOB_CONTEXT_RE.search(cue_window) or has_dob_suffix_context(text, end)):
continue
candidate_span = {
"start": start,
"end": end,
"label": "DATE_OF_BIRTH",
"score": 0.66,
"text": text[start:end],
}
conflicting_labels = {"DATE_OF_BIRTH", "PHONE_NUMBER", "AGE", "FIRST_NAME", "LAST_NAME", "ACCOUNT_NUMBER", "CITY"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_contextual_ages(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in AGE_VALUE_RE.finditer(text):
start, end = match.span(1)
if not is_reasonable_span_text("AGE", text, start, end):
continue
if any(
spans_overlap({"start": start, "end": end}, other)
and other["label"] == "DATE_OF_BIRTH"
for other in repaired
):
continue
candidate_span = {
"start": start,
"end": end,
"label": "AGE",
"score": 0.66,
"text": text[start:end],
}
conflicting_labels = {"AGE", "PHONE_NUMBER", "ACCOUNT_NUMBER"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
ACCOUNT_CUE_RE = re.compile(
r"(?i)(account\s+number|bank\s+account|uimhir\s+chuntais|cuntas\s+bainc)"
)
ACCOUNT_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])(\d{6,12})(?![A-Za-z0-9])")
def repair_contextual_account_numbers(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in ACCOUNT_VALUE_RE.finditer(text):
start, end = match.span(1)
candidate_span = {
"start": start,
"end": end,
"label": "ACCOUNT_NUMBER",
"score": 0.51,
"text": text[start:end],
}
if any(
other["label"] == "ACCOUNT_NUMBER"
and int(other["start"]) <= start
and int(other["end"]) >= end
for other in repaired
):
continue
cue_window = text[max(0, start - 40) : start]
if not ACCOUNT_CUE_RE.search(cue_window):
continue
if any(
spans_overlap(candidate_span, other)
and other["label"] in {"PHONE_NUMBER", "BANK_ROUTING_NUMBER", "PPSN", "POSTCODE", "PASSPORT_NUMBER"}
for other in repaired
):
continue
repaired.append(candidate_span)
return repaired
def repair_emails(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in EMAIL_EXTRACT_RE.finditer(text):
start, end = match.span(1)
candidate_span = {
"start": start,
"end": end,
"label": "EMAIL",
"score": 0.74,
"text": text[start:end],
}
conflicting_labels = {"EMAIL", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_phone_numbers(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in PHONE_VALUE_RE.finditer(text):
start, end = match.span(1)
candidate_span = {
"start": start,
"end": end,
"label": "PHONE_NUMBER",
"score": 0.69,
"text": text[start:end],
}
cue_window = text[max(0, start - 32) : min(len(text), end + 16)]
has_cue = bool(PHONE_CUE_RE.search(cue_window))
has_overlap = any(spans_overlap(candidate_span, other) and other["label"] == "PHONE_NUMBER" for other in repaired)
if not (has_cue or has_overlap):
continue
if not is_reasonable_span_text("PHONE_NUMBER", text, start, end):
continue
conflicting_labels = {"PHONE_NUMBER", "PPSN", "ACCOUNT_NUMBER", "BANK_ROUTING_NUMBER", "CREDIT_DEBIT_CARD"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_postcodes(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in POSTCODE_VALUE_RE.finditer(text):
start, end = match.span(1)
candidate_span = {
"start": start,
"end": end,
"label": "POSTCODE",
"score": 0.71,
"text": text[start:end],
}
conflicting_labels = {"POSTCODE", "PHONE_NUMBER", "ACCOUNT_NUMBER", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_city_spans(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
seen: set[tuple[int, int]] = set()
ordered_forms = sorted(IRISH_CITY_FORMS, key=len, reverse=True)
for form in ordered_forms:
for match in iter_bounded_form_matches(form, text):
start, end = match.span()
prefix = text[max(0, start - 20) : start]
if CITY_COUNTY_PREFIX_RE.search(prefix):
continue
key = (start, end)
if key in seen:
continue
seen.add(key)
candidate_span = {
"start": start,
"end": end,
"label": "CITY",
"score": 0.64,
"text": text[start:end],
}
has_context = False
for other in repaired:
other_start = int(other["start"])
other_end = int(other["end"])
if other["label"] == "STREET_ADDRESS" and 0 <= start - other_end <= 4:
has_context = True
break
if other["label"] in {"COUNTY", "POSTCODE"} and 0 <= other_start - end <= 6:
has_context = True
break
if not has_context and re.match(r"^\s*,\s*(?:Co\.\s+|[A-Z]\d{2}|D6W)", text[end:]):
has_context = True
if not has_context:
cue_window = text[max(0, start - 40) : min(len(text), end + 32)]
has_context = bool(CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window))
if not has_context:
continue
conflicting_labels = {"CITY", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_city_before_postcode(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in CITY_BEFORE_POSTCODE_RE.finditer(text):
start, end = match.span(1)
value = text[start:end]
if any(ch.isdigit() for ch in value):
continue
prefix = text[max(0, start - 20) : start]
if CITY_COUNTY_PREFIX_RE.search(prefix):
continue
has_context = False
for other in repaired:
other_start = int(other["start"])
other_end = int(other["end"])
if other["label"] == "STREET_ADDRESS" and 0 <= start - other_end <= 4:
has_context = True
break
if other["label"] == "POSTCODE" and 0 <= other_start - end <= 6:
has_context = True
break
if not has_context:
cue_window = text[max(0, start - 40) : min(len(text), end + 24)]
has_context = bool(CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window))
if not has_context:
continue
candidate_span = {
"start": start,
"end": end,
"label": "CITY",
"score": 0.63,
"text": value,
}
if any(
other["label"] == "CITY"
and spans_overlap(candidate_span, other)
and (int(other["end"]) - int(other["start"])) >= (end - start)
for other in repaired
):
continue
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in {"CITY", "FIRST_NAME", "LAST_NAME"}
)
]
repaired.append(candidate_span)
return repaired
def repair_city_before_county(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for regex in (CITY_TOWN_SUFFIX_RE, CITY_BEFORE_COUNTY_RE):
for match in regex.finditer(text):
start, end = match.span(1)
value = text[start:end]
if any(ch.isdigit() for ch in value):
continue
prefix = text[max(0, start - 20) : start]
if CITY_COUNTY_PREFIX_RE.search(prefix):
continue
cue_window = text[max(0, start - 40) : min(len(text), end + 24)]
if not ((CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window)) or re.search(r"^\s*,\s*(?:County|Contae|gContae|Co\.)\b", text[end:])):
continue
candidate_span = {
"start": start,
"end": end,
"label": "CITY",
"score": 0.64,
"text": value,
}
if any(
other["label"] == "CITY"
and spans_overlap(candidate_span, other)
and (int(other["end"]) - int(other["start"])) >= (end - start)
for other in repaired
):
continue
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in {"CITY", "FIRST_NAME", "LAST_NAME"}
)
]
repaired.append(candidate_span)
return repaired
def repair_city_field_cues(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
seen: set[tuple[int, int]] = set()
for regex in (CITY_FIELD_VALUE_RE, INLINE_CITY_FIELD_VALUE_RE):
for match in regex.finditer(text):
start, end = match.span(1)
if (start, end) in seen:
continue
seen.add((start, end))
candidate_span = {
"start": start,
"end": end,
"label": "CITY",
"score": 0.65,
"text": text[start:end],
}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in {"CITY", "COUNTY", "FIRST_NAME", "LAST_NAME"}
)
]
repaired.append(candidate_span)
return repaired
def repair_prefixed_city_spans(text: str, spans: list[dict]) -> list[dict]:
repaired: list[dict] = []
for span in spans:
if span["label"] != "CITY":
repaired.append(span)
continue
start = int(span["start"])
end = int(span["end"])
if start <= 0:
repaired.append(span)
continue
prefix_char = text[start - 1]
if prefix_char.lower() not in IRISH_CITY_PREFIX_CHARS:
repaired.append(span)
continue
if start - 1 > 0 and text[start - 2].isalpha():
repaired.append(span)
continue
if normalize_surface(span.get("text", "")) not in IRISH_CITY_SURFACES:
repaired.append(span)
continue
candidate_start = start - 1
candidate_text = text[candidate_start:end]
repaired.append(
{
**span,
"start": candidate_start,
"text": candidate_text,
"score": max(float(span.get("score", 0.0)), 0.66),
}
)
return repaired
def prefer_long_city_spans(spans: list[dict]) -> list[dict]:
if not spans:
return spans
keep: list[dict] = []
for span in spans:
if span["label"] != "CITY":
keep.append(span)
continue
shadowed = False
for other in spans:
if other is span or other["label"] != "CITY":
continue
if int(other["start"]) <= int(span["start"]) and int(other["end"]) >= int(span["end"]):
if (int(other["start"]), int(other["end"])) != (int(span["start"]), int(span["end"])):
if float(other.get("score", 0.0)) >= max(0.6, float(span.get("score", 0.0)) * 0.6):
shadowed = True
break
if not shadowed:
keep.append(span)
return keep
def repair_county_field_cues(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
seen: set[tuple[int, int]] = set()
for regex in (COUNTY_FIELD_VALUE_RE, INLINE_COUNTY_FIELD_VALUE_RE):
for match in regex.finditer(text):
start, end = match.span(1)
if (start, end) in seen:
continue
seen.add((start, end))
candidate_span = {
"start": start,
"end": end,
"label": "COUNTY",
"score": 0.66,
"text": text[start:end],
}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"}
)
]
repaired.append(candidate_span)
return repaired
def repair_county_spans(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
seen: set[tuple[int, int]] = set()
ordered_forms = sorted(IRISH_COUNTY_FORMS, key=len, reverse=True)
for form in ordered_forms:
for match in iter_bounded_form_matches(form, text):
start, end = match.span()
key = (start, end)
if key in seen:
continue
seen.add(key)
candidate_span = {
"start": start,
"end": end,
"label": "COUNTY",
"score": 0.74,
"text": text[start:end],
}
if not is_reasonable_span_text("COUNTY", text, start, end):
continue
overlapping_counties = [
other
for other in repaired
if spans_overlap(candidate_span, other) and other["label"] == "COUNTY"
]
if overlapping_counties:
candidate_span["score"] = max(
float(candidate_span["score"]),
max(float(other.get("score", 0.0)) for other in overlapping_counties),
)
conflicting_labels = {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
for match in COUNTY_VALUE_RE.finditer(text):
start, end = match.span(1)
key = (start, end)
if key in seen:
continue
context = text[max(0, start - 40) : min(len(text), end + 24)]
has_context = bool(ADDRESS_CUE_RE.search(context) or POSTCODE_VALUE_RE.search(context))
if not has_context:
for other in repaired:
other_start = int(other["start"])
other_end = int(other["end"])
if other["label"] in {"STREET_ADDRESS", "CITY", "POSTCODE"} and (
abs(other_start - end) <= 24 or abs(start - other_end) <= 24
):
has_context = True
break
if not has_context:
continue
seen.add(key)
candidate_span = {
"start": start,
"end": end,
"label": "COUNTY",
"score": 0.74,
"text": text[start:end],
}
if not is_reasonable_span_text("COUNTY", text, start, end):
continue
conflicting_labels = {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_street_addresses(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
seen: set[tuple[int, int]] = set()
for match in STREET_ADDRESS_VALUE_RE.finditer(text):
start, end = match.span(1)
key = (start, end)
if key in seen:
continue
seen.add(key)
candidate_span = {
"start": start,
"end": end,
"label": "STREET_ADDRESS",
"score": 0.65,
"text": text[start:end],
}
if not is_reasonable_span_text("STREET_ADDRESS", text, start, end):
continue
context = text[max(0, start - 32) : min(len(text), end + 24)]
has_context = bool(ADDRESS_CUE_RE.search(context))
if not has_context:
for other in repaired:
other_start = int(other["start"])
other_end = int(other["end"])
if other["label"] in {"CITY", "COUNTY", "POSTCODE"} and 0 <= other_start - end <= 16:
has_context = True
break
if other["label"] in {"FIRST_NAME", "LAST_NAME"} and 0 <= start - other_end <= 24:
has_context = True
break
if not has_context:
continue
conflicting_labels = {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_contextual_address_blocks(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in ADDRESS_BLOCK_VALUE_RE.finditer(text):
start, end = match.span(1)
value = text[start:end].strip()
if not value:
continue
candidate_span = {
"start": start,
"end": end,
"label": "STREET_ADDRESS",
"score": 0.68,
"text": text[start:end],
}
tokens = [token for token in re.split(r"\s+", value) if token]
if len(tokens) < 2:
continue
has_digit = any(ch.isdigit() for ch in value)
has_prefix = bool(ADDRESS_UNIT_PREFIX_RE.match(value) or HOUSE_NAME_PREFIX_RE.match(value))
has_street_suffix = bool(STREET_SUFFIX_RE.search(value))
cue_window = text[max(0, start - 40) : start]
has_address_line_cue = bool(ADDRESS_LINE_CUE_RE.search(cue_window))
if not (has_digit or has_prefix):
continue
if not has_street_suffix:
tail_window = text[end : min(len(text), end + 48)]
has_following_address_context = bool(
re.match(
r"^\s*,\s*((?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana))",
tail_window,
flags=re.IGNORECASE,
)
or re.search(r"(?:County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W))", tail_window)
)
building_tail_match = ADDRESS_BUILDING_TAIL_RE.match(tail_window)
has_following_building_context = False
if building_tail_match:
remaining_tail = tail_window[int(building_tail_match.end(1)) :]
has_following_building_context = bool(
re.match(
r"^\s*(?:$|,\s*(?:(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)|County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W)|[A-ZÁÉÍÓÚ]))",
remaining_tail,
flags=re.IGNORECASE,
)
)
looks_like_suffixless_address = has_digit and len(tokens) >= 3
if not ((has_prefix and (has_address_line_cue or has_following_building_context)) or ((has_prefix or looks_like_suffixless_address) and has_following_address_context)):
continue
if any(
other["label"] == "STREET_ADDRESS"
and int(other["start"]) <= start
and int(other["end"]) >= end
and (int(other["start"]), int(other["end"])) != (start, end)
for other in repaired
):
continue
conflicting_labels = {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in conflicting_labels
)
]
repaired.append(candidate_span)
return repaired
def repair_address_line_prefix_spans(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in ADDRESS_LINE_PREFIX_VALUE_RE.finditer(text):
start, end = match.span(1)
candidate = text[start:end].strip()
if not candidate:
continue
if not (ADDRESS_UNIT_PREFIX_RE.match(candidate) or HOUSE_NAME_PREFIX_RE.match(candidate)):
continue
candidate_span = {
"start": start,
"end": end,
"label": "STREET_ADDRESS",
"score": 0.67,
"text": text[start:end],
}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"}
)
]
repaired.append(candidate_span)
return repaired
def extend_prefixed_street_address_spans(text: str, spans: list[dict]) -> list[dict]:
repaired: list[dict] = []
tail_re = re.compile(
r"^\s*,\s*((?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)(?:\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*){0,2})",
flags=re.IGNORECASE,
)
building_follow_context_re = re.compile(
r"^\s*(?:$|[.;]|,\s*(?:(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)|County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W)|[A-ZÁÉÍÓÚ]))",
flags=re.IGNORECASE,
)
for span in spans:
if span["label"] != "STREET_ADDRESS":
repaired.append(span)
continue
value = span.get("text", "").strip()
if not value or not (ADDRESS_UNIT_PREFIX_RE.match(value) or HOUSE_NAME_PREFIX_RE.match(value)):
repaired.append(span)
continue
tail_match = tail_re.match(text[int(span["end"]) :])
if tail_match:
extended_end = int(span["end"]) + int(tail_match.end(1))
repaired.append(
{
**span,
"end": extended_end,
"text": text[int(span["start"]) : extended_end],
"score": max(float(span.get("score", 0.0)), 0.67),
}
)
continue
building_tail_match = ADDRESS_BUILDING_TAIL_RE.match(text[int(span["end"]) :])
if not building_tail_match:
repaired.append(span)
continue
remaining_tail = text[int(span["end"]) + int(building_tail_match.end(1)) :]
extended_end = int(span["end"]) + int(building_tail_match.end(1))
repaired.append(
{
**span,
"end": extended_end,
"text": text[int(span["start"]) : extended_end],
"score": max(float(span.get("score", 0.0)), 0.67),
}
)
return repaired
def merge_adjacent_street_address_spans(text: str, spans: list[dict]) -> list[dict]:
if not spans:
return spans
ordered = sorted(spans, key=lambda item: (int(item["start"]), int(item["end"]), item["label"]))
merged: list[dict] = []
index = 0
while index < len(ordered):
span = ordered[index]
if span["label"] != "STREET_ADDRESS":
merged.append(span)
index += 1
continue
current = dict(span)
lookahead = index + 1
while lookahead < len(ordered):
other = ordered[lookahead]
if other["label"] != "STREET_ADDRESS":
break
gap = text[int(current["end"]) : int(other["start"])]
if not re.fullmatch(r"\s*,\s*", gap):
break
candidate_end = int(other["end"])
merged_value = text[int(current["start"]) : candidate_end]
if not (
any(ch.isdigit() for ch in merged_value)
and (
ADDRESS_UNIT_PREFIX_RE.match(merged_value)
or STREET_SUFFIX_RE.search(merged_value)
or BUILDING_SUFFIX_RE.search(merged_value)
)
):
break
current["end"] = candidate_end
current["text"] = text[int(current["start"]) : candidate_end]
current["score"] = max(float(current.get("score", 0.0)), float(other.get("score", 0.0)))
lookahead += 1
merged.append(current)
index = lookahead
return merged
def repair_contextual_address_place_blocks(text: str, spans: list[dict]) -> list[dict]:
repaired = list(spans)
for match in ADDRESS_PLACE_VALUE_RE.finditer(text):
start, end = match.span(1)
candidate_span = {
"start": start,
"end": end,
"label": "STREET_ADDRESS",
"score": 0.69,
"text": text[start:end],
}
repaired = [
other
for other in repaired
if not (
spans_overlap(candidate_span, other)
and other["label"] in {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME", "CITY"}
)
]
repaired.append(candidate_span)
return repaired
def prefer_long_street_address_spans(spans: list[dict]) -> list[dict]:
if not spans:
return spans
keep: list[dict] = []
for span in spans:
if span["label"] != "STREET_ADDRESS":
keep.append(span)
continue
shadowed = False
for other in spans:
if other is span or other["label"] != "STREET_ADDRESS":
continue
if int(other["start"]) <= int(span["start"]) and int(other["end"]) >= int(span["end"]):
if (int(other["start"]), int(other["end"])) != (int(span["start"]), int(span["end"])):
shadowed = True
break
if not shadowed:
keep.append(span)
return keep
def drop_contextual_fragment_spans(spans: list[dict]) -> list[dict]:
if not spans:
return spans
suppressors = {"COUNTY", "CITY", "POSTCODE", "STREET_ADDRESS", "DATE_OF_BIRTH"}
keep: list[dict] = []
for span in spans:
if span["label"] in {"FIRST_NAME", "LAST_NAME"}:
if any(
other is not span
and other["label"] in suppressors
and spans_overlap(span, other)
and (int(other["end"]) - int(other["start"])) >= (int(span["end"]) - int(span["start"]))
for other in spans
):
continue
if span["label"] == "CITY":
if any(
other is not span
and other["label"] == "COUNTY"
and spans_overlap(span, other)
and (int(other["end"]) - int(other["start"])) >= (int(span["end"]) - int(span["start"]))
for other in spans
):
continue
keep.append(span)
return keep
def drop_public_contact_detail_spans(text: str, spans: list[dict]) -> list[dict]:
if not PUBLIC_CONTACT_DETAILS_RE.search(text):
return spans
keep: list[dict] = []
for span in spans:
if span["label"] not in {"STREET_ADDRESS", "CITY", "COUNTY"}:
keep.append(span)
continue
keep.append(span)
return [
span for span in keep
if not (span["label"] in {"STREET_ADDRESS", "CITY", "COUNTY"} and PUBLIC_CONTACT_DETAILS_RE.search(text))
]
def drop_org_like_name_spans(text: str, spans: list[dict]) -> list[dict]:
keep: list[dict] = []
for span in spans:
if span["label"] not in {"FIRST_NAME", "LAST_NAME"}:
keep.append(span)
continue
tokens = [normalize_surface(token) for token in re.split(r"\s+", span.get("text", "").strip()) if token]
if tokens and tokens[-1] in ORG_NAME_TRAILING_SURFACES:
continue
prefix = text[max(0, int(span["start"]) - 32) : int(span["start"])]
if ORG_NAME_PREFIX_RE.search(prefix):
continue
keep.append(span)
return keep
def drop_city_org_prefix_spans(text: str, spans: list[dict]) -> list[dict]:
keep: list[dict] = []
for span in spans:
if span["label"] != "CITY":
keep.append(span)
continue
tail = text[int(span["end"]) : min(len(text), int(span["end"]) + 24)]
if ORG_CITY_TAIL_RE.match(tail):
continue
keep.append(span)
return keep
def canonicalize_location_spans(text: str, spans: list[dict]) -> list[dict]:
repaired: list[dict] = []
for span in spans:
if span["label"] not in {"CITY", "COUNTY"}:
repaired.append(span)
continue
start = int(span["start"])
end = int(span["end"])
if span["label"] == "CITY" and start > 0:
prefix_char = text[start - 1]
if prefix_char.lower() in IRISH_CITY_PREFIX_CHARS and not (start - 1 > 0 and text[start - 2].isalpha()):
candidate_start = start - 1
candidate_text = text[candidate_start:end]
if normalize_surface(candidate_text) in IRISH_CITY_SURFACES:
start = candidate_start
while end > start and text[end - 1] in ".,;:":
end -= 1
repaired.append(
{
**span,
"start": start,
"end": end,
"text": text[start:end],
}
)
return repaired
def canonicalize_street_address_spans(text: str, spans: list[dict]) -> list[dict]:
repaired: list[dict] = []
for span in spans:
if span["label"] != "STREET_ADDRESS":
repaired.append(span)
continue
start = int(span["start"])
end = int(span["end"])
window_start = max(0, start - 48)
window_end = min(len(text), end + 48)
window = text[window_start:window_end]
best_match = None
best_key = None
for match in STREET_ADDRESS_VALUE_RE.finditer(window):
candidate_start = window_start + int(match.start(1))
candidate_end = window_start + int(match.end(1))
if candidate_end <= start or candidate_start >= end:
continue
candidate_text = text[candidate_start:candidate_end]
if not is_reasonable_span_text("STREET_ADDRESS", text, candidate_start, candidate_end):
continue
key = (
candidate_end - candidate_start,
-(abs(candidate_start - start) + abs(candidate_end - end)),
)
if best_key is None or key > best_key:
best_key = key
best_match = (candidate_start, candidate_end, candidate_text)
if best_match is not None:
start, end, _ = best_match
for newline_char in ("\n", "\r"):
newline_pos = text.find(newline_char, start, end)
if newline_pos != -1:
end = newline_pos
break
while end > start and text[end - 1] in ".,;:":
end -= 1
repaired.append(
{
**span,
"start": start,
"end": end,
"text": text[start:end],
}
)
return repaired
def canonicalize_email_spans(text: str, spans: list[dict]) -> list[dict]:
repaired: list[dict] = []
for span in spans:
if span["label"] != "EMAIL":
repaired.append(span)
continue
segment = text[int(span["start"]) : int(span["end"])]
match = EMAIL_EXTRACT_RE.search(segment)
if not match:
repaired.append(span)
continue
start = int(span["start"]) + int(match.start(1))
end = int(span["start"]) + int(match.end(1))
repaired.append(
{
**span,
"start": start,
"end": end,
"text": text[start:end],
}
)
return repaired
def drop_stacked_first_names(spans: list[dict]) -> list[dict]:
if not spans:
return spans
first_names = [span for span in spans if span["label"] == "FIRST_NAME"]
last_names = [span for span in spans if span["label"] == "LAST_NAME"]
if not first_names or not last_names:
return spans
keep: list[dict] = []
for span in spans:
if span["label"] != "FIRST_NAME":
keep.append(span)
continue
shadowed = False
for other in first_names:
if other is span:
continue
if int(other["start"]) <= int(span["start"]):
continue
if int(other["start"]) - int(span["end"]) > 2:
continue
if not any(
int(last["start"]) >= int(other["end"]) and int(last["start"]) - int(other["end"]) <= 2
for last in last_names
):
continue
shadowed = True
break
if not shadowed:
keep.append(span)
return keep
def decode_span_matrix(
text: str,
offsets: list[tuple[int, int]],
span_scores: np.ndarray,
config,
min_score: float,
) -> list[dict]:
label_names = label_names_from_config(config)
thresholds = label_thresholds_from_config(config, min_score)
max_span_tokens = label_max_span_tokens_from_config(config)
min_nonspace_chars = label_min_nonspace_chars_from_config(config)
if span_scores.ndim != 3:
raise ValueError(f"Expected [num_labels, seq_len, seq_len] span scores, got shape {span_scores.shape}")
num_labels, seq_len, _ = span_scores.shape
valid = np.array([valid_offset(offset) for offset in offsets[:seq_len]], dtype=bool)
start_chars = np.array([int(offset[0]) if valid[index] else -1 for index, offset in enumerate(offsets[:seq_len])], dtype=np.int32)
end_chars = np.array([int(offset[1]) if valid[index] else -1 for index, offset in enumerate(offsets[:seq_len])], dtype=np.int32)
nonspace_prefix = [0]
for ch in text:
nonspace_prefix.append(nonspace_prefix[-1] + (0 if ch.isspace() else 1))
spans: list[dict] = []
for label_index in range(min(num_labels, len(label_names))):
label = label_names[label_index]
threshold = thresholds.get(label, min_score)
max_width = max(1, int(max_span_tokens.get(label, 8)))
min_chars = max(1, int(min_nonspace_chars.get(label, 1)))
label_scores = span_scores[label_index, :seq_len, :seq_len]
start_indices, end_indices = np.where(label_scores >= threshold)
if start_indices.size == 0:
continue
width_mask = (end_indices >= start_indices) & ((end_indices - start_indices) < max_width)
if not np.any(width_mask):
continue
start_indices = start_indices[width_mask]
end_indices = end_indices[width_mask]
valid_mask = valid[start_indices] & valid[end_indices]
if not np.any(valid_mask):
continue
start_indices = start_indices[valid_mask]
end_indices = end_indices[valid_mask]
for start_idx, end_idx in zip(start_indices.tolist(), end_indices.tolist()):
start_char = int(start_chars[start_idx])
end_char = int(end_chars[end_idx])
if end_char <= start_char:
continue
if (nonspace_prefix[end_char] - nonspace_prefix[start_char]) < min_chars:
continue
if not is_reasonable_span_text(label, text, start_char, end_char):
continue
spans.append(
{
"start": start_char,
"end": end_char,
"label": label,
"score": float(label_scores[start_idx, end_idx]),
"text": text[start_char:end_char],
}
)
spans = prefer_long_name_spans(spans, thresholds)
spans = prefer_long_structured_spans(spans, thresholds)
spans = repair_first_name_from_last_name(text, spans)
text_lower = text.lower()
has_digit = any(ch.isdigit() for ch in text)
has_alpha = any(ch.isalpha() for ch in text)
has_email_hint = "@" in text
has_address_hint = bool(
ADDRESS_CUE_RE.search(text)
or ADDRESS_LINE_CUE_RE.search(text)
or STREET_SUFFIX_RE.search(text)
or HOUSE_NAME_PREFIX_RE.search(text)
or ADDRESS_UNIT_PREFIX_RE.search(text)
)
has_city_hint = bool(
has_address_hint
or POSTCODE_VALUE_RE.search(text)
or "city:" in text_lower
or "city " in text_lower
or "town:" in text_lower
or "town " in text_lower
or "city/town" in text_lower
or "cathair" in text_lower
or "baile" in text_lower
or "county" in text_lower
or "contae" in text_lower
or "co." in text_lower
)
if has_email_hint:
spans = repair_emails(text, spans)
if has_digit:
spans = repair_phone_numbers(text, spans)
spans = repair_ppsn_variants(text, spans)
spans = repair_postcodes(text, spans)
if has_address_hint:
spans = repair_street_addresses(text, spans)
if NAME_SELF_CUE_RE.search(text):
spans = repair_contextual_name_cues(text, spans)
if NAME_ROLE_CUE_RE.search(text):
spans = repair_role_name_cues(text, spans)
if SURNAME_CUE_RE.search(text):
spans = repair_surname_field_cues(text, spans)
if NAME_BEFORE_STRUCTURED_CUE_RE.search(text):
spans = repair_name_before_structured_cues(text, spans)
if any(span["label"] in {"FIRST_NAME", "LAST_NAME"} for span in spans) and NAME_PARTICLE_SURNAME_RE.search(text):
spans = repair_name_particle_surnames(text, spans)
if has_address_hint:
spans = repair_contextual_address_blocks(text, spans)
spans = repair_address_line_prefix_spans(text, spans)
spans = extend_prefixed_street_address_spans(text, spans)
spans = repair_contextual_address_place_blocks(text, spans)
spans = merge_adjacent_street_address_spans(text, spans)
spans = prefer_long_street_address_spans(spans)
if has_city_hint:
spans = repair_county_spans(text, spans)
spans = repair_city_spans(text, spans)
spans = repair_city_before_postcode(text, spans)
spans = repair_city_before_county(text, spans)
spans = repair_city_field_cues(text, spans)
spans = repair_prefixed_city_spans(text, spans)
spans = prefer_long_city_spans(spans)
spans = repair_county_field_cues(text, spans)
if has_digit and DOB_CONTEXT_RE.search(text):
spans = repair_contextual_date_of_birth(text, spans)
if has_digit and AGE_CONTEXT_RE.search(text):
spans = repair_contextual_ages(text, spans)
if has_digit and PASSPORT_CUE_RE.search(text):
spans = repair_contextual_passport_numbers(text, spans)
if has_digit and ("iban" in text_lower or "account" in text_lower or "bank" in text_lower or "cuntas" in text_lower):
spans = repair_contextual_account_numbers(text, spans)
if "public contact details" in text_lower:
spans = drop_public_contact_detail_spans(text, spans)
if any(span["label"] in {"FIRST_NAME", "LAST_NAME"} for span in spans):
spans = drop_org_like_name_spans(text, spans)
spans = drop_stacked_first_names(spans)
if any(span["label"] == "CITY" for span in spans):
spans = drop_city_org_prefix_spans(text, spans)
if any(span["label"] in {"FIRST_NAME", "LAST_NAME", "CITY"} for span in spans):
spans = drop_contextual_fragment_spans(spans)
if any(span["label"] in {"CITY", "COUNTY"} for span in spans):
spans = canonicalize_location_spans(text, spans)
if any(span["label"] == "STREET_ADDRESS" for span in spans):
spans = canonicalize_street_address_spans(text, spans)
if any(span["label"] == "EMAIL" for span in spans):
spans = canonicalize_email_spans(text, spans)
return dedupe_spans(spans)
def prefer_long_name_spans(spans: list[dict], thresholds: dict[str, float]) -> list[dict]:
if not spans:
return spans
preferred: list[dict] = []
consumed: set[int] = set()
for index, span in enumerate(spans):
if index in consumed:
continue
label = span["label"]
if label not in {"FIRST_NAME", "LAST_NAME"}:
preferred.append(span)
continue
same_start = [
(other_index, other)
for other_index, other in enumerate(spans)
if other_index not in consumed and other["label"] == label and other["start"] == span["start"]
]
if len(same_start) == 1:
preferred.append(span)
continue
for other_index, _ in same_start:
consumed.add(other_index)
best_by_score = max(same_start, key=lambda item: float(item[1].get("score", 0.0)))[1]
longest = max(same_start, key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))))[1]
threshold = float(thresholds.get(label, 0.5))
if float(longest.get("score", 0.0)) >= max(threshold + 0.15, float(best_by_score.get("score", 0.0)) * 0.7):
preferred.append(longest)
else:
preferred.append(best_by_score)
return prefer_same_end_extensions(preferred, thresholds)
def prefer_same_end_extensions(spans: list[dict], thresholds: dict[str, float]) -> list[dict]:
if not spans:
return spans
preferred: list[dict] = []
consumed: set[int] = set()
for index, span in enumerate(spans):
if index in consumed:
continue
label = span["label"]
if label not in {"FIRST_NAME", "LAST_NAME", "EMAIL"}:
preferred.append(span)
continue
same_end = [
(other_index, other)
for other_index, other in enumerate(spans)
if other_index not in consumed and other["label"] == label and other["end"] == span["end"]
]
if len(same_end) == 1:
preferred.append(span)
continue
for other_index, _ in same_end:
consumed.add(other_index)
best_by_score = max(same_end, key=lambda item: float(item[1].get("score", 0.0)))[1]
longest = max(same_end, key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))))[1]
longest_score = float(longest.get("score", 0.0))
best_score = float(best_by_score.get("score", 0.0))
if label == "EMAIL":
if "@" in longest.get("text", "") or longest["end"] - longest["start"] > best_by_score["end"] - best_by_score["start"]:
if longest_score >= best_score - 0.02:
preferred.append(longest)
continue
else:
longest_text = longest.get("text", "")
if " " not in longest_text.strip() and longest_score >= max(float(thresholds.get(label, 0.5)) * 0.8, best_score * 0.55):
preferred.append(longest)
continue
preferred.append(best_by_score)
return preferred
def prefer_long_structured_spans(spans: list[dict], thresholds: dict[str, float]) -> list[dict]:
if not spans:
return spans
preferred: list[dict] = []
consumed: set[int] = set()
target_labels = {"STREET_ADDRESS", "DATE_OF_BIRTH"}
for index, span in enumerate(spans):
if index in consumed:
continue
label = span["label"]
if label not in target_labels:
preferred.append(span)
continue
overlapping = [
(other_index, other)
for other_index, other in enumerate(spans)
if other_index not in consumed and other["label"] == label and spans_overlap(span, other)
]
if len(overlapping) == 1:
preferred.append(span)
continue
for other_index, _ in overlapping:
consumed.add(other_index)
best_by_score = max(overlapping, key=lambda item: float(item[1].get("score", 0.0)))[1]
longest = max(
overlapping,
key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))),
)[1]
longest_score = float(longest.get("score", 0.0))
best_score = float(best_by_score.get("score", 0.0))
threshold = float(thresholds.get(label, 0.5))
if longest_score >= max(threshold, best_score * 0.75):
preferred.append(longest)
else:
preferred.append(best_by_score)
return preferred
def sigmoid_np(values: np.ndarray) -> np.ndarray:
clipped = np.clip(values, -60.0, 60.0)
return 1.0 / (1.0 + np.exp(-clipped))
def run_onnx_span(session, encoded: dict[str, Any]) -> np.ndarray:
feed = {}
input_names = {item.name for item in session.get_inputs()}
for key, value in encoded.items():
if key == "offset_mapping":
continue
if key in input_names:
feed[key] = value
outputs = session.run(None, feed)
if not outputs:
raise ValueError("ONNX session returned no outputs")
return outputs[0]