pujithapsx's picture
initial push
e9084d7
import re
from datetime import datetime
from typing import List, Dict
import pandas as pd
import logging
try:
import pgeocode
except ImportError:
pgeocode = None
import math
logger = logging.getLogger("rules")
from services.config import (
config,
APARTMENT_IDENTIFIER,
FLAT_NUMBER_IDENTIFIER,
HOUSE_NUMBER_IDENTIFIER,
STREET_KEYWORD,
name_variation_df,
hno_variation_df,
city_prev_pres_df,
state_name_standard_df,
sur_comm_names_df,
pin_city_state_df,
CITY_MAPPING,
STATE_MAPPING,
MATCHING_RULES
)
# =========================================================
# TEXT CLEANING
# =========================================================
def clean_text(text):
"""
Simple text cleaning for all input values:
1. Strip leading/trailing whitespace
2. Remove HTML tags and HTML entities
3. Remove non-printable/control characters and unicode artifacts
4. Collapse multiple whitespace into single space
5. Convert to lowercase
"""
if not text or not isinstance(text, str):
return "" if text is None else text
# Remove HTML tags (e.g., <br>, <p>...</p>)
text = re.sub(r'<[^>]+>', ' ', text)
# Remove HTML entities (e.g., &amp;, &nbsp;, &#123;)
text = re.sub(r'&(?:#\d+|#x[0-9a-fA-F]+|[a-zA-Z]+);', ' ', text)
# Remove non-printable and control characters (keep printable ASCII range 0x20-0x7E)
text = re.sub(r'[^\x20-\x7E]', '', text)
# Collapse multiple whitespace into single space
text = re.sub(r'\s+', ' ', text)
# Strip leading/trailing spaces
text = text.strip()
# Convert to lowercase
text = text.upper()
return text
logger.info("Using simple text cleaning (no regex/keyword pipeline)")
# =========================================================
# NAME PREPROCESSING
# =========================================================
# Titles/honorifics to remove from names
NAME_TITLES = {
"dr", "mr", "mrs", "ms", "miss", "master",
"m/s", "sri", "sree", "shri", "shree",
"smt", "shrimati", "kumari",
"prof", "late",
"er", "adv", "ca",
"capt", "col", "lt", "major", "brig", "brigadier",
"cmdr", "commander", "wingcmdr", "groupcapt",
"justice", "judge", "cj", "chiefjustice",
"ias", "ips", "ifs",
"pt", "pandit", "swami", "guru", "maulana", "maulvi",
"haji", "haj", "imam", "maharaj",
"sardar",
"phd", "md", "dphil",
}
# Regex for titles that may appear without a space (e.g., "dr.rajesh")
_TITLE_PATTERN = re.compile(
r'^(dr\.?|mr\.?|mrs\.?|ms\.?|miss|m/s\.?|sri|sree|shri|shree|'
r'smt\.?|prof\.?|late|er|adv|ca|capt|col|lt|major|justice|'
r'shrimati|kumari|master|brig|brigadier|cmdr|commander|wingcmdr|'
r'groupcapt|judge|cj|chiefjustice|ias|ips|ifs|pt|pandit|swami|'
r'guru|maulana|maulvi|haji|haj|imam|maharaj|sardar|phd|dphil)\s*',
re.IGNORECASE
)
# Relational prefixes: "sita w/o ram" β†’ "sita ram"
_RELATIONAL_PATTERNS = re.compile(
r'\b(?:s/o|d/o|w/o|h/o|c/o|g/o|'
r'son\s+of|daughter\s+of|wife\s+of|husband\s+of|care\s+of|guardian\s+of|'
r'so|do|wo|ho|co|go)\b',
re.IGNORECASE
)
def remove_name_titles(text):
"""
Remove title prefixes and suffixes from name.
Handles both space-separated ("mr rajesh") and dot-attached ("dr.rajesh").
"""
if not text:
return ""
# 1. Token-based removal (handles space-separated titles)
tokens = text.upper().split()
# Remove from front
while tokens and tokens[0].rstrip('.') in NAME_TITLES:
tokens.pop(0)
# Remove from back
while tokens and tokens[-1].rstrip('.') in NAME_TITLES:
tokens.pop()
text = " ".join(tokens)
# 2. Regex fallback for no-space cases (e.g., "dr.rajesh")
text = _TITLE_PATTERN.sub('', text)
return text.strip()
def remove_relational_prefixes(text):
"""
Remove relational prefixes from names.
"sita w/o ram" β†’ "sita ram"
"anil s/o suresh" β†’ "anil suresh"
"""
if not text:
return ""
text = _RELATIONAL_PATTERNS.sub(' ', text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def remove_non_alpha_trailing(text):
"""
Remove non-alpha trailing content from names.
"anil kumar 1/05/1985" β†’ "anil kumar"
"rajesh 12345" β†’ "rajesh"
Keeps only alphabetic tokens from the name.
"""
if not text:
return ""
tokens = text.split()
cleaned = []
for token in tokens:
# Keep token only if it contains at least one letter
if re.search(r'[a-zA-Z]', token):
# Remove any non-alpha characters within the token
alpha_only = re.sub(r'[^a-zA-Z]', '', token)
if alpha_only:
cleaned.append(alpha_only)
return " ".join(cleaned)
def deduplicate_tokens(text):
"""
Remove repeated tokens, keeping only unique ones in order.
"rajesh kumar rajesh" β†’ "rajesh kumar"
"""
if not text:
return ""
tokens = text.split()
seen = set()
unique = []
for t in tokens:
key = t.lower()
if key not in seen:
seen.add(key)
unique.append(t)
return " ".join(unique)
def deduplicate_consecutive_tokens(text):
"""
Remove only consecutively repeated tokens (for addresses).
"mg road mg road bangalore" β†’ "mg road bangalore"
"mg road bangalore mg road" stays as-is (non-consecutive)
"""
if not text:
return ""
tokens = text.split()
if not tokens:
return ""
result = [tokens[0]]
for t in tokens[1:]:
if t.upper() != result[-1].upper():
result.append(t)
return " ".join(result)
def collapse_repeated_chars(text):
"""
Preprocessing step:
1. Collapse 3+ consecutive identical alpha characters to 2 (typo fix)
e.g., "MOHAMMMED" β†’ "MOHAMMED", "SHARRMA" β†’ "SHARMA"
2. Replace consecutive non-alphanumeric chars with single space
e.g., "---" β†’ " ", "..." β†’ " "
"""
if not text:
return ""
# Collapse 3+ identical letters to 2
text = re.sub(r'([a-zA-Z])\1{2,}', r'\1\1', text)
# Replace consecutive non-alphanumeric/non-space chars with single space
text = re.sub(r'[^a-zA-Z0-9\s]{2,}', ' ', text)
# Collapse multiple spaces
text = re.sub(r'\s+', ' ', text)
return text.strip()
def strip_non_alphanumeric(text):
"""
Remove non-alphanumeric characters from text, keeping spaces.
Used for address cleanup before sending to embedding model.
"""
if not text:
return ""
text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def is_subset_match(tokens1, tokens2):
"""
Check if all tokens of one name are a complete subset of the other.
Returns True if name1 tokens βŠ† name2 tokens or vice versa.
e.g., ["rajesh", "kumar"] βŠ† ["rajesh", "kumar", "sharma"] β†’ True
"""
if not tokens1 or not tokens2:
return False
set1 = {t.upper() for t in tokens1}
set2 = {t.upper() for t in tokens2}
return set1.issubset(set2) or set2.issubset(set1)
def standardize_name_variations(text):
"""
Replace name token variations with standard forms using name_variation_standard.csv.
Iterates through each token and checks if it exists as a variation.
e.g., "mohommed" β†’ "mohammad"
"""
if not text or name_variation_df.empty:
return text if text else ""
# Build a lookup dict for fast access (done once, cached)
if not hasattr(standardize_name_variations, '_lookup'):
lookup = {}
for _, row in name_variation_df.iterrows():
var = str(row.get('VARIATION', '')).strip().upper()
std = str(row.get('STANDARD', '')).strip().upper()
if var and std:
lookup[var] = std
standardize_name_variations._lookup = lookup
lookup = standardize_name_variations._lookup
tokens = text.upper().split()
result = []
for token in tokens:
result.append(lookup.get(token, token))
return " ".join(result)
# =========================================================
# ADDRESS VARIATION PREPROCESSING (PDF cases 1-19)
# NAME VARIATION PREPROCESSING HELPERS (PDF cases 1-14)
# =========================================================
# ─── already imported in original: re, pd, logging ──────────
# ===========================================================
# ADDRESS VARIATION HANDLERS (all 19 PDF cases)
# ===========================================================
# ── Case 1 & 4 : delimiter / special-char normalisation ──
_ADDR_SPECIAL_CHARS = re.compile(r'[|#@$%^&*\(\)\[\]\{\};:\'\"\\<>?]')
def _normalize_delimiters(text: str) -> str:
"""Replace non-standard delimiters with space; collapse whitespace."""
text = _ADDR_SPECIAL_CHARS.sub(' ', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
# ── Case 4: hyphen normalisation inside house/flat numbers ──
def _normalize_hyphens(text: str) -> str:
"""
Remove hyphens that are purely cosmetic inside alphanumeric tokens
(e.g. '12-B' β†’ '12B', 'A-110' β†’ 'A110') while preserving hyphens
that form compound locality names like 'Pimpri-Chinchwad'.
"""
def _dehyphen(m):
a, b = m.group(1), m.group(2)
# If both sides are digit-or-digit+letter treat as house number variation
if re.fullmatch(r'[0-9]+[A-Z]?', a, re.I) and re.fullmatch(r'[A-Z]?[0-9]+[A-Z]?', b, re.I):
return a + b
return m.group(0) # leave intact (locality name)
return re.sub(r'([A-Z0-9]+)-([A-Z0-9]+)', _dehyphen, text, flags=re.I)
# ── Case 2: abbreviation expansion dictionary ──
_ADDR_ABBREV = {
# directions
'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST',
'NE': 'NORTH EAST', 'NW': 'NORTH WEST', 'SE': 'SOUTH EAST', 'SW': 'SOUTH WEST',
# administrative
'NGR': 'NAGAR', 'NGRS': 'NAGAR', 'LYT': 'LAYOUT', 'LT': 'LAYOUT',
'HYD': 'HYDERABAD', 'BLR': 'BANGALORE', 'MUM': 'MUMBAI', 'DEL': 'DELHI',
'CHN': 'CHENNAI', 'KOL': 'KOLKATA', 'PUN': 'PUNE', 'AHM': 'AHMEDABAD',
# road / area
'RD': 'ROAD', 'ST': 'STREET', 'AVE': 'AVENUE', 'BLVD': 'BOULEVARD',
'MRG': 'MARG', 'LN': 'LANE', 'CR': 'CROSS', 'CIR': 'CIRCLE',
# building
'APT': 'APARTMENT', 'APTS': 'APARTMENTS', 'BLDG': 'BUILDING',
'BLK': 'BLOCK', 'SECT': 'SECTOR', 'SEC': 'SECTOR',
# misc
'OPP': 'OPPOSITE', 'NR': 'NEAR', 'ADJ': 'ADJACENT',
'JN': 'JUNCTION', 'STA': 'STATION',
'PO': 'POST OFFICE', 'PB': 'POST BOX', 'PO BOX': 'POST BOX',
'P.O BOX': 'POST BOX', 'P.O. BOX': 'POST BOX',
'DIST': 'DISTRICT', 'DST': 'DISTRICT', 'DT': 'DISTRICT',
'TAL': 'TALUK', 'TQ': 'TALUK', 'TEH': 'TEHSIL',
'VLG': 'VILLAGE', 'VIL': 'VILLAGE', 'VILL': 'VILLAGE',
'CLNY': 'COLONY', 'COL': 'COLONY',
'EXT': 'EXTENSION', 'EXTN': 'EXTENSION',
'PH': 'PHASE',
}
def _expand_address_abbreviations(text: str) -> str:
"""Expand common address abbreviations to full forms."""
tokens = text.upper().split()
expanded = []
i = 0
while i < len(tokens):
# try 2-token phrase first (e.g. "P.O BOX")
if i + 1 < len(tokens):
two = tokens[i] + ' ' + tokens[i+1]
if two in _ADDR_ABBREV:
expanded.append(_ADDR_ABBREV[two])
i += 2
continue
tok = re.sub(r'\.', '', tokens[i]) # strip dots: "P.O." β†’ "PO"
expanded.append(_ADDR_ABBREV.get(tok, tokens[i]))
i += 1
return ' '.join(expanded)
# ── Case 9: Roman numeral conversion (already exists; exposed here) ──
# (roman_to_number is already defined in original rules.py – no duplication)
# ── Case 14: Spelled-out numbers ──
_NUMBER_WORDS = {
'ZERO':'0','ONE':'1','TWO':'2','THREE':'3','FOUR':'4','FIVE':'5',
'SIX':'6','SEVEN':'7','EIGHT':'8','NINE':'9','TEN':'10',
'ELEVEN':'11','TWELVE':'12','THIRTEEN':'13','FOURTEEN':'14','FIFTEEN':'15',
'SIXTEEN':'16','SEVENTEEN':'17','EIGHTEEN':'18','NINETEEN':'19','TWENTY':'20',
'TWENTY ONE':'21','TWENTY TWO':'22','TWENTY THREE':'23','TWENTY FOUR':'24',
'TWENTY FIVE':'25','TWENTY SIX':'26','TWENTY SEVEN':'27','TWENTY EIGHT':'28',
'TWENTY NINE':'29','THIRTY':'30','THIRTY TWO':'32','FORTY':'40','FIFTY':'50',
'FIRST':'1ST','SECOND':'2ND','THIRD':'3RD','FOURTH':'4TH','FIFTH':'5TH',
'SIXTH':'6TH','SEVENTH':'7TH','EIGHTH':'8TH','NINTH':'9TH','TENTH':'10TH',
'FOURTH':'4TH','FIFTH':'5TH',
}
_ORDINAL_MAP = {'FIRST':'1ST','SECOND':'2ND','THIRD':'3RD','FOURTH':'4TH','FIFTH':'5TH',
'SIXTH':'6TH','SEVENTH':'7TH','EIGHTH':'8TH','NINTH':'9TH','TENTH':'10TH'}
def _normalize_spelled_numbers(text: str) -> str:
"""Replace spelled-out numbers with digits: 'Thirty-Two' β†’ '32'."""
t = text.upper()
# Try multi-word first
for phrase, digit in sorted(_NUMBER_WORDS.items(), key=lambda x: -len(x[0])):
t = re.sub(r'\b' + re.escape(phrase) + r'\b', digit, t)
return t
# ── Case 15 & 16: Landmark synonym normalisation ──
_LANDMARK_SYNONYMS = {
'OPP': 'NEAR', 'OPPOSITE': 'NEAR', 'OPPOSITE TO': 'NEAR',
'ADJACENT TO': 'NEAR', 'ADJ TO': 'NEAR', 'BEHIND': 'NEAR',
'IN FRONT OF': 'NEAR', 'BESIDE': 'NEAR', 'NEXT TO': 'NEAR',
'CLOSE TO': 'NEAR',
}
def _normalize_landmark_phrases(text: str) -> str:
"""Standardise landmark relative phrases to a single token."""
t = text.upper()
for phrase, std in sorted(_LANDMARK_SYNONYMS.items(), key=lambda x: -len(x[0])):
t = re.sub(r'\b' + re.escape(phrase) + r'\b', std, t)
return t
# ── Case 16: Relational marker removal in addresses ──
_ADDR_RELATIONAL = re.compile(
r'\b(S/O|D/O|W/O|H/O|SON\s+OF|DAUGHTER\s+OF|WIFE\s+OF|HUSBAND\s+OF)\b',
re.IGNORECASE
)
def _remove_addr_relational_markers(text: str) -> str:
"""Remove s/o, d/o, w/o etc. from address lines."""
text = _ADDR_RELATIONAL.sub(' ', text)
return re.sub(r'\s+', ' ', text).strip()
# ── Case 18: P.O Box / Post Box normalisation ──
def _normalize_po_box(text: str) -> str:
"""Normalise P.O Box / P.O. Box / Post Box to a canonical form."""
t = re.sub(r'P\.?\s*O\.?\s*BOX', 'POST BOX', text, flags=re.IGNORECASE)
t = re.sub(r'POST\s+BOX', 'POSTBOX', t, flags=re.IGNORECASE)
return t
# ── Case 17: Directional token normalisation ──
_DIR_MAP = {
'EAST': 'E', 'WEST': 'W', 'NORTH': 'N', 'SOUTH': 'S',
'NORTH EAST': 'NE', 'NORTH WEST': 'NW', 'SOUTH EAST': 'SE', 'SOUTH WEST': 'SW',
}
# Normalise to abbreviated form so "Andheri East" == "Andheri E"
def _normalize_directions(text: str) -> str:
t = text.upper()
for full, abbr in sorted(_DIR_MAP.items(), key=lambda x: -len(x[0])):
t = re.sub(r'\b' + re.escape(full) + r'\b', abbr, t)
return t
# ── Master address preprocessing pipeline ──
def preprocess_address(text: str) -> str:
"""
Full address preprocessing pipeline covering all 19 PDF variation cases
plus new requirements (landmark removal, PO box normalise, comprehensive
admin abbreviation expansion):
1/4. Delimiter + special char normalisation, hyphen in house no.
2. Comprehensive abbreviation expansion (rural+urban)
9. Roman numeral β†’ digit
14. Spelled-out numbers β†’ digit
15. Landmark synonym standardisation
16. Relational marker removal (s/o, w/o …)
17. Directional token normalisation
18. P.O Box / Post Box normalisation
19. Duplicate token removal
NEW. Landmark phrase removal (near/nearby/landmark is …)
All. Case fold, whitespace collapse, strip
"""
if not text or not isinstance(text, str):
return ""
t = clean_text(text) # lowercase, HTML strip, unicode clean
if not t:
return ""
t = _normalize_delimiters(t) # Case 1/4 – delimiters
t = _normalize_hyphens(t) # Case 4 – hyphen in house no
t = _remove_addr_relational_markers(t) # Case 16 – s/o, w/o
t = remove_landmark_phrases(t) # NEW – near/nearby/landmark
t = roman_to_number(t) # Case 9 – Roman numerals
t = _normalize_spelled_numbers(t) # Case 14 – thirty-two β†’ 32
t = _expand_all_address_variations(t) # Case 2 – comprehensive abbrev expansion
t = _normalize_landmark_phrases(t) # Case 15 – opp/near synonyms
t = _normalize_po_box(t) # Case 18 – P.O Box canonical form
t = _normalize_directions(t) # Case 17 – East/West β†’ E/W
t = normalize_and_deduplicate_address(t) # Case 19 – dedup tokens
t = re.sub(r'\s+', ' ', t).strip()
return t
# =========================================================
# LANDMARK REMOVAL (new requirement)
# =========================================================
# Keywords that introduce landmark phrases β€” strip everything from
# the keyword up to the next comma/delimiter.
_LANDMARK_INTRO_PATTERNS = re.compile(
r'(?<![a-z])' # not mid-word
r'(near\s*to|nearbyto|near\s*by|nearby|near|landmark\s+is|landmark:|landmark)\s*',
re.IGNORECASE
)
def remove_landmark_phrases(text: str) -> str:
"""
Remove landmark references from address text.
Strips from the landmark keyword up to the next comma (or end of string).
Preserves all other address tokens.
Examples:
"12B Lakshmi Nagar, near Hanuman Temple, Hyderabad"
β†’ "12B Lakshmi Nagar, Hyderabad"
"32 Main Road nearbyto Bus Stand Jaipur"
β†’ "32 Main Road Jaipur"
"""
if not text:
return text
# Split on comma to process segment by segment
parts = text.split(',')
cleaned = []
for part in parts:
# If a landmark keyword appears inside this segment, remove from keyword onward
stripped = _LANDMARK_INTRO_PATTERNS.sub('', part)
# If keyword was found, everything after it was the landmark β€” keep only the part before
if stripped != part:
before = _LANDMARK_INTRO_PATTERNS.split(part)[0].strip()
if before:
cleaned.append(before)
else:
cleaned.append(part.strip())
result = ', '.join(s for s in cleaned if s)
return re.sub(r'\s+', ' ', result).strip()
# =========================================================
# NAMED COMPONENT EXTRACTION (street, colony, sector, nagar …)
# =========================================================
# Keywords that introduce named locality components
_NAMED_COMPONENT_KEYWORDS = [
'street', 'colony', 'sector', 'nagar', 'bhavan', 'bhawan',
'layout', 'enclave', 'vihar', 'phase', 'block', 'ward',
'galli', 'gali', 'cross', 'main', 'road', 'marg', 'lane',
'avenue', 'circle', 'plaza', 'park', 'garden', 'gardens',
'extension', 'extn', 'township', 'town', 'puram', 'pura',
'nagara', 'nagar', 'bazaar', 'bazar', 'market',
]
_NAMED_COMP_PATTERN = re.compile(
r'\b(' + '|'.join(re.escape(k) for k in _NAMED_COMPONENT_KEYWORDS) + r')\b',
re.IGNORECASE
)
def extract_named_components(text: str) -> dict:
"""
Extract named locality components from an address.
Returns dict with:
'components': list of (keyword, full_phrase) tuples found
'remaining': address text with those components removed
Example:
"Plot 5, HSR Layout, Sector 7, Bengaluru"
β†’ components: [('layout','hsr layout'), ('sector','sector 7')]
remaining: "Plot 5, Bengaluru"
"""
if not text:
return {'components': [], 'remaining': text}
t = text.upper()
found = []
consumed_spans = []
for m in _NAMED_COMP_PATTERN.finditer(t):
kw = m.group(1).upper()
start = m.start()
# Grab up to 3 tokens before + 2 tokens after the keyword as the phrase
before_chunk = t[max(0, start-30):start].strip()
after_chunk = t[m.end():min(len(t), m.end()+30)].strip()
# Build phrase: last 1-2 tokens before kw + kw + first 1-2 tokens after
before_toks = before_chunk.split()[-2:] if before_chunk else []
after_toks = after_chunk.split()[:2] if after_chunk else []
phrase = ' '.join(before_toks + [kw] + after_toks).strip()
found.append((kw, phrase))
consumed_spans.append((max(0, start - len(' '.join(before_toks))),
m.end() + len(' '.join(after_toks))))
# Remove found component spans from text for "remaining"
remaining = t
for kw, phrase in found:
remaining = re.sub(re.escape(phrase), ' ', remaining, count=1)
remaining = re.sub(r'\s+', ' ', remaining).strip().strip(',').strip()
return {'components': found, 'remaining': remaining}
def compare_named_components(addr1: str, addr2: str) -> dict:
"""
Compare named locality components between two addresses.
Returns:
'verdict': 'match' | 'mismatch' | 'skip' (skip = one/both sides missing)
'score_adjustment': float to add to base address score
'detail': list of comparison results per keyword
Logic:
- For each keyword present in BOTH addresses: compare the associated phrase.
If phrases are similar (token overlap >= 50%): match (+5 per component)
If phrases clearly differ: mismatch (-20 per component)
- If keyword only present in one address: remove it, continue with rest (skip).
"""
from rapidfuzz import fuzz as _fuzz
c1 = extract_named_components(addr1)
c2 = extract_named_components(addr2)
kw_map1 = {kw: phrase for kw, phrase in c1['components']}
kw_map2 = {kw: phrase for kw, phrase in c2['components']}
shared_kws = set(kw_map1.keys()) & set(kw_map2.keys())
detail = []
score_adj = 0.0
mismatches = 0
for kw in shared_kws:
p1, p2 = kw_map1[kw], kw_map2[kw]
sim = _fuzz.token_set_ratio(p1, p2)
if sim >= 70:
detail.append({'keyword': kw, 'result': 'match', 'score': sim})
score_adj += 5.0
else:
detail.append({'keyword': kw, 'result': 'mismatch', 'score': sim})
score_adj -= 20.0
mismatches += 1
if not shared_kws:
return {'verdict': 'skip', 'score_adjustment': 0.0, 'detail': []}
verdict = 'mismatch' if mismatches > 0 else 'match'
return {'verdict': verdict, 'score_adjustment': score_adj, 'detail': detail}
# =========================================================
# POST BOX NUMBER EXTRACTION & COMPARISON
# =========================================================
_POSTBOX_PATTERN = re.compile(
r'(?:p\.?\s*o\.?\s*box|post\s*box|postbox|p\.?b\.?\s*no\.?|pb\s*no\.?)'
r'\s*[:\-]?\s*(\d{1,6})',
re.IGNORECASE
)
def extract_postbox_number(text: str) -> str | None:
"""
Extract post box number from address text.
Returns the numeric part as string, or None if not found.
"""
if not text:
return None
m = _POSTBOX_PATTERN.search(text)
return m.group(1).strip() if m else None
def remove_postbox_from_address(text: str) -> str:
"""Remove post box reference entirely from address for remaining comparison."""
if not text:
return text
cleaned = _POSTBOX_PATTERN.sub(' ', text)
return re.sub(r'\s+', ' ', cleaned).strip()
def compare_postbox(addr1: str, addr2: str) -> dict:
"""
Extract and compare post box numbers from two addresses.
Returns:
'found': bool β€” True if PO box detected in either address
'adjustment': float
+10 if both have PO box AND numbers match
-30 if both have PO box AND numbers differ
0 if only one (or neither) has PO box (no signal either way)
"""
pb1 = extract_postbox_number(addr1)
pb2 = extract_postbox_number(addr2)
if pb1 is None and pb2 is None:
return {'found': False, 'adjustment': 0.0, 'pb1': None, 'pb2': None}
if pb1 is not None and pb2 is not None:
adj = 10.0 if pb1 == pb2 else -30.0
return {'found': True, 'adjustment': adj, 'pb1': pb1, 'pb2': pb2}
# Only one side has PO box β€” no adjustment
return {'found': True, 'adjustment': 0.0, 'pb1': pb1, 'pb2': pb2}
# =========================================================
# ENHANCED HOUSE NUMBER EXTRACTION
# =========================================================
# Priority-ordered keywords that precede a house/door number
_HNO_KEYWORD_PATTERN = re.compile(
r'\b(?:'
r'd\.?\s*no\.?|door\s*no\.?|h\.?\s*no\.?|house\s*no\.?|'
r'house\s*number|property\s*no\.?|plot\s*no\.?|'
r'flat\s*no\.?|flat\s*number|'
r'mig\s*no\.?|hig\s*no\.?|lig\s*no\.?|'
r'khata\s*no\.?|khasra\s*no\.?'
r')'
r'\s*[:\-]?\s*([A-Z0-9][A-Z0-9\-/]*)',
re.IGNORECASE
)
# "Sector N", "Ward N", "Phase N" β€” these are NOT house numbers
_NON_HNO_COMPONENT_PATTERN = re.compile(
r'\b(sector|ward|phase|block|zone|taluk|village|vill|dist|district|'
r'plot|survey|sy\.?\s*no\.?|s\.?\s*no\.?)\s*[:\-]?\s*(\d+[A-Z]?)',
re.IGNORECASE
)
def extract_house_number_v2(text: str) -> str | None:
"""
Revamped house number extraction with high priority to explicit keywords.
Priority order:
1. Explicit HNO keyword (H.No, D.No, House No, Door No, Plot No, Flat No …)
2. Leading numeric token (first token if it looks like HNO, not sector/ward)
3. Pattern match for compound numbers (12-B, 45/3, A-110)
Explicitly excludes sector numbers, ward numbers, phase numbers, block numbers
from being treated as house numbers.
Returns the extracted house number string or None.
"""
if not text:
return None
t = text.strip()
# Step 1: keyword-based extraction (highest priority)
m = _HNO_KEYWORD_PATTERN.search(t)
if m:
return m.group(1).strip().upper()
# Build set of non-HNO numbers (sector/ward/phase/block values) to exclude
non_hno_values = set()
for nm in _NON_HNO_COMPONENT_PATTERN.finditer(t):
non_hno_values.add(nm.group(2).strip().upper())
# Step 2: leading numeric heuristic
tokens = t.split()
if tokens:
first = tokens[0].upper()
# Must look like a house number (digit or letter+digit)
if re.fullmatch(r'[A-Z]?\d+[A-Z]?(?:[/\-]\d+[A-Z]?)*', first):
if first not in non_hno_values:
return first
# Step 3: compound number pattern anywhere in text
compound_patterns = [
r'\b(\d+[A-Z]?/\d+[A-Z]?)\b', # 45/3, 45/3A
r'\b(\d+-\d+[A-Z]?)\b', # 12-3B
r'\b([A-Z]-\d+[A-Z]?)\b', # A-110
r'\b(\d+[A-Z])\b', # 12B
r'\b(\d{1,4})\b', # plain number
]
for pat in compound_patterns:
for m in re.finditer(pat, t, re.IGNORECASE):
val = m.group(1).strip().upper()
if val not in non_hno_values:
# Additional check: not preceded by sector/ward/phase keyword
before = t[:m.start()].upper()
if not re.search(r'\b(sector|ward|phase|block|zone)\s*$', before):
return val
return None
def compare_house_numbers(addr1: str, addr2: str) -> dict:
"""
Extract and compare house numbers from two addresses.
Returns:
'h1', 'h2': extracted house numbers (or None), normalized alphanumeric-only
'verdict': 'match' | 'mismatch' | 'missing'
'score_adjustment': float
+30 if both present and match AND base_score > 50 (caller must apply conditionally)
-30 if both present and clearly different
0 if one/both absent
"""
h1_raw = extract_house_number_v2(addr1)
h2_raw = extract_house_number_v2(addr2)
# Normalize: strip all non-alphanumerics (144/143 β†’ 144143)
h1 = re.sub(r'[^A-Z0-9]', '', h1_raw.upper()) if h1_raw else None
h2 = re.sub(r'[^A-Z0-9]', '', h2_raw.upper()) if h2_raw else None
if h1 is None and h2 is None:
return {'h1': None, 'h2': None, 'verdict': 'missing', 'score_adjustment': 0.0}
if h1 is not None and h2 is not None:
if h1 == h2:
# Boost deferred: caller applies +30 only when base_score > 50
return {'h1': h1, 'h2': h2, 'verdict': 'match', 'score_adjustment': 0.0}
else:
return {'h1': h1, 'h2': h2, 'verdict': 'mismatch', 'score_adjustment': -30.0}
return {'h1': h1, 'h2': h2, 'verdict': 'missing', 'score_adjustment': 0.0}
# =========================================================
# EXPANDED INDIAN ADDRESS ADMINISTRATIVE VARIATIONS
# =========================================================
# Comprehensive dictionary of rural + urban address abbreviations / variations
# with their standard canonical expansions.
_INDIAN_ADDR_VARIATIONS: dict[str, str] = {
# ── Road / Street ──
'RD': 'ROAD', 'STR': 'STREET', 'ST': 'STREET', 'AVE': 'AVENUE',
'MRG': 'MARG', 'LN': 'LANE', 'BLVD': 'BOULEVARD', 'CIR': 'CIRCLE',
'CR': 'CROSS', 'CROSS RD': 'CROSS ROAD', 'X RD': 'CROSS ROAD',
# ── Locality ──
'NGR': 'NAGAR', 'NGRS': 'NAGAR', 'NAGARA': 'NAGAR',
'LYT': 'LAYOUT', 'LOUT': 'LAYOUT',
'CLY': 'COLONY', 'CLNY': 'COLONY', 'COL': 'COLONY',
'EXT': 'EXTENSION', 'EXTN': 'EXTENSION',
'ENCL': 'ENCLAVE',
'VIHAR': 'VIHAR', # kept as-is but note variants below
'VIHARA': 'VIHAR',
'PURA': 'PURAM', 'PORA': 'PURAM',
# ── Directions ──
'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST',
'NE': 'NORTH EAST', 'NW': 'NORTH WEST', 'SE': 'SOUTH EAST', 'SW': 'SOUTH WEST',
# ── Administrative (urban) ──
'SECT': 'SECTOR', 'SEC': 'SECTOR', 'SCT': 'SECTOR',
'BLK': 'BLOCK', 'BK': 'BLOCK',
'PH': 'PHASE', 'PHZ': 'PHASE',
'APT': 'APARTMENT', 'APTS': 'APARTMENTS',
'BLDG': 'BUILDING', 'BLDGS': 'BUILDINGS',
'FLR': 'FLOOR', 'FL': 'FLOOR',
'OPP': 'OPPOSITE', 'NR': 'NEAR', 'ADJ': 'ADJACENT',
'JN': 'JUNCTION', 'JCT': 'JUNCTION',
'STA': 'STATION', 'STN': 'STATION',
# ── Administrative (rural) ──
'VLG': 'VILLAGE', 'VIL': 'VILLAGE', 'VILL': 'VILLAGE', 'VG': 'VILLAGE',
'GRMA': 'GRAMA', 'GM': 'GRAMA', 'PANCHAYAT': 'PANCHAYAT',
'DIST': 'DISTRICT', 'DST': 'DISTRICT', 'DT': 'DISTRICT', 'ZILLA': 'DISTRICT',
'JILLA': 'DISTRICT', 'ZILA': 'DISTRICT',
'TAL': 'TALUK', 'TQ': 'TALUK', 'TALUKA': 'TALUK',
'TEH': 'TEHSIL', 'TEHS': 'TEHSIL', 'MANDAL': 'MANDAL', 'MD': 'MANDAL',
'POST': 'POST', 'PO': 'POST OFFICE',
'HOBLI': 'HOBLI', 'HBL': 'HOBLI',
'REV': 'REVENUE', 'REV VILLAGE': 'REVENUE VILLAGE',
'SY NO': 'SURVEY NUMBER', 'SY. NO': 'SURVEY NUMBER',
'KHASRA': 'KHASRA', 'KHATA': 'KHATA',
# ── Post box ──
'PB': 'POST BOX', 'PO BOX': 'POST BOX',
'P.O BOX': 'POST BOX', 'P.O. BOX': 'POST BOX',
# ── State abbreviations (already handled by STATE_MAPPING but kept here too) ──
'AP': 'ANDHRA PRADESH', 'TS': 'TELANGANA', 'KA': 'KARNATAKA',
'TN': 'TAMIL NADU', 'MH': 'MAHARASHTRA', 'GJ': 'GUJARAT',
'RJ': 'RAJASTHAN', 'UP': 'UTTAR PRADESH', 'MP': 'MADHYA PRADESH',
'WB': 'WEST BENGAL', 'OR': 'ODISHA', 'OD': 'ODISHA',
}
def _expand_all_address_variations(text: str) -> str:
"""
Expand ALL Indian address administrative variations (rural + urban)
using the comprehensive dictionary above.
Replaces the earlier _expand_address_abbreviations for address lines.
"""
tokens = text.upper().split()
expanded = []
i = 0
while i < len(tokens):
# Try 2-token phrases first (e.g. "SY NO", "PO BOX", "REV VILLAGE")
if i + 1 < len(tokens):
two = tokens[i] + ' ' + tokens[i+1]
two_clean = re.sub(r'\.', '', two)
if two_clean in _INDIAN_ADDR_VARIATIONS:
expanded.append(_INDIAN_ADDR_VARIATIONS[two_clean])
i += 2
continue
tok_clean = re.sub(r'\.', '', tokens[i]) # strip trailing dots
expanded.append(_INDIAN_ADDR_VARIATIONS.get(tok_clean, tokens[i]))
i += 1
return ' '.join(expanded)
# ===========================================================
# NAME VARIATION HANDLERS (all 14 PDF cases)
# ===========================================================
# ── Case 7A: Religious/cultural prefix abbreviation map ──
_NAME_PREFIX_EXPANSION = {
# Mohammed variants
'MD': 'MOHAMMED', 'MOHD': 'MOHAMMED', 'MHD': 'MOHAMMED',
'MUHAMMAD': 'MOHAMMED', 'MOHAMAD': 'MOHAMMED', 'MOHHAMED': 'MOHAMMED',
'MUHAMED': 'MOHAMMED', 'MUHAMMED': 'MOHAMMED', 'MOHAMMD': 'MOHAMMED',
# Sheikh / Shaikh variants (Case 7A: sk β†’ sheikh)
'SK': 'SHEIKH', 'SHK': 'SHEIKH', 'SHAIKH': 'SHEIKH',
'SHEKH': 'SHEIKH', 'SHIEKH': 'SHEIKH', 'SHEIK': 'SHEIKH',
'SHEK': 'SHEIKH', 'SAIKH': 'SHEIKH',
# Abdul variants
'ABD': 'ABDUL', 'ABDL': 'ABDUL', 'ABDU': 'ABDUL',
# Syed / Saiyed variants
'SYD': 'SYED', 'SYE': 'SYED', 'SAIYAD': 'SYED', 'SAIYED': 'SYED',
'SAYYED': 'SYED', 'SAYYAD': 'SYED',
# Kumari / Km variants
'KUM': 'KUMARI', 'KM': 'KUMARI',
# Chaudhary variants
'CH': 'CHAUDHARY', 'CHD': 'CHAUDHARY', 'CHOUDHARY': 'CHAUDHARY',
'CHOWDHARY': 'CHAUDHARY', 'CHOWDARY': 'CHAUDHARY',
# Bala variants
'BAL': 'BALA',
# Ranga variants
'RNG': 'RANGA',
}
def _expand_name_prefix_abbreviations(text: str) -> str:
"""Expand religious/cultural name prefix abbreviations."""
tokens = text.upper().split()
result = []
for tok in tokens:
clean_tok = tok.rstrip('.')
result.append(_NAME_PREFIX_EXPANSION.get(clean_tok.upper(), tok))
return ' '.join(result)
# ── Case 8: Special characters in names ──
def _remove_name_special_chars(text: str) -> str:
"""Remove hyphens, slashes and punctuation from names."""
text = re.sub(r'[-/\\@$%^&*\(\)\[\]\{\};:\'"<>?!]', ' ', text)
return re.sub(r'\s+', ' ', text).strip()
# ── Case 10: Organisation suffix removal ──
_ORG_SUFFIXES = re.compile(
r'\b(AND\s+SONS?|ENTERPRISES?|TRADERS?|INDUSTRIES|LTD|PVT\.?\s*LTD|'
r'LIMITED|CORP|CORPORATION|INC|LLC|CO\.?\s*LTD|COMPANY|ASSOCIATES?|'
r'BROTHERS?|BROS?|AGENCIES?)\b',
re.IGNORECASE
)
def _remove_org_suffixes(text: str) -> str:
"""Remove organisation suffix tokens from name fields."""
return re.sub(r'\s+', ' ', _ORG_SUFFIXES.sub(' ', text)).strip()
# ── Case 1 (name): merged token split helper ──
# e.g. "DIGVIJAYSINGH" β†’ "DIGVIJAY SINGH"
# We rely on fuzzy/phonetic similarity rather than a hard split,
# but we add a camel-case splitter as a best-effort normaliser.
def _split_merged_tokens(text: str) -> str:
"""
Best-effort split of CamelCase or merged uppercase tokens.
'DiGVIJAYSINGH' β†’ 'Di GVIJAY SINGH' (rough; embeddings handle remainder).
Only applied when token length > 12 and no spaces present.
"""
tokens = text.split()
result = []
for tok in tokens:
if len(tok) > 12:
# Insert space before uppercase letters preceded by lowercase
split = re.sub(r'([a-z])([A-Z])', r'\1 \2', tok)
result.append(split)
else:
result.append(tok)
return ' '.join(result)
# ── Case 13: Relational name markers ──
# Already handled by remove_relational_prefixes in original code.
# Ensure it is called in preprocess_name (it is).
# ── Case 11: Name with DOB / extra numeric content ──
# Already handled by remove_non_alpha_trailing in original code.
# ── Enhanced preprocess_name ──
def enhanced_preprocess_name(text: str) -> str:
"""
Extended name preprocessing pipeline covering all 14 PDF cases.
Calls original pipeline steps PLUS new variation handlers.
"""
if not text or not isinstance(text, str):
return ""
t = clean_text(text)
if not t:
return ""
t = collapse_repeated_chars(t) # Case 6 – typo / repeated chars
t = remove_relational_prefixes(t) # Case 13 – w/o, s/o
t = remove_non_alpha_trailing(t) # Case 11 – dates/numbers
t = _remove_name_special_chars(t) # Case 8 – hyphens/punctuation
t = remove_name_titles(t) # Case 7 – Dr, Mr, Shri …
t = _expand_name_prefix_abbreviations(t) # Case 7A – Md β†’ Mohammed
t = _remove_org_suffixes(t) # Case 10 – and Sons, Ltd
t = _split_merged_tokens(t) # Case 1 – merged tokens
t = deduplicate_tokens(t) # dedup
t = standardize_name_variations(t) # CSV variation map
return t.strip()
def preprocess_name(text):
"""
Full name preprocessing pipeline for embedding model matching.
Steps:
1. Clean text (strip, remove HTML/unicode, collapse spaces, lowercase)
2. Remove relational prefixes (s/o, d/o, w/o etc.)
3. Remove non-alpha trailing content (dates, numbers)
4. Remove title prefixes/suffixes (Dr, Mr, Shri etc.)
5. Deduplicate tokens
6. Standardize name variations from CSV
"""
if not text or not isinstance(text, str):
return ""
# 1. Basic cleaning + lowercase
text = clean_text(text)
if not text:
return ""
# 1b. Collapse repeated characters (typo fix: "mohammmed" β†’ "mohammed")
text = collapse_repeated_chars(text)
# 2. Remove relational prefixes (keep names after s/o etc.)
text = remove_relational_prefixes(text)
# 3. Remove non-alpha content (dates, numbers embedded in names)
text = remove_non_alpha_trailing(text)
# 4. Remove title prefixes/suffixes
text = remove_name_titles(text)
# 5. Remove duplicate tokens
text = deduplicate_tokens(text)
# 6. Standardize name variations from CSV
text = standardize_name_variations(text)
# # 7. Enhanced variations (Case 7A, 8, 10, 1-merged-tokens)
# text = _expand_name_prefix_abbreviations(text)
# text = _remove_name_special_chars(text)
# text = _remove_org_suffixes(text)
# text = _split_merged_tokens(text)
# text = deduplicate_tokens(text)
return text.strip()
# =========================================================
# SURNAME DETECTION AND INITIAL LETTER MATCHING
# =========================================================
def detect_surnames(text):
"""
Detect which tokens in specified text are common surnames
from sur_comm_names.csv.
Returns: set of surname tokens found.
"""
if not text or sur_comm_names_df.empty:
return set()
# Build surname set (cached on first call)
if not hasattr(detect_surnames, '_surname_set'):
surname_set = set()
col = 'surname_community_extension' if 'surname_community_extension' in sur_comm_names_df.columns else sur_comm_names_df.columns[-1]
for val in sur_comm_names_df[col].dropna():
surname_set.add(str(val).strip().upper())
detect_surnames._surname_set = surname_set
tokens = text.upper().split()
return {t for t in tokens if t in detect_surnames._surname_set}
# def compute_initial_letter_boost(name1_tokens, name2_tokens):
# """
# Case 3A: Multi-initial matching.
# After token sorting, checks whether every single-char initial in one name
# corresponds (by first letter) to a full-word token in the other name.
# Logic (applied after alphabetical sort):
# 1. Find common full-word tokens (exact match) between both names.
# 2. From remaining tokens:
# - side A: collect single-char initials β†’ initial_set
# - side B: collect full words β†’ full_words
# 3. For every initial in initial_set, check if a full word in full_words
# starts with that letter (one-to-one pairing, each word used once).
# 4. If ALL initials are matched β†’ return 0.2 (boost).
# If ANY initial has NO match β†’ return -0.2 (mismatch penalty).
# If no initials on either side β†’ return 0.0 (no signal).
# Examples:
# ["k","v","reddy"] vs ["katta","venkata","reddy"]:
# common={"reddy"}, initials={"k","v"}, full={"katta","venkata"}
# kβ†’katta βœ“, vβ†’venkata βœ“ β†’ +0.2
# ["k","v","reddy"] vs ["krishna","mohan","reddy"]:
# common={"reddy"}, initials={"k","v"}, full={"krishna","mohan"}
# kβ†’krishna βœ“, vβ†’? no word starts with v β†’ -0.2 (mismatch)
# """
# if not name1_tokens or not name2_tokens:
# return 0.0
# set1 = set(name1_tokens)
# set2 = set(name2_tokens)
# common = set1 & set2
# rem1 = [t for t in name1_tokens if t not in common]
# rem2 = [t for t in name2_tokens if t not in common]
# if not rem1 and not rem2:
# return 0.0
# # Identify which side has initials (single-char tokens)
# initials1 = [t for t in rem1 if len(t) == 1]
# initials2 = [t for t in rem2 if len(t) == 1]
# full1 = [t for t in rem1 if len(t) > 1]
# full2 = [t for t in rem2 if len(t) > 1]
# def _match_initials_to_full(initials, full_words):
# """
# Try to pair each initial to a distinct full word starting with that letter.
# Returns True if all initials matched, False if any unmatched.
# """
# available = list(full_words) # copy so we can consume
# for init in initials:
# matched = False
# for i, word in enumerate(available):
# if word and word[0].upper() == init.upper():
# available.pop(i)
# matched = True
# break
# if not matched:
# return False
# return True
# # Case: side 1 has initials, side 2 has full words
# if initials1 and full2:
# if _match_initials_to_full(initials1, full2):
# return 0.2 # all initials matched
# else:
# return -0.2 # at least one initial did NOT match β†’ mismatch signal
# # Case: side 2 has initials, side 1 has full words
# if initials2 and full1:
# if _match_initials_to_full(initials2, full1):
# return 0.2
# else:
# return -0.2
# # Both sides have initials (e.g. "K V Reddy" vs "K M Reddy")
# # Compare initials sets directly
# if initials1 and initials2:
# init_set1 = {t.upper() for t in initials1}
# init_set2 = {t.upper() for t in initials2}
# if init_set1 == init_set2:
# return 0.2
# else:
# return -0.2 # initials differ β†’ mismatch
# return 0.0
def compute_initial_letter_boost(name1_tokens, name2_tokens):
"""
If one name has more tokens than the other, check if the initials
of the shorter name match the first letters of tokens in the longer name.
Returns 0.2 boost if initials match, else 0.0.
Example: ["k", "v", "reddy"] vs ["krishna", "venkata", "reddy"]
Common tokens: {"reddy"}
Remaining short: ["k", "v"], remaining long: ["krishna", "venkata"]
Initials of short: {"k", "v"}, first-letters of long: {"k", "v"} β†’ match β†’ +0.2
"""
if not name1_tokens or not name2_tokens:
return 0.0
# Find common tokens
set1, set2 = set(name1_tokens), set(name2_tokens)
common = set1 & set2
# Get remaining (non-common) tokens
rem1 = [t for t in name1_tokens if t not in common]
rem2 = [t for t in name2_tokens if t not in common]
if not rem1 or not rem2:
return 0.0
# Determine shorter and longer remaining lists
if len(rem1) <= len(rem2):
shorter, longer = rem1, rem2
else:
shorter, longer = rem2, rem1
# Check if all tokens in shorter are single-char initials
shorter_initials = {t[0] for t in shorter if len(t) == 1}
if not shorter_initials:
return 0.0
# Get first letters of longer tokens
longer_first_letters = {t[0] for t in longer if t}
# If every initial in the shorter set matches some first letter in longer
if shorter_initials.issubset(longer_first_letters):
return 0.2
return 0.0
def replace_with_standard(string_value, df=None):
"""
Replace string with standard value if found in CSV variation column.
Handles exact match AND substring match (e.g., "TRIVANDRUM KERALA" matches "TRIVANDRUM").
Args:
string_value: String to search for
df: DataFrame with 'VARIATION' and 'STANDARD' columns (optional)
Returns:
Standard value if found, otherwise original string
"""
source_df = df if df is not None and not df.empty else name_variation_df
if source_df.empty:
return string_value
string_upper = string_value.strip().upper()
variations = source_df['VARIATION'].str.strip().str.upper()
# 1. Exact match first (fastest, most precise)
exact_mask = variations == string_upper
if exact_mask.any():
return source_df.loc[exact_mask, 'STANDARD'].iloc[0]
# 2. Substring match: check if any variation is a word-boundary substring of string_value
# e.g., "TRIVANDRUM" inside "TRIVANDRUM KERALA"
for idx, variation in variations.items():
if not variation:
continue
# Use word boundary to avoid partial word matches (e.g., "PUNE" in "IMPUNE")
pattern = r'\b' + re.escape(variation) + r'\b'
if re.search(pattern, string_upper):
return source_df.loc[idx, 'STANDARD']
# 3. Reverse check: string_value is a substring of a variation
# e.g., input "TRIVANDRUM" matching variation "TRIVANDRUM KERALA"
for idx, variation in variations.items():
if not variation:
continue
pattern = r'\b' + re.escape(string_upper) + r'\b'
if re.search(pattern, variation):
return source_df.loc[idx, 'STANDARD']
return string_value
def lookup_from_mapping(value, mapping_dict):
"""
Look up a value in a mapping dictionary (Value List -> Key)
Example: {"BENGALURU": ["BANGALORE", "BENGALURU"]}
Handles:
1. Exact key match: "BENGALURU" -> "BENGALURU"
2. Exact variation match: "BANGALORE" -> "BENGALURU"
3. Variation-in-input: "BANGALORE KARNATAKA" -> "BENGALURU"
4. Input-in-variation: "BANGAL" inside variation "BANGAL URBAN" -> "BENGALURU"
"""
if not value or not mapping_dict:
return None
value_upper = str(value).strip().upper()
# 1. Exact key match
if value_upper in mapping_dict:
return value_upper
# 2. Exact variation match
for standard, variations in mapping_dict.items():
if isinstance(variations, list):
if value_upper in [v.strip().upper() for v in variations]:
return standard
# 3. Variation-in-input (e.g., "BANGALORE" found inside "BANGALORE KARNATAKA")
for standard, variations in mapping_dict.items():
if isinstance(variations, list):
for variation in variations:
pattern = r'\b' + re.escape(variation.strip().upper()) + r'\b'
if re.search(pattern, value_upper):
return standard
# 4. Input-in-variation (e.g., input "BANGAL" found inside variation "BANGAL URBAN")
for standard, variations in mapping_dict.items():
if isinstance(variations, list):
for variation in variations:
pattern = r'\b' + re.escape(value_upper) + r'\b'
if re.search(pattern, variation.strip().upper()):
return standard
return None
# =========================================================
# PINCODE SIMILARITY FUNCTION
# =========================================================
def pincode_similarity_india(pin1, pin2):
"""
Calculate similarity between two Indian pincodes based on geographic distance
and metro/non-metro classification.
Args:
pin1: First pincode (string or int)
pin2: Second pincode (string or int)
Returns:
dict: Contains match status, similarity score, distance, and classification details,
plus geocoding details (county_name, state_name for both pins)
"""
INVALID_VALUES = {None, "", "-", "NA", "N/A", "NULL"}
def is_missing(pin):
return pin is None or str(pin).strip().upper() in INVALID_VALUES
if is_missing(pin1) or is_missing(pin2):
return {
"match": False,
"similarity_score": None,
"distance_km": None,
"area_type": "Missing pincode",
"reason": "One or both pincodes are null / empty / placeholder",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# ========== INPUT VALIDATION & NORMALIZATION ==========
try:
pin1 = str(pin1).strip().zfill(6)
pin2 = str(pin2).strip().zfill(6)
# ========== HARD SHORT-CIRCUIT: EXACT SAME PIN ==========
if pin1 == pin2:
# Still need to get geocoding data for city/state extraction
try:
nomi = pgeocode.Nominatim("IN")
p1 = nomi.query_postal_code(pin1)
# Extract city and state
county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None
state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None
return {
"match": True,
"similarity_score": 100,
"distance_km": 0.0,
"area_type": "Exact same pincode",
"is_metro_logic": None,
"is_extended_metro": None,
"metro_cluster": None,
"pin1_prefix": pin1[:3],
"pin2_prefix": pin2[:3],
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": county_name,
"pin2_county_name": county_name,
"pin1_state_name": state_name,
"pin2_state_name": state_name,
"pin1_location": None,
"pin2_location": None,
}
except Exception as e:
return {
"match": True,
"similarity_score": 100,
"distance_km": 0.0,
"area_type": "Exact same pincode",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
except (ValueError, AttributeError):
return {
"match": False,
"similarity_score": 0,
"reason": "Invalid pincode format - cannot convert to string",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# Validate format
if len(pin1) != 6 or len(pin2) != 6:
return {
"match": False,
"similarity_score": 0,
"reason": f"Invalid pincode length (pin1: {len(pin1)}, pin2: {len(pin2)})",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
if not pin1.isdigit() or not pin2.isdigit():
return {
"match": False,
"similarity_score": 0,
"reason": "Pincode must contain only digits",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# Check for invalid ranges (Indian pincodes: 110001-855117)
pin1_num = int(pin1)
pin2_num = int(pin2)
if pin1_num < 110001 or pin1_num > 855117 or pin2_num < 110001 or pin2_num > 855117:
return {
"match": False,
"similarity_score": 0,
"reason": "Pincode outside valid Indian range (110001-855117)",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# ========== CONFIGURATION ==========
# Major metro city prefixes (3-digit)
METRO_PIN_PREFIXES = {
"110", # Delhi NCR
"400", # Mumbai
"560", # Bengaluru
"600", # Chennai
"500", # Hyderabad
"700", # Kolkata
"411", # Pune
"380", # Ahmedabad
}
# Extended metro regions (satellite cities, suburbs)
EXTENDED_METROS = [
{"110", "201", "122", "121", "124"}, # Delhiβ€”Noidaβ€”Gurgaonβ€”Faridabadβ€”Ghaziabad
{"400", "421", "410"}, # Mumbaiβ€”Thaneβ€”Navi Mumbai
{"500", "501"}, # Hyderabadβ€”Secunderabad
{"560", "562"}, # Bengaluruβ€”Whitefieldβ€”Electronic City
{"600", "601", "603"}, # Chennaiβ€”Kanchipuramβ€”Chengalpattu
{"700", "711", "712"}, # Kolkataβ€”Howrahβ€”Hooghly
]
# Distance thresholds for metro areas (km)
METRO_THRESHOLDS = {
"same_locality": 8, # Very close neighborhoods
"nearby": 15, # Adjacent areas/suburbs
"same_metro": 35, # Within metro limits
"extended_metro": 60, # Extended metro region
}
# Distance thresholds for non-metro areas (km)
NON_METRO_THRESHOLDS = {
"same_locality": 5, # Same town/village cluster
"nearby": 12, # Adjacent towns
"same_district": 40, # Within district (approximate)
}
# ========== UTILITY FUNCTIONS ==========
def haversine(lat1, lon1, lat2, lon2):
"""Calculate distance between two lat/lon points using Haversine formula"""
R = 6371 # Earth's radius in kilometers
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (
math.sin(dlat / 2) ** 2 +
math.cos(math.radians(lat1)) *
math.cos(math.radians(lat2)) *
math.sin(dlon / 2) ** 2
)
c = 2 * math.asin(math.sqrt(a))
return R * c
# ========== GEOCODING LOOKUP ==========
try:
nomi = pgeocode.Nominatim("IN")
p1 = nomi.query_postal_code(pin1)
p2 = nomi.query_postal_code(pin2)
except Exception as e:
return {
"match": False,
"similarity_score": 0,
"reason": f"Geocoding service error: {str(e)}",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# Check if geocoding was successful
if p1 is None or p2 is None:
return {
"match": False,
"similarity_score": 0,
"reason": "Geocoding returned None",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
if (p1.latitude is None or p1.longitude is None or
p2.latitude is None or p2.longitude is None or
math.isnan(p1.latitude) or math.isnan(p2.latitude)):
return {
"match": False,
"similarity_score": 0,
"reason": "Pincode not found in geocoding database",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# ========== EXTRACT CITY AND STATE FROM GEOCODING ==========
pin1_county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None
pin2_county_name = p2.county_name if hasattr(p2, 'county_name') and not (p2.county_name is None or (isinstance(p2.county_name, float) and math.isnan(p2.county_name))) else None
pin1_state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None
pin2_state_name = p2.state_name if hasattr(p2, 'state_name') and not (p2.state_name is None or (isinstance(p2.state_name, float) and math.isnan(p2.state_name))) else None
# ========== DISTANCE CALCULATION ==========
distance = haversine(
p1.latitude, p1.longitude,
p2.latitude, p2.longitude
)
# ========== PREFIX EXTRACTION ==========
prefix1 = pin1[:3]
prefix2 = pin2[:3]
# ========== METRO CLASSIFICATION ==========
is_metro = False
is_extended_metro = False
metro_cluster_name = None
# Check if both pincodes belong to same extended metro cluster
for cluster in EXTENDED_METROS:
if prefix1 in cluster and prefix2 in cluster:
is_extended_metro = True
is_metro = True
if "110" in cluster:
metro_cluster_name = "Delhi NCR"
elif "400" in cluster:
metro_cluster_name = "Mumbai Metropolitan Region"
elif "500" in cluster:
metro_cluster_name = "Hyderabad Metro"
elif "560" in cluster:
metro_cluster_name = "Bengaluru Metro"
elif "600" in cluster:
metro_cluster_name = "Chennai Metro"
elif "700" in cluster:
metro_cluster_name = "Kolkata Metro"
break
# Check if same metro prefix
if not is_metro and prefix1 == prefix2 and prefix1 in METRO_PIN_PREFIXES:
is_metro = True
metro_map = {
"110": "Delhi", "400": "Mumbai", "560": "Bengaluru",
"600": "Chennai", "500": "Hyderabad", "700": "Kolkata",
"411": "Pune", "380": "Ahmedabad"
}
metro_cluster_name = metro_map.get(prefix1, "Metro City")
one_is_metro = prefix1 in METRO_PIN_PREFIXES or prefix2 in METRO_PIN_PREFIXES
# ========== SIMILARITY SCORING LOGIC ==========
score = 0
if is_metro:
if distance <= METRO_THRESHOLDS["same_locality"]:
score = 95
elif distance <= METRO_THRESHOLDS["nearby"]:
score = 85
elif distance <= METRO_THRESHOLDS["same_metro"]:
score = 70
elif is_extended_metro and distance <= METRO_THRESHOLDS["extended_metro"]:
score = 60
else:
score = 35
elif one_is_metro and not is_metro:
if distance <= 20:
score = 50
else:
score = 25
else:
same_state = False
if hasattr(p1, 'state_name') and hasattr(p2, 'state_name'):
same_state = p1.state_name == p2.state_name
if distance <= NON_METRO_THRESHOLDS["same_locality"]:
score = 92
elif distance <= NON_METRO_THRESHOLDS["nearby"]:
score = 75
elif distance <= NON_METRO_THRESHOLDS["same_district"]:
score = 55
elif same_state and distance <= 100:
score = 40
else:
score = 20
return {
"match": score >= 60,
"similarity_score": score,
"distance_km": distance,
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": pin1_county_name,
"pin2_county_name": pin2_county_name,
"pin1_state_name": pin1_state_name,
"pin2_state_name": pin2_state_name,
"area_type": metro_cluster_name if is_metro else "Non-metro",
"is_metro_logic": is_metro,
"is_extended_metro": is_extended_metro
}
# =========================================================
# NORMALIZATION & PREPROCESSING
# =========================================================
def preprocess_text(text):
"""Remove extra trailing/leading spaces and normalize whitespace"""
if not text:
return ""
text = re.sub(r"\s+", " ", text.strip())
return text
def normalize_text(text):
"""Normalize text to uppercase and remove extra spaces"""
return re.sub(r"\s+", " ", text.upper().strip()) if text else ""
# =========================================================
# VALIDATION FUNCTIONS
# =========================================================
def validate_and_normalize_pincode(pincode):
"""
Validate and normalize pincode to exactly 6 digits
Returns normalized pincode or None if invalid
"""
if not pincode:
return None
digits = re.sub(r'\D', '', str(pincode).strip())
if len(digits) == 6:
return digits
return None
def validate_and_normalize_phone(phone):
"""
Validate and normalize phone to exactly 10 digits
Handles formats: +91, 91-, 91, or plain 10 digits
Returns normalized 10-digit phone or None if invalid
"""
if not phone:
return None
phone_str = str(phone).strip()
# Remove common prefixes and separators
phone_str = re.sub(r'^\+91[-\s]?', '', phone_str)
phone_str = re.sub(r'^91[-\s]?', '', phone_str)
phone_str = re.sub(r'^0[-\s]?', '', phone_str)
digits = re.sub(r'\D', '', phone_str)
if len(digits) == 10:
return digits
return None
def validate_and_normalize_email(email):
"""
Validate and normalize email using regex
Returns normalized email or None if invalid
"""
if not email:
return None
email_str = str(email).strip().upper()
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(email_pattern, email_str):
return email_str
return None
return None
def validate_and_normalize_pan(pan):
"""
Validate and normalize PAN (Permanent Account Number)
Format: 5 letters, 4 digits, 1 letter (e.g., ABCDE1234F)
"""
if not pan:
return None
# Remove spaces and hyphens, convert to uppercase
pan_str = str(pan).strip().upper()
pan_str = re.sub(r'[\s-]', '', pan_str)
# Check length
if len(pan_str) != 10:
return None
# Regex validation
pattern = r'^[A-Z]{5}[0-9]{4}[A-Z]{1}$'
if re.match(pattern, pan_str):
return pan_str
return None
# Verhoeff Algorithm Tables
verhoeff_table_d = [
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[1, 2, 3, 4, 0, 6, 7, 8, 9, 5],
[2, 3, 4, 0, 1, 7, 8, 9, 5, 6],
[3, 4, 0, 1, 2, 8, 9, 5, 6, 7],
[4, 0, 1, 2, 3, 9, 5, 6, 7, 8],
[5, 9, 8, 7, 6, 0, 4, 3, 2, 1],
[6, 5, 9, 8, 7, 1, 0, 4, 3, 2],
[7, 6, 5, 9, 8, 2, 1, 0, 4, 3],
[8, 7, 6, 5, 9, 3, 2, 1, 0, 4],
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
]
verhoeff_table_p = [
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
[1, 5, 7, 6, 2, 8, 3, 0, 9, 4],
[5, 8, 0, 3, 7, 9, 6, 1, 4, 2],
[8, 9, 1, 6, 0, 4, 3, 5, 2, 7],
[9, 4, 5, 3, 1, 2, 6, 8, 7, 0],
[4, 2, 8, 6, 5, 7, 3, 9, 0, 1],
[2, 7, 9, 3, 8, 0, 6, 4, 1, 5],
[7, 0, 4, 6, 9, 1, 3, 2, 5, 8]
]
verhoeff_table_inv = [0, 4, 3, 2, 1, 5, 6, 7, 8, 9]
def validate_verhoeff(num):
"""Validate Verhoeff checksum for a given number string."""
c = 0
ll = list(map(int, reversed(num)))
for i, item in enumerate(ll):
c = verhoeff_table_d[c][verhoeff_table_p[i % 8][item]]
return c == 0
def validate_and_normalize_aadhar(aadhar):
"""
Validate and normalize Aadhar Number using Verhoeff algorithm
Format: 12 digits, last digit is checksum
"""
if not aadhar:
return None
# Remove spaces and hyphens
aadhar_str = str(aadhar).strip()
aadhar_str = re.sub(r'[\s-]', '', aadhar_str)
# Check if all digits and length is 12
if aadhar_str.isdigit() and len(aadhar_str) == 12:
# Prevent trivial sequences like 0000... or 1111... if desired, but Verhoeff usually catches invalid checksums.
# However, 000000000000 is often invalid in practice, but Verhoeff of all 0s is 0.
# Aadhar spec: "It is a 12 digit random number" - but checksum must hold.
if validate_verhoeff(aadhar_str):
return aadhar_str
return None
def normalize_dob(text: str) -> str:
"""
Extract and normalize date from text to DD-MM-YYYY format using regex.
"""
if not text:
return None
text = text.strip()
text = re.sub(r'\s*([-/.])\s*', r'\1', text)
text_lower = text.upper()
month_names = {
'jan': '01', 'january': '01', 'feb': '02', 'february': '02',
'mar': '03', 'march': '03', 'apr': '04', 'april': '04',
'may': '05', 'jun': '06', 'june': '06', 'jul': '07', 'july': '07',
'aug': '08', 'august': '08', 'sep': '09', 'sept': '09', 'september': '09',
'oct': '10', 'october': '10', 'nov': '11', 'november': '11',
'dec': '12', 'december': '12',
'1': '01', '2': '02', '3': '03', '4': '04', '5': '05',
'6': '06', '7': '07', '8': '08', '9': '09'
}
def normalize_number(num_str: str) -> str:
num = int(num_str)
if 1 <= num <= 9:
return f'0{num}'
return str(num)
def is_valid_year(year_str: str) -> bool:
try:
year = int(year_str)
return 1900 <= year <= 2026
except ValueError:
return False
def validate_and_determine_format(first: str, second: str) -> tuple:
"""Determine if DD-MM or MM-DD format and return (month, day)"""
try:
first_int = int(first)
second_int = int(second)
except ValueError:
return (None, None)
if first_int < 1 or second_int < 1:
return (None, None)
# If first > 12, it must be day, so second is month
if first_int > 12:
if first_int > 31 or second_int > 12 or second_int < 1:
return (None, None)
return (normalize_number(second), normalize_number(first)) # (month, day)
# If second > 12, it must be day, so first is month
if second_int > 12:
if second_int > 31 or first_int > 12 or first_int < 1:
return (None, None)
return (normalize_number(first), normalize_number(second)) # (month, day)
# Both <= 12, ambiguous - assume DD-MM format (common in India)
if first_int > 31 or second_int > 31:
return (None, None)
return (normalize_number(second), normalize_number(first)) # (month, day)
# Pattern 1: YYYY-MM-DD or YYYY/MM/DD or YYYY.MM.DD or YYYY MM DD
# Also handles YYYY-DD-MM when second > 12 (must be day, not month)
match = re.search(r'(\d{4})[-\/\.\s](\d{1,2})[-\/\.\s](\d{1,2})', text)
if match:
year, second, third = match.groups()
if not is_valid_year(year):
# print(f"Invalid year detected: {year}")
pass
else:
second_int = int(second)
third_int = int(third)
if second_int > 12 and 1 <= third_int <= 12:
# second > 12 means it MUST be the day β†’ YYYY-DD-MM
day = normalize_number(second)
month = normalize_number(third)
elif 1 <= second_int <= 12:
# Standard YYYY-MM-DD
month = normalize_number(second)
day = normalize_number(third)
else:
# Both > 12 or invalid β€” skip to next pattern
day = None
month = None
if day and month:
try:
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
# print(f"Invalid date: {day}-{month}-{year}")
pass
# Pattern 1.5: YYYY-MMM-DD or YYYY/MMM/DD or YYYY MMM DD (e.g., 2002-sept-30, 2002/Mar/15)
match = re.search(r'(\d{4})[-\/\.\s]([a-z]{3,9})[-\/\.\s](\d{1,2})', text_lower)
if match:
year, month_str, day_str = match.groups()
if not is_valid_year(year):
# print(f"Invalid year detected: {year}")
pass
elif month_str in month_names:
day = normalize_number(day_str)
month = month_names[month_str]
try:
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
# print(f"Invalid date: {day}-{month}-{year}")
pass
# Pattern 2: DD-MM-YYYY or DD/MM/YYYY or DD.MM.YYYY or DD MM YYYY
match = re.search(r'\b(\d{1,2})[-\/\.\s](\d{1,2})[-\/\.\s](\d{4})\b', text)
if match:
first, second, year = match.groups()
if not is_valid_year(year):
# print(f"Invalid year detected: {year}")
pass
else:
month, day = validate_and_determine_format(first, second)
if month is None or day is None:
return "Invalid DOB"
try:
# Correct datetime constructor: (year, month, day)
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
# print(f"Invalid date: {day}-{month}-{year}")
pass
# Pattern 3: DDMMYYYY (8 continuous digits)
match = re.search(r'\b(\d{2})(\d{2})(\d{4})\b', text)
if match:
first, second, year = match.groups()
if not is_valid_year(year):
# print(f"Invalid year detected: {year}")
pass
else:
month, day = validate_and_determine_format(first, second)
if month is None or day is None:
return "Invalid DOB"
try:
# Correct datetime constructor: (year, month, day)
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
# print(f"Invalid date: {day}-{month}-{year}")
pass
# Pattern 4: DD-MMM-YYYY or DD MMM YYYY
match = re.search(r'\b(\d{1,2})[-\s]([a-z]{3,9})[-\s](\d{4})\b', text_lower)
if match:
day_str, month_str, year = match.groups()
if not is_valid_year(year):
# print(f"Invalid year detected: {year}")
pass
elif month_str in month_names:
day = normalize_number(day_str)
month = month_names[month_str]
try:
# Correct datetime constructor: (year, month, day)
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
# print(f"Invalid date: {day}-{month}-{year}")
pass
# Pattern 4.5: DDMMMYYYY or DDMMMYY (no separators) - e.g., 05Mar1992, 05MAR92
match = re.search(r'\b(\d{1,2})([a-z]{3,9})(\d{4}|\d{2})\b', text_lower)
if match:
day_str, month_str, year = match.groups()
# Handle 2-digit year
if len(year) == 2:
year_int = int(year)
if year_int >= 0 and year_int <= 26:
year = f'20{year}'
else:
year = f'19{year}'
if not is_valid_year(year):
# print(f"Invalid year detected: {year}")
pass
elif month_str in month_names:
day = normalize_number(day_str)
month = month_names[month_str]
try:
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
# print(f"Invalid date: {day}-{month}-{year}")
pass
# Pattern 5: MMM DD, YYYY or MONTH DD, YYYY or MMM-DD-YYYY (Mar 05, 1992 or sept-30-2000)
match = re.search(r'\b([a-z]{3,9})[-\/\.\s](\d{1,2})[-\/\.\s,]+(\d{4})\b', text_lower)
if match:
month_str, day_str, year = match.groups()
if not is_valid_year(year):
# print(f"Invalid year detected: {year}")
pass
elif month_str in month_names:
day = normalize_number(day_str)
month = month_names[month_str]
try:
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
# print(f"Invalid date: {day}-{month}-{year}")
pass
# Pattern 6: DD-MMM-YY (05-MAR-92)
match = re.search(r'\b(\d{1,2})[-\s]([a-z]{3,9})[-\s](\d{2})\b', text_lower)
if match:
day_str, month_str, year_short = match.groups()
# Convert 2-digit year to 4-digit
year_int = int(year_short)
if year_int >= 0 and year_int <= 26:
year = f'20{year_short}'
else:
year = f'19{year_short}'
if month_str in month_names:
day = normalize_number(day_str)
month = month_names[month_str]
try:
dt = datetime(int(year), int(month), int(day))
return f'{day}-{month}-{year}'
except ValueError:
print(f"Invalid date: {day}-{month}-{year}")
pass
return None
# =========================================================
# PGEOCODE LOOKUP (offline after first run, cached)
# =========================================================
_PGEOCODE_NOMI_INST = None
_PGEOCODE_LOOKUP_CACHE: dict = {}
def _get_pgeocode_inst():
"""Return cached pgeocode.Nominatim("IN") instance."""
global _PGEOCODE_NOMI_INST
if _PGEOCODE_NOMI_INST is None:
try:
import pgeocode as _pgeocode_lib
_PGEOCODE_NOMI_INST = _pgeocode_lib.Nominatim("IN")
logger.info("pgeocode loaded for India (offline pincode DB).")
except Exception as e:
logger.warning("pgeocode unavailable β€” pincode enrichment disabled: %s", e)
return _PGEOCODE_NOMI_INST
def lookup_pincode_info(pin: str) -> dict:
"""
Offline lookup of a 6-digit Indian pincode.
Returns dict: {district, state, place, lat, lng}
All values are strings (empty string if not found), lat/lng are float or None.
Result is cached in memory after first call β€” no repeated disk/network I/O.
"""
if not pin:
return {}
pin_str = re.sub(r"\D", "", str(pin).strip()).zfill(6)
if len(pin_str) != 6:
return {}
if pin_str in _PGEOCODE_LOOKUP_CACHE:
return _PGEOCODE_LOOKUP_CACHE[pin_str]
db = _get_pgeocode_inst()
if db is None:
_PGEOCODE_LOOKUP_CACHE[pin_str] = {}
return {}
try:
row = db.query_postal_code(pin_str)
if row is None:
_PGEOCODE_LOOKUP_CACHE[pin_str] = {}
return {}
def _safe_str(val) -> str:
if val is None:
return ""
try:
if isinstance(val, float) and math.isnan(val):
return ""
except Exception:
pass
return str(val).strip()
def _safe_float(val):
try:
f = float(val)
return None if math.isnan(f) else f
except Exception:
return None
result = {
"district": _safe_str(getattr(row, "county_name", "")),
"state": _safe_str(getattr(row, "state_name", "")),
"place": _safe_str(getattr(row, "place_name", "")),
"lat": _safe_float(getattr(row, "latitude", None)),
"lng": _safe_float(getattr(row, "longitude", None)),
}
_PGEOCODE_LOOKUP_CACHE[pin_str] = result
return result
except Exception as e:
logger.debug("pgeocode lookup error for %s: %s", pin_str, e)
_PGEOCODE_LOOKUP_CACHE[pin_str] = {}
return {}
# =========================================================
# BANK / SYSTEM INTERNAL STATE CODES
# Maps non-standard codes used by banks/systems to canonical
# state names recognised by STATE_MAPPING.
# =========================================================
_BANK_STATE_CODE_MAP: dict = {
# Delhi internal codes
"NDH": "DELHI", "SDH": "DELHI", "CDH": "DELHI",
"EDH": "DELHI", "WDH": "DELHI", "NWD": "DELHI",
"SWD": "DELHI", "NED": "DELHI",
# City-based codes used as state
"MUM": "MAHARASHTRA", "BOM": "MAHARASHTRA",
"BLR": "KARNATAKA", "BNG": "KARNATAKA",
"HYD": "TELANGANA", "SCB": "TELANGANA",
"CHN": "TAMIL NADU", "MAD": "TAMIL NADU",
"KOL": "WEST BENGAL","CAL": "WEST BENGAL",
"PUN": "MAHARASHTRA","PCM": "MAHARASHTRA",
"AHM": "GUJARAT", "AMD": "GUJARAT",
"JAI": "RAJASTHAN",
"LKO": "UTTAR PRADESH", "KNP": "UTTAR PRADESH",
"PAT": "BIHAR",
"RNC": "JHARKHAND",
"BHU": "ODISHA",
"GHY": "ASSAM",
"CCU": "WEST BENGAL",
# Dotted abbreviations sometimes seen
"A.P.": "ANDHRA PRADESH", "A.P": "ANDHRA PRADESH",
"T.N.": "TAMIL NADU", "T.N": "TAMIL NADU",
"U.P.": "UTTAR PRADESH", "U.P": "UTTAR PRADESH",
"M.P.": "MADHYA PRADESH", "M.P": "MADHYA PRADESH",
"H.P.": "HIMACHAL PRADESH","H.P": "HIMACHAL PRADESH",
"W.B.": "WEST BENGAL", "W.B": "WEST BENGAL",
}
def standardize_state(state_str):
"""
Standardize state names to canonical lowercase form.
Handles:
- Standard ISO abbreviations (AP, TS, KA …)
- Full state names and common variants
- Bank/system internal codes (NDHβ†’DELHI, BLRβ†’KARNATAKA …)
- Dotted abbreviations (A.P., T.N. …)
"""
if not state_str:
return None
state_str = clean_text(state_str)
if not state_str:
return None
normalized = state_str.strip()
lookup_key = normalized.upper()
# Check bank/system internal codes FIRST (before STATE_MAPPING)
if lookup_key in _BANK_STATE_CODE_MAP:
canonical = _BANK_STATE_CODE_MAP[lookup_key]
# Now resolve canonical through STATE_MAPPING for full normalisation
if STATE_MAPPING:
std_name = lookup_from_mapping(canonical, STATE_MAPPING)
if std_name:
return std_name.upper()
return canonical.upper()
if STATE_MAPPING:
std_name = lookup_from_mapping(lookup_key, STATE_MAPPING)
if std_name:
return std_name.upper()
if not state_name_standard_df.empty:
state_mappping_df = state_name_standard_df.copy()
state_mappping_df.columns = state_mappping_df.columns.str.upper()
state_name = replace_with_standard(lookup_key, state_mappping_df)
if state_name != "" and state_name != lookup_key:
return state_name.upper()
return normalized
def standardize_city(city_str):
"""
Standardize city names to canonical lowercase form.
"""
if not city_str:
return None
city_str = clean_text(city_str)
if not city_str:
return None
normalized = city_str.strip()
lookup_key = normalized.upper()
if CITY_MAPPING:
std_name = lookup_from_mapping(lookup_key, CITY_MAPPING)
if std_name:
return std_name.upper()
if not city_prev_pres_df.empty:
city_prev_pres_data = city_prev_pres_df.copy()
city_prev_pres_data.columns = city_prev_pres_data.columns.str.upper()
city_name = replace_with_standard(lookup_key, city_prev_pres_data)
if city_name != "" and city_name != lookup_key:
return city_name.upper()
return normalized
def standardize_column(text, column_name):
"""
Standardize field values to canonical lowercase form.
"""
if not text:
return None
if isinstance(text, str):
text = clean_text(text) # includes lowercase
if not text:
return None
column_lower = str(column_name).upper() if column_name else ""
if "addressline" in column_lower:
if not hno_variation_df.empty:
try:
address_df = hno_variation_df.copy()
address_df.columns = address_df.columns.str.upper()
# Lookup needs uppercase key but we return lowercase
text_upper = text.upper()
result = replace_with_standard(text_upper, address_df)
text = result.upper() if result else text
except Exception as e:
pass
if column_lower == 'pan':
return validate_and_normalize_pan(text.upper() if text else text)
elif column_lower == 'aadhar':
return validate_and_normalize_aadhar(text)
return text
def standardize_dob(dob_str):
if not dob_str:
return None
# NOTE: Do NOT apply data cleaning pipeline for DOB.
# The cleaning pipeline is designed for text fields (names, addresses)
# and corrupts date strings (e.g., '2002-sept-30' -> '2002-SESUB STRING').
# normalize_dob already handles all date parsing and normalization.
raw_input = dob_str
dob_str = normalize_dob(dob_str)
# print(f"DOB: input='{raw_input}' -> normalized='{dob_str}'")
return dob_str
# =========================================================
# FIELD COMPARISON FUNCTIONS
# =========================================================
def compare_exact(val1, val2):
"""Exact match (case-insensitive)"""
if not val1 or not val2:
return 0
# print("dob1 value",val1)
# print("dob2 value",val2)
v1 = str(val1).strip().upper()
v2 = str(val2).strip().upper()
return 100 if v1 == v2 else 0
def compare_any_match(list1, list2, field_type="pincode"):
"""
1:N matching for lists of values (pincodes, states, cities)
Returns 100 if any value in list1 matches any value in list2
"""
valid_list1 = [v for v in list1 if v and str(v).strip() not in ["", "-", " "]]
valid_list2 = [v for v in list2 if v and str(v).strip() not in ["", "-", " "]]
if not valid_list1 or not valid_list2:
return 0
# Normalize based on field type
if field_type == "pincode":
normalized_list1 = [validate_and_normalize_pincode(v) for v in valid_list1]
normalized_list2 = [validate_and_normalize_pincode(v) for v in valid_list2]
elif field_type == "state":
normalized_list1 = [standardize_state(v) for v in valid_list1]
normalized_list2 = [standardize_state(v) for v in valid_list2]
elif field_type == "city":
normalized_list1 = [standardize_city(v) for v in valid_list1]
normalized_list2 = [standardize_city(v) for v in valid_list2]
elif field_type == "dob" or field_type == "birthdate":
normalized_list1 = [standardize_dob(v) for v in valid_list1]
normalized_list2 = [standardize_dob(v) for v in valid_list2]
else:
normalized_list1 = [str(v).strip().upper() for v in valid_list1]
normalized_list2 = [str(v).strip().upper() for v in valid_list2]
normalized_list1 = [v for v in normalized_list1 if v]
normalized_list2 = [v for v in normalized_list2 if v]
if not normalized_list1 or not normalized_list2:
return 0
for v1 in normalized_list1:
if v1 in normalized_list2:
return 100
return 0
def compare_phone_any_match(phones1, phones2):
"""1:N matching for phone numbers"""
valid_phones1 = [validate_and_normalize_phone(p) for p in phones1 if p]
valid_phones2 = [validate_and_normalize_phone(p) for p in phones2 if p]
valid_phones1 = [p for p in valid_phones1 if p]
valid_phones2 = [p for p in valid_phones2 if p]
if not valid_phones1 or not valid_phones2:
return 0
for p1 in valid_phones1:
if p1 in valid_phones2:
return 100
return 0
def compare_email_any_match(emails1, emails2):
"""1:N matching for email addresses"""
valid_emails1 = [validate_and_normalize_email(e) for e in emails1 if e]
valid_emails2 = [validate_and_normalize_email(e) for e in emails2 if e]
valid_emails1 = [e for e in valid_emails1 if e]
valid_emails2 = [e for e in valid_emails2 if e]
if not valid_emails1 or not valid_emails2:
return 0
for e1 in valid_emails1:
if e1 in valid_emails2:
return 100
return 0
# =========================================================
# MATCHING RULES
# =========================================================
def evaluate_matching_rules(field_scores: Dict[str, float]) -> tuple:
"""
Evaluate matching rules and return overall decision
Returns: (decision, reason)
"""
def get_score(field_name):
return field_scores.get(field_name, 0)
def rule_satisfied(conditions):
for field, threshold in conditions:
if get_score(field) < threshold:
return False
return True
# Matching rules in priority order
RULES = MATCHING_RULES
for conditions, reason in RULES:
if rule_satisfied(conditions):
return "Match", reason
return "No Match", "None of the defined matching rules were satisfied"
# =========================================================
# PATTERN-BASED FIELD MATCHING
# =========================================================
def apply_pattern_matching_logic(field_name: str, score) -> float:
"""
Apply 0 or 100 logic for pattern-based fields
"""
PATTERN_FIELDS = {
"BIRTHDATE", "PHONE", "EMAIL", "ZIPCODE",
"TAXID", "LICENSEID", "PASSPORTID", "GENDER",
"AADHAR", "PAN"
}
if score == "missing value":
return 0
if field_name in PATTERN_FIELDS:
return 100 if score >= 100 else 0
return score
def roman_to_number(text):
"""Convert Roman numerals to Arabic numbers in text"""
if not text or not isinstance(text, str):
return str(text) if text else "" # Always return a string
def roman_to_int(roman):
roman = roman.upper()
# Strict Roman Numeral Regex
# M (1000), CM (900), D (500), CD (400), C (100), XC (90), L (50), XL (40), X (10), IX (9), V (5), IV (4), I (1)
# Repeated characters allowed up to 3 times for I, X, C, M.
# V, L, D cannot be repeated.
strict_regex = r"^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$"
if not re.fullmatch(strict_regex, roman):
return None
values = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000}
total = 0
prev = 0
for ch in reversed(roman):
val = values.get(ch, 0)
if val < prev:
total -= val
else:
total += val
prev = val
return total
# Only match if surrounded by boundaries, and ensure it's a plausible Roman numeral
# Filter out common false positives by checking against strict regex inside the replacement function
pattern = r'\b([IVXLCDM]+)\b'
def replace_roman(match):
roman = match.group(1)
# Skip if it's likely a word (e.g., "MIX", "DIV", "VILL", "MILL")
# But Strict Regex should handle "VILL" (LL invalid), "MILL" (LL invalid), "DIV" (IV valid, D valid... DIV? D=500, IV=4. 504? No, value order. D > I < V. Good.)
# Strict regex logic:
# V I L L -> L, L invalid repetition for 50.
number = roman_to_int(roman)
return str(number) if number is not None else roman
replaced_roman = re.sub(pattern, replace_roman, text, flags=re.IGNORECASE)
return replaced_roman
def normalize_and_deduplicate_address(text):
"""Remove duplicate words from entire address while preserving order"""
if not text or not isinstance(text, str):
return ""
segments = text.split(',')
seen = set()
deduplicated_segments = []
for segment in segments:
words = segment.strip().split()
unique_words = []
for word in words:
key = word.upper()
if key not in seen:
seen.add(key)
unique_words.append(word)
if unique_words:
deduplicated_segments.append(" ".join(unique_words))
return " ".join(deduplicated_segments)
def extract_leading_house_number(segment, street_keywords):
"""Extract house number if it appears as the FIRST token"""
tokens = segment.strip().split()
if len(tokens) < 1:
return None
first = tokens[0].upper()
if not re.fullmatch(r"[A-Z]?\d+[A-Z]?", first):
return None
if len(tokens) >= 2:
second = tokens[1].upper()
keywords_list = [street_keywords] if isinstance(street_keywords, str) else street_keywords
if second in [kw.upper() for kw in keywords_list]:
return None
return first
def is_street_context(text, match_start, street_keywords):
"""Check if a match occurs near street keywords"""
window = text[max(0, match_start - 20):match_start]
keywords_list = [street_keywords] if isinstance(street_keywords, str) else street_keywords
for kw in keywords_list:
if re.search(rf"\b{re.escape(kw)}\b", window, re.IGNORECASE):
return True
return False
def extract_component_with_hierarchy(text, identifier, value_patterns=None, street_keywords=None):
"""
Hierarchical extraction working directly on full address string.
No comma-based segmentation.
Returns: (extracted_value, identifier_found, pattern_value)
"""
if not text:
return None, None, None
# PRIORITY 1: Identifier + Pattern match
if identifier:
id_match = re.search(rf"\b{re.escape(identifier)}\b", text, re.IGNORECASE)
if id_match:
if value_patterns:
# Search for pattern AFTER the identifier
text_after_id = text[id_match.end():]
for pattern in value_patterns:
m = re.search(pattern, text_after_id, re.IGNORECASE)
if m:
return m.group(0).strip(), identifier, m.group(0).strip()
# Identifier found but no pattern matched β€” return identifier found signal
return None, identifier, None
else:
# No pattern needed, extract everything after identifier till delimiter
text_after = text[id_match.end():].strip()
# Take until next comma or end
value = re.split(r"[,]", text_after)[0].strip()
return value if value else None, identifier, None
# PRIORITY 2: Leading house number heuristic (only for house extraction)
if street_keywords is not None:
leading = extract_leading_house_number(text, street_keywords)
if leading:
return leading, None, leading
# PRIORITY 3: Pattern-only match
if value_patterns:
for pattern in value_patterns:
for match in re.finditer(pattern, text, re.IGNORECASE):
extracted_value = match.group(0).strip()
if street_keywords:
if is_street_context(text, match.start(), street_keywords):
continue
return extracted_value, None, extracted_value
return None, None, None
def remove_matched_text(text, identifier=None, pattern_value=None):
"""
Remove identifier and/or pattern value directly from full address string.
Rules:
- If identifier present AND pattern matched: remove both
- If identifier present but no pattern matched: remove identifier only
- If no identifier, only pattern matched: remove pattern value only
"""
if not text:
return ""
result = text
if identifier:
result = re.sub(
rf"\b{re.escape(identifier)}\b[\s#.:/-]*",
" ",
result,
flags=re.IGNORECASE
)
if pattern_value:
result = re.sub(
rf"\b{re.escape(pattern_value)}\b[\s#.:/-]*",
" ",
result,
flags=re.IGNORECASE
)
# Cleanup
result = re.sub(r"\s{2,}", " ", result).strip()
result = re.sub(r"^[,\s]+|[,\s]+$", "", result)
return result
def extract_address_components(address_line: str) -> dict:
"""
Master extraction function β€” no comma segmentation.
Works directly on full address string.
"""
empty_result = {
"original_address": "",
"house_number": None,
"house_segment": None,
"flat_number": None,
"flat_segment": None,
"apartment": None,
"apartment_segment": None,
"street": None,
"street_segment": None,
"remaining_address": ""
}
if not address_line:
return empty_result
address_line = clean_text(str(address_line))
if not address_line:
return empty_result
original_address = address_line
remaining = address_line
# Step 1: Roman numeral conversion
remaining = roman_to_number(remaining)
remaining = str(remaining) if remaining else ""
# ── Augment street keywords to always block GALI NO / LANE NO style phrases ─
_base_kws = [STREET_KEYWORD] if isinstance(STREET_KEYWORD, str) else list(STREET_KEYWORD)
_street_kws = list(dict.fromkeys(
_base_kws + ["GALI NO", "LANE NO", "GALI", "GALLI"]
))
# ── Patterns ──────────────────────────────────────────────────────────────
house_patterns = [
r"\b(MIG|HIG|LIG)-\d+[a-zA-Z]?\b",
r"\b\d+(?:-\d+){2,}[a-zA-Z]?\b",
r"\b\d+-\d+/\d+[a-zA-Z]?\b",
r"\b\d+-\d+/[a-zA-Z]\b",
r"\b\d+-\d+/\d+\b",
r"\b\d+/\d+(?:/\d+)?\s?[a-zA-Z]?\b",
r"\b[a-zA-Z]{1,3}/\d+[a-zA-Z]?\b",
r"\b\d+-\d+[a-zA-Z]\b",
r"\b\d+-\d+\b",
r"\b[a-zA-Z]{1,2}-?\d+[a-zA-Z]?\b",
r"\b\d+[a-zA-Z]\b",
r"\b\d{1,4}\b",
]
flat_patterns = [
r"\b\d+[a-zA-Z]?\b",
r"\b[a-zA-Z]-?\d+\b",
]
# ── 1. HOUSE NUMBER ───────────────────────────────────────────────────────
house_no, house_id_found, house_pat_val = extract_component_with_hierarchy(
remaining,
HOUSE_NUMBER_IDENTIFIER,
house_patterns,
_street_kws
)
house_segment = None
if house_id_found or house_pat_val:
house_segment = remaining # record full text at time of extraction
remaining = remove_matched_text(remaining, house_id_found, house_pat_val)
remaining = str(remaining) if remaining else ""
# ── 2. FLAT NUMBER ────────────────────────────────────────────────────────
# Pass street_keywords so numbers inside GALI NO / LANE NO / etc. are blocked
flat_no, flat_id_found, flat_pat_val = extract_component_with_hierarchy(
remaining,
FLAT_NUMBER_IDENTIFIER,
flat_patterns,
street_keywords=_street_kws
)
flat_segment = None
if flat_id_found or flat_pat_val:
flat_segment = remaining
remaining = remove_matched_text(remaining, flat_id_found, flat_pat_val)
remaining = str(remaining) if remaining else ""
# ── 3. APARTMENT/BUILDING ─────────────────────────────────────────────────
apartment, apt_id_found, apt_pat_val = extract_component_with_hierarchy(
remaining,
APARTMENT_IDENTIFIER
)
apartment_segment = None
if apt_id_found or apt_pat_val:
apartment_segment = remaining
remaining = remove_matched_text(remaining, apt_id_found, apt_pat_val)
remaining = str(remaining) if remaining else ""
# ── 4. STREET ─────────────────────────────────────────────────────────────
street, street_id_found, street_pat_val = extract_component_with_hierarchy(
remaining,
STREET_KEYWORD,
street_keywords=_street_kws
)
street_segment = None
if street_id_found or street_pat_val:
street_segment = remaining
remaining = remove_matched_text(remaining, street_id_found, street_pat_val)
remaining = str(remaining) if remaining else ""
# ── Final cleanup ─────────────────────────────────────────────────────────
# Step 1: remove "GALI NO 3A" style β€” keyword + its value together
remaining = re.sub(
r'\b(GALI|LANE|CROSS|MAIN)\s+NO\s+[A-Z0-9][A-Z0-9\-]*\b[\s,]*',
' ', remaining, flags=re.IGNORECASE
)
# Step 2: remove bare "GALI NO" / "LANE NO" with no value following
remaining = re.sub(
r'\b(GALI|LANE|ROAD|MARG|STREET|CROSS|MAIN)\s+NO\b[\s,]*',
' ', remaining, flags=re.IGNORECASE
)
remaining = re.sub(r"\s+", " ", remaining).strip()
remaining = re.sub(r"^[,\s]+|[,\s]+$", "", remaining)
remaining = normalize_and_deduplicate_address(remaining)
print(f"[EXTRACT] house_no : {house_no!r} | segment: {house_segment!r}")
print(f"[EXTRACT] flat_no : {flat_no!r} | segment: {flat_segment!r}")
print(f"[EXTRACT] apartment : {apartment!r} | segment: {apartment_segment!r}")
print(f"[EXTRACT] street : {street!r} | segment: {street_segment!r}")
print(f"[EXTRACT] remaining_addr: {remaining!r}")
return {
"original_address": original_address,
"house_number": house_no,
"house_segment": house_segment,
"flat_number": flat_no,
"flat_segment": flat_segment,
"apartment": apartment,
"apartment_segment": apartment_segment,
"street": street,
"street_segment": street_segment,
"remaining_address": remaining if remaining else ""
}
# =========================================================
# STRUCTURED ADDRESS MATCHING
# Handles data format: separate ADDRESSLINE / CITY / ZIPCODE / STATE columns
# =========================================================
# Non-HNO structural numbers (sector/ward/phase) β€” never treat as house no
_STRUCT_NON_HNO = re.compile(
r'\b(sector|ward|phase|block|zone|gali\s*no|gali\s*number|lane\s*no)\s*'
r'[:\-]?\s*(\d+[A-Z]?)',
re.IGNORECASE,
)
# Keyword-based house number extractor
_STRUCT_HNO_KW = re.compile(
r'\b(?:d\.?\s*no\.?|door\s*no\.?|h\.?\s*no\.?|house\s*no\.?|'
r'house\s*number|plot\s*no\.?|flat\s*no\.?|flat\s*number|'
r'mig\s*no\.?|hig\s*no\.?|lig\s*no\.?|'
r'khata\s*no\.?|khasra\s*no\.?)'
r'\s*[:\-]?\s*([A-Z0-9][A-Z0-9\-/]*)',
re.IGNORECASE,
)
def extract_house_number_from_addressline(text: str) -> str:
"""
Extract house/door number from a raw addressline string.
No keyword required β€” uses pattern priority:
1. Explicit keyword (H.No, D.No, House No, Flat No …)
2. Compound formats: 2-6-116, 144/143, MIG-25, 1-180a
3. Simple alpha-numeric: 12B, A-110
Excludes sector/ward/phase numbers.
Returns normalised uppercase string or empty string.
"""
if not text:
return ""
excluded = {m.group(2).strip().upper() for m in _STRUCT_NON_HNO.finditer(text)}
# Priority 1: keyword
m = _STRUCT_HNO_KW.search(text)
if m:
val = m.group(1).strip().upper()
if val not in excluded:
return val
# Priority 2 & 3: patterns most-specific first
_pats = [
r'\b((?:MIG|HIG|LIG)-\d+[A-Z]?)\b',
r'\b(\d+(?:-\d+){2,}[A-Z]?)\b',
r'\b(\d+-\d+/\d+[A-Z]?)\b',
r'\b(\d+/\d+(?:/\d+)?[A-Z]?)\b',
r'\b([A-Z]{1,3}/\d+[A-Z]?)\b',
r'\b(\d+-\d+[A-Z]?)\b',
r'\b([A-Z]-?\d+[A-Z]?)\b',
r'\b(\d+[A-Z])\b',
]
for pat in _pats:
for m in re.finditer(pat, text, re.IGNORECASE):
val = m.group(1).strip().upper()
if val not in excluded:
before = text[:m.start()].upper()
if not re.search(r'\b(sector|ward|phase|block|zone|gali)\s*$', before):
return val
return ""
class _StructuredAddressRecord:
"""
Internal helper: holds one address record with separate column values.
Enriches missing state/city from zipcode via pgeocode.
Extracts all address components (house_number, flat_number, apartment, street)
and stores the remaining address (all components removed) for model input.
"""
__slots__ = ('raw_addressline', 'raw_city', 'raw_zipcode', 'raw_state',
'addressline', 'city', 'state', 'zipcode', 'pgeocode_info',
'house_number', 'flat_number', 'apartment', 'street')
def __init__(self, addressline="", city="", zipcode="", state=""):
self.raw_addressline = str(addressline or "").strip()
self.raw_city = str(city or "").strip()
self.raw_zipcode = str(zipcode or "").strip()
self.raw_state = str(state or "").strip()
self.addressline = ""
self.city = ""
self.state = ""
self.zipcode = ""
self.pgeocode_info = {}
self.house_number = ""
self.flat_number = ""
self.apartment = ""
self.street = ""
self._enrich()
@staticmethod
def _norm(val):
"""Normalize extracted component: strip non-alphanumerics and spaces."""
if not val:
return ""
return re.sub(r'[^A-Z0-9]', '', str(val).upper())
def _enrich(self):
# Addressline β€” full preprocessing pipeline
preprocessed = preprocess_address(self.raw_addressline).upper() if self.raw_addressline else ""
# Zipcode β€” digits only, must be 6
pin_clean = re.sub(r'\D', '', self.raw_zipcode)
self.zipcode = pin_clean if len(pin_clean) == 6 else ""
# State β€” canonical form
self.state = standardize_state(self.raw_state) or ""
# City β€” canonical form
self.city = standardize_city(self.raw_city) or ""
# Extract all address components from raw addressline
components = extract_address_components(self.raw_addressline)
self.house_number = self._norm(components.get("house_number"))
self.flat_number = self._norm(components.get("flat_number"))
self.apartment = self._norm(components.get("apartment"))
self.street = self._norm(components.get("street"))
# Model input = remaining address after all components removed
remaining = components.get("remaining_address", "").strip()
self.addressline = remaining if remaining else preprocessed
# pgeocode enrichment β€” fill missing state/city from pincode
if self.zipcode:
self.pgeocode_info = lookup_pincode_info(self.zipcode)
if not self.state and self.pgeocode_info.get("state"):
self.state = standardize_state(self.pgeocode_info["state"]) or ""
if not self.city and self.pgeocode_info.get("district"):
self.city = standardize_city(self.pgeocode_info["district"]) or ""
def match_structured_address_fields(
addressline1: str, city1: str, zipcode1: str, state1: str,
addressline2: str, city2: str, zipcode2: str, state2: str,
) -> dict:
"""
Match two address records provided as already-split column values
(ADDRESSLINE, CITY, ZIPCODE, STATE).
Address component scoring (applied only when remaining address base_score > 60):
house_number : match β†’ +30, mismatch β†’ -30
flat_number : match β†’ +10, mismatch β†’ -10
street : match β†’ +10, mismatch β†’ -10
apartment : match β†’ +10, mismatch β†’ -10
(missing on either side β†’ no adjustment for that component)
If base_score <= 60, component adjustments are NOT applied.
"""
from rapidfuzz import fuzz as _rfuzz
r1 = _StructuredAddressRecord(addressline1, city1, zipcode1, state1)
r2 = _StructuredAddressRecord(addressline2, city2, zipcode2, state2)
# ── Zipcode ──────────────────────────────────────────────
if r1.zipcode and r2.zipcode:
if r1.zipcode == r2.zipcode:
zip_cmp = {"verdict": "match", "adjustment": 20.0, "z1": r1.zipcode, "z2": r2.zipcode}
else:
zip_cmp = {"verdict": "mismatch", "adjustment": -25.0, "z1": r1.zipcode, "z2": r2.zipcode}
else:
zip_cmp = {"verdict": "missing", "adjustment": 0.0, "z1": r1.zipcode, "z2": r2.zipcode}
# ── State ────────────────────────────────────────────────
s1, s2 = r1.state, r2.state
if s1 and s2:
if s1 == s2:
state_cmp = {"verdict": "match", "adjustment": 10.0, "s1": s1, "s2": s2}
else:
state_cmp = {"verdict": "mismatch", "adjustment": -20.0, "s1": s1, "s2": s2}
else:
state_cmp = {"verdict": "missing", "adjustment": 0.0, "s1": s1, "s2": s2}
# ── City ─────────────────────────────────────────────────
c1, c2 = r1.city, r2.city
if c1 and c2:
sim = _rfuzz.token_set_ratio(c1, c2)
if sim >= 85:
city_cmp = {"verdict": "match", "adjustment": 10.0, "c1": c1, "c2": c2, "similarity": sim}
elif sim >= 60:
city_cmp = {"verdict": "partial", "adjustment": 3.0, "c1": c1, "c2": c2, "similarity": sim}
else:
city_cmp = {"verdict": "mismatch","adjustment":-10.0, "c1": c1, "c2": c2, "similarity": sim}
else:
city_cmp = {"verdict": "missing", "adjustment": 0.0, "c1": c1, "c2": c2, "similarity": 0}
# ── Base addressline text similarity (on remaining address) ──────────────
t1, t2 = r1.addressline, r2.addressline
if t1 and t2:
try:
from services.model import match_entities
from services.config import ADDRESS_MODEL_WEIGHTS
base_score = float(match_entities(t1, t2, weights=ADDRESS_MODEL_WEIGHTS))
except Exception:
base_score = float(max(
_rfuzz.token_set_ratio(t1, t2),
_rfuzz.WRatio(t1, t2),
_rfuzz.ratio(t1, t2),
))
else:
base_score = 0.0
# ── Per-component comparison (boost/penalty only when base_score > 60) ───
def _compare_component(v1, v2, boost, penalty):
"""Compare two normalized component values. Returns result dict."""
if v1 and v2:
if v1 == v2:
return {"verdict": "match", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty}
else:
return {"verdict": "mismatch", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty}
return {"verdict": "missing", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty}
hno_cmp = _compare_component(r1.house_number, r2.house_number, boost=30.0, penalty=30.0)
flat_cmp = _compare_component(r1.flat_number, r2.flat_number, boost=10.0, penalty=10.0)
apt_cmp = _compare_component(r1.apartment, r2.apartment, boost=10.0, penalty=10.0)
str_cmp = _compare_component(r1.street, r2.street, boost=10.0, penalty=10.0)
# Apply component adjustments only when remaining address score > 60
comp_adj = 0.0
print(f"[ADDR_COMPONENTS] base_score={base_score:.2f} | threshold=60 | adjustments_applied={base_score > 60}")
print(f" remaining_addr1 : {r1.addressline!r}")
print(f" remaining_addr2 : {r2.addressline!r}")
for cmp, label in [
(hno_cmp, "house_number"),
(flat_cmp, "flat_number"),
(apt_cmp, "apartment"),
(str_cmp, "street"),
]:
verdict = cmp["verdict"]
v1, v2 = cmp.get("v1", ""), cmp.get("v2", "")
if verdict == "missing":
print(f" {label:<15} | verdict=missing | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [skipped - component absent]")
elif base_score <= 60:
sign = "+" if verdict == "match" else "-"
pts = cmp["boost"] if verdict == "match" else cmp["penalty"]
print(f" {label:<15} | verdict={verdict:<9} | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [SKIPPED - base_score<=60]")
else:
if verdict == "match":
adj = cmp["boost"]
comp_adj += adj
print(f" {label:<15} | verdict=match | v1={v1!r:>10} v2={v2!r:<10} | adjustment=+{adj:.1f} [BOOSTED]")
else:
adj = cmp["penalty"]
comp_adj -= adj
print(f" {label:<15} | verdict=mismatch | v1={v1!r:>10} v2={v2!r:<10} | adjustment=-{adj:.1f} [PENALISED]")
print(f" total comp_adj : {comp_adj:+.1f}")
# ── Accumulate and cap ───────────────────────────────────
total_adj = (zip_cmp["adjustment"] + state_cmp["adjustment"]
+ city_cmp["adjustment"] + comp_adj)
final_score = max(0.0, min(100.0, base_score + total_adj))
# ── Notes ────────────────────────────────────────────────
notes = []
for cmp, key, v1k, v2k in [
(zip_cmp, "zipcode", "z1", "z2"),
(state_cmp, "state", "s1", "s2"),
(city_cmp, "city", "c1", "c2"),
]:
v = cmp["verdict"]
if v == "match":
notes.append(f"{key} match ({cmp.get(v1k,'')})")
elif v == "mismatch":
notes.append(f"{key} MISMATCH ({cmp.get(v1k,'')} β‰  {cmp.get(v2k,'')})")
for cmp, key in [(hno_cmp, "house_no"), (flat_cmp, "flat_no"),
(apt_cmp, "apartment"), (str_cmp, "street")]:
v = cmp["verdict"]
if v == "match":
notes.append(f"{key} match ({cmp['v1']})")
elif v == "mismatch":
notes.append(f"{key} MISMATCH ({cmp['v1']} β‰  {cmp['v2']})"
+ (" [applied]" if base_score > 60 else " [skipped, base<=60]"))
return {
"final_score": round(final_score, 2),
"base_score": round(base_score, 2),
"adjustment": round(total_adj, 2),
"comp_adjustment": round(comp_adj, 2),
"zipcode": zip_cmp,
"state": state_cmp,
"city": city_cmp,
"house_number": hno_cmp,
"flat_number": flat_cmp,
"apartment": apt_cmp,
"street": str_cmp,
"record1": {
"addressline": r1.addressline,
"city": r1.city,
"state": r1.state,
"zipcode": r1.zipcode,
"house_number": r1.house_number or None,
"flat_number": r1.flat_number or None,
"apartment": r1.apartment or None,
"street": r1.street or None,
"pgeocode": r1.pgeocode_info,
},
"record2": {
"addressline": r2.addressline,
"city": r2.city,
"state": r2.state,
"zipcode": r2.zipcode,
"house_number": r2.house_number or None,
"flat_number": r2.flat_number or None,
"apartment": r2.apartment or None,
"street": r2.street or None,
"pgeocode": r2.pgeocode_info,
},
"notes": notes,
}
def match_structured_address_lists(
addrs1: list,
addrs2: list,
) -> float:
"""
Match N address dicts from record1 against M from record2.
Each dict: {addressline, city, zipcode, state}.
Returns best score across all NΓ—M combinations (0-100).
"""
if not addrs1 or not addrs2:
return 0.0
best = 0.0
for a1 in addrs1:
for a2 in addrs2:
r = match_structured_address_fields(
a1.get("addressline", ""), a1.get("city", ""),
a1.get("zipcode", ""), a1.get("state", ""),
a2.get("addressline", ""), a2.get("city", ""),
a2.get("zipcode", ""), a2.get("state", ""),
)
if r["final_score"] > best:
best = r["final_score"]
return round(best, 2)