#!/usr/bin/env python3 from __future__ import annotations import math import re from functools import lru_cache from pathlib import Path import sys from typing import Any import unicodedata import numpy as np ROOT_DIR = Path(__file__).resolve().parents[2] if str(ROOT_DIR) not in sys.path: sys.path.insert(0, str(ROOT_DIR)) from base_common import ( dedupe_spans, label_max_span_tokens_from_config, label_min_nonspace_chars_from_config, label_names_from_config, load_onnx_session, normalize_entity_name, safe_auto_tokenizer, ) def label_thresholds_from_config(config, default_threshold: float) -> dict[str, float]: raw = getattr(config, "span_label_thresholds", None) or {} out = {normalize_entity_name(key): float(value) for key, value in raw.items()} for label in label_names_from_config(config): out.setdefault(label, float(default_threshold)) return out def valid_offset(offset: tuple[int, int]) -> bool: return bool(offset) and int(offset[1]) > int(offset[0]) def nonspace_length(text: str, start: int, end: int) -> int: return sum(0 if ch.isspace() else 1 for ch in text[int(start) : int(end)]) def alnum_upper(text: str) -> str: return "".join(ch for ch in text.upper() if ch.isalnum()) @lru_cache(maxsize=16384) def normalize_surface(text: str) -> str: value = unicodedata.normalize("NFKD", text) value = "".join(ch for ch in value if not unicodedata.combining(ch)) value = value.replace("\u00A0", " ").replace("\u202F", " ") value = re.sub(r"\s+", " ", value.strip().lower()) return value IRISH_CITY_FORMS = ( "Dublin", "Baile Átha Cliath", "mBaile Átha Cliath", "mBaile Atha Cliath", "Galway", "Gaillimh", "Cork", "Cork City", "Corcaigh", "gCorcaigh", "Limerick", "Luimneach", "Waterford", "Port Láirge", "Kilkenny", "Cill Chainnigh", "Carlow", "Ceatharlach", "Sligo", "Sligeach", "Tralee", "Trá Lí", "Ennis", "Inis", "Letterkenny", "Leitir Ceanainn", "Castlebar", "Caisleán an Bharraigh", "Caislean an Bharraigh", "gCaisleán an Bharraigh", "gCaislean an Bharraigh", "Wexford", "Loch Garman", "Navan", "Uaimh", "An Uaimh", "hUaimh", "nUaimh", "Dundalk", "Dún Dealgan", "Dun Dealgan", "Mullingar", "Muileann gCearr", "An Muileann gCearr", "Tullamore", "Tulach Mhór", "Tulach Mhor", "dTulach Mhór", "dTulach Mhor", "Portlaoise", "Port Laoise", "bPort Laoise", "Bray", "Bré", "Bre", "mBré", "mBre", "Athlone", "Baile Átha Luain", "Baile Atha Luain", "mBaile Átha Luain", "mBaile Atha Luain", ) IRISH_CITY_SURFACES = {normalize_surface(value) for value in IRISH_CITY_FORMS} IRISH_COUNTY_FORMS = ( "Co. Dublin", "County Dublin", "Co. Bhaile Átha Cliath", "Contae Bhaile Átha Cliath", "gContae Bhaile Átha Cliath", "Co. Galway", "County Galway", "Co. na Gaillimhe", "Contae na Gaillimhe", "gContae na Gaillimhe", "Co. Cork", "County Cork", "Co. Chorcaí", "Contae Chorcaí", "gContae Chorcaí", "Co. Limerick", "County Limerick", "Co. Luimnigh", "Contae Luimnigh", "gContae Luimnigh", "Co. Waterford", "County Waterford", "Co. Phort Láirge", "Contae Phort Láirge", "gContae Phort Láirge", "Co. Kilkenny", "County Kilkenny", "Co. Chill Chainnigh", "Contae Chill Chainnigh", "gContae Chill Chainnigh", "Co. Carlow", "County Carlow", "Co. Cheatharlach", "Contae Cheatharlach", "gContae Cheatharlach", "Co. Sligo", "County Sligo", "Co. Shligigh", "Contae Shligigh", "gContae Shligigh", "Co. Kerry", "County Kerry", "Co. Chiarraí", "Contae Chiarraí", "gContae Chiarraí", "Co. Clare", "County Clare", "Co. an Chláir", "Contae an Chláir", "gContae an Chláir", "Co. Donegal", "County Donegal", "Co. Dhún na nGall", "Co. Dhun na nGall", "Contae Dhún na nGall", "Contae Dhun na nGall", "gContae Dhún na nGall", "gContae Dhun na nGall", "Co. Mayo", "County Mayo", "Co. Mhaigh Eo", "Contae Mhaigh Eo", "gContae Mhaigh Eo", "Co. Wexford", "County Wexford", "Co. Loch Garman", "Contae Loch Garman", "gContae Loch Garman", "Co. Meath", "County Meath", "Co. na Mí", "Co. na Mi", "Contae na Mí", "Contae na Mi", "gContae na Mí", "gContae na Mi", "Co. Louth", "County Louth", "Co. Lú", "Co. Lu", "Contae Lú", "Contae Lu", "gContae Lú", "gContae Lu", "Co. Westmeath", "County Westmeath", "Co. na hIarmhí", "Co. na hIarmhi", "Contae na hIarmhí", "Contae na hIarmhi", "gContae na hIarmhí", "gContae na hIarmhi", "Co. Offaly", "County Offaly", "Co. Uíbh Fhailí", "Co. Uibh Fhaili", "Contae Uíbh Fhailí", "Contae Uibh Fhaili", "gContae Uíbh Fhailí", "gContae Uibh Fhaili", "Co. Laois", "County Laois", "Contae Laoise", "gContae Laoise", "Co. Wicklow", "County Wicklow", "Co. Chill Mhantáin", "Co. Chill Mhantain", "Contae Chill Mhantáin", "Contae Chill Mhantain", "gContae Chill Mhantáin", "gContae Chill Mhantain", ) IRISH_COUNTY_SURFACES = {normalize_surface(value) for value in IRISH_COUNTY_FORMS} COUNTY_STOP_SURFACES = { normalize_surface(value) for value in { "County Hall", "County House", "County Council", "County Offices", "County Office", } } IRISH_CITY_PREFIX_CHARS = {"n", "g", "m", "b", "d", "h"} STREET_SUFFIX_RE = re.compile( r"(?i)\b(street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)\b" ) BUILDING_SUFFIX_RE = re.compile( r"(?i)\b(house|cottage|lodge|villa|apartments?|building|business\s+centre|community\s+centre|shopping\s+centre|retail\s+park|Teach(?:ín|in)?)\b" ) PHONE_SURFACE_RE = re.compile(r"^[+().\d][+().\d \-/\u00A0\u202F]*\d$") ACCOUNT_DIGIT_SURFACE_RE = re.compile(r"^[\d \-\u00A0\u202F]+$") MONTH_NAME_RE = ( r"(?:January|February|March|April|May|June|July|August|September|October|November|December|" r"Eanáir|Eanair|Feabhra|Márta|Marta|Aibreán|Aibrean|Bealtaine|Meitheamh|Iúil|Iuil|Lúnasa|Lunasa|" r"Meán\s+Fómhair|Mean\s+Fomhair|Deireadh\s+Fómhair|Deireadh\s+Fomhair|Samhain|Nollaig)" ) DATE_OF_BIRTH_RE = re.compile( rf"(?i)^(?:\d{{1,2}}[./-]\d{{1,2}}[./-]\d{{2,4}}|\d{{4}}-\d{{2}}-\d{{2}}|(?:an\s+)?\d{{1,2}}(?:st|nd|rd|th|ú)?\s+{MONTH_NAME_RE}[,]?\s+\d{{2,4}}|{MONTH_NAME_RE}\s+\d{{1,2}},?\s+\d{{2,4}})$" ) DATE_OF_BIRTH_VALUE_RE = re.compile( rf"(? bool: suffix = text[int(end) : min(len(text), int(end) + window)] match = DOB_SUFFIX_CONTEXT_RE.search(suffix) if not match: return False return not any(ch in ",.;:\n\r" for ch in suffix[: int(match.start())]) ADDRESS_FIELD_CUE_PATTERN = r"(?:address(?:\s+line\s+\d+)?(?:\s+is)?|my\s+address\s+is|seoladh(?:\s+l[ií]nte?\s+\d+)?|is\s+[ée]\s+mo\s+sheoladh)" ADDRESS_LINE_CUE_RE = re.compile(r"(?i)\b(?:address\s+line\s+\d+|seoladh\s+l[ií]nte?\s+\d+)\b") ADDRESS_CUE_RE = re.compile( rf"(?i)\b({ADDRESS_FIELD_CUE_PATTERN}|sheoladh|allocation\s+centre|intreo\s+centre|ionad\s+leithdh[aá]ilte|ionad\s+intreo|live\s+at|lives\s+at|living\s+at|located\s+at|i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]\s+m[ée]\s+i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]im\s+i\s+mo\s+ch[oó]na[ií]\s+ag|cónai\s+ag|chónai\s+ag|conai\s+ag|chonai\s+ag)\b" ) CITY_COUNTY_PREFIX_RE = re.compile(r"(?i)(?:county|co\.|contae|gcontae)(?:\s+na)?\s*$") PPSN_CUE_RE = re.compile( r"(?i)\b(ppsn|upsp|personal public service(?:\s+number)?|uimhir\s+(?:mo\s+)?upsp|uimhir\s+(?:mo\s+)?ppsn)\b" ) NAME_STOP_SURFACES = { normalize_surface(value) for value in { "Address", "Name", "Phone", "Email", "Seoladh", "Ainm", "Teagmháil", "Teagmhail", "Ríomhphost", "Riomhphost", "Eirchód", "Eirchod", "Eircode", "PPSN", "UPSP", "Call", "Glao", "Glaoigh", "Rugadh", "Ionad", "Intreo", "Cill", "Sampla", "Leithdháilte", "Leithdhailte", "Leithdháil", "Leithdhail", "Leithdh", "Apartment", "Flat", "Unit", "Suite", "Árasán", "Arasan", "Aonad", "County", "Contae", "Fón", "Fon", "January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday", "Eanáir", "Feabhra", "Márta", "Aibreán", "Aibrean", "Bealtaine", "Meitheamh", "Iúil", "Iuil", "Lúnasa", "Lunasa", "Meán Fómhair", "Mean Fomhair", "Deireadh Fómhair", "Deireadh Fomhair", "Samhain", "Nollaig", "Luan", "Máirt", "Mairt", "Céadaoin", "Ceadaoin", "Déardaoin", "Deardaoin", "Aoine", "Satharn", "Domhnach", } } NAME_PARTICLE_SURFACES = { normalize_surface(value) for value in {"Ó", "O", "Ní", "Ni", "Nic", "Mac", "Mc", "de", "van", "von"} } STREET_TRAILING_BLOCK_SURFACES = { normalize_surface(value) for value in { "are", "public", "contact", "details", "website", "open", "before", "visiting", "roimh", "chuairt", "agus", "and", "the", "is", "ta", } } ADDRESS_UNIT_PREFIX_RE = re.compile(r"(?i)^(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\b") HOUSE_NAME_PREFIX_RE = re.compile( r"(?i)^(?:[A-ZÁÉÍÓÚ][\w'’.-]+(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]+){0,2}\s+(?:house|cottage|lodge|villa)|teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚ][\w'’.-]+)?)$" ) STREET_ADDRESS_VALUE_RE = re.compile( r"(?i)(? bool: tokens = [token for token in re.split(r"\s+", value.strip()) if token] if not tokens: return False for token in tokens: if not any(ch.isalpha() for ch in token): return False if not all(is_name_token_char(ch) for ch in token): return False alpha_chars = [ch for ch in token if ch.isalpha()] first_alpha = alpha_chars[0] if alpha_chars else "" if first_alpha.isupper(): continue if len(alpha_chars) >= 2 and alpha_chars[0].islower() and alpha_chars[1].isupper(): continue if normalize_surface(token) in NAME_PARTICLE_SURFACES: continue return False return True def is_reasonable_span_text(label: str, text: str, start: int, end: int) -> bool: value = text[int(start) : int(end)].strip() if not value: return False upper: str | None = None if label in {"FIRST_NAME", "LAST_NAME"}: if not any(ch.isalpha() for ch in value): return False if any(ch.isdigit() for ch in value): return False if int(start) > 0 and text[int(start) - 1].isalpha(): return False if int(end) < len(text) and text[int(end)].isalpha(): return False if normalize_surface(value) in NAME_STOP_SURFACES: return False if label == "FIRST_NAME" and any(ch.isspace() for ch in value): return False if any(ch in ".,;:/@()" for ch in value): return False if label == "FIRST_NAME": first_alpha = next((ch for ch in value if ch.isalpha()), "") if not first_alpha or not first_alpha.isupper(): return False if label == "LAST_NAME" and not is_plausible_last_name_sequence(value): return False if start > 0 and text[int(start) - 1].isdigit(): return False return True if label == "EMAIL": if "@" not in value: return False local, _, domain = value.partition("@") return bool(local) and "." in domain if label == "PHONE_NUMBER": normalized = value.replace("\u00A0", " ").replace("\u202F", " ").strip() if any(ch.isalpha() for ch in normalized): return False if "@" in normalized: return False if int(start) > 0 and text[int(start) - 1].isalnum(): return False if int(end) < len(text) and text[int(end)].isalnum(): return False if not PHONE_SURFACE_RE.match(normalized): return False digits = "".join(ch for ch in value if ch.isdigit()) if normalized.startswith("+353"): tail = digits[3:] if tail.startswith("0"): tail = tail[1:] return 8 <= len(tail) <= 9 if not digits.startswith("0"): return False if digits.startswith("0818") or digits.startswith("1800"): return len(digits) == 10 if digits.startswith("08"): return len(digits) == 10 if digits.startswith("01"): return len(digits) == 9 return 9 <= len(digits) <= 10 if label == "PPSN": upper = alnum_upper(value) return bool(len(upper) in {8, 9} and upper[:7].isdigit() and upper[7:].isalpha()) if label == "POSTCODE": compact = value.replace(" ", "").replace("\u00A0", "").replace("\u202F", "") if any(not (ch.isalnum() or ch.isspace()) for ch in value): return False if len(compact) != 7: return False routing = compact[:3] unique = compact[3:] routing_ok = bool( (routing[0].isalpha() and routing[1:].isdigit()) or routing == "D6W" ) unique_ok = bool( len(unique) == 4 and unique[0].isalpha() and unique[1:].isalnum() ) return routing_ok and unique_ok if label == "PASSPORT_NUMBER": return bool(re.fullmatch(r"[A-Z]{1,2}\s?\d{7}", value.strip())) if label == "BANK_ROUTING_NUMBER": digits = "".join(ch for ch in value if ch.isdigit()) if len(digits) != 6: return False context = text[max(0, int(start) - 32) : min(len(text), int(end) + 24)] return bool(BANK_ROUTING_CONTEXT_RE.search(context)) if label == "SWIFT_BIC": upper = alnum_upper(value) return len(upper) in {8, 11} and upper.isalnum() if label == "CREDIT_DEBIT_CARD": digits = "".join(ch for ch in value if ch.isdigit()) return 12 <= len(digits) <= 19 if label == "ACCOUNT_NUMBER": upper = alnum_upper(value) if upper.startswith("IE"): return bool(re.fullmatch(r"IE\d{2}[A-Z0-9]{18}", upper)) if not ACCOUNT_DIGIT_SURFACE_RE.fullmatch(value.strip()): return False digits = "".join(ch for ch in value if ch.isdigit()) return 6 <= len(digits) <= 34 if label == "AGE": digits = "".join(ch for ch in value if ch.isdigit()) if digits != value.strip(): return False if not digits: return False if int(start) > 0 and text[int(start) - 1].isalnum(): return False trailing = text[int(end) : min(len(text), int(end) + 12)] if int(end) < len(text) and text[int(end)].isalnum() and not AGE_INLINE_SUFFIX_RE.match(trailing): return False if int(start) > 0 and text[int(start) - 1] in "/-": return False if int(end) < len(text) and text[int(end)] in "/-" and not AGE_INLINE_SUFFIX_RE.match(trailing): return False age = int(digits) if not (0 < age <= 120): return False context = text[max(0, int(start) - 32) : min(len(text), int(end) + 24)] prefix = text[max(0, int(start) - 24) : int(start)] return bool(AGE_CONTEXT_RE.search(context) or AGE_SELF_PREFIX_RE.search(prefix)) if label == "DATE_OF_BIRTH": if not any(ch.isdigit() for ch in value): return False if not DATE_OF_BIRTH_RE.match(value.strip()): return False prefix = text[max(0, int(start) - 96) : int(start)] return bool(DOB_CONTEXT_RE.search(prefix) or has_dob_suffix_context(text, int(end))) if label == "CITY": if any(ch.isdigit() for ch in value): return False prefix = text[max(0, int(start) - 20) : int(start)] if CITY_COUNTY_PREFIX_RE.search(prefix): return False return normalize_surface(value) in IRISH_CITY_SURFACES if label == "COUNTY": if any(ch.isdigit() for ch in value): return False normalized = normalize_surface(value) if normalized in COUNTY_STOP_SURFACES: return False if normalized.startswith(("county hall", "county house", "county council", "county office", "county offices")): return False if normalized in IRISH_COUNTY_SURFACES: return True if normalized.startswith(("county ", "contae ", "gcontae ", "co. ")): tail = normalized.split(" ", 1)[1] if " " in normalized else "" if tail in {"hall", "house", "council", "office", "offices"}: return False return True return False if label == "STREET_ADDRESS": cleaned = value.strip() address_parts = [part.strip() for part in cleaned.split(",")] if len(address_parts) > 3: return False prefix_part = "" building_part = "" street_part = cleaned if len(address_parts) == 2: prefix_part, street_part = address_parts if not prefix_part or not street_part: return False if not ( ADDRESS_UNIT_PREFIX_RE.match(prefix_part) or HOUSE_NAME_PREFIX_RE.match(prefix_part) ): return False elif len(address_parts) == 3: prefix_part, building_part, street_part = address_parts if not prefix_part or not building_part or not street_part: return False if not ADDRESS_UNIT_PREFIX_RE.match(prefix_part): return False if not HOUSE_NAME_PREFIX_RE.match(building_part): return False suffix_match = STREET_SUFFIX_RE.search(street_part) if not suffix_match: return False if any(ch in "@:;" for ch in cleaned): return False trailing = street_part[int(suffix_match.end()) :].strip() trailing_tokens = [token for token in re.split(r"\s+", trailing) if token] if len(trailing_tokens) > 3: return False if any(normalize_surface(token) in STREET_TRAILING_BLOCK_SURFACES for token in trailing_tokens): return False has_digit = any(ch.isdigit() for ch in street_part) if has_digit and not re.match(r"^\s*\d{1,4}\b", street_part): return False title_tokens = [token for token in re.split(r"\s+", street_part) if token] if not has_digit and not prefix_part: context = text[max(0, int(start) - 24) : min(len(text), int(end) + 12)] if not ADDRESS_CUE_RE.search(context): return False return has_digit or len(title_tokens) >= 2 return True def spans_overlap(a: dict, b: dict) -> bool: return int(a["start"]) < int(b["end"]) and int(b["start"]) < int(a["end"]) def is_name_token_char(ch: str) -> bool: return ch.isalpha() or ch in {"-", "'", "’"} def is_plausible_first_name(value: str) -> bool: if not value: return False if any(ch.isspace() for ch in value): return False if any(ch.isdigit() for ch in value): return False if any(ch in ",;:/@()" for ch in value): return False if not any(ch.isalpha() for ch in value): return False first_alpha = next((ch for ch in value if ch.isalpha()), "") if not first_alpha or not first_alpha.isupper(): return False return all(is_name_token_char(ch) for ch in value) def is_plausible_cued_first_name(value: str) -> bool: if not value: return False if any(ch.isspace() for ch in value): return False if any(ch.isdigit() for ch in value): return False if any(ch in ",;:/@()" for ch in value): return False if not any(ch.isalpha() for ch in value): return False return all(is_name_token_char(ch) for ch in value) def is_plausible_cued_last_name_sequence(value: str) -> bool: tokens = [token for token in re.split(r"\s+", value.strip()) if token] if not tokens: return False for token in tokens: if not any(ch.isalpha() for ch in token): return False if not all(is_name_token_char(ch) for ch in token): return False alpha_chars = [ch for ch in token if ch.isalpha()] first_alpha = alpha_chars[0] if alpha_chars else "" if first_alpha.isupper() or first_alpha.islower(): continue if normalize_surface(token) in NAME_PARTICLE_SURFACES: continue return False return True def extract_name_tokens_after_cue(text: str, cue_end: int, max_tokens: int = 4) -> list[tuple[int, int, str]]: cursor = cue_end while cursor < len(text) and text[cursor].isspace(): cursor += 1 tokens: list[tuple[int, int, str]] = [] while cursor < len(text): saw_line_break = False while cursor < len(text) and text[cursor].isspace(): if text[cursor] in "\r\n": saw_line_break = True cursor += 1 if saw_line_break and tokens: break if cursor >= len(text) or text[cursor] in ",.;:\n": break token_start = cursor while cursor < len(text) and is_name_token_char(text[cursor]): cursor += 1 if token_start == cursor: break token = text[token_start:cursor] normalized = normalize_surface(token) if tokens and normalized in NAME_CUE_STOP_SURFACES: break if not all(is_name_token_char(ch) for ch in token): break tokens.append((token_start, cursor, token)) if len(tokens) >= max_tokens: break if cursor < len(text) and text[cursor] in ",.;:\n": break if cursor < len(text) and not text[cursor].isspace(): break return tokens def repair_name_particle_surnames(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) surname_re = re.compile( r"^[ \t]*((?:Ní|Ni|Ó|O|Nic|Mac|Mc)[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*)?)" ) first_names = [span for span in repaired if span["label"] == "FIRST_NAME"] for first_name in first_names: tail = text[int(first_name["end"]) : min(len(text), int(first_name["end"]) + 40)] match = surname_re.match(tail) if not match: continue start = int(first_name["end"]) + int(match.start(1)) end = int(first_name["end"]) + int(match.end(1)) candidate = text[start:end] if not is_plausible_cued_last_name_sequence(candidate): continue candidate_span = { "start": start, "end": end, "label": "LAST_NAME", "score": 0.66, "text": candidate, } repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) ] repaired.append(candidate_span) return repaired def repair_first_name_from_last_name(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for last_name in [span for span in repaired if span["label"] == "LAST_NAME"]: if any( span["label"] == "FIRST_NAME" and int(span["end"]) <= int(last_name["start"]) and int(last_name["start"]) - int(span["end"]) <= 2 for span in repaired ): continue cursor = int(last_name["start"]) - 1 if cursor < 0 or not text[cursor].isspace(): continue while cursor >= 0 and text[cursor].isspace(): cursor -= 1 token_end = cursor + 1 while cursor >= 0 and is_name_token_char(text[cursor]): cursor -= 1 token_start = cursor + 1 if token_end <= token_start: continue candidate = text[token_start:token_end] if not is_plausible_first_name(candidate): continue candidate_span = { "start": token_start, "end": token_end, "label": "FIRST_NAME", "score": float(last_name.get("score", 0.5)) * 0.6, "text": candidate, } if any(spans_overlap(candidate_span, other) for other in repaired if other["label"] == "FIRST_NAME"): continue repaired.append(candidate_span) return repaired def repair_contextual_name_cues(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in NAME_SELF_CUE_RE.finditer(text): if any( other["label"] == "FIRST_NAME" and 0 <= int(other["start"]) - match.end() <= 4 for other in repaired ) and any( other["label"] == "LAST_NAME" and 0 <= int(other["start"]) - match.end() <= 16 for other in repaired ): continue cursor = match.end() while cursor < len(text) and text[cursor].isspace(): cursor += 1 start = cursor while cursor < len(text) and (is_name_token_char(text[cursor]) or text[cursor].isspace()): cursor += 1 candidate = text[start:cursor].strip() raw_tokens = [token for token in re.split(r"\s+", candidate) if token] tokens: list[str] = [] for token in raw_tokens: normalized = normalize_surface(token) if tokens and normalized in LOWER_NAME_STOP_SURFACES: break if not all(is_name_token_char(ch) for ch in token): break tokens.append(token) if len(tokens) >= 4: break if len(tokens) < 2: continue first_value = tokens[0] last_value = " ".join(tokens[1:]) if not is_plausible_cued_first_name(first_value): continue if not is_plausible_cued_last_name_sequence(last_value): continue first_start = text.find(first_value, start, cursor) if first_start < 0: continue first_end = first_start + len(first_value) last_start = text.find(last_value, first_end, cursor) if last_start < 0: continue last_end = last_start + len(last_value) first_span = { "start": first_start, "end": first_end, "label": "FIRST_NAME", "score": 0.63, "text": text[first_start:first_end], } last_span = { "start": last_start, "end": last_end, "label": "LAST_NAME", "score": 0.63, "text": text[last_start:last_end], } repaired = [ other for other in repaired if not ( spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) and not ( spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) ] repaired.extend([first_span, last_span]) return repaired def repair_role_name_cues(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in NAME_ROLE_CUE_RE.finditer(text): token_bounds = extract_name_tokens_after_cue(text, match.end()) if len(token_bounds) < 2: continue first_start, first_end, first_value = token_bounds[0] last_start = token_bounds[1][0] last_end = token_bounds[-1][1] last_value = text[last_start:last_end] if not is_plausible_cued_first_name(first_value): continue if not is_plausible_cued_last_name_sequence(last_value): continue first_span = { "start": first_start, "end": first_end, "label": "FIRST_NAME", "score": 0.63, "text": text[first_start:first_end], } last_span = { "start": last_start, "end": last_end, "label": "LAST_NAME", "score": 0.63, "text": text[last_start:last_end], } repaired = [ other for other in repaired if not ( spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) and not ( spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) ] repaired.extend([first_span, last_span]) return repaired def repair_surname_field_cues(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in SURNAME_CUE_RE.finditer(text): token_bounds = extract_name_tokens_after_cue(text, match.end()) if not token_bounds: continue start = token_bounds[0][0] end = token_bounds[-1][1] candidate = text[start:end] if not is_plausible_cued_last_name_sequence(candidate): continue last_span = { "start": start, "end": end, "label": "LAST_NAME", "score": 0.64, "text": candidate, } repaired = [ other for other in repaired if not ( spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) ] repaired.append(last_span) return repaired def repair_name_before_structured_cues(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in NAME_BEFORE_STRUCTURED_CUE_RE.finditer(text): token_bounds = extract_name_tokens_after_cue(text, match.start(1)) if len(token_bounds) < 2: continue first_start, first_end, first_value = token_bounds[0] last_start = token_bounds[1][0] last_end = token_bounds[-1][1] last_value = text[last_start:last_end] if not is_plausible_first_name(first_value): continue if not is_plausible_last_name_sequence(last_value): continue first_span = { "start": first_start, "end": first_end, "label": "FIRST_NAME", "score": 0.64, "text": text[first_start:first_end], } last_span = { "start": last_start, "end": last_end, "label": "LAST_NAME", "score": 0.64, "text": text[last_start:last_end], } repaired = [ other for other in repaired if not ( spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) and not ( spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} ) ] repaired.extend([first_span, last_span]) return repaired PASSPORT_CUE_RE = re.compile( r"(?i)(passport(?:\s+number)?|phas|uimhir\s+(?:mo\s+)?phas)" ) PASSPORT_VALUE_RE = re.compile(r"(?]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,})", re.UNICODE) PHONE_CUE_RE = re.compile( r"(?i)\b(phone|call|contact|reach\s+me|glaoigh\s+ar|teagmh[aá]il|uimhir|m['’]uimhir|f[oó]n|fon|teileaf[oó]n|telefon)\b" ) NAME_SELF_CUE_RE = re.compile( r"(?i)\b(my\s+name\s+is|is\s+mise|is\s+[ée]\s+m['’]?ainm|is\s+[ée]\s+mo\s+ainm)\b" ) NAME_ROLE_CUE_RE = re.compile( r"(?i)(?:\b(?:applicant|customer|claimant|patient|an\s+t-iarratas[oó]ir|iarratas[oó]ir)\b\s*[:,]\s*|\b(?:full\s+name|name|ainm(?!\s+teaghlaigh))\b\s*:\s*)" ) SURNAME_CUE_RE = re.compile( r"(?i)\b(?:my\s+)?(?:surname|last\s+name|family\s+name|ainm\s+teaghlaigh|sloinne)\b(?:\s+is)?\s*[:,-]?\s*" ) NAME_PARTICLE_SURNAME_RE = re.compile(r"(?i)(?:\bN[ií]\b|\bÓ\b|\bNic\b|\bMac\b|\bMc\b|\bO['’])") NAME_BEFORE_STRUCTURED_CUE_RE = re.compile( r"(? list[dict]: repaired = list(spans) for match in PASSPORT_VALUE_RE.finditer(text): start, end = match.span(1) candidate_span = { "start": start, "end": end, "label": "PASSPORT_NUMBER", "score": 0.67, "text": text[start:end], } if any( other["label"] == "PASSPORT_NUMBER" and int(other["start"]) <= start and int(other["end"]) >= end for other in repaired ): continue cue_window = text[max(0, start - 32) : start] if not PASSPORT_CUE_RE.search(cue_window): continue conflicting_labels = {"PHONE_NUMBER", "PPSN", "ACCOUNT_NUMBER", "AGE", "PASSPORT_NUMBER"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_ppsn_variants(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in PPSN_VALUE_RE.finditer(text): start, end = match.span(1) value = text[start:end] compact = alnum_upper(value) if not (len(compact) in {8, 9} and compact[:7].isdigit() and compact[7:].isalpha()): continue cue_window = text[max(0, start - 32) : min(len(text), end + 24)] has_cue = bool(PPSN_CUE_RE.search(cue_window)) candidate_span = { "start": start, "end": end, "label": "PPSN", "score": 0.72 if has_cue else 0.58, "text": value, } conflicting_labels = {"PHONE_NUMBER", "PASSPORT_NUMBER", "ACCOUNT_NUMBER", "AGE", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels.union({"PPSN"}) ) ] repaired.append(candidate_span) return repaired def repair_contextual_date_of_birth(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in DATE_OF_BIRTH_VALUE_RE.finditer(text): start, end = match.span(1) cue_window = text[max(0, start - 96) : start] if not (DOB_CONTEXT_RE.search(cue_window) or has_dob_suffix_context(text, end)): continue candidate_span = { "start": start, "end": end, "label": "DATE_OF_BIRTH", "score": 0.66, "text": text[start:end], } conflicting_labels = {"DATE_OF_BIRTH", "PHONE_NUMBER", "AGE", "FIRST_NAME", "LAST_NAME", "ACCOUNT_NUMBER", "CITY"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_contextual_ages(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in AGE_VALUE_RE.finditer(text): start, end = match.span(1) if not is_reasonable_span_text("AGE", text, start, end): continue if any( spans_overlap({"start": start, "end": end}, other) and other["label"] == "DATE_OF_BIRTH" for other in repaired ): continue candidate_span = { "start": start, "end": end, "label": "AGE", "score": 0.66, "text": text[start:end], } conflicting_labels = {"AGE", "PHONE_NUMBER", "ACCOUNT_NUMBER"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired ACCOUNT_CUE_RE = re.compile( r"(?i)(account\s+number|bank\s+account|uimhir\s+chuntais|cuntas\s+bainc)" ) ACCOUNT_VALUE_RE = re.compile(r"(? list[dict]: repaired = list(spans) for match in ACCOUNT_VALUE_RE.finditer(text): start, end = match.span(1) candidate_span = { "start": start, "end": end, "label": "ACCOUNT_NUMBER", "score": 0.51, "text": text[start:end], } if any( other["label"] == "ACCOUNT_NUMBER" and int(other["start"]) <= start and int(other["end"]) >= end for other in repaired ): continue cue_window = text[max(0, start - 40) : start] if not ACCOUNT_CUE_RE.search(cue_window): continue if any( spans_overlap(candidate_span, other) and other["label"] in {"PHONE_NUMBER", "BANK_ROUTING_NUMBER", "PPSN", "POSTCODE", "PASSPORT_NUMBER"} for other in repaired ): continue repaired.append(candidate_span) return repaired def repair_emails(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in EMAIL_EXTRACT_RE.finditer(text): start, end = match.span(1) candidate_span = { "start": start, "end": end, "label": "EMAIL", "score": 0.74, "text": text[start:end], } conflicting_labels = {"EMAIL", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_phone_numbers(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in PHONE_VALUE_RE.finditer(text): start, end = match.span(1) candidate_span = { "start": start, "end": end, "label": "PHONE_NUMBER", "score": 0.69, "text": text[start:end], } cue_window = text[max(0, start - 32) : min(len(text), end + 16)] has_cue = bool(PHONE_CUE_RE.search(cue_window)) has_overlap = any(spans_overlap(candidate_span, other) and other["label"] == "PHONE_NUMBER" for other in repaired) if not (has_cue or has_overlap): continue if not is_reasonable_span_text("PHONE_NUMBER", text, start, end): continue conflicting_labels = {"PHONE_NUMBER", "PPSN", "ACCOUNT_NUMBER", "BANK_ROUTING_NUMBER", "CREDIT_DEBIT_CARD"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_postcodes(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in POSTCODE_VALUE_RE.finditer(text): start, end = match.span(1) candidate_span = { "start": start, "end": end, "label": "POSTCODE", "score": 0.71, "text": text[start:end], } conflicting_labels = {"POSTCODE", "PHONE_NUMBER", "ACCOUNT_NUMBER", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_city_spans(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) seen: set[tuple[int, int]] = set() ordered_forms = sorted(IRISH_CITY_FORMS, key=len, reverse=True) for form in ordered_forms: for match in iter_bounded_form_matches(form, text): start, end = match.span() prefix = text[max(0, start - 20) : start] if CITY_COUNTY_PREFIX_RE.search(prefix): continue key = (start, end) if key in seen: continue seen.add(key) candidate_span = { "start": start, "end": end, "label": "CITY", "score": 0.64, "text": text[start:end], } has_context = False for other in repaired: other_start = int(other["start"]) other_end = int(other["end"]) if other["label"] == "STREET_ADDRESS" and 0 <= start - other_end <= 4: has_context = True break if other["label"] in {"COUNTY", "POSTCODE"} and 0 <= other_start - end <= 6: has_context = True break if not has_context and re.match(r"^\s*,\s*(?:Co\.\s+|[A-Z]\d{2}|D6W)", text[end:]): has_context = True if not has_context: cue_window = text[max(0, start - 40) : min(len(text), end + 32)] has_context = bool(CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window)) if not has_context: continue conflicting_labels = {"CITY", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_city_before_postcode(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in CITY_BEFORE_POSTCODE_RE.finditer(text): start, end = match.span(1) value = text[start:end] if any(ch.isdigit() for ch in value): continue prefix = text[max(0, start - 20) : start] if CITY_COUNTY_PREFIX_RE.search(prefix): continue has_context = False for other in repaired: other_start = int(other["start"]) other_end = int(other["end"]) if other["label"] == "STREET_ADDRESS" and 0 <= start - other_end <= 4: has_context = True break if other["label"] == "POSTCODE" and 0 <= other_start - end <= 6: has_context = True break if not has_context: cue_window = text[max(0, start - 40) : min(len(text), end + 24)] has_context = bool(CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window)) if not has_context: continue candidate_span = { "start": start, "end": end, "label": "CITY", "score": 0.63, "text": value, } if any( other["label"] == "CITY" and spans_overlap(candidate_span, other) and (int(other["end"]) - int(other["start"])) >= (end - start) for other in repaired ): continue repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in {"CITY", "FIRST_NAME", "LAST_NAME"} ) ] repaired.append(candidate_span) return repaired def repair_city_before_county(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for regex in (CITY_TOWN_SUFFIX_RE, CITY_BEFORE_COUNTY_RE): for match in regex.finditer(text): start, end = match.span(1) value = text[start:end] if any(ch.isdigit() for ch in value): continue prefix = text[max(0, start - 20) : start] if CITY_COUNTY_PREFIX_RE.search(prefix): continue cue_window = text[max(0, start - 40) : min(len(text), end + 24)] if not ((CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window)) or re.search(r"^\s*,\s*(?:County|Contae|gContae|Co\.)\b", text[end:])): continue candidate_span = { "start": start, "end": end, "label": "CITY", "score": 0.64, "text": value, } if any( other["label"] == "CITY" and spans_overlap(candidate_span, other) and (int(other["end"]) - int(other["start"])) >= (end - start) for other in repaired ): continue repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in {"CITY", "FIRST_NAME", "LAST_NAME"} ) ] repaired.append(candidate_span) return repaired def repair_city_field_cues(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) seen: set[tuple[int, int]] = set() for regex in (CITY_FIELD_VALUE_RE, INLINE_CITY_FIELD_VALUE_RE): for match in regex.finditer(text): start, end = match.span(1) if (start, end) in seen: continue seen.add((start, end)) candidate_span = { "start": start, "end": end, "label": "CITY", "score": 0.65, "text": text[start:end], } repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in {"CITY", "COUNTY", "FIRST_NAME", "LAST_NAME"} ) ] repaired.append(candidate_span) return repaired def repair_prefixed_city_spans(text: str, spans: list[dict]) -> list[dict]: repaired: list[dict] = [] for span in spans: if span["label"] != "CITY": repaired.append(span) continue start = int(span["start"]) end = int(span["end"]) if start <= 0: repaired.append(span) continue prefix_char = text[start - 1] if prefix_char.lower() not in IRISH_CITY_PREFIX_CHARS: repaired.append(span) continue if start - 1 > 0 and text[start - 2].isalpha(): repaired.append(span) continue if normalize_surface(span.get("text", "")) not in IRISH_CITY_SURFACES: repaired.append(span) continue candidate_start = start - 1 candidate_text = text[candidate_start:end] repaired.append( { **span, "start": candidate_start, "text": candidate_text, "score": max(float(span.get("score", 0.0)), 0.66), } ) return repaired def prefer_long_city_spans(spans: list[dict]) -> list[dict]: if not spans: return spans keep: list[dict] = [] for span in spans: if span["label"] != "CITY": keep.append(span) continue shadowed = False for other in spans: if other is span or other["label"] != "CITY": continue if int(other["start"]) <= int(span["start"]) and int(other["end"]) >= int(span["end"]): if (int(other["start"]), int(other["end"])) != (int(span["start"]), int(span["end"])): if float(other.get("score", 0.0)) >= max(0.6, float(span.get("score", 0.0)) * 0.6): shadowed = True break if not shadowed: keep.append(span) return keep def repair_county_field_cues(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) seen: set[tuple[int, int]] = set() for regex in (COUNTY_FIELD_VALUE_RE, INLINE_COUNTY_FIELD_VALUE_RE): for match in regex.finditer(text): start, end = match.span(1) if (start, end) in seen: continue seen.add((start, end)) candidate_span = { "start": start, "end": end, "label": "COUNTY", "score": 0.66, "text": text[start:end], } repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"} ) ] repaired.append(candidate_span) return repaired def repair_county_spans(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) seen: set[tuple[int, int]] = set() ordered_forms = sorted(IRISH_COUNTY_FORMS, key=len, reverse=True) for form in ordered_forms: for match in iter_bounded_form_matches(form, text): start, end = match.span() key = (start, end) if key in seen: continue seen.add(key) candidate_span = { "start": start, "end": end, "label": "COUNTY", "score": 0.74, "text": text[start:end], } if not is_reasonable_span_text("COUNTY", text, start, end): continue overlapping_counties = [ other for other in repaired if spans_overlap(candidate_span, other) and other["label"] == "COUNTY" ] if overlapping_counties: candidate_span["score"] = max( float(candidate_span["score"]), max(float(other.get("score", 0.0)) for other in overlapping_counties), ) conflicting_labels = {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) for match in COUNTY_VALUE_RE.finditer(text): start, end = match.span(1) key = (start, end) if key in seen: continue context = text[max(0, start - 40) : min(len(text), end + 24)] has_context = bool(ADDRESS_CUE_RE.search(context) or POSTCODE_VALUE_RE.search(context)) if not has_context: for other in repaired: other_start = int(other["start"]) other_end = int(other["end"]) if other["label"] in {"STREET_ADDRESS", "CITY", "POSTCODE"} and ( abs(other_start - end) <= 24 or abs(start - other_end) <= 24 ): has_context = True break if not has_context: continue seen.add(key) candidate_span = { "start": start, "end": end, "label": "COUNTY", "score": 0.74, "text": text[start:end], } if not is_reasonable_span_text("COUNTY", text, start, end): continue conflicting_labels = {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_street_addresses(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) seen: set[tuple[int, int]] = set() for match in STREET_ADDRESS_VALUE_RE.finditer(text): start, end = match.span(1) key = (start, end) if key in seen: continue seen.add(key) candidate_span = { "start": start, "end": end, "label": "STREET_ADDRESS", "score": 0.65, "text": text[start:end], } if not is_reasonable_span_text("STREET_ADDRESS", text, start, end): continue context = text[max(0, start - 32) : min(len(text), end + 24)] has_context = bool(ADDRESS_CUE_RE.search(context)) if not has_context: for other in repaired: other_start = int(other["start"]) other_end = int(other["end"]) if other["label"] in {"CITY", "COUNTY", "POSTCODE"} and 0 <= other_start - end <= 16: has_context = True break if other["label"] in {"FIRST_NAME", "LAST_NAME"} and 0 <= start - other_end <= 24: has_context = True break if not has_context: continue conflicting_labels = {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_contextual_address_blocks(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in ADDRESS_BLOCK_VALUE_RE.finditer(text): start, end = match.span(1) value = text[start:end].strip() if not value: continue candidate_span = { "start": start, "end": end, "label": "STREET_ADDRESS", "score": 0.68, "text": text[start:end], } tokens = [token for token in re.split(r"\s+", value) if token] if len(tokens) < 2: continue has_digit = any(ch.isdigit() for ch in value) has_prefix = bool(ADDRESS_UNIT_PREFIX_RE.match(value) or HOUSE_NAME_PREFIX_RE.match(value)) has_street_suffix = bool(STREET_SUFFIX_RE.search(value)) cue_window = text[max(0, start - 40) : start] has_address_line_cue = bool(ADDRESS_LINE_CUE_RE.search(cue_window)) if not (has_digit or has_prefix): continue if not has_street_suffix: tail_window = text[end : min(len(text), end + 48)] has_following_address_context = bool( re.match( r"^\s*,\s*((?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana))", tail_window, flags=re.IGNORECASE, ) or re.search(r"(?:County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W))", tail_window) ) building_tail_match = ADDRESS_BUILDING_TAIL_RE.match(tail_window) has_following_building_context = False if building_tail_match: remaining_tail = tail_window[int(building_tail_match.end(1)) :] has_following_building_context = bool( re.match( r"^\s*(?:$|,\s*(?:(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)|County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W)|[A-ZÁÉÍÓÚ]))", remaining_tail, flags=re.IGNORECASE, ) ) looks_like_suffixless_address = has_digit and len(tokens) >= 3 if not ((has_prefix and (has_address_line_cue or has_following_building_context)) or ((has_prefix or looks_like_suffixless_address) and has_following_address_context)): continue if any( other["label"] == "STREET_ADDRESS" and int(other["start"]) <= start and int(other["end"]) >= end and (int(other["start"]), int(other["end"])) != (start, end) for other in repaired ): continue conflicting_labels = {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"} repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in conflicting_labels ) ] repaired.append(candidate_span) return repaired def repair_address_line_prefix_spans(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in ADDRESS_LINE_PREFIX_VALUE_RE.finditer(text): start, end = match.span(1) candidate = text[start:end].strip() if not candidate: continue if not (ADDRESS_UNIT_PREFIX_RE.match(candidate) or HOUSE_NAME_PREFIX_RE.match(candidate)): continue candidate_span = { "start": start, "end": end, "label": "STREET_ADDRESS", "score": 0.67, "text": text[start:end], } repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"} ) ] repaired.append(candidate_span) return repaired def extend_prefixed_street_address_spans(text: str, spans: list[dict]) -> list[dict]: repaired: list[dict] = [] tail_re = re.compile( r"^\s*,\s*((?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)(?:\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*){0,2})", flags=re.IGNORECASE, ) building_follow_context_re = re.compile( r"^\s*(?:$|[.;]|,\s*(?:(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)|County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W)|[A-ZÁÉÍÓÚ]))", flags=re.IGNORECASE, ) for span in spans: if span["label"] != "STREET_ADDRESS": repaired.append(span) continue value = span.get("text", "").strip() if not value or not (ADDRESS_UNIT_PREFIX_RE.match(value) or HOUSE_NAME_PREFIX_RE.match(value)): repaired.append(span) continue tail_match = tail_re.match(text[int(span["end"]) :]) if tail_match: extended_end = int(span["end"]) + int(tail_match.end(1)) repaired.append( { **span, "end": extended_end, "text": text[int(span["start"]) : extended_end], "score": max(float(span.get("score", 0.0)), 0.67), } ) continue building_tail_match = ADDRESS_BUILDING_TAIL_RE.match(text[int(span["end"]) :]) if not building_tail_match: repaired.append(span) continue remaining_tail = text[int(span["end"]) + int(building_tail_match.end(1)) :] extended_end = int(span["end"]) + int(building_tail_match.end(1)) repaired.append( { **span, "end": extended_end, "text": text[int(span["start"]) : extended_end], "score": max(float(span.get("score", 0.0)), 0.67), } ) return repaired def merge_adjacent_street_address_spans(text: str, spans: list[dict]) -> list[dict]: if not spans: return spans ordered = sorted(spans, key=lambda item: (int(item["start"]), int(item["end"]), item["label"])) merged: list[dict] = [] index = 0 while index < len(ordered): span = ordered[index] if span["label"] != "STREET_ADDRESS": merged.append(span) index += 1 continue current = dict(span) lookahead = index + 1 while lookahead < len(ordered): other = ordered[lookahead] if other["label"] != "STREET_ADDRESS": break gap = text[int(current["end"]) : int(other["start"])] if not re.fullmatch(r"\s*,\s*", gap): break candidate_end = int(other["end"]) merged_value = text[int(current["start"]) : candidate_end] if not ( any(ch.isdigit() for ch in merged_value) and ( ADDRESS_UNIT_PREFIX_RE.match(merged_value) or STREET_SUFFIX_RE.search(merged_value) or BUILDING_SUFFIX_RE.search(merged_value) ) ): break current["end"] = candidate_end current["text"] = text[int(current["start"]) : candidate_end] current["score"] = max(float(current.get("score", 0.0)), float(other.get("score", 0.0))) lookahead += 1 merged.append(current) index = lookahead return merged def repair_contextual_address_place_blocks(text: str, spans: list[dict]) -> list[dict]: repaired = list(spans) for match in ADDRESS_PLACE_VALUE_RE.finditer(text): start, end = match.span(1) candidate_span = { "start": start, "end": end, "label": "STREET_ADDRESS", "score": 0.69, "text": text[start:end], } repaired = [ other for other in repaired if not ( spans_overlap(candidate_span, other) and other["label"] in {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME", "CITY"} ) ] repaired.append(candidate_span) return repaired def prefer_long_street_address_spans(spans: list[dict]) -> list[dict]: if not spans: return spans keep: list[dict] = [] for span in spans: if span["label"] != "STREET_ADDRESS": keep.append(span) continue shadowed = False for other in spans: if other is span or other["label"] != "STREET_ADDRESS": continue if int(other["start"]) <= int(span["start"]) and int(other["end"]) >= int(span["end"]): if (int(other["start"]), int(other["end"])) != (int(span["start"]), int(span["end"])): shadowed = True break if not shadowed: keep.append(span) return keep def drop_contextual_fragment_spans(spans: list[dict]) -> list[dict]: if not spans: return spans suppressors = {"COUNTY", "CITY", "POSTCODE", "STREET_ADDRESS", "DATE_OF_BIRTH"} keep: list[dict] = [] for span in spans: if span["label"] in {"FIRST_NAME", "LAST_NAME"}: if any( other is not span and other["label"] in suppressors and spans_overlap(span, other) and (int(other["end"]) - int(other["start"])) >= (int(span["end"]) - int(span["start"])) for other in spans ): continue if span["label"] == "CITY": if any( other is not span and other["label"] == "COUNTY" and spans_overlap(span, other) and (int(other["end"]) - int(other["start"])) >= (int(span["end"]) - int(span["start"])) for other in spans ): continue keep.append(span) return keep def drop_public_contact_detail_spans(text: str, spans: list[dict]) -> list[dict]: if not PUBLIC_CONTACT_DETAILS_RE.search(text): return spans keep: list[dict] = [] for span in spans: if span["label"] not in {"STREET_ADDRESS", "CITY", "COUNTY"}: keep.append(span) continue keep.append(span) return [ span for span in keep if not (span["label"] in {"STREET_ADDRESS", "CITY", "COUNTY"} and PUBLIC_CONTACT_DETAILS_RE.search(text)) ] def drop_org_like_name_spans(text: str, spans: list[dict]) -> list[dict]: keep: list[dict] = [] for span in spans: if span["label"] not in {"FIRST_NAME", "LAST_NAME"}: keep.append(span) continue tokens = [normalize_surface(token) for token in re.split(r"\s+", span.get("text", "").strip()) if token] if tokens and tokens[-1] in ORG_NAME_TRAILING_SURFACES: continue prefix = text[max(0, int(span["start"]) - 32) : int(span["start"])] if ORG_NAME_PREFIX_RE.search(prefix): continue keep.append(span) return keep def drop_city_org_prefix_spans(text: str, spans: list[dict]) -> list[dict]: keep: list[dict] = [] for span in spans: if span["label"] != "CITY": keep.append(span) continue tail = text[int(span["end"]) : min(len(text), int(span["end"]) + 24)] if ORG_CITY_TAIL_RE.match(tail): continue keep.append(span) return keep def canonicalize_location_spans(text: str, spans: list[dict]) -> list[dict]: repaired: list[dict] = [] for span in spans: if span["label"] not in {"CITY", "COUNTY"}: repaired.append(span) continue start = int(span["start"]) end = int(span["end"]) if span["label"] == "CITY" and start > 0: prefix_char = text[start - 1] if prefix_char.lower() in IRISH_CITY_PREFIX_CHARS and not (start - 1 > 0 and text[start - 2].isalpha()): candidate_start = start - 1 candidate_text = text[candidate_start:end] if normalize_surface(candidate_text) in IRISH_CITY_SURFACES: start = candidate_start while end > start and text[end - 1] in ".,;:": end -= 1 repaired.append( { **span, "start": start, "end": end, "text": text[start:end], } ) return repaired def canonicalize_street_address_spans(text: str, spans: list[dict]) -> list[dict]: repaired: list[dict] = [] for span in spans: if span["label"] != "STREET_ADDRESS": repaired.append(span) continue start = int(span["start"]) end = int(span["end"]) window_start = max(0, start - 48) window_end = min(len(text), end + 48) window = text[window_start:window_end] best_match = None best_key = None for match in STREET_ADDRESS_VALUE_RE.finditer(window): candidate_start = window_start + int(match.start(1)) candidate_end = window_start + int(match.end(1)) if candidate_end <= start or candidate_start >= end: continue candidate_text = text[candidate_start:candidate_end] if not is_reasonable_span_text("STREET_ADDRESS", text, candidate_start, candidate_end): continue key = ( candidate_end - candidate_start, -(abs(candidate_start - start) + abs(candidate_end - end)), ) if best_key is None or key > best_key: best_key = key best_match = (candidate_start, candidate_end, candidate_text) if best_match is not None: start, end, _ = best_match for newline_char in ("\n", "\r"): newline_pos = text.find(newline_char, start, end) if newline_pos != -1: end = newline_pos break while end > start and text[end - 1] in ".,;:": end -= 1 repaired.append( { **span, "start": start, "end": end, "text": text[start:end], } ) return repaired def canonicalize_email_spans(text: str, spans: list[dict]) -> list[dict]: repaired: list[dict] = [] for span in spans: if span["label"] != "EMAIL": repaired.append(span) continue segment = text[int(span["start"]) : int(span["end"])] match = EMAIL_EXTRACT_RE.search(segment) if not match: repaired.append(span) continue start = int(span["start"]) + int(match.start(1)) end = int(span["start"]) + int(match.end(1)) repaired.append( { **span, "start": start, "end": end, "text": text[start:end], } ) return repaired def drop_stacked_first_names(spans: list[dict]) -> list[dict]: if not spans: return spans first_names = [span for span in spans if span["label"] == "FIRST_NAME"] last_names = [span for span in spans if span["label"] == "LAST_NAME"] if not first_names or not last_names: return spans keep: list[dict] = [] for span in spans: if span["label"] != "FIRST_NAME": keep.append(span) continue shadowed = False for other in first_names: if other is span: continue if int(other["start"]) <= int(span["start"]): continue if int(other["start"]) - int(span["end"]) > 2: continue if not any( int(last["start"]) >= int(other["end"]) and int(last["start"]) - int(other["end"]) <= 2 for last in last_names ): continue shadowed = True break if not shadowed: keep.append(span) return keep def decode_span_matrix( text: str, offsets: list[tuple[int, int]], span_scores: np.ndarray, config, min_score: float, ) -> list[dict]: label_names = label_names_from_config(config) thresholds = label_thresholds_from_config(config, min_score) max_span_tokens = label_max_span_tokens_from_config(config) min_nonspace_chars = label_min_nonspace_chars_from_config(config) if span_scores.ndim != 3: raise ValueError(f"Expected [num_labels, seq_len, seq_len] span scores, got shape {span_scores.shape}") num_labels, seq_len, _ = span_scores.shape valid = np.array([valid_offset(offset) for offset in offsets[:seq_len]], dtype=bool) start_chars = np.array([int(offset[0]) if valid[index] else -1 for index, offset in enumerate(offsets[:seq_len])], dtype=np.int32) end_chars = np.array([int(offset[1]) if valid[index] else -1 for index, offset in enumerate(offsets[:seq_len])], dtype=np.int32) nonspace_prefix = [0] for ch in text: nonspace_prefix.append(nonspace_prefix[-1] + (0 if ch.isspace() else 1)) spans: list[dict] = [] for label_index in range(min(num_labels, len(label_names))): label = label_names[label_index] threshold = thresholds.get(label, min_score) max_width = max(1, int(max_span_tokens.get(label, 8))) min_chars = max(1, int(min_nonspace_chars.get(label, 1))) label_scores = span_scores[label_index, :seq_len, :seq_len] start_indices, end_indices = np.where(label_scores >= threshold) if start_indices.size == 0: continue width_mask = (end_indices >= start_indices) & ((end_indices - start_indices) < max_width) if not np.any(width_mask): continue start_indices = start_indices[width_mask] end_indices = end_indices[width_mask] valid_mask = valid[start_indices] & valid[end_indices] if not np.any(valid_mask): continue start_indices = start_indices[valid_mask] end_indices = end_indices[valid_mask] for start_idx, end_idx in zip(start_indices.tolist(), end_indices.tolist()): start_char = int(start_chars[start_idx]) end_char = int(end_chars[end_idx]) if end_char <= start_char: continue if (nonspace_prefix[end_char] - nonspace_prefix[start_char]) < min_chars: continue if not is_reasonable_span_text(label, text, start_char, end_char): continue spans.append( { "start": start_char, "end": end_char, "label": label, "score": float(label_scores[start_idx, end_idx]), "text": text[start_char:end_char], } ) spans = prefer_long_name_spans(spans, thresholds) spans = prefer_long_structured_spans(spans, thresholds) spans = repair_first_name_from_last_name(text, spans) text_lower = text.lower() has_digit = any(ch.isdigit() for ch in text) has_alpha = any(ch.isalpha() for ch in text) has_email_hint = "@" in text has_address_hint = bool( ADDRESS_CUE_RE.search(text) or ADDRESS_LINE_CUE_RE.search(text) or STREET_SUFFIX_RE.search(text) or HOUSE_NAME_PREFIX_RE.search(text) or ADDRESS_UNIT_PREFIX_RE.search(text) ) has_city_hint = bool( has_address_hint or POSTCODE_VALUE_RE.search(text) or "city:" in text_lower or "city " in text_lower or "town:" in text_lower or "town " in text_lower or "city/town" in text_lower or "cathair" in text_lower or "baile" in text_lower or "county" in text_lower or "contae" in text_lower or "co." in text_lower ) if has_email_hint: spans = repair_emails(text, spans) if has_digit: spans = repair_phone_numbers(text, spans) spans = repair_ppsn_variants(text, spans) spans = repair_postcodes(text, spans) if has_address_hint: spans = repair_street_addresses(text, spans) if NAME_SELF_CUE_RE.search(text): spans = repair_contextual_name_cues(text, spans) if NAME_ROLE_CUE_RE.search(text): spans = repair_role_name_cues(text, spans) if SURNAME_CUE_RE.search(text): spans = repair_surname_field_cues(text, spans) if NAME_BEFORE_STRUCTURED_CUE_RE.search(text): spans = repair_name_before_structured_cues(text, spans) if any(span["label"] in {"FIRST_NAME", "LAST_NAME"} for span in spans) and NAME_PARTICLE_SURNAME_RE.search(text): spans = repair_name_particle_surnames(text, spans) if has_address_hint: spans = repair_contextual_address_blocks(text, spans) spans = repair_address_line_prefix_spans(text, spans) spans = extend_prefixed_street_address_spans(text, spans) spans = repair_contextual_address_place_blocks(text, spans) spans = merge_adjacent_street_address_spans(text, spans) spans = prefer_long_street_address_spans(spans) if has_city_hint: spans = repair_county_spans(text, spans) spans = repair_city_spans(text, spans) spans = repair_city_before_postcode(text, spans) spans = repair_city_before_county(text, spans) spans = repair_city_field_cues(text, spans) spans = repair_prefixed_city_spans(text, spans) spans = prefer_long_city_spans(spans) spans = repair_county_field_cues(text, spans) if has_digit and DOB_CONTEXT_RE.search(text): spans = repair_contextual_date_of_birth(text, spans) if has_digit and AGE_CONTEXT_RE.search(text): spans = repair_contextual_ages(text, spans) if has_digit and PASSPORT_CUE_RE.search(text): spans = repair_contextual_passport_numbers(text, spans) if has_digit and ("iban" in text_lower or "account" in text_lower or "bank" in text_lower or "cuntas" in text_lower): spans = repair_contextual_account_numbers(text, spans) if "public contact details" in text_lower: spans = drop_public_contact_detail_spans(text, spans) if any(span["label"] in {"FIRST_NAME", "LAST_NAME"} for span in spans): spans = drop_org_like_name_spans(text, spans) spans = drop_stacked_first_names(spans) if any(span["label"] == "CITY" for span in spans): spans = drop_city_org_prefix_spans(text, spans) if any(span["label"] in {"FIRST_NAME", "LAST_NAME", "CITY"} for span in spans): spans = drop_contextual_fragment_spans(spans) if any(span["label"] in {"CITY", "COUNTY"} for span in spans): spans = canonicalize_location_spans(text, spans) if any(span["label"] == "STREET_ADDRESS" for span in spans): spans = canonicalize_street_address_spans(text, spans) if any(span["label"] == "EMAIL" for span in spans): spans = canonicalize_email_spans(text, spans) return dedupe_spans(spans) def prefer_long_name_spans(spans: list[dict], thresholds: dict[str, float]) -> list[dict]: if not spans: return spans preferred: list[dict] = [] consumed: set[int] = set() for index, span in enumerate(spans): if index in consumed: continue label = span["label"] if label not in {"FIRST_NAME", "LAST_NAME"}: preferred.append(span) continue same_start = [ (other_index, other) for other_index, other in enumerate(spans) if other_index not in consumed and other["label"] == label and other["start"] == span["start"] ] if len(same_start) == 1: preferred.append(span) continue for other_index, _ in same_start: consumed.add(other_index) best_by_score = max(same_start, key=lambda item: float(item[1].get("score", 0.0)))[1] longest = max(same_start, key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))))[1] threshold = float(thresholds.get(label, 0.5)) if float(longest.get("score", 0.0)) >= max(threshold + 0.15, float(best_by_score.get("score", 0.0)) * 0.7): preferred.append(longest) else: preferred.append(best_by_score) return prefer_same_end_extensions(preferred, thresholds) def prefer_same_end_extensions(spans: list[dict], thresholds: dict[str, float]) -> list[dict]: if not spans: return spans preferred: list[dict] = [] consumed: set[int] = set() for index, span in enumerate(spans): if index in consumed: continue label = span["label"] if label not in {"FIRST_NAME", "LAST_NAME", "EMAIL"}: preferred.append(span) continue same_end = [ (other_index, other) for other_index, other in enumerate(spans) if other_index not in consumed and other["label"] == label and other["end"] == span["end"] ] if len(same_end) == 1: preferred.append(span) continue for other_index, _ in same_end: consumed.add(other_index) best_by_score = max(same_end, key=lambda item: float(item[1].get("score", 0.0)))[1] longest = max(same_end, key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))))[1] longest_score = float(longest.get("score", 0.0)) best_score = float(best_by_score.get("score", 0.0)) if label == "EMAIL": if "@" in longest.get("text", "") or longest["end"] - longest["start"] > best_by_score["end"] - best_by_score["start"]: if longest_score >= best_score - 0.02: preferred.append(longest) continue else: longest_text = longest.get("text", "") if " " not in longest_text.strip() and longest_score >= max(float(thresholds.get(label, 0.5)) * 0.8, best_score * 0.55): preferred.append(longest) continue preferred.append(best_by_score) return preferred def prefer_long_structured_spans(spans: list[dict], thresholds: dict[str, float]) -> list[dict]: if not spans: return spans preferred: list[dict] = [] consumed: set[int] = set() target_labels = {"STREET_ADDRESS", "DATE_OF_BIRTH"} for index, span in enumerate(spans): if index in consumed: continue label = span["label"] if label not in target_labels: preferred.append(span) continue overlapping = [ (other_index, other) for other_index, other in enumerate(spans) if other_index not in consumed and other["label"] == label and spans_overlap(span, other) ] if len(overlapping) == 1: preferred.append(span) continue for other_index, _ in overlapping: consumed.add(other_index) best_by_score = max(overlapping, key=lambda item: float(item[1].get("score", 0.0)))[1] longest = max( overlapping, key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))), )[1] longest_score = float(longest.get("score", 0.0)) best_score = float(best_by_score.get("score", 0.0)) threshold = float(thresholds.get(label, 0.5)) if longest_score >= max(threshold, best_score * 0.75): preferred.append(longest) else: preferred.append(best_by_score) return preferred def sigmoid_np(values: np.ndarray) -> np.ndarray: clipped = np.clip(values, -60.0, 60.0) return 1.0 / (1.0 + np.exp(-clipped)) def run_onnx_span(session, encoded: dict[str, Any]) -> np.ndarray: feed = {} input_names = {item.name for item in session.get_inputs()} for key, value in encoded.items(): if key == "offset_mapping": continue if key in input_names: feed[key] = value outputs = session.run(None, feed) if not outputs: raise ValueError("ONNX session returned no outputs") return outputs[0]