| |
| from __future__ import annotations |
|
|
| import math |
| import re |
| from functools import lru_cache |
| from pathlib import Path |
| import sys |
| from typing import Any |
| import unicodedata |
|
|
| import numpy as np |
|
|
| ROOT_DIR = Path(__file__).resolve().parents[2] |
| if str(ROOT_DIR) not in sys.path: |
| sys.path.insert(0, str(ROOT_DIR)) |
|
|
| from base_common import ( |
| dedupe_spans, |
| label_max_span_tokens_from_config, |
| label_min_nonspace_chars_from_config, |
| label_names_from_config, |
| load_onnx_session, |
| normalize_entity_name, |
| safe_auto_tokenizer, |
| ) |
|
|
|
|
| def label_thresholds_from_config(config, default_threshold: float) -> dict[str, float]: |
| raw = getattr(config, "span_label_thresholds", None) or {} |
| out = {normalize_entity_name(key): float(value) for key, value in raw.items()} |
| for label in label_names_from_config(config): |
| out.setdefault(label, float(default_threshold)) |
| return out |
|
|
|
|
| def valid_offset(offset: tuple[int, int]) -> bool: |
| return bool(offset) and int(offset[1]) > int(offset[0]) |
|
|
|
|
| def nonspace_length(text: str, start: int, end: int) -> int: |
| return sum(0 if ch.isspace() else 1 for ch in text[int(start) : int(end)]) |
|
|
|
|
| def alnum_upper(text: str) -> str: |
| return "".join(ch for ch in text.upper() if ch.isalnum()) |
|
|
|
|
| @lru_cache(maxsize=16384) |
| def normalize_surface(text: str) -> str: |
| value = unicodedata.normalize("NFKD", text) |
| value = "".join(ch for ch in value if not unicodedata.combining(ch)) |
| value = value.replace("\u00A0", " ").replace("\u202F", " ") |
| value = re.sub(r"\s+", " ", value.strip().lower()) |
| return value |
|
|
|
|
| IRISH_CITY_FORMS = ( |
| "Dublin", |
| "Baile Átha Cliath", |
| "mBaile Átha Cliath", |
| "mBaile Atha Cliath", |
| "Galway", |
| "Gaillimh", |
| "Cork", |
| "Cork City", |
| "Corcaigh", |
| "gCorcaigh", |
| "Limerick", |
| "Luimneach", |
| "Waterford", |
| "Port Láirge", |
| "Kilkenny", |
| "Cill Chainnigh", |
| "Carlow", |
| "Ceatharlach", |
| "Sligo", |
| "Sligeach", |
| "Tralee", |
| "Trá Lí", |
| "Ennis", |
| "Inis", |
| "Letterkenny", |
| "Leitir Ceanainn", |
| "Castlebar", |
| "Caisleán an Bharraigh", |
| "Caislean an Bharraigh", |
| "gCaisleán an Bharraigh", |
| "gCaislean an Bharraigh", |
| "Wexford", |
| "Loch Garman", |
| "Navan", |
| "Uaimh", |
| "An Uaimh", |
| "hUaimh", |
| "nUaimh", |
| "Dundalk", |
| "Dún Dealgan", |
| "Dun Dealgan", |
| "Mullingar", |
| "Muileann gCearr", |
| "An Muileann gCearr", |
| "Tullamore", |
| "Tulach Mhór", |
| "Tulach Mhor", |
| "dTulach Mhór", |
| "dTulach Mhor", |
| "Portlaoise", |
| "Port Laoise", |
| "bPort Laoise", |
| "Bray", |
| "Bré", |
| "Bre", |
| "mBré", |
| "mBre", |
| "Athlone", |
| "Baile Átha Luain", |
| "Baile Atha Luain", |
| "mBaile Átha Luain", |
| "mBaile Atha Luain", |
| ) |
| IRISH_CITY_SURFACES = {normalize_surface(value) for value in IRISH_CITY_FORMS} |
|
|
| IRISH_COUNTY_FORMS = ( |
| "Co. Dublin", |
| "County Dublin", |
| "Co. Bhaile Átha Cliath", |
| "Contae Bhaile Átha Cliath", |
| "gContae Bhaile Átha Cliath", |
| "Co. Galway", |
| "County Galway", |
| "Co. na Gaillimhe", |
| "Contae na Gaillimhe", |
| "gContae na Gaillimhe", |
| "Co. Cork", |
| "County Cork", |
| "Co. Chorcaí", |
| "Contae Chorcaí", |
| "gContae Chorcaí", |
| "Co. Limerick", |
| "County Limerick", |
| "Co. Luimnigh", |
| "Contae Luimnigh", |
| "gContae Luimnigh", |
| "Co. Waterford", |
| "County Waterford", |
| "Co. Phort Láirge", |
| "Contae Phort Láirge", |
| "gContae Phort Láirge", |
| "Co. Kilkenny", |
| "County Kilkenny", |
| "Co. Chill Chainnigh", |
| "Contae Chill Chainnigh", |
| "gContae Chill Chainnigh", |
| "Co. Carlow", |
| "County Carlow", |
| "Co. Cheatharlach", |
| "Contae Cheatharlach", |
| "gContae Cheatharlach", |
| "Co. Sligo", |
| "County Sligo", |
| "Co. Shligigh", |
| "Contae Shligigh", |
| "gContae Shligigh", |
| "Co. Kerry", |
| "County Kerry", |
| "Co. Chiarraí", |
| "Contae Chiarraí", |
| "gContae Chiarraí", |
| "Co. Clare", |
| "County Clare", |
| "Co. an Chláir", |
| "Contae an Chláir", |
| "gContae an Chláir", |
| "Co. Donegal", |
| "County Donegal", |
| "Co. Dhún na nGall", |
| "Co. Dhun na nGall", |
| "Contae Dhún na nGall", |
| "Contae Dhun na nGall", |
| "gContae Dhún na nGall", |
| "gContae Dhun na nGall", |
| "Co. Mayo", |
| "County Mayo", |
| "Co. Mhaigh Eo", |
| "Contae Mhaigh Eo", |
| "gContae Mhaigh Eo", |
| "Co. Wexford", |
| "County Wexford", |
| "Co. Loch Garman", |
| "Contae Loch Garman", |
| "gContae Loch Garman", |
| "Co. Meath", |
| "County Meath", |
| "Co. na Mí", |
| "Co. na Mi", |
| "Contae na Mí", |
| "Contae na Mi", |
| "gContae na Mí", |
| "gContae na Mi", |
| "Co. Louth", |
| "County Louth", |
| "Co. Lú", |
| "Co. Lu", |
| "Contae Lú", |
| "Contae Lu", |
| "gContae Lú", |
| "gContae Lu", |
| "Co. Westmeath", |
| "County Westmeath", |
| "Co. na hIarmhí", |
| "Co. na hIarmhi", |
| "Contae na hIarmhí", |
| "Contae na hIarmhi", |
| "gContae na hIarmhí", |
| "gContae na hIarmhi", |
| "Co. Offaly", |
| "County Offaly", |
| "Co. Uíbh Fhailí", |
| "Co. Uibh Fhaili", |
| "Contae Uíbh Fhailí", |
| "Contae Uibh Fhaili", |
| "gContae Uíbh Fhailí", |
| "gContae Uibh Fhaili", |
| "Co. Laois", |
| "County Laois", |
| "Contae Laoise", |
| "gContae Laoise", |
| "Co. Wicklow", |
| "County Wicklow", |
| "Co. Chill Mhantáin", |
| "Co. Chill Mhantain", |
| "Contae Chill Mhantáin", |
| "Contae Chill Mhantain", |
| "gContae Chill Mhantáin", |
| "gContae Chill Mhantain", |
| ) |
| IRISH_COUNTY_SURFACES = {normalize_surface(value) for value in IRISH_COUNTY_FORMS} |
| COUNTY_STOP_SURFACES = { |
| normalize_surface(value) |
| for value in { |
| "County Hall", |
| "County House", |
| "County Council", |
| "County Offices", |
| "County Office", |
| } |
| } |
| IRISH_CITY_PREFIX_CHARS = {"n", "g", "m", "b", "d", "h"} |
|
|
| STREET_SUFFIX_RE = re.compile( |
| r"(?i)\b(street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)\b" |
| ) |
| BUILDING_SUFFIX_RE = re.compile( |
| r"(?i)\b(house|cottage|lodge|villa|apartments?|building|business\s+centre|community\s+centre|shopping\s+centre|retail\s+park|Teach(?:ín|in)?)\b" |
| ) |
| PHONE_SURFACE_RE = re.compile(r"^[+().\d][+().\d \-/\u00A0\u202F]*\d$") |
| ACCOUNT_DIGIT_SURFACE_RE = re.compile(r"^[\d \-\u00A0\u202F]+$") |
| MONTH_NAME_RE = ( |
| r"(?:January|February|March|April|May|June|July|August|September|October|November|December|" |
| r"Eanáir|Eanair|Feabhra|Márta|Marta|Aibreán|Aibrean|Bealtaine|Meitheamh|Iúil|Iuil|Lúnasa|Lunasa|" |
| r"Meán\s+Fómhair|Mean\s+Fomhair|Deireadh\s+Fómhair|Deireadh\s+Fomhair|Samhain|Nollaig)" |
| ) |
| DATE_OF_BIRTH_RE = re.compile( |
| rf"(?i)^(?:\d{{1,2}}[./-]\d{{1,2}}[./-]\d{{2,4}}|\d{{4}}-\d{{2}}-\d{{2}}|(?:an\s+)?\d{{1,2}}(?:st|nd|rd|th|ú)?\s+{MONTH_NAME_RE}[,]?\s+\d{{2,4}}|{MONTH_NAME_RE}\s+\d{{1,2}},?\s+\d{{2,4}})$" |
| ) |
| DATE_OF_BIRTH_VALUE_RE = re.compile( |
| rf"(?<![A-Za-z0-9])(\d{{1,2}}[./-]\d{{1,2}}[./-]\d{{2,4}}|\d{{4}}-\d{{2}}-\d{{2}}|(?:an\s+)?\d{{1,2}}(?:st|nd|rd|th|ú)?\s+{MONTH_NAME_RE}[,]?\s+\d{{2,4}}|{MONTH_NAME_RE}\s+\d{{1,2}},?\s+\d{{2,4}})(?![A-Za-z0-9])" |
| ) |
| AGE_CONTEXT_RE = re.compile(r"(?i)\b(age|aged|years?\s+old|year\s+old|year-old|yrs?\s+old|y/?o|yo|aois|bliana\s+d['’]aois|mbliana\s+d['’]aois)\b") |
| AGE_INLINE_SUFFIX_RE = re.compile(r"(?i)^(?:-year-old\b|yo\b|y/o\b|yrs?\b)") |
| AGE_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])(\d{1,3})(?![A-Za-z0-9])") |
| AGE_SELF_PREFIX_RE = re.compile(r"(?i)(?:^|.*\b)(?:i\s+am|i['’]?m|im|t[áa]\s+m[ée]|t[áa]im)\s*$") |
| DOB_CONTEXT_RE = re.compile( |
| r"(?i)\b(dob|date\s+of\s+birth|born(?:\s+on)?|data\s+breithe|dáta\s+breithe|dhata\s+breithe|dháta\s+breithe|rugadh)\b" |
| ) |
| DOB_SUFFIX_CONTEXT_RE = re.compile( |
| r"(?i)\b(?:my\s+date\s+of\s+birth|mo\s+(?:dáta|dháta|data|dhata)\s+breithe|dob|date\s+of\s+birth|rugadh)\b" |
| ) |
|
|
|
|
| def has_dob_suffix_context(text: str, end: int, window: int = 40) -> bool: |
| suffix = text[int(end) : min(len(text), int(end) + window)] |
| match = DOB_SUFFIX_CONTEXT_RE.search(suffix) |
| if not match: |
| return False |
| return not any(ch in ",.;:\n\r" for ch in suffix[: int(match.start())]) |
| ADDRESS_FIELD_CUE_PATTERN = r"(?:address(?:\s+line\s+\d+)?(?:\s+is)?|my\s+address\s+is|seoladh(?:\s+l[ií]nte?\s+\d+)?|is\s+[ée]\s+mo\s+sheoladh)" |
| ADDRESS_LINE_CUE_RE = re.compile(r"(?i)\b(?:address\s+line\s+\d+|seoladh\s+l[ií]nte?\s+\d+)\b") |
| ADDRESS_CUE_RE = re.compile( |
| rf"(?i)\b({ADDRESS_FIELD_CUE_PATTERN}|sheoladh|allocation\s+centre|intreo\s+centre|ionad\s+leithdh[aá]ilte|ionad\s+intreo|live\s+at|lives\s+at|living\s+at|located\s+at|i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]\s+m[ée]\s+i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]im\s+i\s+mo\s+ch[oó]na[ií]\s+ag|cónai\s+ag|chónai\s+ag|conai\s+ag|chonai\s+ag)\b" |
| ) |
| CITY_COUNTY_PREFIX_RE = re.compile(r"(?i)(?:county|co\.|contae|gcontae)(?:\s+na)?\s*$") |
| PPSN_CUE_RE = re.compile( |
| r"(?i)\b(ppsn|upsp|personal public service(?:\s+number)?|uimhir\s+(?:mo\s+)?upsp|uimhir\s+(?:mo\s+)?ppsn)\b" |
| ) |
| NAME_STOP_SURFACES = { |
| normalize_surface(value) |
| for value in { |
| "Address", |
| "Name", |
| "Phone", |
| "Email", |
| "Seoladh", |
| "Ainm", |
| "Teagmháil", |
| "Teagmhail", |
| "Ríomhphost", |
| "Riomhphost", |
| "Eirchód", |
| "Eirchod", |
| "Eircode", |
| "PPSN", |
| "UPSP", |
| "Call", |
| "Glao", |
| "Glaoigh", |
| "Rugadh", |
| "Ionad", |
| "Intreo", |
| "Cill", |
| "Sampla", |
| "Leithdháilte", |
| "Leithdhailte", |
| "Leithdháil", |
| "Leithdhail", |
| "Leithdh", |
| "Apartment", |
| "Flat", |
| "Unit", |
| "Suite", |
| "Árasán", |
| "Arasan", |
| "Aonad", |
| "County", |
| "Contae", |
| "Fón", |
| "Fon", |
| "January", |
| "February", |
| "March", |
| "April", |
| "May", |
| "June", |
| "July", |
| "August", |
| "September", |
| "October", |
| "November", |
| "December", |
| "Monday", |
| "Tuesday", |
| "Wednesday", |
| "Thursday", |
| "Friday", |
| "Saturday", |
| "Sunday", |
| "Eanáir", |
| "Feabhra", |
| "Márta", |
| "Aibreán", |
| "Aibrean", |
| "Bealtaine", |
| "Meitheamh", |
| "Iúil", |
| "Iuil", |
| "Lúnasa", |
| "Lunasa", |
| "Meán Fómhair", |
| "Mean Fomhair", |
| "Deireadh Fómhair", |
| "Deireadh Fomhair", |
| "Samhain", |
| "Nollaig", |
| "Luan", |
| "Máirt", |
| "Mairt", |
| "Céadaoin", |
| "Ceadaoin", |
| "Déardaoin", |
| "Deardaoin", |
| "Aoine", |
| "Satharn", |
| "Domhnach", |
| } |
| } |
| NAME_PARTICLE_SURFACES = { |
| normalize_surface(value) |
| for value in {"Ó", "O", "Ní", "Ni", "Nic", "Mac", "Mc", "de", "van", "von"} |
| } |
| STREET_TRAILING_BLOCK_SURFACES = { |
| normalize_surface(value) |
| for value in { |
| "are", |
| "public", |
| "contact", |
| "details", |
| "website", |
| "open", |
| "before", |
| "visiting", |
| "roimh", |
| "chuairt", |
| "agus", |
| "and", |
| "the", |
| "is", |
| "ta", |
| } |
| } |
| ADDRESS_UNIT_PREFIX_RE = re.compile(r"(?i)^(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\b") |
| HOUSE_NAME_PREFIX_RE = re.compile( |
| r"(?i)^(?:[A-ZÁÉÍÓÚ][\w'’.-]+(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]+){0,2}\s+(?:house|cottage|lodge|villa)|teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚ][\w'’.-]+)?)$" |
| ) |
| STREET_ADDRESS_VALUE_RE = re.compile( |
| r"(?i)(?<![\w@])(" |
| r"(?:(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\s+[A-Za-z0-9-]+,\s+)?" |
| r"(?:(?:[A-ZÁÉÍÓÚ][\w'’.-]+(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]+){0,2}\s+(?:house|cottage|lodge|villa)|teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚ][\w'’.-]+)?),\s+)?" |
| r"(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)(?:\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*){0,2}" |
| r")" |
| ) |
|
|
|
|
| def is_plausible_last_name_sequence(value: str) -> bool: |
| tokens = [token for token in re.split(r"\s+", value.strip()) if token] |
| if not tokens: |
| return False |
| for token in tokens: |
| if not any(ch.isalpha() for ch in token): |
| return False |
| if not all(is_name_token_char(ch) for ch in token): |
| return False |
| alpha_chars = [ch for ch in token if ch.isalpha()] |
| first_alpha = alpha_chars[0] if alpha_chars else "" |
| if first_alpha.isupper(): |
| continue |
| if len(alpha_chars) >= 2 and alpha_chars[0].islower() and alpha_chars[1].isupper(): |
| continue |
| if normalize_surface(token) in NAME_PARTICLE_SURFACES: |
| continue |
| return False |
| return True |
|
|
|
|
| def is_reasonable_span_text(label: str, text: str, start: int, end: int) -> bool: |
| value = text[int(start) : int(end)].strip() |
| if not value: |
| return False |
| upper: str | None = None |
|
|
| if label in {"FIRST_NAME", "LAST_NAME"}: |
| if not any(ch.isalpha() for ch in value): |
| return False |
| if any(ch.isdigit() for ch in value): |
| return False |
| if int(start) > 0 and text[int(start) - 1].isalpha(): |
| return False |
| if int(end) < len(text) and text[int(end)].isalpha(): |
| return False |
| if normalize_surface(value) in NAME_STOP_SURFACES: |
| return False |
| if label == "FIRST_NAME" and any(ch.isspace() for ch in value): |
| return False |
| if any(ch in ".,;:/@()" for ch in value): |
| return False |
| if label == "FIRST_NAME": |
| first_alpha = next((ch for ch in value if ch.isalpha()), "") |
| if not first_alpha or not first_alpha.isupper(): |
| return False |
| if label == "LAST_NAME" and not is_plausible_last_name_sequence(value): |
| return False |
| if start > 0 and text[int(start) - 1].isdigit(): |
| return False |
| return True |
|
|
| if label == "EMAIL": |
| if "@" not in value: |
| return False |
| local, _, domain = value.partition("@") |
| return bool(local) and "." in domain |
|
|
| if label == "PHONE_NUMBER": |
| normalized = value.replace("\u00A0", " ").replace("\u202F", " ").strip() |
| if any(ch.isalpha() for ch in normalized): |
| return False |
| if "@" in normalized: |
| return False |
| if int(start) > 0 and text[int(start) - 1].isalnum(): |
| return False |
| if int(end) < len(text) and text[int(end)].isalnum(): |
| return False |
| if not PHONE_SURFACE_RE.match(normalized): |
| return False |
| digits = "".join(ch for ch in value if ch.isdigit()) |
| if normalized.startswith("+353"): |
| tail = digits[3:] |
| if tail.startswith("0"): |
| tail = tail[1:] |
| return 8 <= len(tail) <= 9 |
| if not digits.startswith("0"): |
| return False |
| if digits.startswith("0818") or digits.startswith("1800"): |
| return len(digits) == 10 |
| if digits.startswith("08"): |
| return len(digits) == 10 |
| if digits.startswith("01"): |
| return len(digits) == 9 |
| return 9 <= len(digits) <= 10 |
|
|
| if label == "PPSN": |
| upper = alnum_upper(value) |
| return bool(len(upper) in {8, 9} and upper[:7].isdigit() and upper[7:].isalpha()) |
|
|
| if label == "POSTCODE": |
| compact = value.replace(" ", "").replace("\u00A0", "").replace("\u202F", "") |
| if any(not (ch.isalnum() or ch.isspace()) for ch in value): |
| return False |
| if len(compact) != 7: |
| return False |
| routing = compact[:3] |
| unique = compact[3:] |
| routing_ok = bool( |
| (routing[0].isalpha() and routing[1:].isdigit()) |
| or routing == "D6W" |
| ) |
| unique_ok = bool( |
| len(unique) == 4 |
| and unique[0].isalpha() |
| and unique[1:].isalnum() |
| ) |
| return routing_ok and unique_ok |
|
|
| if label == "PASSPORT_NUMBER": |
| return bool(re.fullmatch(r"[A-Z]{1,2}\s?\d{7}", value.strip())) |
|
|
| if label == "BANK_ROUTING_NUMBER": |
| digits = "".join(ch for ch in value if ch.isdigit()) |
| if len(digits) != 6: |
| return False |
| context = text[max(0, int(start) - 32) : min(len(text), int(end) + 24)] |
| return bool(BANK_ROUTING_CONTEXT_RE.search(context)) |
|
|
| if label == "SWIFT_BIC": |
| upper = alnum_upper(value) |
| return len(upper) in {8, 11} and upper.isalnum() |
|
|
| if label == "CREDIT_DEBIT_CARD": |
| digits = "".join(ch for ch in value if ch.isdigit()) |
| return 12 <= len(digits) <= 19 |
|
|
| if label == "ACCOUNT_NUMBER": |
| upper = alnum_upper(value) |
| if upper.startswith("IE"): |
| return bool(re.fullmatch(r"IE\d{2}[A-Z0-9]{18}", upper)) |
| if not ACCOUNT_DIGIT_SURFACE_RE.fullmatch(value.strip()): |
| return False |
| digits = "".join(ch for ch in value if ch.isdigit()) |
| return 6 <= len(digits) <= 34 |
|
|
| if label == "AGE": |
| digits = "".join(ch for ch in value if ch.isdigit()) |
| if digits != value.strip(): |
| return False |
| if not digits: |
| return False |
| if int(start) > 0 and text[int(start) - 1].isalnum(): |
| return False |
| trailing = text[int(end) : min(len(text), int(end) + 12)] |
| if int(end) < len(text) and text[int(end)].isalnum() and not AGE_INLINE_SUFFIX_RE.match(trailing): |
| return False |
| if int(start) > 0 and text[int(start) - 1] in "/-": |
| return False |
| if int(end) < len(text) and text[int(end)] in "/-" and not AGE_INLINE_SUFFIX_RE.match(trailing): |
| return False |
| age = int(digits) |
| if not (0 < age <= 120): |
| return False |
| context = text[max(0, int(start) - 32) : min(len(text), int(end) + 24)] |
| prefix = text[max(0, int(start) - 24) : int(start)] |
| return bool(AGE_CONTEXT_RE.search(context) or AGE_SELF_PREFIX_RE.search(prefix)) |
|
|
| if label == "DATE_OF_BIRTH": |
| if not any(ch.isdigit() for ch in value): |
| return False |
| if not DATE_OF_BIRTH_RE.match(value.strip()): |
| return False |
| prefix = text[max(0, int(start) - 96) : int(start)] |
| return bool(DOB_CONTEXT_RE.search(prefix) or has_dob_suffix_context(text, int(end))) |
|
|
| if label == "CITY": |
| if any(ch.isdigit() for ch in value): |
| return False |
| prefix = text[max(0, int(start) - 20) : int(start)] |
| if CITY_COUNTY_PREFIX_RE.search(prefix): |
| return False |
| return normalize_surface(value) in IRISH_CITY_SURFACES |
|
|
| if label == "COUNTY": |
| if any(ch.isdigit() for ch in value): |
| return False |
| normalized = normalize_surface(value) |
| if normalized in COUNTY_STOP_SURFACES: |
| return False |
| if normalized.startswith(("county hall", "county house", "county council", "county office", "county offices")): |
| return False |
| if normalized in IRISH_COUNTY_SURFACES: |
| return True |
| if normalized.startswith(("county ", "contae ", "gcontae ", "co. ")): |
| tail = normalized.split(" ", 1)[1] if " " in normalized else "" |
| if tail in {"hall", "house", "council", "office", "offices"}: |
| return False |
| return True |
| return False |
|
|
| if label == "STREET_ADDRESS": |
| cleaned = value.strip() |
| address_parts = [part.strip() for part in cleaned.split(",")] |
| if len(address_parts) > 3: |
| return False |
| prefix_part = "" |
| building_part = "" |
| street_part = cleaned |
| if len(address_parts) == 2: |
| prefix_part, street_part = address_parts |
| if not prefix_part or not street_part: |
| return False |
| if not ( |
| ADDRESS_UNIT_PREFIX_RE.match(prefix_part) |
| or HOUSE_NAME_PREFIX_RE.match(prefix_part) |
| ): |
| return False |
| elif len(address_parts) == 3: |
| prefix_part, building_part, street_part = address_parts |
| if not prefix_part or not building_part or not street_part: |
| return False |
| if not ADDRESS_UNIT_PREFIX_RE.match(prefix_part): |
| return False |
| if not HOUSE_NAME_PREFIX_RE.match(building_part): |
| return False |
| suffix_match = STREET_SUFFIX_RE.search(street_part) |
| if not suffix_match: |
| return False |
| if any(ch in "@:;" for ch in cleaned): |
| return False |
| trailing = street_part[int(suffix_match.end()) :].strip() |
| trailing_tokens = [token for token in re.split(r"\s+", trailing) if token] |
| if len(trailing_tokens) > 3: |
| return False |
| if any(normalize_surface(token) in STREET_TRAILING_BLOCK_SURFACES for token in trailing_tokens): |
| return False |
| has_digit = any(ch.isdigit() for ch in street_part) |
| if has_digit and not re.match(r"^\s*\d{1,4}\b", street_part): |
| return False |
| title_tokens = [token for token in re.split(r"\s+", street_part) if token] |
| if not has_digit and not prefix_part: |
| context = text[max(0, int(start) - 24) : min(len(text), int(end) + 12)] |
| if not ADDRESS_CUE_RE.search(context): |
| return False |
| return has_digit or len(title_tokens) >= 2 |
|
|
| return True |
|
|
|
|
| def spans_overlap(a: dict, b: dict) -> bool: |
| return int(a["start"]) < int(b["end"]) and int(b["start"]) < int(a["end"]) |
|
|
|
|
| def is_name_token_char(ch: str) -> bool: |
| return ch.isalpha() or ch in {"-", "'", "’"} |
|
|
|
|
| def is_plausible_first_name(value: str) -> bool: |
| if not value: |
| return False |
| if any(ch.isspace() for ch in value): |
| return False |
| if any(ch.isdigit() for ch in value): |
| return False |
| if any(ch in ",;:/@()" for ch in value): |
| return False |
| if not any(ch.isalpha() for ch in value): |
| return False |
| first_alpha = next((ch for ch in value if ch.isalpha()), "") |
| if not first_alpha or not first_alpha.isupper(): |
| return False |
| return all(is_name_token_char(ch) for ch in value) |
|
|
|
|
| def is_plausible_cued_first_name(value: str) -> bool: |
| if not value: |
| return False |
| if any(ch.isspace() for ch in value): |
| return False |
| if any(ch.isdigit() for ch in value): |
| return False |
| if any(ch in ",;:/@()" for ch in value): |
| return False |
| if not any(ch.isalpha() for ch in value): |
| return False |
| return all(is_name_token_char(ch) for ch in value) |
|
|
|
|
| def is_plausible_cued_last_name_sequence(value: str) -> bool: |
| tokens = [token for token in re.split(r"\s+", value.strip()) if token] |
| if not tokens: |
| return False |
| for token in tokens: |
| if not any(ch.isalpha() for ch in token): |
| return False |
| if not all(is_name_token_char(ch) for ch in token): |
| return False |
| alpha_chars = [ch for ch in token if ch.isalpha()] |
| first_alpha = alpha_chars[0] if alpha_chars else "" |
| if first_alpha.isupper() or first_alpha.islower(): |
| continue |
| if normalize_surface(token) in NAME_PARTICLE_SURFACES: |
| continue |
| return False |
| return True |
|
|
|
|
| def extract_name_tokens_after_cue(text: str, cue_end: int, max_tokens: int = 4) -> list[tuple[int, int, str]]: |
| cursor = cue_end |
| while cursor < len(text) and text[cursor].isspace(): |
| cursor += 1 |
| tokens: list[tuple[int, int, str]] = [] |
| while cursor < len(text): |
| saw_line_break = False |
| while cursor < len(text) and text[cursor].isspace(): |
| if text[cursor] in "\r\n": |
| saw_line_break = True |
| cursor += 1 |
| if saw_line_break and tokens: |
| break |
| if cursor >= len(text) or text[cursor] in ",.;:\n": |
| break |
| token_start = cursor |
| while cursor < len(text) and is_name_token_char(text[cursor]): |
| cursor += 1 |
| if token_start == cursor: |
| break |
| token = text[token_start:cursor] |
| normalized = normalize_surface(token) |
| if tokens and normalized in NAME_CUE_STOP_SURFACES: |
| break |
| if not all(is_name_token_char(ch) for ch in token): |
| break |
| tokens.append((token_start, cursor, token)) |
| if len(tokens) >= max_tokens: |
| break |
| if cursor < len(text) and text[cursor] in ",.;:\n": |
| break |
| if cursor < len(text) and not text[cursor].isspace(): |
| break |
| return tokens |
|
|
|
|
| def repair_name_particle_surnames(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| surname_re = re.compile( |
| r"^[ \t]*((?:Ní|Ni|Ó|O|Nic|Mac|Mc)[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*)?)" |
| ) |
| first_names = [span for span in repaired if span["label"] == "FIRST_NAME"] |
| for first_name in first_names: |
| tail = text[int(first_name["end"]) : min(len(text), int(first_name["end"]) + 40)] |
| match = surname_re.match(tail) |
| if not match: |
| continue |
| start = int(first_name["end"]) + int(match.start(1)) |
| end = int(first_name["end"]) + int(match.end(1)) |
| candidate = text[start:end] |
| if not is_plausible_cued_last_name_sequence(candidate): |
| continue |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "LAST_NAME", |
| "score": 0.66, |
| "text": candidate, |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_first_name_from_last_name(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for last_name in [span for span in repaired if span["label"] == "LAST_NAME"]: |
| if any( |
| span["label"] == "FIRST_NAME" |
| and int(span["end"]) <= int(last_name["start"]) |
| and int(last_name["start"]) - int(span["end"]) <= 2 |
| for span in repaired |
| ): |
| continue |
|
|
| cursor = int(last_name["start"]) - 1 |
| if cursor < 0 or not text[cursor].isspace(): |
| continue |
| while cursor >= 0 and text[cursor].isspace(): |
| cursor -= 1 |
| token_end = cursor + 1 |
| while cursor >= 0 and is_name_token_char(text[cursor]): |
| cursor -= 1 |
| token_start = cursor + 1 |
| if token_end <= token_start: |
| continue |
| candidate = text[token_start:token_end] |
| if not is_plausible_first_name(candidate): |
| continue |
| candidate_span = { |
| "start": token_start, |
| "end": token_end, |
| "label": "FIRST_NAME", |
| "score": float(last_name.get("score", 0.5)) * 0.6, |
| "text": candidate, |
| } |
| if any(spans_overlap(candidate_span, other) for other in repaired if other["label"] == "FIRST_NAME"): |
| continue |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_contextual_name_cues(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in NAME_SELF_CUE_RE.finditer(text): |
| if any( |
| other["label"] == "FIRST_NAME" |
| and 0 <= int(other["start"]) - match.end() <= 4 |
| for other in repaired |
| ) and any( |
| other["label"] == "LAST_NAME" |
| and 0 <= int(other["start"]) - match.end() <= 16 |
| for other in repaired |
| ): |
| continue |
| cursor = match.end() |
| while cursor < len(text) and text[cursor].isspace(): |
| cursor += 1 |
| start = cursor |
| while cursor < len(text) and (is_name_token_char(text[cursor]) or text[cursor].isspace()): |
| cursor += 1 |
| candidate = text[start:cursor].strip() |
| raw_tokens = [token for token in re.split(r"\s+", candidate) if token] |
| tokens: list[str] = [] |
| for token in raw_tokens: |
| normalized = normalize_surface(token) |
| if tokens and normalized in LOWER_NAME_STOP_SURFACES: |
| break |
| if not all(is_name_token_char(ch) for ch in token): |
| break |
| tokens.append(token) |
| if len(tokens) >= 4: |
| break |
| if len(tokens) < 2: |
| continue |
| first_value = tokens[0] |
| last_value = " ".join(tokens[1:]) |
| if not is_plausible_cued_first_name(first_value): |
| continue |
| if not is_plausible_cued_last_name_sequence(last_value): |
| continue |
| first_start = text.find(first_value, start, cursor) |
| if first_start < 0: |
| continue |
| first_end = first_start + len(first_value) |
| last_start = text.find(last_value, first_end, cursor) |
| if last_start < 0: |
| continue |
| last_end = last_start + len(last_value) |
| first_span = { |
| "start": first_start, |
| "end": first_end, |
| "label": "FIRST_NAME", |
| "score": 0.63, |
| "text": text[first_start:first_end], |
| } |
| last_span = { |
| "start": last_start, |
| "end": last_end, |
| "label": "LAST_NAME", |
| "score": 0.63, |
| "text": text[last_start:last_end], |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) and not ( |
| spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.extend([first_span, last_span]) |
| return repaired |
|
|
|
|
| def repair_role_name_cues(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in NAME_ROLE_CUE_RE.finditer(text): |
| token_bounds = extract_name_tokens_after_cue(text, match.end()) |
| if len(token_bounds) < 2: |
| continue |
| first_start, first_end, first_value = token_bounds[0] |
| last_start = token_bounds[1][0] |
| last_end = token_bounds[-1][1] |
| last_value = text[last_start:last_end] |
| if not is_plausible_cued_first_name(first_value): |
| continue |
| if not is_plausible_cued_last_name_sequence(last_value): |
| continue |
| first_span = { |
| "start": first_start, |
| "end": first_end, |
| "label": "FIRST_NAME", |
| "score": 0.63, |
| "text": text[first_start:first_end], |
| } |
| last_span = { |
| "start": last_start, |
| "end": last_end, |
| "label": "LAST_NAME", |
| "score": 0.63, |
| "text": text[last_start:last_end], |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) and not ( |
| spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.extend([first_span, last_span]) |
| return repaired |
|
|
|
|
| def repair_surname_field_cues(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in SURNAME_CUE_RE.finditer(text): |
| token_bounds = extract_name_tokens_after_cue(text, match.end()) |
| if not token_bounds: |
| continue |
| start = token_bounds[0][0] |
| end = token_bounds[-1][1] |
| candidate = text[start:end] |
| if not is_plausible_cued_last_name_sequence(candidate): |
| continue |
| last_span = { |
| "start": start, |
| "end": end, |
| "label": "LAST_NAME", |
| "score": 0.64, |
| "text": candidate, |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.append(last_span) |
| return repaired |
|
|
|
|
| def repair_name_before_structured_cues(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in NAME_BEFORE_STRUCTURED_CUE_RE.finditer(text): |
| token_bounds = extract_name_tokens_after_cue(text, match.start(1)) |
| if len(token_bounds) < 2: |
| continue |
| first_start, first_end, first_value = token_bounds[0] |
| last_start = token_bounds[1][0] |
| last_end = token_bounds[-1][1] |
| last_value = text[last_start:last_end] |
| if not is_plausible_first_name(first_value): |
| continue |
| if not is_plausible_last_name_sequence(last_value): |
| continue |
| first_span = { |
| "start": first_start, |
| "end": first_end, |
| "label": "FIRST_NAME", |
| "score": 0.64, |
| "text": text[first_start:first_end], |
| } |
| last_span = { |
| "start": last_start, |
| "end": last_end, |
| "label": "LAST_NAME", |
| "score": 0.64, |
| "text": text[last_start:last_end], |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(first_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) and not ( |
| spans_overlap(last_span, other) and other["label"] in {"FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.extend([first_span, last_span]) |
| return repaired |
|
|
|
|
| PASSPORT_CUE_RE = re.compile( |
| r"(?i)(passport(?:\s+number)?|phas|uimhir\s+(?:mo\s+)?phas)" |
| ) |
| PASSPORT_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])([A-Z]{1,2}\s?\d{7})(?![A-Za-z0-9])") |
| EMAIL_EXTRACT_RE = re.compile(r"([^\s@,;:()<>]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,})", re.UNICODE) |
| PHONE_CUE_RE = re.compile( |
| r"(?i)\b(phone|call|contact|reach\s+me|glaoigh\s+ar|teagmh[aá]il|uimhir|m['’]uimhir|f[oó]n|fon|teileaf[oó]n|telefon)\b" |
| ) |
| NAME_SELF_CUE_RE = re.compile( |
| r"(?i)\b(my\s+name\s+is|is\s+mise|is\s+[ée]\s+m['’]?ainm|is\s+[ée]\s+mo\s+ainm)\b" |
| ) |
| NAME_ROLE_CUE_RE = re.compile( |
| r"(?i)(?:\b(?:applicant|customer|claimant|patient|an\s+t-iarratas[oó]ir|iarratas[oó]ir)\b\s*[:,]\s*|\b(?:full\s+name|name|ainm(?!\s+teaghlaigh))\b\s*:\s*)" |
| ) |
| SURNAME_CUE_RE = re.compile( |
| r"(?i)\b(?:my\s+)?(?:surname|last\s+name|family\s+name|ainm\s+teaghlaigh|sloinne)\b(?:\s+is)?\s*[:,-]?\s*" |
| ) |
| NAME_PARTICLE_SURNAME_RE = re.compile(r"(?i)(?:\bN[ií]\b|\bÓ\b|\bNic\b|\bMac\b|\bMc\b|\bO['’])") |
| NAME_BEFORE_STRUCTURED_CUE_RE = re.compile( |
| r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’.-]*(?:\s+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’.-]*){1,3})(?=\s*,\s*(?:PPSN|UPSP|DOB|Date\s+of\s+birth|D[áa]ta\s+breithe|Address|Seoladh|lives?\s+at|my\s+phone|phone|email|r-phost))" |
| ) |
| LOWER_NAME_STOP_SURFACES = { |
| normalize_surface(value) |
| for value in {"and", "agus", "is", "ta", "tá", "my", "mo", "an", "the"} |
| } |
| NAME_CUE_STOP_SURFACES = { |
| normalize_surface(value) |
| for value in { |
| "and", |
| "agus", |
| "submitted", |
| "provided", |
| "gave", |
| "her", |
| "his", |
| "their", |
| "she", |
| "he", |
| "email", |
| "phone", |
| "fón", |
| "fon", |
| "ppsn", |
| "upsp", |
| "address", |
| "seoladh", |
| "dob", |
| "age", |
| "aois", |
| "bank", |
| "iban", |
| "swift", |
| "chuir", |
| "isteach", |
| "sí", |
| "si", |
| "a", |
| "huimhir", |
| } |
| } |
| ORG_NAME_TRAILING_SURFACES = { |
| normalize_surface(value) |
| for value in {"centre", "center", "clinic", "hospital", "office", "service", "section", "unit", "council"} |
| } |
| ORG_CITY_TAIL_RE = re.compile( |
| r"(?i)^\s+(?:intreo\s+centre|business\s+centre|community\s+centre|shopping\s+centre|retail\s+park|p[áa]irc\s+miond[ií]ola)\b" |
| ) |
| ORG_NAME_PREFIX_RE = re.compile( |
| r"(?i)(?:retail\s+park|business\s+centre|community\s+centre|shopping\s+centre|p[áa]irc\s+miond[ií]ola)\s*$" |
| ) |
| PUBLIC_CONTACT_DETAILS_RE = re.compile(r"(?i)\bpublic\s+contact\s+details\b") |
| CITY_CUE_RE = re.compile( |
| r"(?i)\b(address|seoladh|located|suite|centre|center|ionad|intreo|clinic|hospital|ospid[eé]al|hse|fss)\b" |
| ) |
| BANK_ROUTING_CONTEXT_RE = re.compile( |
| r"(?i)\b(sort\s+code|routing\s+number|bank\s+of\s+ireland|aib|cod\s+sort[aá]la|sort[aá]la)\b" |
| ) |
| PHONE_VALUE_RE = re.compile( |
| r"(?<![A-Za-z0-9])((?:\+353(?:\s*\((?:0)?\d{1,2}\))?[\s\-./]?|0)\d(?:[\s\-./]?\d){6,13}|\(\s*0\d{1,2}\s*\)(?:[\s\-./]?\d){6,10})(?![A-Za-z0-9])" |
| ) |
| PPSN_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])(\d{7}(?:[\s-]*[A-Za-z]){1,2})(?![A-Za-z0-9])") |
| POSTCODE_VALUE_RE = re.compile( |
| r"(?<![A-Za-z0-9])((?:[A-Za-z]\d{2}|D6W)[\s\u00A0\u202F]?[A-Za-z][A-Za-z0-9]{3})(?![A-Za-z0-9])" |
| ) |
| CITY_BEFORE_POSTCODE_RE = re.compile( |
| r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2})(?=\s*,\s*(?:(?:County|Contae|gContae|Co\.)\s+[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\s*,\s*)?(?:[A-Z]\d{2}|D6W))" |
| ) |
| CITY_BEFORE_COUNTY_RE = re.compile( |
| r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,3})(?=\s*,\s*(?:County|Contae|gContae|Co\.)\s+[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\b)" |
| ) |
| CITY_TOWN_SUFFIX_RE = re.compile( |
| r"(?<![A-Za-zÁÉÍÓÚáéíóú])([A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\s+Town)(?=\s*,\s*(?:(?:County|Contae|gContae|Co\.)\s+[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,2}\s*,\s*)?(?:[A-Z]\d{2}|D6W))" |
| ) |
| CITY_FIELD_VALUE_RE = re.compile( |
| r"(?im)(?:^|[\n\r])\s*(?:city(?:/town)?|town|cathair|baile)\b\s*[:,-]?\s*([A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,3})" |
| ) |
| INLINE_CITY_FIELD_VALUE_RE = re.compile( |
| r"(?i)\b(?:city(?:/town)?|cathair|baile)\b\s*[:,-]\s*([A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,3})" |
| ) |
| COUNTY_FIELD_VALUE_RE = re.compile( |
| r"(?im)(?:^|[\n\r])\s*(?:county|co\.|contae|gcontae)\b\s*[:,-]?\s*((?:Co\.[ \t]+)?[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,2})" |
| ) |
| INLINE_COUNTY_FIELD_VALUE_RE = re.compile( |
| r"(?i)\b(?:county|co\.|contae|gcontae)\b\s*[:,-]\s*((?:Co\.[ \t]+)?[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*(?:[ \t]+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){0,2})" |
| ) |
| COUNTY_VALUE_RE = re.compile( |
| r"(?<![A-Za-zÁÉÍÓÚáéíóú])((?:County|Contae|gContae|Co\.)(?:\s+[A-ZÁÉÍÓÚ][A-Za-zÁÉÍÓÚáéíóú'’-]*){1,3})(?![A-Za-zÁÉÍÓÚáéíóú])" |
| ) |
| ADDRESS_BLOCK_VALUE_RE = re.compile( |
| rf"(?i)\b(?:{ADDRESS_FIELD_CUE_PATTERN}|live\s+at|lives\s+at|living\s+at|located\s+at|t[áa]\s+m[ée]\s+i\s+mo\s+ch[oó]na[ií]\s+ag|t[áa]im\s+i\s+mo\s+ch[oó]na[ií]\s+ag)\b\s*[:,-]?\s*([^,\n.]+)" |
| ) |
| ADDRESS_LINE_PREFIX_VALUE_RE = re.compile( |
| r"(?im)^(?:address\s+line\s+1|seoladh\s+l[ií]ne\s+1)\s*:\s*((?:(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\s+[A-Za-z0-9-]+,\s+)?(?:[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,5}|Teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]+)?))\s*$" |
| ) |
| ADDRESS_PLACE_VALUE_RE = re.compile( |
| rf"(?i)\b(?:{ADDRESS_FIELD_CUE_PATTERN}|live\s+at|lives\s+at|living\s+at|located\s+at|allocation\s+centre|intreo\s+centre)\b\s*[:,-]?\s*(" |
| r"(?:(?:apartment|apt\.?|flat|unit|suite|[AaÁá]ras[aá]n|aonad)\s+[A-Za-z0-9-]+,\s+)?" |
| r"(?:[A-ZÁÉÍÓÚ][\w'’.-]*(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*){0,4}\s+(?:business\s+centre|community\s+centre|shopping\s+centre|retail\s+park)|P[áa]irc\s+Miond[ií]ola(?:\s+[A-ZÁÉÍÓÚ][\w'’.-]*)?)" |
| r")" |
| ) |
| ADDRESS_BUILDING_TAIL_RE = re.compile( |
| r"^\s*,\s*((?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:house|cottage|lodge|villa|apartments?|building|business\s+centre|community\s+centre|shopping\s+centre|retail\s+park)|Teach(?:ín|in)?(?:\s+(?:na|an|an\s+t-)\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]+)?)", |
| flags=re.IGNORECASE, |
| ) |
| LOCATION_FORM_EDGE = r"[A-Za-zÁÉÍÓÚáéíóú]" |
| _BOUNDED_FORM_PATTERNS = { |
| form: re.compile( |
| rf"(?<!{LOCATION_FORM_EDGE}){re.escape(form)}(?!{LOCATION_FORM_EDGE})", |
| flags=re.IGNORECASE, |
| ) |
| for form in {*(IRISH_CITY_FORMS), *(IRISH_COUNTY_FORMS)} |
| } |
|
|
|
|
| def iter_bounded_form_matches(form: str, text: str): |
| pattern = _BOUNDED_FORM_PATTERNS.get(form) |
| if pattern is None: |
| pattern = re.compile( |
| rf"(?<!{LOCATION_FORM_EDGE}){re.escape(form)}(?!{LOCATION_FORM_EDGE})", |
| flags=re.IGNORECASE, |
| ) |
| _BOUNDED_FORM_PATTERNS[form] = pattern |
| return pattern.finditer(text) |
|
|
|
|
| def repair_contextual_passport_numbers(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in PASSPORT_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "PASSPORT_NUMBER", |
| "score": 0.67, |
| "text": text[start:end], |
| } |
| if any( |
| other["label"] == "PASSPORT_NUMBER" |
| and int(other["start"]) <= start |
| and int(other["end"]) >= end |
| for other in repaired |
| ): |
| continue |
| cue_window = text[max(0, start - 32) : start] |
| if not PASSPORT_CUE_RE.search(cue_window): |
| continue |
| conflicting_labels = {"PHONE_NUMBER", "PPSN", "ACCOUNT_NUMBER", "AGE", "PASSPORT_NUMBER"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_ppsn_variants(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in PPSN_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| value = text[start:end] |
| compact = alnum_upper(value) |
| if not (len(compact) in {8, 9} and compact[:7].isdigit() and compact[7:].isalpha()): |
| continue |
| cue_window = text[max(0, start - 32) : min(len(text), end + 24)] |
| has_cue = bool(PPSN_CUE_RE.search(cue_window)) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "PPSN", |
| "score": 0.72 if has_cue else 0.58, |
| "text": value, |
| } |
| conflicting_labels = {"PHONE_NUMBER", "PASSPORT_NUMBER", "ACCOUNT_NUMBER", "AGE", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels.union({"PPSN"}) |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_contextual_date_of_birth(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in DATE_OF_BIRTH_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| cue_window = text[max(0, start - 96) : start] |
| if not (DOB_CONTEXT_RE.search(cue_window) or has_dob_suffix_context(text, end)): |
| continue |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "DATE_OF_BIRTH", |
| "score": 0.66, |
| "text": text[start:end], |
| } |
| conflicting_labels = {"DATE_OF_BIRTH", "PHONE_NUMBER", "AGE", "FIRST_NAME", "LAST_NAME", "ACCOUNT_NUMBER", "CITY"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_contextual_ages(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in AGE_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| if not is_reasonable_span_text("AGE", text, start, end): |
| continue |
| if any( |
| spans_overlap({"start": start, "end": end}, other) |
| and other["label"] == "DATE_OF_BIRTH" |
| for other in repaired |
| ): |
| continue |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "AGE", |
| "score": 0.66, |
| "text": text[start:end], |
| } |
| conflicting_labels = {"AGE", "PHONE_NUMBER", "ACCOUNT_NUMBER"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| ACCOUNT_CUE_RE = re.compile( |
| r"(?i)(account\s+number|bank\s+account|uimhir\s+chuntais|cuntas\s+bainc)" |
| ) |
| ACCOUNT_VALUE_RE = re.compile(r"(?<![A-Za-z0-9])(\d{6,12})(?![A-Za-z0-9])") |
|
|
|
|
| def repair_contextual_account_numbers(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in ACCOUNT_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "ACCOUNT_NUMBER", |
| "score": 0.51, |
| "text": text[start:end], |
| } |
| if any( |
| other["label"] == "ACCOUNT_NUMBER" |
| and int(other["start"]) <= start |
| and int(other["end"]) >= end |
| for other in repaired |
| ): |
| continue |
| cue_window = text[max(0, start - 40) : start] |
| if not ACCOUNT_CUE_RE.search(cue_window): |
| continue |
| if any( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"PHONE_NUMBER", "BANK_ROUTING_NUMBER", "PPSN", "POSTCODE", "PASSPORT_NUMBER"} |
| for other in repaired |
| ): |
| continue |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_emails(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in EMAIL_EXTRACT_RE.finditer(text): |
| start, end = match.span(1) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "EMAIL", |
| "score": 0.74, |
| "text": text[start:end], |
| } |
| conflicting_labels = {"EMAIL", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_phone_numbers(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in PHONE_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "PHONE_NUMBER", |
| "score": 0.69, |
| "text": text[start:end], |
| } |
| cue_window = text[max(0, start - 32) : min(len(text), end + 16)] |
| has_cue = bool(PHONE_CUE_RE.search(cue_window)) |
| has_overlap = any(spans_overlap(candidate_span, other) and other["label"] == "PHONE_NUMBER" for other in repaired) |
| if not (has_cue or has_overlap): |
| continue |
| if not is_reasonable_span_text("PHONE_NUMBER", text, start, end): |
| continue |
| conflicting_labels = {"PHONE_NUMBER", "PPSN", "ACCOUNT_NUMBER", "BANK_ROUTING_NUMBER", "CREDIT_DEBIT_CARD"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_postcodes(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in POSTCODE_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "POSTCODE", |
| "score": 0.71, |
| "text": text[start:end], |
| } |
| conflicting_labels = {"POSTCODE", "PHONE_NUMBER", "ACCOUNT_NUMBER", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_city_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| seen: set[tuple[int, int]] = set() |
| ordered_forms = sorted(IRISH_CITY_FORMS, key=len, reverse=True) |
| for form in ordered_forms: |
| for match in iter_bounded_form_matches(form, text): |
| start, end = match.span() |
| prefix = text[max(0, start - 20) : start] |
| if CITY_COUNTY_PREFIX_RE.search(prefix): |
| continue |
| key = (start, end) |
| if key in seen: |
| continue |
| seen.add(key) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "CITY", |
| "score": 0.64, |
| "text": text[start:end], |
| } |
| has_context = False |
| for other in repaired: |
| other_start = int(other["start"]) |
| other_end = int(other["end"]) |
| if other["label"] == "STREET_ADDRESS" and 0 <= start - other_end <= 4: |
| has_context = True |
| break |
| if other["label"] in {"COUNTY", "POSTCODE"} and 0 <= other_start - end <= 6: |
| has_context = True |
| break |
| if not has_context and re.match(r"^\s*,\s*(?:Co\.\s+|[A-Z]\d{2}|D6W)", text[end:]): |
| has_context = True |
| if not has_context: |
| cue_window = text[max(0, start - 40) : min(len(text), end + 32)] |
| has_context = bool(CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window)) |
| if not has_context: |
| continue |
| conflicting_labels = {"CITY", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_city_before_postcode(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in CITY_BEFORE_POSTCODE_RE.finditer(text): |
| start, end = match.span(1) |
| value = text[start:end] |
| if any(ch.isdigit() for ch in value): |
| continue |
| prefix = text[max(0, start - 20) : start] |
| if CITY_COUNTY_PREFIX_RE.search(prefix): |
| continue |
| has_context = False |
| for other in repaired: |
| other_start = int(other["start"]) |
| other_end = int(other["end"]) |
| if other["label"] == "STREET_ADDRESS" and 0 <= start - other_end <= 4: |
| has_context = True |
| break |
| if other["label"] == "POSTCODE" and 0 <= other_start - end <= 6: |
| has_context = True |
| break |
| if not has_context: |
| cue_window = text[max(0, start - 40) : min(len(text), end + 24)] |
| has_context = bool(CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window)) |
| if not has_context: |
| continue |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "CITY", |
| "score": 0.63, |
| "text": value, |
| } |
| if any( |
| other["label"] == "CITY" |
| and spans_overlap(candidate_span, other) |
| and (int(other["end"]) - int(other["start"])) >= (end - start) |
| for other in repaired |
| ): |
| continue |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"CITY", "FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_city_before_county(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for regex in (CITY_TOWN_SUFFIX_RE, CITY_BEFORE_COUNTY_RE): |
| for match in regex.finditer(text): |
| start, end = match.span(1) |
| value = text[start:end] |
| if any(ch.isdigit() for ch in value): |
| continue |
| prefix = text[max(0, start - 20) : start] |
| if CITY_COUNTY_PREFIX_RE.search(prefix): |
| continue |
| cue_window = text[max(0, start - 40) : min(len(text), end + 24)] |
| if not ((CITY_CUE_RE.search(cue_window) or ADDRESS_CUE_RE.search(cue_window)) or re.search(r"^\s*,\s*(?:County|Contae|gContae|Co\.)\b", text[end:])): |
| continue |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "CITY", |
| "score": 0.64, |
| "text": value, |
| } |
| if any( |
| other["label"] == "CITY" |
| and spans_overlap(candidate_span, other) |
| and (int(other["end"]) - int(other["start"])) >= (end - start) |
| for other in repaired |
| ): |
| continue |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"CITY", "FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_city_field_cues(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| seen: set[tuple[int, int]] = set() |
| for regex in (CITY_FIELD_VALUE_RE, INLINE_CITY_FIELD_VALUE_RE): |
| for match in regex.finditer(text): |
| start, end = match.span(1) |
| if (start, end) in seen: |
| continue |
| seen.add((start, end)) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "CITY", |
| "score": 0.65, |
| "text": text[start:end], |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"CITY", "COUNTY", "FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_prefixed_city_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired: list[dict] = [] |
| for span in spans: |
| if span["label"] != "CITY": |
| repaired.append(span) |
| continue |
| start = int(span["start"]) |
| end = int(span["end"]) |
| if start <= 0: |
| repaired.append(span) |
| continue |
| prefix_char = text[start - 1] |
| if prefix_char.lower() not in IRISH_CITY_PREFIX_CHARS: |
| repaired.append(span) |
| continue |
| if start - 1 > 0 and text[start - 2].isalpha(): |
| repaired.append(span) |
| continue |
| if normalize_surface(span.get("text", "")) not in IRISH_CITY_SURFACES: |
| repaired.append(span) |
| continue |
| candidate_start = start - 1 |
| candidate_text = text[candidate_start:end] |
| repaired.append( |
| { |
| **span, |
| "start": candidate_start, |
| "text": candidate_text, |
| "score": max(float(span.get("score", 0.0)), 0.66), |
| } |
| ) |
| return repaired |
|
|
|
|
| def prefer_long_city_spans(spans: list[dict]) -> list[dict]: |
| if not spans: |
| return spans |
| keep: list[dict] = [] |
| for span in spans: |
| if span["label"] != "CITY": |
| keep.append(span) |
| continue |
| shadowed = False |
| for other in spans: |
| if other is span or other["label"] != "CITY": |
| continue |
| if int(other["start"]) <= int(span["start"]) and int(other["end"]) >= int(span["end"]): |
| if (int(other["start"]), int(other["end"])) != (int(span["start"]), int(span["end"])): |
| if float(other.get("score", 0.0)) >= max(0.6, float(span.get("score", 0.0)) * 0.6): |
| shadowed = True |
| break |
| if not shadowed: |
| keep.append(span) |
| return keep |
|
|
|
|
| def repair_county_field_cues(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| seen: set[tuple[int, int]] = set() |
| for regex in (COUNTY_FIELD_VALUE_RE, INLINE_COUNTY_FIELD_VALUE_RE): |
| for match in regex.finditer(text): |
| start, end = match.span(1) |
| if (start, end) in seen: |
| continue |
| seen.add((start, end)) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "COUNTY", |
| "score": 0.66, |
| "text": text[start:end], |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_county_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| seen: set[tuple[int, int]] = set() |
| ordered_forms = sorted(IRISH_COUNTY_FORMS, key=len, reverse=True) |
| for form in ordered_forms: |
| for match in iter_bounded_form_matches(form, text): |
| start, end = match.span() |
| key = (start, end) |
| if key in seen: |
| continue |
| seen.add(key) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "COUNTY", |
| "score": 0.74, |
| "text": text[start:end], |
| } |
| if not is_reasonable_span_text("COUNTY", text, start, end): |
| continue |
| overlapping_counties = [ |
| other |
| for other in repaired |
| if spans_overlap(candidate_span, other) and other["label"] == "COUNTY" |
| ] |
| if overlapping_counties: |
| candidate_span["score"] = max( |
| float(candidate_span["score"]), |
| max(float(other.get("score", 0.0)) for other in overlapping_counties), |
| ) |
| conflicting_labels = {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| for match in COUNTY_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| key = (start, end) |
| if key in seen: |
| continue |
| context = text[max(0, start - 40) : min(len(text), end + 24)] |
| has_context = bool(ADDRESS_CUE_RE.search(context) or POSTCODE_VALUE_RE.search(context)) |
| if not has_context: |
| for other in repaired: |
| other_start = int(other["start"]) |
| other_end = int(other["end"]) |
| if other["label"] in {"STREET_ADDRESS", "CITY", "POSTCODE"} and ( |
| abs(other_start - end) <= 24 or abs(start - other_end) <= 24 |
| ): |
| has_context = True |
| break |
| if not has_context: |
| continue |
| seen.add(key) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "COUNTY", |
| "score": 0.74, |
| "text": text[start:end], |
| } |
| if not is_reasonable_span_text("COUNTY", text, start, end): |
| continue |
| conflicting_labels = {"COUNTY", "CITY", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_street_addresses(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| seen: set[tuple[int, int]] = set() |
| for match in STREET_ADDRESS_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| key = (start, end) |
| if key in seen: |
| continue |
| seen.add(key) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "STREET_ADDRESS", |
| "score": 0.65, |
| "text": text[start:end], |
| } |
| if not is_reasonable_span_text("STREET_ADDRESS", text, start, end): |
| continue |
| context = text[max(0, start - 32) : min(len(text), end + 24)] |
| has_context = bool(ADDRESS_CUE_RE.search(context)) |
| if not has_context: |
| for other in repaired: |
| other_start = int(other["start"]) |
| other_end = int(other["end"]) |
| if other["label"] in {"CITY", "COUNTY", "POSTCODE"} and 0 <= other_start - end <= 16: |
| has_context = True |
| break |
| if other["label"] in {"FIRST_NAME", "LAST_NAME"} and 0 <= start - other_end <= 24: |
| has_context = True |
| break |
| if not has_context: |
| continue |
| conflicting_labels = {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_contextual_address_blocks(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in ADDRESS_BLOCK_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| value = text[start:end].strip() |
| if not value: |
| continue |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "STREET_ADDRESS", |
| "score": 0.68, |
| "text": text[start:end], |
| } |
| tokens = [token for token in re.split(r"\s+", value) if token] |
| if len(tokens) < 2: |
| continue |
| has_digit = any(ch.isdigit() for ch in value) |
| has_prefix = bool(ADDRESS_UNIT_PREFIX_RE.match(value) or HOUSE_NAME_PREFIX_RE.match(value)) |
| has_street_suffix = bool(STREET_SUFFIX_RE.search(value)) |
| cue_window = text[max(0, start - 40) : start] |
| has_address_line_cue = bool(ADDRESS_LINE_CUE_RE.search(cue_window)) |
| if not (has_digit or has_prefix): |
| continue |
| if not has_street_suffix: |
| tail_window = text[end : min(len(text), end + 48)] |
| has_following_address_context = bool( |
| re.match( |
| r"^\s*,\s*((?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana))", |
| tail_window, |
| flags=re.IGNORECASE, |
| ) |
| or re.search(r"(?:County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W))", tail_window) |
| ) |
| building_tail_match = ADDRESS_BUILDING_TAIL_RE.match(tail_window) |
| has_following_building_context = False |
| if building_tail_match: |
| remaining_tail = tail_window[int(building_tail_match.end(1)) :] |
| has_following_building_context = bool( |
| re.match( |
| r"^\s*(?:$|,\s*(?:(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)|County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W)|[A-ZÁÉÍÓÚ]))", |
| remaining_tail, |
| flags=re.IGNORECASE, |
| ) |
| ) |
| looks_like_suffixless_address = has_digit and len(tokens) >= 3 |
| if not ((has_prefix and (has_address_line_cue or has_following_building_context)) or ((has_prefix or looks_like_suffixless_address) and has_following_address_context)): |
| continue |
| if any( |
| other["label"] == "STREET_ADDRESS" |
| and int(other["start"]) <= start |
| and int(other["end"]) >= end |
| and (int(other["start"]), int(other["end"])) != (start, end) |
| for other in repaired |
| ): |
| continue |
| conflicting_labels = {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"} |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in conflicting_labels |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def repair_address_line_prefix_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in ADDRESS_LINE_PREFIX_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| candidate = text[start:end].strip() |
| if not candidate: |
| continue |
| if not (ADDRESS_UNIT_PREFIX_RE.match(candidate) or HOUSE_NAME_PREFIX_RE.match(candidate)): |
| continue |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "STREET_ADDRESS", |
| "score": 0.67, |
| "text": text[start:end], |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME"} |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def extend_prefixed_street_address_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired: list[dict] = [] |
| tail_re = re.compile( |
| r"^\s*,\s*((?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)(?:\s+[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*){0,2})", |
| flags=re.IGNORECASE, |
| ) |
| building_follow_context_re = re.compile( |
| r"^\s*(?:$|[.;]|,\s*(?:(?:\d{1,4}\s+)?(?:[A-ZÁÉÍÓÚa-záéíóú][\w'’.-]*\s+){0,5}(?:street|road|avenue|lane|park|view|square|terrace|drive|close|way|place|crescent|grove|green|court|manor|mews|gardens?|heights|quay|bóthar|bothar|sráid|sraid|lána|lana)|County|Contae|gContae|Co\.|(?:[A-Z]\d{2}|D6W)|[A-ZÁÉÍÓÚ]))", |
| flags=re.IGNORECASE, |
| ) |
| for span in spans: |
| if span["label"] != "STREET_ADDRESS": |
| repaired.append(span) |
| continue |
| value = span.get("text", "").strip() |
| if not value or not (ADDRESS_UNIT_PREFIX_RE.match(value) or HOUSE_NAME_PREFIX_RE.match(value)): |
| repaired.append(span) |
| continue |
| tail_match = tail_re.match(text[int(span["end"]) :]) |
| if tail_match: |
| extended_end = int(span["end"]) + int(tail_match.end(1)) |
| repaired.append( |
| { |
| **span, |
| "end": extended_end, |
| "text": text[int(span["start"]) : extended_end], |
| "score": max(float(span.get("score", 0.0)), 0.67), |
| } |
| ) |
| continue |
| building_tail_match = ADDRESS_BUILDING_TAIL_RE.match(text[int(span["end"]) :]) |
| if not building_tail_match: |
| repaired.append(span) |
| continue |
| remaining_tail = text[int(span["end"]) + int(building_tail_match.end(1)) :] |
| extended_end = int(span["end"]) + int(building_tail_match.end(1)) |
| repaired.append( |
| { |
| **span, |
| "end": extended_end, |
| "text": text[int(span["start"]) : extended_end], |
| "score": max(float(span.get("score", 0.0)), 0.67), |
| } |
| ) |
| return repaired |
|
|
|
|
| def merge_adjacent_street_address_spans(text: str, spans: list[dict]) -> list[dict]: |
| if not spans: |
| return spans |
| ordered = sorted(spans, key=lambda item: (int(item["start"]), int(item["end"]), item["label"])) |
| merged: list[dict] = [] |
| index = 0 |
| while index < len(ordered): |
| span = ordered[index] |
| if span["label"] != "STREET_ADDRESS": |
| merged.append(span) |
| index += 1 |
| continue |
| current = dict(span) |
| lookahead = index + 1 |
| while lookahead < len(ordered): |
| other = ordered[lookahead] |
| if other["label"] != "STREET_ADDRESS": |
| break |
| gap = text[int(current["end"]) : int(other["start"])] |
| if not re.fullmatch(r"\s*,\s*", gap): |
| break |
| candidate_end = int(other["end"]) |
| merged_value = text[int(current["start"]) : candidate_end] |
| if not ( |
| any(ch.isdigit() for ch in merged_value) |
| and ( |
| ADDRESS_UNIT_PREFIX_RE.match(merged_value) |
| or STREET_SUFFIX_RE.search(merged_value) |
| or BUILDING_SUFFIX_RE.search(merged_value) |
| ) |
| ): |
| break |
| current["end"] = candidate_end |
| current["text"] = text[int(current["start"]) : candidate_end] |
| current["score"] = max(float(current.get("score", 0.0)), float(other.get("score", 0.0))) |
| lookahead += 1 |
| merged.append(current) |
| index = lookahead |
| return merged |
|
|
|
|
| def repair_contextual_address_place_blocks(text: str, spans: list[dict]) -> list[dict]: |
| repaired = list(spans) |
| for match in ADDRESS_PLACE_VALUE_RE.finditer(text): |
| start, end = match.span(1) |
| candidate_span = { |
| "start": start, |
| "end": end, |
| "label": "STREET_ADDRESS", |
| "score": 0.69, |
| "text": text[start:end], |
| } |
| repaired = [ |
| other |
| for other in repaired |
| if not ( |
| spans_overlap(candidate_span, other) |
| and other["label"] in {"STREET_ADDRESS", "FIRST_NAME", "LAST_NAME", "CITY"} |
| ) |
| ] |
| repaired.append(candidate_span) |
| return repaired |
|
|
|
|
| def prefer_long_street_address_spans(spans: list[dict]) -> list[dict]: |
| if not spans: |
| return spans |
| keep: list[dict] = [] |
| for span in spans: |
| if span["label"] != "STREET_ADDRESS": |
| keep.append(span) |
| continue |
| shadowed = False |
| for other in spans: |
| if other is span or other["label"] != "STREET_ADDRESS": |
| continue |
| if int(other["start"]) <= int(span["start"]) and int(other["end"]) >= int(span["end"]): |
| if (int(other["start"]), int(other["end"])) != (int(span["start"]), int(span["end"])): |
| shadowed = True |
| break |
| if not shadowed: |
| keep.append(span) |
| return keep |
|
|
|
|
| def drop_contextual_fragment_spans(spans: list[dict]) -> list[dict]: |
| if not spans: |
| return spans |
| suppressors = {"COUNTY", "CITY", "POSTCODE", "STREET_ADDRESS", "DATE_OF_BIRTH"} |
| keep: list[dict] = [] |
| for span in spans: |
| if span["label"] in {"FIRST_NAME", "LAST_NAME"}: |
| if any( |
| other is not span |
| and other["label"] in suppressors |
| and spans_overlap(span, other) |
| and (int(other["end"]) - int(other["start"])) >= (int(span["end"]) - int(span["start"])) |
| for other in spans |
| ): |
| continue |
| if span["label"] == "CITY": |
| if any( |
| other is not span |
| and other["label"] == "COUNTY" |
| and spans_overlap(span, other) |
| and (int(other["end"]) - int(other["start"])) >= (int(span["end"]) - int(span["start"])) |
| for other in spans |
| ): |
| continue |
| keep.append(span) |
| return keep |
|
|
|
|
| def drop_public_contact_detail_spans(text: str, spans: list[dict]) -> list[dict]: |
| if not PUBLIC_CONTACT_DETAILS_RE.search(text): |
| return spans |
| keep: list[dict] = [] |
| for span in spans: |
| if span["label"] not in {"STREET_ADDRESS", "CITY", "COUNTY"}: |
| keep.append(span) |
| continue |
| keep.append(span) |
| return [ |
| span for span in keep |
| if not (span["label"] in {"STREET_ADDRESS", "CITY", "COUNTY"} and PUBLIC_CONTACT_DETAILS_RE.search(text)) |
| ] |
|
|
|
|
| def drop_org_like_name_spans(text: str, spans: list[dict]) -> list[dict]: |
| keep: list[dict] = [] |
| for span in spans: |
| if span["label"] not in {"FIRST_NAME", "LAST_NAME"}: |
| keep.append(span) |
| continue |
| tokens = [normalize_surface(token) for token in re.split(r"\s+", span.get("text", "").strip()) if token] |
| if tokens and tokens[-1] in ORG_NAME_TRAILING_SURFACES: |
| continue |
| prefix = text[max(0, int(span["start"]) - 32) : int(span["start"])] |
| if ORG_NAME_PREFIX_RE.search(prefix): |
| continue |
| keep.append(span) |
| return keep |
|
|
|
|
| def drop_city_org_prefix_spans(text: str, spans: list[dict]) -> list[dict]: |
| keep: list[dict] = [] |
| for span in spans: |
| if span["label"] != "CITY": |
| keep.append(span) |
| continue |
| tail = text[int(span["end"]) : min(len(text), int(span["end"]) + 24)] |
| if ORG_CITY_TAIL_RE.match(tail): |
| continue |
| keep.append(span) |
| return keep |
|
|
|
|
| def canonicalize_location_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired: list[dict] = [] |
| for span in spans: |
| if span["label"] not in {"CITY", "COUNTY"}: |
| repaired.append(span) |
| continue |
| start = int(span["start"]) |
| end = int(span["end"]) |
| if span["label"] == "CITY" and start > 0: |
| prefix_char = text[start - 1] |
| if prefix_char.lower() in IRISH_CITY_PREFIX_CHARS and not (start - 1 > 0 and text[start - 2].isalpha()): |
| candidate_start = start - 1 |
| candidate_text = text[candidate_start:end] |
| if normalize_surface(candidate_text) in IRISH_CITY_SURFACES: |
| start = candidate_start |
| while end > start and text[end - 1] in ".,;:": |
| end -= 1 |
| repaired.append( |
| { |
| **span, |
| "start": start, |
| "end": end, |
| "text": text[start:end], |
| } |
| ) |
| return repaired |
|
|
|
|
| def canonicalize_street_address_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired: list[dict] = [] |
| for span in spans: |
| if span["label"] != "STREET_ADDRESS": |
| repaired.append(span) |
| continue |
| start = int(span["start"]) |
| end = int(span["end"]) |
| window_start = max(0, start - 48) |
| window_end = min(len(text), end + 48) |
| window = text[window_start:window_end] |
| best_match = None |
| best_key = None |
| for match in STREET_ADDRESS_VALUE_RE.finditer(window): |
| candidate_start = window_start + int(match.start(1)) |
| candidate_end = window_start + int(match.end(1)) |
| if candidate_end <= start or candidate_start >= end: |
| continue |
| candidate_text = text[candidate_start:candidate_end] |
| if not is_reasonable_span_text("STREET_ADDRESS", text, candidate_start, candidate_end): |
| continue |
| key = ( |
| candidate_end - candidate_start, |
| -(abs(candidate_start - start) + abs(candidate_end - end)), |
| ) |
| if best_key is None or key > best_key: |
| best_key = key |
| best_match = (candidate_start, candidate_end, candidate_text) |
| if best_match is not None: |
| start, end, _ = best_match |
| for newline_char in ("\n", "\r"): |
| newline_pos = text.find(newline_char, start, end) |
| if newline_pos != -1: |
| end = newline_pos |
| break |
| while end > start and text[end - 1] in ".,;:": |
| end -= 1 |
| repaired.append( |
| { |
| **span, |
| "start": start, |
| "end": end, |
| "text": text[start:end], |
| } |
| ) |
| return repaired |
|
|
|
|
| def canonicalize_email_spans(text: str, spans: list[dict]) -> list[dict]: |
| repaired: list[dict] = [] |
| for span in spans: |
| if span["label"] != "EMAIL": |
| repaired.append(span) |
| continue |
| segment = text[int(span["start"]) : int(span["end"])] |
| match = EMAIL_EXTRACT_RE.search(segment) |
| if not match: |
| repaired.append(span) |
| continue |
| start = int(span["start"]) + int(match.start(1)) |
| end = int(span["start"]) + int(match.end(1)) |
| repaired.append( |
| { |
| **span, |
| "start": start, |
| "end": end, |
| "text": text[start:end], |
| } |
| ) |
| return repaired |
|
|
|
|
| def drop_stacked_first_names(spans: list[dict]) -> list[dict]: |
| if not spans: |
| return spans |
| first_names = [span for span in spans if span["label"] == "FIRST_NAME"] |
| last_names = [span for span in spans if span["label"] == "LAST_NAME"] |
| if not first_names or not last_names: |
| return spans |
| keep: list[dict] = [] |
| for span in spans: |
| if span["label"] != "FIRST_NAME": |
| keep.append(span) |
| continue |
| shadowed = False |
| for other in first_names: |
| if other is span: |
| continue |
| if int(other["start"]) <= int(span["start"]): |
| continue |
| if int(other["start"]) - int(span["end"]) > 2: |
| continue |
| if not any( |
| int(last["start"]) >= int(other["end"]) and int(last["start"]) - int(other["end"]) <= 2 |
| for last in last_names |
| ): |
| continue |
| shadowed = True |
| break |
| if not shadowed: |
| keep.append(span) |
| return keep |
|
|
|
|
| def decode_span_matrix( |
| text: str, |
| offsets: list[tuple[int, int]], |
| span_scores: np.ndarray, |
| config, |
| min_score: float, |
| ) -> list[dict]: |
| label_names = label_names_from_config(config) |
| thresholds = label_thresholds_from_config(config, min_score) |
| max_span_tokens = label_max_span_tokens_from_config(config) |
| min_nonspace_chars = label_min_nonspace_chars_from_config(config) |
|
|
| if span_scores.ndim != 3: |
| raise ValueError(f"Expected [num_labels, seq_len, seq_len] span scores, got shape {span_scores.shape}") |
|
|
| num_labels, seq_len, _ = span_scores.shape |
| valid = np.array([valid_offset(offset) for offset in offsets[:seq_len]], dtype=bool) |
| start_chars = np.array([int(offset[0]) if valid[index] else -1 for index, offset in enumerate(offsets[:seq_len])], dtype=np.int32) |
| end_chars = np.array([int(offset[1]) if valid[index] else -1 for index, offset in enumerate(offsets[:seq_len])], dtype=np.int32) |
| nonspace_prefix = [0] |
| for ch in text: |
| nonspace_prefix.append(nonspace_prefix[-1] + (0 if ch.isspace() else 1)) |
| spans: list[dict] = [] |
| for label_index in range(min(num_labels, len(label_names))): |
| label = label_names[label_index] |
| threshold = thresholds.get(label, min_score) |
| max_width = max(1, int(max_span_tokens.get(label, 8))) |
| min_chars = max(1, int(min_nonspace_chars.get(label, 1))) |
| label_scores = span_scores[label_index, :seq_len, :seq_len] |
| start_indices, end_indices = np.where(label_scores >= threshold) |
| if start_indices.size == 0: |
| continue |
| width_mask = (end_indices >= start_indices) & ((end_indices - start_indices) < max_width) |
| if not np.any(width_mask): |
| continue |
| start_indices = start_indices[width_mask] |
| end_indices = end_indices[width_mask] |
| valid_mask = valid[start_indices] & valid[end_indices] |
| if not np.any(valid_mask): |
| continue |
| start_indices = start_indices[valid_mask] |
| end_indices = end_indices[valid_mask] |
| for start_idx, end_idx in zip(start_indices.tolist(), end_indices.tolist()): |
| start_char = int(start_chars[start_idx]) |
| end_char = int(end_chars[end_idx]) |
| if end_char <= start_char: |
| continue |
| if (nonspace_prefix[end_char] - nonspace_prefix[start_char]) < min_chars: |
| continue |
| if not is_reasonable_span_text(label, text, start_char, end_char): |
| continue |
| spans.append( |
| { |
| "start": start_char, |
| "end": end_char, |
| "label": label, |
| "score": float(label_scores[start_idx, end_idx]), |
| "text": text[start_char:end_char], |
| } |
| ) |
| spans = prefer_long_name_spans(spans, thresholds) |
| spans = prefer_long_structured_spans(spans, thresholds) |
| spans = repair_first_name_from_last_name(text, spans) |
| text_lower = text.lower() |
| has_digit = any(ch.isdigit() for ch in text) |
| has_alpha = any(ch.isalpha() for ch in text) |
| has_email_hint = "@" in text |
| has_address_hint = bool( |
| ADDRESS_CUE_RE.search(text) |
| or ADDRESS_LINE_CUE_RE.search(text) |
| or STREET_SUFFIX_RE.search(text) |
| or HOUSE_NAME_PREFIX_RE.search(text) |
| or ADDRESS_UNIT_PREFIX_RE.search(text) |
| ) |
| has_city_hint = bool( |
| has_address_hint |
| or POSTCODE_VALUE_RE.search(text) |
| or "city:" in text_lower |
| or "city " in text_lower |
| or "town:" in text_lower |
| or "town " in text_lower |
| or "city/town" in text_lower |
| or "cathair" in text_lower |
| or "baile" in text_lower |
| or "county" in text_lower |
| or "contae" in text_lower |
| or "co." in text_lower |
| ) |
| if has_email_hint: |
| spans = repair_emails(text, spans) |
| if has_digit: |
| spans = repair_phone_numbers(text, spans) |
| spans = repair_ppsn_variants(text, spans) |
| spans = repair_postcodes(text, spans) |
| if has_address_hint: |
| spans = repair_street_addresses(text, spans) |
| if NAME_SELF_CUE_RE.search(text): |
| spans = repair_contextual_name_cues(text, spans) |
| if NAME_ROLE_CUE_RE.search(text): |
| spans = repair_role_name_cues(text, spans) |
| if SURNAME_CUE_RE.search(text): |
| spans = repair_surname_field_cues(text, spans) |
| if NAME_BEFORE_STRUCTURED_CUE_RE.search(text): |
| spans = repair_name_before_structured_cues(text, spans) |
| if any(span["label"] in {"FIRST_NAME", "LAST_NAME"} for span in spans) and NAME_PARTICLE_SURNAME_RE.search(text): |
| spans = repair_name_particle_surnames(text, spans) |
| if has_address_hint: |
| spans = repair_contextual_address_blocks(text, spans) |
| spans = repair_address_line_prefix_spans(text, spans) |
| spans = extend_prefixed_street_address_spans(text, spans) |
| spans = repair_contextual_address_place_blocks(text, spans) |
| spans = merge_adjacent_street_address_spans(text, spans) |
| spans = prefer_long_street_address_spans(spans) |
| if has_city_hint: |
| spans = repair_county_spans(text, spans) |
| spans = repair_city_spans(text, spans) |
| spans = repair_city_before_postcode(text, spans) |
| spans = repair_city_before_county(text, spans) |
| spans = repair_city_field_cues(text, spans) |
| spans = repair_prefixed_city_spans(text, spans) |
| spans = prefer_long_city_spans(spans) |
| spans = repair_county_field_cues(text, spans) |
| if has_digit and DOB_CONTEXT_RE.search(text): |
| spans = repair_contextual_date_of_birth(text, spans) |
| if has_digit and AGE_CONTEXT_RE.search(text): |
| spans = repair_contextual_ages(text, spans) |
| if has_digit and PASSPORT_CUE_RE.search(text): |
| spans = repair_contextual_passport_numbers(text, spans) |
| if has_digit and ("iban" in text_lower or "account" in text_lower or "bank" in text_lower or "cuntas" in text_lower): |
| spans = repair_contextual_account_numbers(text, spans) |
| if "public contact details" in text_lower: |
| spans = drop_public_contact_detail_spans(text, spans) |
| if any(span["label"] in {"FIRST_NAME", "LAST_NAME"} for span in spans): |
| spans = drop_org_like_name_spans(text, spans) |
| spans = drop_stacked_first_names(spans) |
| if any(span["label"] == "CITY" for span in spans): |
| spans = drop_city_org_prefix_spans(text, spans) |
| if any(span["label"] in {"FIRST_NAME", "LAST_NAME", "CITY"} for span in spans): |
| spans = drop_contextual_fragment_spans(spans) |
| if any(span["label"] in {"CITY", "COUNTY"} for span in spans): |
| spans = canonicalize_location_spans(text, spans) |
| if any(span["label"] == "STREET_ADDRESS" for span in spans): |
| spans = canonicalize_street_address_spans(text, spans) |
| if any(span["label"] == "EMAIL" for span in spans): |
| spans = canonicalize_email_spans(text, spans) |
| return dedupe_spans(spans) |
|
|
|
|
| def prefer_long_name_spans(spans: list[dict], thresholds: dict[str, float]) -> list[dict]: |
| if not spans: |
| return spans |
| preferred: list[dict] = [] |
| consumed: set[int] = set() |
| for index, span in enumerate(spans): |
| if index in consumed: |
| continue |
| label = span["label"] |
| if label not in {"FIRST_NAME", "LAST_NAME"}: |
| preferred.append(span) |
| continue |
| same_start = [ |
| (other_index, other) |
| for other_index, other in enumerate(spans) |
| if other_index not in consumed and other["label"] == label and other["start"] == span["start"] |
| ] |
| if len(same_start) == 1: |
| preferred.append(span) |
| continue |
| for other_index, _ in same_start: |
| consumed.add(other_index) |
| best_by_score = max(same_start, key=lambda item: float(item[1].get("score", 0.0)))[1] |
| longest = max(same_start, key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))))[1] |
| threshold = float(thresholds.get(label, 0.5)) |
| if float(longest.get("score", 0.0)) >= max(threshold + 0.15, float(best_by_score.get("score", 0.0)) * 0.7): |
| preferred.append(longest) |
| else: |
| preferred.append(best_by_score) |
| return prefer_same_end_extensions(preferred, thresholds) |
|
|
|
|
| def prefer_same_end_extensions(spans: list[dict], thresholds: dict[str, float]) -> list[dict]: |
| if not spans: |
| return spans |
| preferred: list[dict] = [] |
| consumed: set[int] = set() |
| for index, span in enumerate(spans): |
| if index in consumed: |
| continue |
| label = span["label"] |
| if label not in {"FIRST_NAME", "LAST_NAME", "EMAIL"}: |
| preferred.append(span) |
| continue |
| same_end = [ |
| (other_index, other) |
| for other_index, other in enumerate(spans) |
| if other_index not in consumed and other["label"] == label and other["end"] == span["end"] |
| ] |
| if len(same_end) == 1: |
| preferred.append(span) |
| continue |
| for other_index, _ in same_end: |
| consumed.add(other_index) |
| best_by_score = max(same_end, key=lambda item: float(item[1].get("score", 0.0)))[1] |
| longest = max(same_end, key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))))[1] |
| longest_score = float(longest.get("score", 0.0)) |
| best_score = float(best_by_score.get("score", 0.0)) |
| if label == "EMAIL": |
| if "@" in longest.get("text", "") or longest["end"] - longest["start"] > best_by_score["end"] - best_by_score["start"]: |
| if longest_score >= best_score - 0.02: |
| preferred.append(longest) |
| continue |
| else: |
| longest_text = longest.get("text", "") |
| if " " not in longest_text.strip() and longest_score >= max(float(thresholds.get(label, 0.5)) * 0.8, best_score * 0.55): |
| preferred.append(longest) |
| continue |
| preferred.append(best_by_score) |
| return preferred |
|
|
|
|
| def prefer_long_structured_spans(spans: list[dict], thresholds: dict[str, float]) -> list[dict]: |
| if not spans: |
| return spans |
| preferred: list[dict] = [] |
| consumed: set[int] = set() |
| target_labels = {"STREET_ADDRESS", "DATE_OF_BIRTH"} |
| for index, span in enumerate(spans): |
| if index in consumed: |
| continue |
| label = span["label"] |
| if label not in target_labels: |
| preferred.append(span) |
| continue |
| overlapping = [ |
| (other_index, other) |
| for other_index, other in enumerate(spans) |
| if other_index not in consumed and other["label"] == label and spans_overlap(span, other) |
| ] |
| if len(overlapping) == 1: |
| preferred.append(span) |
| continue |
| for other_index, _ in overlapping: |
| consumed.add(other_index) |
| best_by_score = max(overlapping, key=lambda item: float(item[1].get("score", 0.0)))[1] |
| longest = max( |
| overlapping, |
| key=lambda item: (item[1]["end"] - item[1]["start"], float(item[1].get("score", 0.0))), |
| )[1] |
| longest_score = float(longest.get("score", 0.0)) |
| best_score = float(best_by_score.get("score", 0.0)) |
| threshold = float(thresholds.get(label, 0.5)) |
| if longest_score >= max(threshold, best_score * 0.75): |
| preferred.append(longest) |
| else: |
| preferred.append(best_by_score) |
| return preferred |
|
|
|
|
| def sigmoid_np(values: np.ndarray) -> np.ndarray: |
| clipped = np.clip(values, -60.0, 60.0) |
| return 1.0 / (1.0 + np.exp(-clipped)) |
|
|
|
|
| def run_onnx_span(session, encoded: dict[str, Any]) -> np.ndarray: |
| feed = {} |
| input_names = {item.name for item in session.get_inputs()} |
| for key, value in encoded.items(): |
| if key == "offset_mapping": |
| continue |
| if key in input_names: |
| feed[key] = value |
| outputs = session.run(None, feed) |
| if not outputs: |
| raise ValueError("ONNX session returned no outputs") |
| return outputs[0] |
|
|