import re
from datetime import datetime
from typing import List, Dict
import pandas as pd
import logging
try:
import pgeocode
except ImportError:
pgeocode = None
import math
logger = logging.getLogger("rules")
from services.config import (
config,
APARTMENT_IDENTIFIER,
FLAT_NUMBER_IDENTIFIER,
HOUSE_NUMBER_IDENTIFIER,
STREET_KEYWORD,
name_variation_df,
hno_variation_df,
city_prev_pres_df,
state_name_standard_df,
sur_comm_names_df,
pin_city_state_df,
CITY_MAPPING,
STATE_MAPPING,
MATCHING_RULES
)
# =========================================================
# TEXT CLEANING
# =========================================================
def clean_text(text):
"""
Simple text cleaning for all input values:
1. Strip leading/trailing whitespace
2. Remove HTML tags and HTML entities
3. Remove non-printable/control characters and unicode artifacts
4. Collapse multiple whitespace into single space
5. Convert to lowercase
"""
if not text or not isinstance(text, str):
return "" if text is None else text
# Remove HTML tags (e.g.,
,
...
) text = re.sub(r'<[^>]+>', ' ', text) # Remove HTML entities (e.g., &, , {) text = re.sub(r'&(?:#\d+|#x[0-9a-fA-F]+|[a-zA-Z]+);', ' ', text) # Remove non-printable and control characters (keep printable ASCII range 0x20-0x7E) text = re.sub(r'[^\x20-\x7E]', '', text) # Collapse multiple whitespace into single space text = re.sub(r'\s+', ' ', text) # Strip leading/trailing spaces text = text.strip() # Convert to lowercase text = text.upper() return text logger.info("Using simple text cleaning (no regex/keyword pipeline)") # ========================================================= # NAME PREPROCESSING # ========================================================= # Titles/honorifics to remove from names NAME_TITLES = { "dr", "mr", "mrs", "ms", "miss", "master", "m/s", "sri", "sree", "shri", "shree", "smt", "shrimati", "kumari", "prof", "late", "er", "adv", "ca", "capt", "col", "lt", "major", "brig", "brigadier", "cmdr", "commander", "wingcmdr", "groupcapt", "justice", "judge", "cj", "chiefjustice", "ias", "ips", "ifs", "pt", "pandit", "swami", "guru", "maulana", "maulvi", "haji", "haj", "imam", "maharaj", "sardar", "phd", "md", "dphil", } # Regex for titles that may appear without a space (e.g., "dr.rajesh") _TITLE_PATTERN = re.compile( r'^(dr\.?|mr\.?|mrs\.?|ms\.?|miss|m/s\.?|sri|sree|shri|shree|' r'smt\.?|prof\.?|late|er|adv|ca|capt|col|lt|major|justice|' r'shrimati|kumari|master|brig|brigadier|cmdr|commander|wingcmdr|' r'groupcapt|judge|cj|chiefjustice|ias|ips|ifs|pt|pandit|swami|' r'guru|maulana|maulvi|haji|haj|imam|maharaj|sardar|phd|dphil)\s*', re.IGNORECASE ) # Relational prefixes: "sita w/o ram" → "sita ram" _RELATIONAL_PATTERNS = re.compile( r'\b(?:s/o|d/o|w/o|h/o|c/o|g/o|' r'son\s+of|daughter\s+of|wife\s+of|husband\s+of|care\s+of|guardian\s+of|' r'so|do|wo|ho|co|go)\b', re.IGNORECASE ) def remove_name_titles(text): """ Remove title prefixes and suffixes from name. Handles both space-separated ("mr rajesh") and dot-attached ("dr.rajesh"). """ if not text: return "" # 1. Token-based removal (handles space-separated titles) tokens = text.upper().split() # Remove from front while tokens and tokens[0].rstrip('.') in NAME_TITLES: tokens.pop(0) # Remove from back while tokens and tokens[-1].rstrip('.') in NAME_TITLES: tokens.pop() text = " ".join(tokens) # 2. Regex fallback for no-space cases (e.g., "dr.rajesh") text = _TITLE_PATTERN.sub('', text) return text.strip() def remove_relational_prefixes(text): """ Remove relational prefixes from names. "sita w/o ram" → "sita ram" "anil s/o suresh" → "anil suresh" """ if not text: return "" text = _RELATIONAL_PATTERNS.sub(' ', text) text = re.sub(r'\s+', ' ', text).strip() return text def remove_non_alpha_trailing(text): """ Remove non-alpha trailing content from names. "anil kumar 1/05/1985" → "anil kumar" "rajesh 12345" → "rajesh" Keeps only alphabetic tokens from the name. """ if not text: return "" tokens = text.split() cleaned = [] for token in tokens: # Keep token only if it contains at least one letter if re.search(r'[a-zA-Z]', token): # Remove any non-alpha characters within the token alpha_only = re.sub(r'[^a-zA-Z]', '', token) if alpha_only: cleaned.append(alpha_only) return " ".join(cleaned) def deduplicate_tokens(text): """ Remove repeated tokens, keeping only unique ones in order. "rajesh kumar rajesh" → "rajesh kumar" """ if not text: return "" tokens = text.split() seen = set() unique = [] for t in tokens: key = t.lower() if key not in seen: seen.add(key) unique.append(t) return " ".join(unique) def deduplicate_consecutive_tokens(text): """ Remove only consecutively repeated tokens (for addresses). "mg road mg road bangalore" → "mg road bangalore" "mg road bangalore mg road" stays as-is (non-consecutive) """ if not text: return "" tokens = text.split() if not tokens: return "" result = [tokens[0]] for t in tokens[1:]: if t.upper() != result[-1].upper(): result.append(t) return " ".join(result) def collapse_repeated_chars(text): """ Preprocessing step: 1. Collapse 3+ consecutive identical alpha characters to 2 (typo fix) e.g., "MOHAMMMED" → "MOHAMMED", "SHARRMA" → "SHARMA" 2. Replace consecutive non-alphanumeric chars with single space e.g., "---" → " ", "..." → " " """ if not text: return "" # Collapse 3+ identical letters to 2 text = re.sub(r'([a-zA-Z])\1{2,}', r'\1\1', text) # Replace consecutive non-alphanumeric/non-space chars with single space text = re.sub(r'[^a-zA-Z0-9\s]{2,}', ' ', text) # Collapse multiple spaces text = re.sub(r'\s+', ' ', text) return text.strip() def strip_non_alphanumeric(text): """ Remove non-alphanumeric characters from text, keeping spaces. Used for address cleanup before sending to embedding model. """ if not text: return "" text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text) text = re.sub(r'\s+', ' ', text) return text.strip() def is_subset_match(tokens1, tokens2): """ Check if all tokens of one name are a complete subset of the other. Returns True if name1 tokens ⊆ name2 tokens or vice versa. e.g., ["rajesh", "kumar"] ⊆ ["rajesh", "kumar", "sharma"] → True """ if not tokens1 or not tokens2: return False set1 = {t.upper() for t in tokens1} set2 = {t.upper() for t in tokens2} return set1.issubset(set2) or set2.issubset(set1) def standardize_name_variations(text): """ Replace name token variations with standard forms using name_variation_standard.csv. Iterates through each token and checks if it exists as a variation. e.g., "mohommed" → "mohammad" """ if not text or name_variation_df.empty: return text if text else "" # Build a lookup dict for fast access (done once, cached) if not hasattr(standardize_name_variations, '_lookup'): lookup = {} for _, row in name_variation_df.iterrows(): var = str(row.get('VARIATION', '')).strip().upper() std = str(row.get('STANDARD', '')).strip().upper() if var and std: lookup[var] = std standardize_name_variations._lookup = lookup lookup = standardize_name_variations._lookup tokens = text.upper().split() result = [] for token in tokens: result.append(lookup.get(token, token)) return " ".join(result) # ========================================================= # ADDRESS VARIATION PREPROCESSING (PDF cases 1-19) # NAME VARIATION PREPROCESSING HELPERS (PDF cases 1-14) # ========================================================= # ─── already imported in original: re, pd, logging ────────── # =========================================================== # ADDRESS VARIATION HANDLERS (all 19 PDF cases) # =========================================================== # ── Case 1 & 4 : delimiter / special-char normalisation ── _ADDR_SPECIAL_CHARS = re.compile(r'[|#@$%^&*\(\)\[\]\{\};:\'\"\\<>?]') def _normalize_delimiters(text: str) -> str: """Replace non-standard delimiters with space; collapse whitespace.""" text = _ADDR_SPECIAL_CHARS.sub(' ', text) text = re.sub(r'\s+', ' ', text) return text.strip() # ── Case 4: hyphen normalisation inside house/flat numbers ── def _normalize_hyphens(text: str) -> str: """ Remove hyphens that are purely cosmetic inside alphanumeric tokens (e.g. '12-B' → '12B', 'A-110' → 'A110') while preserving hyphens that form compound locality names like 'Pimpri-Chinchwad'. """ def _dehyphen(m): a, b = m.group(1), m.group(2) # If both sides are digit-or-digit+letter treat as house number variation if re.fullmatch(r'[0-9]+[A-Z]?', a, re.I) and re.fullmatch(r'[A-Z]?[0-9]+[A-Z]?', b, re.I): return a + b return m.group(0) # leave intact (locality name) return re.sub(r'([A-Z0-9]+)-([A-Z0-9]+)', _dehyphen, text, flags=re.I) # ── Case 2: abbreviation expansion dictionary ── _ADDR_ABBREV = { # directions 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', 'NE': 'NORTH EAST', 'NW': 'NORTH WEST', 'SE': 'SOUTH EAST', 'SW': 'SOUTH WEST', # administrative 'NGR': 'NAGAR', 'NGRS': 'NAGAR', 'LYT': 'LAYOUT', 'LT': 'LAYOUT', 'HYD': 'HYDERABAD', 'BLR': 'BANGALORE', 'MUM': 'MUMBAI', 'DEL': 'DELHI', 'CHN': 'CHENNAI', 'KOL': 'KOLKATA', 'PUN': 'PUNE', 'AHM': 'AHMEDABAD', # road / area 'RD': 'ROAD', 'ST': 'STREET', 'AVE': 'AVENUE', 'BLVD': 'BOULEVARD', 'MRG': 'MARG', 'LN': 'LANE', 'CR': 'CROSS', 'CIR': 'CIRCLE', # building 'APT': 'APARTMENT', 'APTS': 'APARTMENTS', 'BLDG': 'BUILDING', 'BLK': 'BLOCK', 'SECT': 'SECTOR', 'SEC': 'SECTOR', # misc 'OPP': 'OPPOSITE', 'NR': 'NEAR', 'ADJ': 'ADJACENT', 'JN': 'JUNCTION', 'STA': 'STATION', 'PO': 'POST OFFICE', 'PB': 'POST BOX', 'PO BOX': 'POST BOX', 'P.O BOX': 'POST BOX', 'P.O. BOX': 'POST BOX', 'DIST': 'DISTRICT', 'DST': 'DISTRICT', 'DT': 'DISTRICT', 'TAL': 'TALUK', 'TQ': 'TALUK', 'TEH': 'TEHSIL', 'VLG': 'VILLAGE', 'VIL': 'VILLAGE', 'VILL': 'VILLAGE', 'CLNY': 'COLONY', 'COL': 'COLONY', 'EXT': 'EXTENSION', 'EXTN': 'EXTENSION', 'PH': 'PHASE', } def _expand_address_abbreviations(text: str) -> str: """Expand common address abbreviations to full forms.""" tokens = text.upper().split() expanded = [] i = 0 while i < len(tokens): # try 2-token phrase first (e.g. "P.O BOX") if i + 1 < len(tokens): two = tokens[i] + ' ' + tokens[i+1] if two in _ADDR_ABBREV: expanded.append(_ADDR_ABBREV[two]) i += 2 continue tok = re.sub(r'\.', '', tokens[i]) # strip dots: "P.O." → "PO" expanded.append(_ADDR_ABBREV.get(tok, tokens[i])) i += 1 return ' '.join(expanded) # ── Case 9: Roman numeral conversion (already exists; exposed here) ── # (roman_to_number is already defined in original rules.py – no duplication) # ── Case 14: Spelled-out numbers ── _NUMBER_WORDS = { 'ZERO':'0','ONE':'1','TWO':'2','THREE':'3','FOUR':'4','FIVE':'5', 'SIX':'6','SEVEN':'7','EIGHT':'8','NINE':'9','TEN':'10', 'ELEVEN':'11','TWELVE':'12','THIRTEEN':'13','FOURTEEN':'14','FIFTEEN':'15', 'SIXTEEN':'16','SEVENTEEN':'17','EIGHTEEN':'18','NINETEEN':'19','TWENTY':'20', 'TWENTY ONE':'21','TWENTY TWO':'22','TWENTY THREE':'23','TWENTY FOUR':'24', 'TWENTY FIVE':'25','TWENTY SIX':'26','TWENTY SEVEN':'27','TWENTY EIGHT':'28', 'TWENTY NINE':'29','THIRTY':'30','THIRTY TWO':'32','FORTY':'40','FIFTY':'50', 'FIRST':'1ST','SECOND':'2ND','THIRD':'3RD','FOURTH':'4TH','FIFTH':'5TH', 'SIXTH':'6TH','SEVENTH':'7TH','EIGHTH':'8TH','NINTH':'9TH','TENTH':'10TH', 'FOURTH':'4TH','FIFTH':'5TH', } _ORDINAL_MAP = {'FIRST':'1ST','SECOND':'2ND','THIRD':'3RD','FOURTH':'4TH','FIFTH':'5TH', 'SIXTH':'6TH','SEVENTH':'7TH','EIGHTH':'8TH','NINTH':'9TH','TENTH':'10TH'} def _normalize_spelled_numbers(text: str) -> str: """Replace spelled-out numbers with digits: 'Thirty-Two' → '32'.""" t = text.upper() # Try multi-word first for phrase, digit in sorted(_NUMBER_WORDS.items(), key=lambda x: -len(x[0])): t = re.sub(r'\b' + re.escape(phrase) + r'\b', digit, t) return t # ── Case 15 & 16: Landmark synonym normalisation ── _LANDMARK_SYNONYMS = { 'OPP': 'NEAR', 'OPPOSITE': 'NEAR', 'OPPOSITE TO': 'NEAR', 'ADJACENT TO': 'NEAR', 'ADJ TO': 'NEAR', 'BEHIND': 'NEAR', 'IN FRONT OF': 'NEAR', 'BESIDE': 'NEAR', 'NEXT TO': 'NEAR', 'CLOSE TO': 'NEAR', } def _normalize_landmark_phrases(text: str) -> str: """Standardise landmark relative phrases to a single token.""" t = text.upper() for phrase, std in sorted(_LANDMARK_SYNONYMS.items(), key=lambda x: -len(x[0])): t = re.sub(r'\b' + re.escape(phrase) + r'\b', std, t) return t # ── Case 16: Relational marker removal in addresses ── _ADDR_RELATIONAL = re.compile( r'\b(S/O|D/O|W/O|H/O|SON\s+OF|DAUGHTER\s+OF|WIFE\s+OF|HUSBAND\s+OF)\b', re.IGNORECASE ) def _remove_addr_relational_markers(text: str) -> str: """Remove s/o, d/o, w/o etc. from address lines.""" text = _ADDR_RELATIONAL.sub(' ', text) return re.sub(r'\s+', ' ', text).strip() # ── Case 18: P.O Box / Post Box normalisation ── def _normalize_po_box(text: str) -> str: """Normalise P.O Box / P.O. Box / Post Box to a canonical form.""" t = re.sub(r'P\.?\s*O\.?\s*BOX', 'POST BOX', text, flags=re.IGNORECASE) t = re.sub(r'POST\s+BOX', 'POSTBOX', t, flags=re.IGNORECASE) return t # ── Case 17: Directional token normalisation ── _DIR_MAP = { 'EAST': 'E', 'WEST': 'W', 'NORTH': 'N', 'SOUTH': 'S', 'NORTH EAST': 'NE', 'NORTH WEST': 'NW', 'SOUTH EAST': 'SE', 'SOUTH WEST': 'SW', } # Normalise to abbreviated form so "Andheri East" == "Andheri E" def _normalize_directions(text: str) -> str: t = text.upper() for full, abbr in sorted(_DIR_MAP.items(), key=lambda x: -len(x[0])): t = re.sub(r'\b' + re.escape(full) + r'\b', abbr, t) return t # ── Master address preprocessing pipeline ── def preprocess_address(text: str) -> str: """ Full address preprocessing pipeline covering all 19 PDF variation cases plus new requirements (landmark removal, PO box normalise, comprehensive admin abbreviation expansion): 1/4. Delimiter + special char normalisation, hyphen in house no. 2. Comprehensive abbreviation expansion (rural+urban) 9. Roman numeral → digit 14. Spelled-out numbers → digit 15. Landmark synonym standardisation 16. Relational marker removal (s/o, w/o …) 17. Directional token normalisation 18. P.O Box / Post Box normalisation 19. Duplicate token removal NEW. Landmark phrase removal (near/nearby/landmark is …) All. Case fold, whitespace collapse, strip """ if not text or not isinstance(text, str): return "" t = clean_text(text) # lowercase, HTML strip, unicode clean if not t: return "" t = _normalize_delimiters(t) # Case 1/4 – delimiters t = _normalize_hyphens(t) # Case 4 – hyphen in house no t = _remove_addr_relational_markers(t) # Case 16 – s/o, w/o t = remove_landmark_phrases(t) # NEW – near/nearby/landmark t = roman_to_number(t) # Case 9 – Roman numerals t = _normalize_spelled_numbers(t) # Case 14 – thirty-two → 32 t = _expand_all_address_variations(t) # Case 2 – comprehensive abbrev expansion t = _normalize_landmark_phrases(t) # Case 15 – opp/near synonyms t = _normalize_po_box(t) # Case 18 – P.O Box canonical form t = _normalize_directions(t) # Case 17 – East/West → E/W t = normalize_and_deduplicate_address(t) # Case 19 – dedup tokens t = re.sub(r'\s+', ' ', t).strip() return t # ========================================================= # LANDMARK REMOVAL (new requirement) # ========================================================= # Keywords that introduce landmark phrases — strip everything from # the keyword up to the next comma/delimiter. _LANDMARK_INTRO_PATTERNS = re.compile( r'(? str: """ Remove landmark references from address text. Strips from the landmark keyword up to the next comma (or end of string). Preserves all other address tokens. Examples: "12B Lakshmi Nagar, near Hanuman Temple, Hyderabad" → "12B Lakshmi Nagar, Hyderabad" "32 Main Road nearbyto Bus Stand Jaipur" → "32 Main Road Jaipur" """ if not text: return text # Split on comma to process segment by segment parts = text.split(',') cleaned = [] for part in parts: # If a landmark keyword appears inside this segment, remove from keyword onward stripped = _LANDMARK_INTRO_PATTERNS.sub('', part) # If keyword was found, everything after it was the landmark — keep only the part before if stripped != part: before = _LANDMARK_INTRO_PATTERNS.split(part)[0].strip() if before: cleaned.append(before) else: cleaned.append(part.strip()) result = ', '.join(s for s in cleaned if s) return re.sub(r'\s+', ' ', result).strip() # ========================================================= # NAMED COMPONENT EXTRACTION (street, colony, sector, nagar …) # ========================================================= # Keywords that introduce named locality components _NAMED_COMPONENT_KEYWORDS = [ 'street', 'colony', 'sector', 'nagar', 'bhavan', 'bhawan', 'layout', 'enclave', 'vihar', 'phase', 'block', 'ward', 'galli', 'gali', 'cross', 'main', 'road', 'marg', 'lane', 'avenue', 'circle', 'plaza', 'park', 'garden', 'gardens', 'extension', 'extn', 'township', 'town', 'puram', 'pura', 'nagara', 'nagar', 'bazaar', 'bazar', 'market', ] _NAMED_COMP_PATTERN = re.compile( r'\b(' + '|'.join(re.escape(k) for k in _NAMED_COMPONENT_KEYWORDS) + r')\b', re.IGNORECASE ) def extract_named_components(text: str) -> dict: """ Extract named locality components from an address. Returns dict with: 'components': list of (keyword, full_phrase) tuples found 'remaining': address text with those components removed Example: "Plot 5, HSR Layout, Sector 7, Bengaluru" → components: [('layout','hsr layout'), ('sector','sector 7')] remaining: "Plot 5, Bengaluru" """ if not text: return {'components': [], 'remaining': text} t = text.upper() found = [] consumed_spans = [] for m in _NAMED_COMP_PATTERN.finditer(t): kw = m.group(1).upper() start = m.start() # Grab up to 3 tokens before + 2 tokens after the keyword as the phrase before_chunk = t[max(0, start-30):start].strip() after_chunk = t[m.end():min(len(t), m.end()+30)].strip() # Build phrase: last 1-2 tokens before kw + kw + first 1-2 tokens after before_toks = before_chunk.split()[-2:] if before_chunk else [] after_toks = after_chunk.split()[:2] if after_chunk else [] phrase = ' '.join(before_toks + [kw] + after_toks).strip() found.append((kw, phrase)) consumed_spans.append((max(0, start - len(' '.join(before_toks))), m.end() + len(' '.join(after_toks)))) # Remove found component spans from text for "remaining" remaining = t for kw, phrase in found: remaining = re.sub(re.escape(phrase), ' ', remaining, count=1) remaining = re.sub(r'\s+', ' ', remaining).strip().strip(',').strip() return {'components': found, 'remaining': remaining} def compare_named_components(addr1: str, addr2: str) -> dict: """ Compare named locality components between two addresses. Returns: 'verdict': 'match' | 'mismatch' | 'skip' (skip = one/both sides missing) 'score_adjustment': float to add to base address score 'detail': list of comparison results per keyword Logic: - For each keyword present in BOTH addresses: compare the associated phrase. If phrases are similar (token overlap >= 50%): match (+5 per component) If phrases clearly differ: mismatch (-20 per component) - If keyword only present in one address: remove it, continue with rest (skip). """ from rapidfuzz import fuzz as _fuzz c1 = extract_named_components(addr1) c2 = extract_named_components(addr2) kw_map1 = {kw: phrase for kw, phrase in c1['components']} kw_map2 = {kw: phrase for kw, phrase in c2['components']} shared_kws = set(kw_map1.keys()) & set(kw_map2.keys()) detail = [] score_adj = 0.0 mismatches = 0 for kw in shared_kws: p1, p2 = kw_map1[kw], kw_map2[kw] sim = _fuzz.token_set_ratio(p1, p2) if sim >= 70: detail.append({'keyword': kw, 'result': 'match', 'score': sim}) score_adj += 5.0 else: detail.append({'keyword': kw, 'result': 'mismatch', 'score': sim}) score_adj -= 20.0 mismatches += 1 if not shared_kws: return {'verdict': 'skip', 'score_adjustment': 0.0, 'detail': []} verdict = 'mismatch' if mismatches > 0 else 'match' return {'verdict': verdict, 'score_adjustment': score_adj, 'detail': detail} # ========================================================= # POST BOX NUMBER EXTRACTION & COMPARISON # ========================================================= _POSTBOX_PATTERN = re.compile( r'(?:p\.?\s*o\.?\s*box|post\s*box|postbox|p\.?b\.?\s*no\.?|pb\s*no\.?)' r'\s*[:\-]?\s*(\d{1,6})', re.IGNORECASE ) def extract_postbox_number(text: str) -> str | None: """ Extract post box number from address text. Returns the numeric part as string, or None if not found. """ if not text: return None m = _POSTBOX_PATTERN.search(text) return m.group(1).strip() if m else None def remove_postbox_from_address(text: str) -> str: """Remove post box reference entirely from address for remaining comparison.""" if not text: return text cleaned = _POSTBOX_PATTERN.sub(' ', text) return re.sub(r'\s+', ' ', cleaned).strip() def compare_postbox(addr1: str, addr2: str) -> dict: """ Extract and compare post box numbers from two addresses. Returns: 'found': bool — True if PO box detected in either address 'adjustment': float +10 if both have PO box AND numbers match -30 if both have PO box AND numbers differ 0 if only one (or neither) has PO box (no signal either way) """ pb1 = extract_postbox_number(addr1) pb2 = extract_postbox_number(addr2) if pb1 is None and pb2 is None: return {'found': False, 'adjustment': 0.0, 'pb1': None, 'pb2': None} if pb1 is not None and pb2 is not None: adj = 10.0 if pb1 == pb2 else -30.0 return {'found': True, 'adjustment': adj, 'pb1': pb1, 'pb2': pb2} # Only one side has PO box — no adjustment return {'found': True, 'adjustment': 0.0, 'pb1': pb1, 'pb2': pb2} # ========================================================= # ENHANCED HOUSE NUMBER EXTRACTION # ========================================================= # Priority-ordered keywords that precede a house/door number _HNO_KEYWORD_PATTERN = re.compile( r'\b(?:' r'd\.?\s*no\.?|door\s*no\.?|h\.?\s*no\.?|house\s*no\.?|' r'house\s*number|property\s*no\.?|plot\s*no\.?|' r'flat\s*no\.?|flat\s*number|' r'mig\s*no\.?|hig\s*no\.?|lig\s*no\.?|' r'khata\s*no\.?|khasra\s*no\.?' r')' r'\s*[:\-]?\s*([A-Z0-9][A-Z0-9\-/]*)', re.IGNORECASE ) # "Sector N", "Ward N", "Phase N" — these are NOT house numbers _NON_HNO_COMPONENT_PATTERN = re.compile( r'\b(sector|ward|phase|block|zone|taluk|village|vill|dist|district|' r'plot|survey|sy\.?\s*no\.?|s\.?\s*no\.?)\s*[:\-]?\s*(\d+[A-Z]?)', re.IGNORECASE ) def extract_house_number_v2(text: str) -> str | None: """ Revamped house number extraction with high priority to explicit keywords. Priority order: 1. Explicit HNO keyword (H.No, D.No, House No, Door No, Plot No, Flat No …) 2. Leading numeric token (first token if it looks like HNO, not sector/ward) 3. Pattern match for compound numbers (12-B, 45/3, A-110) Explicitly excludes sector numbers, ward numbers, phase numbers, block numbers from being treated as house numbers. Returns the extracted house number string or None. """ if not text: return None t = text.strip() # Step 1: keyword-based extraction (highest priority) m = _HNO_KEYWORD_PATTERN.search(t) if m: return m.group(1).strip().upper() # Build set of non-HNO numbers (sector/ward/phase/block values) to exclude non_hno_values = set() for nm in _NON_HNO_COMPONENT_PATTERN.finditer(t): non_hno_values.add(nm.group(2).strip().upper()) # Step 2: leading numeric heuristic tokens = t.split() if tokens: first = tokens[0].upper() # Must look like a house number (digit or letter+digit) if re.fullmatch(r'[A-Z]?\d+[A-Z]?(?:[/\-]\d+[A-Z]?)*', first): if first not in non_hno_values: return first # Step 3: compound number pattern anywhere in text compound_patterns = [ r'\b(\d+[A-Z]?/\d+[A-Z]?)\b', # 45/3, 45/3A r'\b(\d+-\d+[A-Z]?)\b', # 12-3B r'\b([A-Z]-\d+[A-Z]?)\b', # A-110 r'\b(\d+[A-Z])\b', # 12B r'\b(\d{1,4})\b', # plain number ] for pat in compound_patterns: for m in re.finditer(pat, t, re.IGNORECASE): val = m.group(1).strip().upper() if val not in non_hno_values: # Additional check: not preceded by sector/ward/phase keyword before = t[:m.start()].upper() if not re.search(r'\b(sector|ward|phase|block|zone)\s*$', before): return val return None def compare_house_numbers(addr1: str, addr2: str) -> dict: """ Extract and compare house numbers from two addresses. Returns: 'h1', 'h2': extracted house numbers (or None), normalized alphanumeric-only 'verdict': 'match' | 'mismatch' | 'missing' 'score_adjustment': float +30 if both present and match AND base_score > 50 (caller must apply conditionally) -30 if both present and clearly different 0 if one/both absent """ h1_raw = extract_house_number_v2(addr1) h2_raw = extract_house_number_v2(addr2) # Normalize: strip all non-alphanumerics (144/143 → 144143) h1 = re.sub(r'[^A-Z0-9]', '', h1_raw.upper()) if h1_raw else None h2 = re.sub(r'[^A-Z0-9]', '', h2_raw.upper()) if h2_raw else None if h1 is None and h2 is None: return {'h1': None, 'h2': None, 'verdict': 'missing', 'score_adjustment': 0.0} if h1 is not None and h2 is not None: if h1 == h2: # Boost deferred: caller applies +30 only when base_score > 50 return {'h1': h1, 'h2': h2, 'verdict': 'match', 'score_adjustment': 0.0} else: return {'h1': h1, 'h2': h2, 'verdict': 'mismatch', 'score_adjustment': -30.0} return {'h1': h1, 'h2': h2, 'verdict': 'missing', 'score_adjustment': 0.0} # ========================================================= # EXPANDED INDIAN ADDRESS ADMINISTRATIVE VARIATIONS # ========================================================= # Comprehensive dictionary of rural + urban address abbreviations / variations # with their standard canonical expansions. _INDIAN_ADDR_VARIATIONS: dict[str, str] = { # ── Road / Street ── 'RD': 'ROAD', 'STR': 'STREET', 'ST': 'STREET', 'AVE': 'AVENUE', 'MRG': 'MARG', 'LN': 'LANE', 'BLVD': 'BOULEVARD', 'CIR': 'CIRCLE', 'CR': 'CROSS', 'CROSS RD': 'CROSS ROAD', 'X RD': 'CROSS ROAD', # ── Locality ── 'NGR': 'NAGAR', 'NGRS': 'NAGAR', 'NAGARA': 'NAGAR', 'LYT': 'LAYOUT', 'LOUT': 'LAYOUT', 'CLY': 'COLONY', 'CLNY': 'COLONY', 'COL': 'COLONY', 'EXT': 'EXTENSION', 'EXTN': 'EXTENSION', 'ENCL': 'ENCLAVE', 'VIHAR': 'VIHAR', # kept as-is but note variants below 'VIHARA': 'VIHAR', 'PURA': 'PURAM', 'PORA': 'PURAM', # ── Directions ── 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', 'NE': 'NORTH EAST', 'NW': 'NORTH WEST', 'SE': 'SOUTH EAST', 'SW': 'SOUTH WEST', # ── Administrative (urban) ── 'SECT': 'SECTOR', 'SEC': 'SECTOR', 'SCT': 'SECTOR', 'BLK': 'BLOCK', 'BK': 'BLOCK', 'PH': 'PHASE', 'PHZ': 'PHASE', 'APT': 'APARTMENT', 'APTS': 'APARTMENTS', 'BLDG': 'BUILDING', 'BLDGS': 'BUILDINGS', 'FLR': 'FLOOR', 'FL': 'FLOOR', 'OPP': 'OPPOSITE', 'NR': 'NEAR', 'ADJ': 'ADJACENT', 'JN': 'JUNCTION', 'JCT': 'JUNCTION', 'STA': 'STATION', 'STN': 'STATION', # ── Administrative (rural) ── 'VLG': 'VILLAGE', 'VIL': 'VILLAGE', 'VILL': 'VILLAGE', 'VG': 'VILLAGE', 'GRMA': 'GRAMA', 'GM': 'GRAMA', 'PANCHAYAT': 'PANCHAYAT', 'DIST': 'DISTRICT', 'DST': 'DISTRICT', 'DT': 'DISTRICT', 'ZILLA': 'DISTRICT', 'JILLA': 'DISTRICT', 'ZILA': 'DISTRICT', 'TAL': 'TALUK', 'TQ': 'TALUK', 'TALUKA': 'TALUK', 'TEH': 'TEHSIL', 'TEHS': 'TEHSIL', 'MANDAL': 'MANDAL', 'MD': 'MANDAL', 'POST': 'POST', 'PO': 'POST OFFICE', 'HOBLI': 'HOBLI', 'HBL': 'HOBLI', 'REV': 'REVENUE', 'REV VILLAGE': 'REVENUE VILLAGE', 'SY NO': 'SURVEY NUMBER', 'SY. NO': 'SURVEY NUMBER', 'KHASRA': 'KHASRA', 'KHATA': 'KHATA', # ── Post box ── 'PB': 'POST BOX', 'PO BOX': 'POST BOX', 'P.O BOX': 'POST BOX', 'P.O. BOX': 'POST BOX', # ── State abbreviations (already handled by STATE_MAPPING but kept here too) ── 'AP': 'ANDHRA PRADESH', 'TS': 'TELANGANA', 'KA': 'KARNATAKA', 'TN': 'TAMIL NADU', 'MH': 'MAHARASHTRA', 'GJ': 'GUJARAT', 'RJ': 'RAJASTHAN', 'UP': 'UTTAR PRADESH', 'MP': 'MADHYA PRADESH', 'WB': 'WEST BENGAL', 'OR': 'ODISHA', 'OD': 'ODISHA', } def _expand_all_address_variations(text: str) -> str: """ Expand ALL Indian address administrative variations (rural + urban) using the comprehensive dictionary above. Replaces the earlier _expand_address_abbreviations for address lines. """ tokens = text.upper().split() expanded = [] i = 0 while i < len(tokens): # Try 2-token phrases first (e.g. "SY NO", "PO BOX", "REV VILLAGE") if i + 1 < len(tokens): two = tokens[i] + ' ' + tokens[i+1] two_clean = re.sub(r'\.', '', two) if two_clean in _INDIAN_ADDR_VARIATIONS: expanded.append(_INDIAN_ADDR_VARIATIONS[two_clean]) i += 2 continue tok_clean = re.sub(r'\.', '', tokens[i]) # strip trailing dots expanded.append(_INDIAN_ADDR_VARIATIONS.get(tok_clean, tokens[i])) i += 1 return ' '.join(expanded) # =========================================================== # NAME VARIATION HANDLERS (all 14 PDF cases) # =========================================================== # ── Case 7A: Religious/cultural prefix abbreviation map ── _NAME_PREFIX_EXPANSION = { # Mohammed variants 'MD': 'MOHAMMED', 'MOHD': 'MOHAMMED', 'MHD': 'MOHAMMED', 'MUHAMMAD': 'MOHAMMED', 'MOHAMAD': 'MOHAMMED', 'MOHHAMED': 'MOHAMMED', 'MUHAMED': 'MOHAMMED', 'MUHAMMED': 'MOHAMMED', 'MOHAMMD': 'MOHAMMED', # Sheikh / Shaikh variants (Case 7A: sk → sheikh) 'SK': 'SHEIKH', 'SHK': 'SHEIKH', 'SHAIKH': 'SHEIKH', 'SHEKH': 'SHEIKH', 'SHIEKH': 'SHEIKH', 'SHEIK': 'SHEIKH', 'SHEK': 'SHEIKH', 'SAIKH': 'SHEIKH', # Abdul variants 'ABD': 'ABDUL', 'ABDL': 'ABDUL', 'ABDU': 'ABDUL', # Syed / Saiyed variants 'SYD': 'SYED', 'SYE': 'SYED', 'SAIYAD': 'SYED', 'SAIYED': 'SYED', 'SAYYED': 'SYED', 'SAYYAD': 'SYED', # Kumari / Km variants 'KUM': 'KUMARI', 'KM': 'KUMARI', # Chaudhary variants 'CH': 'CHAUDHARY', 'CHD': 'CHAUDHARY', 'CHOUDHARY': 'CHAUDHARY', 'CHOWDHARY': 'CHAUDHARY', 'CHOWDARY': 'CHAUDHARY', # Bala variants 'BAL': 'BALA', # Ranga variants 'RNG': 'RANGA', } def _expand_name_prefix_abbreviations(text: str) -> str: """Expand religious/cultural name prefix abbreviations.""" tokens = text.upper().split() result = [] for tok in tokens: clean_tok = tok.rstrip('.') result.append(_NAME_PREFIX_EXPANSION.get(clean_tok.upper(), tok)) return ' '.join(result) # ── Case 8: Special characters in names ── def _remove_name_special_chars(text: str) -> str: """Remove hyphens, slashes and punctuation from names.""" text = re.sub(r'[-/\\@$%^&*\(\)\[\]\{\};:\'"<>?!]', ' ', text) return re.sub(r'\s+', ' ', text).strip() # ── Case 10: Organisation suffix removal ── _ORG_SUFFIXES = re.compile( r'\b(AND\s+SONS?|ENTERPRISES?|TRADERS?|INDUSTRIES|LTD|PVT\.?\s*LTD|' r'LIMITED|CORP|CORPORATION|INC|LLC|CO\.?\s*LTD|COMPANY|ASSOCIATES?|' r'BROTHERS?|BROS?|AGENCIES?)\b', re.IGNORECASE ) def _remove_org_suffixes(text: str) -> str: """Remove organisation suffix tokens from name fields.""" return re.sub(r'\s+', ' ', _ORG_SUFFIXES.sub(' ', text)).strip() # ── Case 1 (name): merged token split helper ── # e.g. "DIGVIJAYSINGH" → "DIGVIJAY SINGH" # We rely on fuzzy/phonetic similarity rather than a hard split, # but we add a camel-case splitter as a best-effort normaliser. def _split_merged_tokens(text: str) -> str: """ Best-effort split of CamelCase or merged uppercase tokens. 'DiGVIJAYSINGH' → 'Di GVIJAY SINGH' (rough; embeddings handle remainder). Only applied when token length > 12 and no spaces present. """ tokens = text.split() result = [] for tok in tokens: if len(tok) > 12: # Insert space before uppercase letters preceded by lowercase split = re.sub(r'([a-z])([A-Z])', r'\1 \2', tok) result.append(split) else: result.append(tok) return ' '.join(result) # ── Case 13: Relational name markers ── # Already handled by remove_relational_prefixes in original code. # Ensure it is called in preprocess_name (it is). # ── Case 11: Name with DOB / extra numeric content ── # Already handled by remove_non_alpha_trailing in original code. # ── Enhanced preprocess_name ── def enhanced_preprocess_name(text: str) -> str: """ Extended name preprocessing pipeline covering all 14 PDF cases. Calls original pipeline steps PLUS new variation handlers. """ if not text or not isinstance(text, str): return "" t = clean_text(text) if not t: return "" t = collapse_repeated_chars(t) # Case 6 – typo / repeated chars t = remove_relational_prefixes(t) # Case 13 – w/o, s/o t = remove_non_alpha_trailing(t) # Case 11 – dates/numbers t = _remove_name_special_chars(t) # Case 8 – hyphens/punctuation t = remove_name_titles(t) # Case 7 – Dr, Mr, Shri … t = _expand_name_prefix_abbreviations(t) # Case 7A – Md → Mohammed t = _remove_org_suffixes(t) # Case 10 – and Sons, Ltd t = _split_merged_tokens(t) # Case 1 – merged tokens t = deduplicate_tokens(t) # dedup t = standardize_name_variations(t) # CSV variation map return t.strip() def preprocess_name(text): """ Full name preprocessing pipeline for embedding model matching. Steps: 1. Clean text (strip, remove HTML/unicode, collapse spaces, lowercase) 2. Remove relational prefixes (s/o, d/o, w/o etc.) 3. Remove non-alpha trailing content (dates, numbers) 4. Remove title prefixes/suffixes (Dr, Mr, Shri etc.) 5. Deduplicate tokens 6. Standardize name variations from CSV """ if not text or not isinstance(text, str): return "" # 1. Basic cleaning + lowercase text = clean_text(text) if not text: return "" # 1b. Collapse repeated characters (typo fix: "mohammmed" → "mohammed") text = collapse_repeated_chars(text) # 2. Remove relational prefixes (keep names after s/o etc.) text = remove_relational_prefixes(text) # 3. Remove non-alpha content (dates, numbers embedded in names) text = remove_non_alpha_trailing(text) # 4. Remove title prefixes/suffixes text = remove_name_titles(text) # 5. Remove duplicate tokens text = deduplicate_tokens(text) # 6. Standardize name variations from CSV text = standardize_name_variations(text) # # 7. Enhanced variations (Case 7A, 8, 10, 1-merged-tokens) # text = _expand_name_prefix_abbreviations(text) # text = _remove_name_special_chars(text) # text = _remove_org_suffixes(text) # text = _split_merged_tokens(text) # text = deduplicate_tokens(text) return text.strip() # ========================================================= # SURNAME DETECTION AND INITIAL LETTER MATCHING # ========================================================= def detect_surnames(text): """ Detect which tokens in specified text are common surnames from sur_comm_names.csv. Returns: set of surname tokens found. """ if not text or sur_comm_names_df.empty: return set() # Build surname set (cached on first call) if not hasattr(detect_surnames, '_surname_set'): surname_set = set() col = 'surname_community_extension' if 'surname_community_extension' in sur_comm_names_df.columns else sur_comm_names_df.columns[-1] for val in sur_comm_names_df[col].dropna(): surname_set.add(str(val).strip().upper()) detect_surnames._surname_set = surname_set tokens = text.upper().split() return {t for t in tokens if t in detect_surnames._surname_set} # def compute_initial_letter_boost(name1_tokens, name2_tokens): # """ # Case 3A: Multi-initial matching. # After token sorting, checks whether every single-char initial in one name # corresponds (by first letter) to a full-word token in the other name. # Logic (applied after alphabetical sort): # 1. Find common full-word tokens (exact match) between both names. # 2. From remaining tokens: # - side A: collect single-char initials → initial_set # - side B: collect full words → full_words # 3. For every initial in initial_set, check if a full word in full_words # starts with that letter (one-to-one pairing, each word used once). # 4. If ALL initials are matched → return 0.2 (boost). # If ANY initial has NO match → return -0.2 (mismatch penalty). # If no initials on either side → return 0.0 (no signal). # Examples: # ["k","v","reddy"] vs ["katta","venkata","reddy"]: # common={"reddy"}, initials={"k","v"}, full={"katta","venkata"} # k→katta ✓, v→venkata ✓ → +0.2 # ["k","v","reddy"] vs ["krishna","mohan","reddy"]: # common={"reddy"}, initials={"k","v"}, full={"krishna","mohan"} # k→krishna ✓, v→? no word starts with v → -0.2 (mismatch) # """ # if not name1_tokens or not name2_tokens: # return 0.0 # set1 = set(name1_tokens) # set2 = set(name2_tokens) # common = set1 & set2 # rem1 = [t for t in name1_tokens if t not in common] # rem2 = [t for t in name2_tokens if t not in common] # if not rem1 and not rem2: # return 0.0 # # Identify which side has initials (single-char tokens) # initials1 = [t for t in rem1 if len(t) == 1] # initials2 = [t for t in rem2 if len(t) == 1] # full1 = [t for t in rem1 if len(t) > 1] # full2 = [t for t in rem2 if len(t) > 1] # def _match_initials_to_full(initials, full_words): # """ # Try to pair each initial to a distinct full word starting with that letter. # Returns True if all initials matched, False if any unmatched. # """ # available = list(full_words) # copy so we can consume # for init in initials: # matched = False # for i, word in enumerate(available): # if word and word[0].upper() == init.upper(): # available.pop(i) # matched = True # break # if not matched: # return False # return True # # Case: side 1 has initials, side 2 has full words # if initials1 and full2: # if _match_initials_to_full(initials1, full2): # return 0.2 # all initials matched # else: # return -0.2 # at least one initial did NOT match → mismatch signal # # Case: side 2 has initials, side 1 has full words # if initials2 and full1: # if _match_initials_to_full(initials2, full1): # return 0.2 # else: # return -0.2 # # Both sides have initials (e.g. "K V Reddy" vs "K M Reddy") # # Compare initials sets directly # if initials1 and initials2: # init_set1 = {t.upper() for t in initials1} # init_set2 = {t.upper() for t in initials2} # if init_set1 == init_set2: # return 0.2 # else: # return -0.2 # initials differ → mismatch # return 0.0 def compute_initial_letter_boost(name1_tokens, name2_tokens): """ If one name has more tokens than the other, check if the initials of the shorter name match the first letters of tokens in the longer name. Returns 0.2 boost if initials match, else 0.0. Example: ["k", "v", "reddy"] vs ["krishna", "venkata", "reddy"] Common tokens: {"reddy"} Remaining short: ["k", "v"], remaining long: ["krishna", "venkata"] Initials of short: {"k", "v"}, first-letters of long: {"k", "v"} → match → +0.2 """ if not name1_tokens or not name2_tokens: return 0.0 # Find common tokens set1, set2 = set(name1_tokens), set(name2_tokens) common = set1 & set2 # Get remaining (non-common) tokens rem1 = [t for t in name1_tokens if t not in common] rem2 = [t for t in name2_tokens if t not in common] if not rem1 or not rem2: return 0.0 # Determine shorter and longer remaining lists if len(rem1) <= len(rem2): shorter, longer = rem1, rem2 else: shorter, longer = rem2, rem1 # Check if all tokens in shorter are single-char initials shorter_initials = {t[0] for t in shorter if len(t) == 1} if not shorter_initials: return 0.0 # Get first letters of longer tokens longer_first_letters = {t[0] for t in longer if t} # If every initial in the shorter set matches some first letter in longer if shorter_initials.issubset(longer_first_letters): return 0.2 return 0.0 def replace_with_standard(string_value, df=None): """ Replace string with standard value if found in CSV variation column. Handles exact match AND substring match (e.g., "TRIVANDRUM KERALA" matches "TRIVANDRUM"). Args: string_value: String to search for df: DataFrame with 'VARIATION' and 'STANDARD' columns (optional) Returns: Standard value if found, otherwise original string """ source_df = df if df is not None and not df.empty else name_variation_df if source_df.empty: return string_value string_upper = string_value.strip().upper() variations = source_df['VARIATION'].str.strip().str.upper() # 1. Exact match first (fastest, most precise) exact_mask = variations == string_upper if exact_mask.any(): return source_df.loc[exact_mask, 'STANDARD'].iloc[0] # 2. Substring match: check if any variation is a word-boundary substring of string_value # e.g., "TRIVANDRUM" inside "TRIVANDRUM KERALA" for idx, variation in variations.items(): if not variation: continue # Use word boundary to avoid partial word matches (e.g., "PUNE" in "IMPUNE") pattern = r'\b' + re.escape(variation) + r'\b' if re.search(pattern, string_upper): return source_df.loc[idx, 'STANDARD'] # 3. Reverse check: string_value is a substring of a variation # e.g., input "TRIVANDRUM" matching variation "TRIVANDRUM KERALA" for idx, variation in variations.items(): if not variation: continue pattern = r'\b' + re.escape(string_upper) + r'\b' if re.search(pattern, variation): return source_df.loc[idx, 'STANDARD'] return string_value def lookup_from_mapping(value, mapping_dict): """ Look up a value in a mapping dictionary (Value List -> Key) Example: {"BENGALURU": ["BANGALORE", "BENGALURU"]} Handles: 1. Exact key match: "BENGALURU" -> "BENGALURU" 2. Exact variation match: "BANGALORE" -> "BENGALURU" 3. Variation-in-input: "BANGALORE KARNATAKA" -> "BENGALURU" 4. Input-in-variation: "BANGAL" inside variation "BANGAL URBAN" -> "BENGALURU" """ if not value or not mapping_dict: return None value_upper = str(value).strip().upper() # 1. Exact key match if value_upper in mapping_dict: return value_upper # 2. Exact variation match for standard, variations in mapping_dict.items(): if isinstance(variations, list): if value_upper in [v.strip().upper() for v in variations]: return standard # 3. Variation-in-input (e.g., "BANGALORE" found inside "BANGALORE KARNATAKA") for standard, variations in mapping_dict.items(): if isinstance(variations, list): for variation in variations: pattern = r'\b' + re.escape(variation.strip().upper()) + r'\b' if re.search(pattern, value_upper): return standard # 4. Input-in-variation (e.g., input "BANGAL" found inside variation "BANGAL URBAN") for standard, variations in mapping_dict.items(): if isinstance(variations, list): for variation in variations: pattern = r'\b' + re.escape(value_upper) + r'\b' if re.search(pattern, variation.strip().upper()): return standard return None # ========================================================= # PINCODE SIMILARITY FUNCTION # ========================================================= def pincode_similarity_india(pin1, pin2): """ Calculate similarity between two Indian pincodes based on geographic distance and metro/non-metro classification. Args: pin1: First pincode (string or int) pin2: Second pincode (string or int) Returns: dict: Contains match status, similarity score, distance, and classification details, plus geocoding details (county_name, state_name for both pins) """ INVALID_VALUES = {None, "", "-", "NA", "N/A", "NULL"} def is_missing(pin): return pin is None or str(pin).strip().upper() in INVALID_VALUES if is_missing(pin1) or is_missing(pin2): return { "match": False, "similarity_score": None, "distance_km": None, "area_type": "Missing pincode", "reason": "One or both pincodes are null / empty / placeholder", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } # ========== INPUT VALIDATION & NORMALIZATION ========== try: pin1 = str(pin1).strip().zfill(6) pin2 = str(pin2).strip().zfill(6) # ========== HARD SHORT-CIRCUIT: EXACT SAME PIN ========== if pin1 == pin2: # Still need to get geocoding data for city/state extraction try: nomi = pgeocode.Nominatim("IN") p1 = nomi.query_postal_code(pin1) # Extract city and state county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None return { "match": True, "similarity_score": 100, "distance_km": 0.0, "area_type": "Exact same pincode", "is_metro_logic": None, "is_extended_metro": None, "metro_cluster": None, "pin1_prefix": pin1[:3], "pin2_prefix": pin2[:3], "pin1": pin1, "pin2": pin2, "pin1_county_name": county_name, "pin2_county_name": county_name, "pin1_state_name": state_name, "pin2_state_name": state_name, "pin1_location": None, "pin2_location": None, } except Exception as e: return { "match": True, "similarity_score": 100, "distance_km": 0.0, "area_type": "Exact same pincode", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } except (ValueError, AttributeError): return { "match": False, "similarity_score": 0, "reason": "Invalid pincode format - cannot convert to string", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } # Validate format if len(pin1) != 6 or len(pin2) != 6: return { "match": False, "similarity_score": 0, "reason": f"Invalid pincode length (pin1: {len(pin1)}, pin2: {len(pin2)})", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } if not pin1.isdigit() or not pin2.isdigit(): return { "match": False, "similarity_score": 0, "reason": "Pincode must contain only digits", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } # Check for invalid ranges (Indian pincodes: 110001-855117) pin1_num = int(pin1) pin2_num = int(pin2) if pin1_num < 110001 or pin1_num > 855117 or pin2_num < 110001 or pin2_num > 855117: return { "match": False, "similarity_score": 0, "reason": "Pincode outside valid Indian range (110001-855117)", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } # ========== CONFIGURATION ========== # Major metro city prefixes (3-digit) METRO_PIN_PREFIXES = { "110", # Delhi NCR "400", # Mumbai "560", # Bengaluru "600", # Chennai "500", # Hyderabad "700", # Kolkata "411", # Pune "380", # Ahmedabad } # Extended metro regions (satellite cities, suburbs) EXTENDED_METROS = [ {"110", "201", "122", "121", "124"}, # Delhi—Noida—Gurgaon—Faridabad—Ghaziabad {"400", "421", "410"}, # Mumbai—Thane—Navi Mumbai {"500", "501"}, # Hyderabad—Secunderabad {"560", "562"}, # Bengaluru—Whitefield—Electronic City {"600", "601", "603"}, # Chennai—Kanchipuram—Chengalpattu {"700", "711", "712"}, # Kolkata—Howrah—Hooghly ] # Distance thresholds for metro areas (km) METRO_THRESHOLDS = { "same_locality": 8, # Very close neighborhoods "nearby": 15, # Adjacent areas/suburbs "same_metro": 35, # Within metro limits "extended_metro": 60, # Extended metro region } # Distance thresholds for non-metro areas (km) NON_METRO_THRESHOLDS = { "same_locality": 5, # Same town/village cluster "nearby": 12, # Adjacent towns "same_district": 40, # Within district (approximate) } # ========== UTILITY FUNCTIONS ========== def haversine(lat1, lon1, lat2, lon2): """Calculate distance between two lat/lon points using Haversine formula""" R = 6371 # Earth's radius in kilometers dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = ( math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2 ) c = 2 * math.asin(math.sqrt(a)) return R * c # ========== GEOCODING LOOKUP ========== try: nomi = pgeocode.Nominatim("IN") p1 = nomi.query_postal_code(pin1) p2 = nomi.query_postal_code(pin2) except Exception as e: return { "match": False, "similarity_score": 0, "reason": f"Geocoding service error: {str(e)}", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } # Check if geocoding was successful if p1 is None or p2 is None: return { "match": False, "similarity_score": 0, "reason": "Geocoding returned None", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } if (p1.latitude is None or p1.longitude is None or p2.latitude is None or p2.longitude is None or math.isnan(p1.latitude) or math.isnan(p2.latitude)): return { "match": False, "similarity_score": 0, "reason": "Pincode not found in geocoding database", "pin1": pin1, "pin2": pin2, "pin1_county_name": None, "pin2_county_name": None, "pin1_state_name": None, "pin2_state_name": None } # ========== EXTRACT CITY AND STATE FROM GEOCODING ========== pin1_county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None pin2_county_name = p2.county_name if hasattr(p2, 'county_name') and not (p2.county_name is None or (isinstance(p2.county_name, float) and math.isnan(p2.county_name))) else None pin1_state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None pin2_state_name = p2.state_name if hasattr(p2, 'state_name') and not (p2.state_name is None or (isinstance(p2.state_name, float) and math.isnan(p2.state_name))) else None # ========== DISTANCE CALCULATION ========== distance = haversine( p1.latitude, p1.longitude, p2.latitude, p2.longitude ) # ========== PREFIX EXTRACTION ========== prefix1 = pin1[:3] prefix2 = pin2[:3] # ========== METRO CLASSIFICATION ========== is_metro = False is_extended_metro = False metro_cluster_name = None # Check if both pincodes belong to same extended metro cluster for cluster in EXTENDED_METROS: if prefix1 in cluster and prefix2 in cluster: is_extended_metro = True is_metro = True if "110" in cluster: metro_cluster_name = "Delhi NCR" elif "400" in cluster: metro_cluster_name = "Mumbai Metropolitan Region" elif "500" in cluster: metro_cluster_name = "Hyderabad Metro" elif "560" in cluster: metro_cluster_name = "Bengaluru Metro" elif "600" in cluster: metro_cluster_name = "Chennai Metro" elif "700" in cluster: metro_cluster_name = "Kolkata Metro" break # Check if same metro prefix if not is_metro and prefix1 == prefix2 and prefix1 in METRO_PIN_PREFIXES: is_metro = True metro_map = { "110": "Delhi", "400": "Mumbai", "560": "Bengaluru", "600": "Chennai", "500": "Hyderabad", "700": "Kolkata", "411": "Pune", "380": "Ahmedabad" } metro_cluster_name = metro_map.get(prefix1, "Metro City") one_is_metro = prefix1 in METRO_PIN_PREFIXES or prefix2 in METRO_PIN_PREFIXES # ========== SIMILARITY SCORING LOGIC ========== score = 0 if is_metro: if distance <= METRO_THRESHOLDS["same_locality"]: score = 95 elif distance <= METRO_THRESHOLDS["nearby"]: score = 85 elif distance <= METRO_THRESHOLDS["same_metro"]: score = 70 elif is_extended_metro and distance <= METRO_THRESHOLDS["extended_metro"]: score = 60 else: score = 35 elif one_is_metro and not is_metro: if distance <= 20: score = 50 else: score = 25 else: same_state = False if hasattr(p1, 'state_name') and hasattr(p2, 'state_name'): same_state = p1.state_name == p2.state_name if distance <= NON_METRO_THRESHOLDS["same_locality"]: score = 92 elif distance <= NON_METRO_THRESHOLDS["nearby"]: score = 75 elif distance <= NON_METRO_THRESHOLDS["same_district"]: score = 55 elif same_state and distance <= 100: score = 40 else: score = 20 return { "match": score >= 60, "similarity_score": score, "distance_km": distance, "pin1": pin1, "pin2": pin2, "pin1_county_name": pin1_county_name, "pin2_county_name": pin2_county_name, "pin1_state_name": pin1_state_name, "pin2_state_name": pin2_state_name, "area_type": metro_cluster_name if is_metro else "Non-metro", "is_metro_logic": is_metro, "is_extended_metro": is_extended_metro } # ========================================================= # NORMALIZATION & PREPROCESSING # ========================================================= def preprocess_text(text): """Remove extra trailing/leading spaces and normalize whitespace""" if not text: return "" text = re.sub(r"\s+", " ", text.strip()) return text def normalize_text(text): """Normalize text to uppercase and remove extra spaces""" return re.sub(r"\s+", " ", text.upper().strip()) if text else "" # ========================================================= # VALIDATION FUNCTIONS # ========================================================= def validate_and_normalize_pincode(pincode): """ Validate and normalize pincode to exactly 6 digits Returns normalized pincode or None if invalid """ if not pincode: return None digits = re.sub(r'\D', '', str(pincode).strip()) if len(digits) == 6: return digits return None def validate_and_normalize_phone(phone): """ Validate and normalize phone to exactly 10 digits Handles formats: +91, 91-, 91, or plain 10 digits Returns normalized 10-digit phone or None if invalid """ if not phone: return None phone_str = str(phone).strip() # Remove common prefixes and separators phone_str = re.sub(r'^\+91[-\s]?', '', phone_str) phone_str = re.sub(r'^91[-\s]?', '', phone_str) phone_str = re.sub(r'^0[-\s]?', '', phone_str) digits = re.sub(r'\D', '', phone_str) if len(digits) == 10: return digits return None def validate_and_normalize_email(email): """ Validate and normalize email using regex Returns normalized email or None if invalid """ if not email: return None email_str = str(email).strip().upper() email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if re.match(email_pattern, email_str): return email_str return None return None def validate_and_normalize_pan(pan): """ Validate and normalize PAN (Permanent Account Number) Format: 5 letters, 4 digits, 1 letter (e.g., ABCDE1234F) """ if not pan: return None # Remove spaces and hyphens, convert to uppercase pan_str = str(pan).strip().upper() pan_str = re.sub(r'[\s-]', '', pan_str) # Check length if len(pan_str) != 10: return None # Regex validation pattern = r'^[A-Z]{5}[0-9]{4}[A-Z]{1}$' if re.match(pattern, pan_str): return pan_str return None # Verhoeff Algorithm Tables verhoeff_table_d = [ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [1, 2, 3, 4, 0, 6, 7, 8, 9, 5], [2, 3, 4, 0, 1, 7, 8, 9, 5, 6], [3, 4, 0, 1, 2, 8, 9, 5, 6, 7], [4, 0, 1, 2, 3, 9, 5, 6, 7, 8], [5, 9, 8, 7, 6, 0, 4, 3, 2, 1], [6, 5, 9, 8, 7, 1, 0, 4, 3, 2], [7, 6, 5, 9, 8, 2, 1, 0, 4, 3], [8, 7, 6, 5, 9, 3, 2, 1, 0, 4], [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] ] verhoeff_table_p = [ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [1, 5, 7, 6, 2, 8, 3, 0, 9, 4], [5, 8, 0, 3, 7, 9, 6, 1, 4, 2], [8, 9, 1, 6, 0, 4, 3, 5, 2, 7], [9, 4, 5, 3, 1, 2, 6, 8, 7, 0], [4, 2, 8, 6, 5, 7, 3, 9, 0, 1], [2, 7, 9, 3, 8, 0, 6, 4, 1, 5], [7, 0, 4, 6, 9, 1, 3, 2, 5, 8] ] verhoeff_table_inv = [0, 4, 3, 2, 1, 5, 6, 7, 8, 9] def validate_verhoeff(num): """Validate Verhoeff checksum for a given number string.""" c = 0 ll = list(map(int, reversed(num))) for i, item in enumerate(ll): c = verhoeff_table_d[c][verhoeff_table_p[i % 8][item]] return c == 0 def validate_and_normalize_aadhar(aadhar): """ Validate and normalize Aadhar Number using Verhoeff algorithm Format: 12 digits, last digit is checksum """ if not aadhar: return None # Remove spaces and hyphens aadhar_str = str(aadhar).strip() aadhar_str = re.sub(r'[\s-]', '', aadhar_str) # Check if all digits and length is 12 if aadhar_str.isdigit() and len(aadhar_str) == 12: # Prevent trivial sequences like 0000... or 1111... if desired, but Verhoeff usually catches invalid checksums. # However, 000000000000 is often invalid in practice, but Verhoeff of all 0s is 0. # Aadhar spec: "It is a 12 digit random number" - but checksum must hold. if validate_verhoeff(aadhar_str): return aadhar_str return None def normalize_dob(text: str) -> str: """ Extract and normalize date from text to DD-MM-YYYY format using regex. """ if not text: return None text = text.strip() text = re.sub(r'\s*([-/.])\s*', r'\1', text) text_lower = text.upper() month_names = { 'jan': '01', 'january': '01', 'feb': '02', 'february': '02', 'mar': '03', 'march': '03', 'apr': '04', 'april': '04', 'may': '05', 'jun': '06', 'june': '06', 'jul': '07', 'july': '07', 'aug': '08', 'august': '08', 'sep': '09', 'sept': '09', 'september': '09', 'oct': '10', 'october': '10', 'nov': '11', 'november': '11', 'dec': '12', 'december': '12', '1': '01', '2': '02', '3': '03', '4': '04', '5': '05', '6': '06', '7': '07', '8': '08', '9': '09' } def normalize_number(num_str: str) -> str: num = int(num_str) if 1 <= num <= 9: return f'0{num}' return str(num) def is_valid_year(year_str: str) -> bool: try: year = int(year_str) return 1900 <= year <= 2026 except ValueError: return False def validate_and_determine_format(first: str, second: str) -> tuple: """Determine if DD-MM or MM-DD format and return (month, day)""" try: first_int = int(first) second_int = int(second) except ValueError: return (None, None) if first_int < 1 or second_int < 1: return (None, None) # If first > 12, it must be day, so second is month if first_int > 12: if first_int > 31 or second_int > 12 or second_int < 1: return (None, None) return (normalize_number(second), normalize_number(first)) # (month, day) # If second > 12, it must be day, so first is month if second_int > 12: if second_int > 31 or first_int > 12 or first_int < 1: return (None, None) return (normalize_number(first), normalize_number(second)) # (month, day) # Both <= 12, ambiguous - assume DD-MM format (common in India) if first_int > 31 or second_int > 31: return (None, None) return (normalize_number(second), normalize_number(first)) # (month, day) # Pattern 1: YYYY-MM-DD or YYYY/MM/DD or YYYY.MM.DD or YYYY MM DD # Also handles YYYY-DD-MM when second > 12 (must be day, not month) match = re.search(r'(\d{4})[-\/\.\s](\d{1,2})[-\/\.\s](\d{1,2})', text) if match: year, second, third = match.groups() if not is_valid_year(year): # print(f"Invalid year detected: {year}") pass else: second_int = int(second) third_int = int(third) if second_int > 12 and 1 <= third_int <= 12: # second > 12 means it MUST be the day → YYYY-DD-MM day = normalize_number(second) month = normalize_number(third) elif 1 <= second_int <= 12: # Standard YYYY-MM-DD month = normalize_number(second) day = normalize_number(third) else: # Both > 12 or invalid — skip to next pattern day = None month = None if day and month: try: dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: # print(f"Invalid date: {day}-{month}-{year}") pass # Pattern 1.5: YYYY-MMM-DD or YYYY/MMM/DD or YYYY MMM DD (e.g., 2002-sept-30, 2002/Mar/15) match = re.search(r'(\d{4})[-\/\.\s]([a-z]{3,9})[-\/\.\s](\d{1,2})', text_lower) if match: year, month_str, day_str = match.groups() if not is_valid_year(year): # print(f"Invalid year detected: {year}") pass elif month_str in month_names: day = normalize_number(day_str) month = month_names[month_str] try: dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: # print(f"Invalid date: {day}-{month}-{year}") pass # Pattern 2: DD-MM-YYYY or DD/MM/YYYY or DD.MM.YYYY or DD MM YYYY match = re.search(r'\b(\d{1,2})[-\/\.\s](\d{1,2})[-\/\.\s](\d{4})\b', text) if match: first, second, year = match.groups() if not is_valid_year(year): # print(f"Invalid year detected: {year}") pass else: month, day = validate_and_determine_format(first, second) if month is None or day is None: return "Invalid DOB" try: # Correct datetime constructor: (year, month, day) dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: # print(f"Invalid date: {day}-{month}-{year}") pass # Pattern 3: DDMMYYYY (8 continuous digits) match = re.search(r'\b(\d{2})(\d{2})(\d{4})\b', text) if match: first, second, year = match.groups() if not is_valid_year(year): # print(f"Invalid year detected: {year}") pass else: month, day = validate_and_determine_format(first, second) if month is None or day is None: return "Invalid DOB" try: # Correct datetime constructor: (year, month, day) dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: # print(f"Invalid date: {day}-{month}-{year}") pass # Pattern 4: DD-MMM-YYYY or DD MMM YYYY match = re.search(r'\b(\d{1,2})[-\s]([a-z]{3,9})[-\s](\d{4})\b', text_lower) if match: day_str, month_str, year = match.groups() if not is_valid_year(year): # print(f"Invalid year detected: {year}") pass elif month_str in month_names: day = normalize_number(day_str) month = month_names[month_str] try: # Correct datetime constructor: (year, month, day) dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: # print(f"Invalid date: {day}-{month}-{year}") pass # Pattern 4.5: DDMMMYYYY or DDMMMYY (no separators) - e.g., 05Mar1992, 05MAR92 match = re.search(r'\b(\d{1,2})([a-z]{3,9})(\d{4}|\d{2})\b', text_lower) if match: day_str, month_str, year = match.groups() # Handle 2-digit year if len(year) == 2: year_int = int(year) if year_int >= 0 and year_int <= 26: year = f'20{year}' else: year = f'19{year}' if not is_valid_year(year): # print(f"Invalid year detected: {year}") pass elif month_str in month_names: day = normalize_number(day_str) month = month_names[month_str] try: dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: # print(f"Invalid date: {day}-{month}-{year}") pass # Pattern 5: MMM DD, YYYY or MONTH DD, YYYY or MMM-DD-YYYY (Mar 05, 1992 or sept-30-2000) match = re.search(r'\b([a-z]{3,9})[-\/\.\s](\d{1,2})[-\/\.\s,]+(\d{4})\b', text_lower) if match: month_str, day_str, year = match.groups() if not is_valid_year(year): # print(f"Invalid year detected: {year}") pass elif month_str in month_names: day = normalize_number(day_str) month = month_names[month_str] try: dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: # print(f"Invalid date: {day}-{month}-{year}") pass # Pattern 6: DD-MMM-YY (05-MAR-92) match = re.search(r'\b(\d{1,2})[-\s]([a-z]{3,9})[-\s](\d{2})\b', text_lower) if match: day_str, month_str, year_short = match.groups() # Convert 2-digit year to 4-digit year_int = int(year_short) if year_int >= 0 and year_int <= 26: year = f'20{year_short}' else: year = f'19{year_short}' if month_str in month_names: day = normalize_number(day_str) month = month_names[month_str] try: dt = datetime(int(year), int(month), int(day)) return f'{day}-{month}-{year}' except ValueError: print(f"Invalid date: {day}-{month}-{year}") pass return None # ========================================================= # PGEOCODE LOOKUP (offline after first run, cached) # ========================================================= _PGEOCODE_NOMI_INST = None _PGEOCODE_LOOKUP_CACHE: dict = {} def _get_pgeocode_inst(): """Return cached pgeocode.Nominatim("IN") instance.""" global _PGEOCODE_NOMI_INST if _PGEOCODE_NOMI_INST is None: try: import pgeocode as _pgeocode_lib _PGEOCODE_NOMI_INST = _pgeocode_lib.Nominatim("IN") logger.info("pgeocode loaded for India (offline pincode DB).") except Exception as e: logger.warning("pgeocode unavailable — pincode enrichment disabled: %s", e) return _PGEOCODE_NOMI_INST def lookup_pincode_info(pin: str) -> dict: """ Offline lookup of a 6-digit Indian pincode. Returns dict: {district, state, place, lat, lng} All values are strings (empty string if not found), lat/lng are float or None. Result is cached in memory after first call — no repeated disk/network I/O. """ if not pin: return {} pin_str = re.sub(r"\D", "", str(pin).strip()).zfill(6) if len(pin_str) != 6: return {} if pin_str in _PGEOCODE_LOOKUP_CACHE: return _PGEOCODE_LOOKUP_CACHE[pin_str] db = _get_pgeocode_inst() if db is None: _PGEOCODE_LOOKUP_CACHE[pin_str] = {} return {} try: row = db.query_postal_code(pin_str) if row is None: _PGEOCODE_LOOKUP_CACHE[pin_str] = {} return {} def _safe_str(val) -> str: if val is None: return "" try: if isinstance(val, float) and math.isnan(val): return "" except Exception: pass return str(val).strip() def _safe_float(val): try: f = float(val) return None if math.isnan(f) else f except Exception: return None result = { "district": _safe_str(getattr(row, "county_name", "")), "state": _safe_str(getattr(row, "state_name", "")), "place": _safe_str(getattr(row, "place_name", "")), "lat": _safe_float(getattr(row, "latitude", None)), "lng": _safe_float(getattr(row, "longitude", None)), } _PGEOCODE_LOOKUP_CACHE[pin_str] = result return result except Exception as e: logger.debug("pgeocode lookup error for %s: %s", pin_str, e) _PGEOCODE_LOOKUP_CACHE[pin_str] = {} return {} # ========================================================= # BANK / SYSTEM INTERNAL STATE CODES # Maps non-standard codes used by banks/systems to canonical # state names recognised by STATE_MAPPING. # ========================================================= _BANK_STATE_CODE_MAP: dict = { # Delhi internal codes "NDH": "DELHI", "SDH": "DELHI", "CDH": "DELHI", "EDH": "DELHI", "WDH": "DELHI", "NWD": "DELHI", "SWD": "DELHI", "NED": "DELHI", # City-based codes used as state "MUM": "MAHARASHTRA", "BOM": "MAHARASHTRA", "BLR": "KARNATAKA", "BNG": "KARNATAKA", "HYD": "TELANGANA", "SCB": "TELANGANA", "CHN": "TAMIL NADU", "MAD": "TAMIL NADU", "KOL": "WEST BENGAL","CAL": "WEST BENGAL", "PUN": "MAHARASHTRA","PCM": "MAHARASHTRA", "AHM": "GUJARAT", "AMD": "GUJARAT", "JAI": "RAJASTHAN", "LKO": "UTTAR PRADESH", "KNP": "UTTAR PRADESH", "PAT": "BIHAR", "RNC": "JHARKHAND", "BHU": "ODISHA", "GHY": "ASSAM", "CCU": "WEST BENGAL", # Dotted abbreviations sometimes seen "A.P.": "ANDHRA PRADESH", "A.P": "ANDHRA PRADESH", "T.N.": "TAMIL NADU", "T.N": "TAMIL NADU", "U.P.": "UTTAR PRADESH", "U.P": "UTTAR PRADESH", "M.P.": "MADHYA PRADESH", "M.P": "MADHYA PRADESH", "H.P.": "HIMACHAL PRADESH","H.P": "HIMACHAL PRADESH", "W.B.": "WEST BENGAL", "W.B": "WEST BENGAL", } def standardize_state(state_str): """ Standardize state names to canonical lowercase form. Handles: - Standard ISO abbreviations (AP, TS, KA …) - Full state names and common variants - Bank/system internal codes (NDH→DELHI, BLR→KARNATAKA …) - Dotted abbreviations (A.P., T.N. …) """ if not state_str: return None state_str = clean_text(state_str) if not state_str: return None normalized = state_str.strip() lookup_key = normalized.upper() # Check bank/system internal codes FIRST (before STATE_MAPPING) if lookup_key in _BANK_STATE_CODE_MAP: canonical = _BANK_STATE_CODE_MAP[lookup_key] # Now resolve canonical through STATE_MAPPING for full normalisation if STATE_MAPPING: std_name = lookup_from_mapping(canonical, STATE_MAPPING) if std_name: return std_name.upper() return canonical.upper() if STATE_MAPPING: std_name = lookup_from_mapping(lookup_key, STATE_MAPPING) if std_name: return std_name.upper() if not state_name_standard_df.empty: state_mappping_df = state_name_standard_df.copy() state_mappping_df.columns = state_mappping_df.columns.str.upper() state_name = replace_with_standard(lookup_key, state_mappping_df) if state_name != "" and state_name != lookup_key: return state_name.upper() return normalized def standardize_city(city_str): """ Standardize city names to canonical lowercase form. """ if not city_str: return None city_str = clean_text(city_str) if not city_str: return None normalized = city_str.strip() lookup_key = normalized.upper() if CITY_MAPPING: std_name = lookup_from_mapping(lookup_key, CITY_MAPPING) if std_name: return std_name.upper() if not city_prev_pres_df.empty: city_prev_pres_data = city_prev_pres_df.copy() city_prev_pres_data.columns = city_prev_pres_data.columns.str.upper() city_name = replace_with_standard(lookup_key, city_prev_pres_data) if city_name != "" and city_name != lookup_key: return city_name.upper() return normalized def standardize_column(text, column_name): """ Standardize field values to canonical lowercase form. """ if not text: return None if isinstance(text, str): text = clean_text(text) # includes lowercase if not text: return None column_lower = str(column_name).upper() if column_name else "" if "addressline" in column_lower: if not hno_variation_df.empty: try: address_df = hno_variation_df.copy() address_df.columns = address_df.columns.str.upper() # Lookup needs uppercase key but we return lowercase text_upper = text.upper() result = replace_with_standard(text_upper, address_df) text = result.upper() if result else text except Exception as e: pass if column_lower == 'pan': return validate_and_normalize_pan(text.upper() if text else text) elif column_lower == 'aadhar': return validate_and_normalize_aadhar(text) return text def standardize_dob(dob_str): if not dob_str: return None # NOTE: Do NOT apply data cleaning pipeline for DOB. # The cleaning pipeline is designed for text fields (names, addresses) # and corrupts date strings (e.g., '2002-sept-30' -> '2002-SESUB STRING'). # normalize_dob already handles all date parsing and normalization. raw_input = dob_str dob_str = normalize_dob(dob_str) # print(f"DOB: input='{raw_input}' -> normalized='{dob_str}'") return dob_str # ========================================================= # FIELD COMPARISON FUNCTIONS # ========================================================= def compare_exact(val1, val2): """Exact match (case-insensitive)""" if not val1 or not val2: return 0 # print("dob1 value",val1) # print("dob2 value",val2) v1 = str(val1).strip().upper() v2 = str(val2).strip().upper() return 100 if v1 == v2 else 0 def compare_any_match(list1, list2, field_type="pincode"): """ 1:N matching for lists of values (pincodes, states, cities) Returns 100 if any value in list1 matches any value in list2 """ valid_list1 = [v for v in list1 if v and str(v).strip() not in ["", "-", " "]] valid_list2 = [v for v in list2 if v and str(v).strip() not in ["", "-", " "]] if not valid_list1 or not valid_list2: return 0 # Normalize based on field type if field_type == "pincode": normalized_list1 = [validate_and_normalize_pincode(v) for v in valid_list1] normalized_list2 = [validate_and_normalize_pincode(v) for v in valid_list2] elif field_type == "state": normalized_list1 = [standardize_state(v) for v in valid_list1] normalized_list2 = [standardize_state(v) for v in valid_list2] elif field_type == "city": normalized_list1 = [standardize_city(v) for v in valid_list1] normalized_list2 = [standardize_city(v) for v in valid_list2] elif field_type == "dob" or field_type == "birthdate": normalized_list1 = [standardize_dob(v) for v in valid_list1] normalized_list2 = [standardize_dob(v) for v in valid_list2] else: normalized_list1 = [str(v).strip().upper() for v in valid_list1] normalized_list2 = [str(v).strip().upper() for v in valid_list2] normalized_list1 = [v for v in normalized_list1 if v] normalized_list2 = [v for v in normalized_list2 if v] if not normalized_list1 or not normalized_list2: return 0 for v1 in normalized_list1: if v1 in normalized_list2: return 100 return 0 def compare_phone_any_match(phones1, phones2): """1:N matching for phone numbers""" valid_phones1 = [validate_and_normalize_phone(p) for p in phones1 if p] valid_phones2 = [validate_and_normalize_phone(p) for p in phones2 if p] valid_phones1 = [p for p in valid_phones1 if p] valid_phones2 = [p for p in valid_phones2 if p] if not valid_phones1 or not valid_phones2: return 0 for p1 in valid_phones1: if p1 in valid_phones2: return 100 return 0 def compare_email_any_match(emails1, emails2): """1:N matching for email addresses""" valid_emails1 = [validate_and_normalize_email(e) for e in emails1 if e] valid_emails2 = [validate_and_normalize_email(e) for e in emails2 if e] valid_emails1 = [e for e in valid_emails1 if e] valid_emails2 = [e for e in valid_emails2 if e] if not valid_emails1 or not valid_emails2: return 0 for e1 in valid_emails1: if e1 in valid_emails2: return 100 return 0 # ========================================================= # MATCHING RULES # ========================================================= def evaluate_matching_rules(field_scores: Dict[str, float]) -> tuple: """ Evaluate matching rules and return overall decision Returns: (decision, reason) """ def get_score(field_name): return field_scores.get(field_name, 0) def rule_satisfied(conditions): for field, threshold in conditions: if get_score(field) < threshold: return False return True # Matching rules in priority order RULES = MATCHING_RULES for conditions, reason in RULES: if rule_satisfied(conditions): return "Match", reason return "No Match", "None of the defined matching rules were satisfied" # ========================================================= # PATTERN-BASED FIELD MATCHING # ========================================================= def apply_pattern_matching_logic(field_name: str, score) -> float: """ Apply 0 or 100 logic for pattern-based fields """ PATTERN_FIELDS = { "BIRTHDATE", "PHONE", "EMAIL", "ZIPCODE", "TAXID", "LICENSEID", "PASSPORTID", "GENDER", "AADHAR", "PAN" } if score == "missing value": return 0 if field_name in PATTERN_FIELDS: return 100 if score >= 100 else 0 return score def roman_to_number(text): """Convert Roman numerals to Arabic numbers in text""" if not text or not isinstance(text, str): return str(text) if text else "" # Always return a string def roman_to_int(roman): roman = roman.upper() # Strict Roman Numeral Regex # M (1000), CM (900), D (500), CD (400), C (100), XC (90), L (50), XL (40), X (10), IX (9), V (5), IV (4), I (1) # Repeated characters allowed up to 3 times for I, X, C, M. # V, L, D cannot be repeated. strict_regex = r"^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$" if not re.fullmatch(strict_regex, roman): return None values = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000} total = 0 prev = 0 for ch in reversed(roman): val = values.get(ch, 0) if val < prev: total -= val else: total += val prev = val return total # Only match if surrounded by boundaries, and ensure it's a plausible Roman numeral # Filter out common false positives by checking against strict regex inside the replacement function pattern = r'\b([IVXLCDM]+)\b' def replace_roman(match): roman = match.group(1) # Skip if it's likely a word (e.g., "MIX", "DIV", "VILL", "MILL") # But Strict Regex should handle "VILL" (LL invalid), "MILL" (LL invalid), "DIV" (IV valid, D valid... DIV? D=500, IV=4. 504? No, value order. D > I < V. Good.) # Strict regex logic: # V I L L -> L, L invalid repetition for 50. number = roman_to_int(roman) return str(number) if number is not None else roman replaced_roman = re.sub(pattern, replace_roman, text, flags=re.IGNORECASE) return replaced_roman def normalize_and_deduplicate_address(text): """Remove duplicate words from entire address while preserving order""" if not text or not isinstance(text, str): return "" segments = text.split(',') seen = set() deduplicated_segments = [] for segment in segments: words = segment.strip().split() unique_words = [] for word in words: key = word.upper() if key not in seen: seen.add(key) unique_words.append(word) if unique_words: deduplicated_segments.append(" ".join(unique_words)) return " ".join(deduplicated_segments) def extract_leading_house_number(segment, street_keywords): """Extract house number if it appears as the FIRST token""" tokens = segment.strip().split() if len(tokens) < 1: return None first = tokens[0].upper() if not re.fullmatch(r"[A-Z]?\d+[A-Z]?", first): return None if len(tokens) >= 2: second = tokens[1].upper() keywords_list = [street_keywords] if isinstance(street_keywords, str) else street_keywords if second in [kw.upper() for kw in keywords_list]: return None return first def is_street_context(text, match_start, street_keywords): """Check if a match occurs near street keywords""" window = text[max(0, match_start - 20):match_start] keywords_list = [street_keywords] if isinstance(street_keywords, str) else street_keywords for kw in keywords_list: if re.search(rf"\b{re.escape(kw)}\b", window, re.IGNORECASE): return True return False def extract_component_with_hierarchy(text, identifier, value_patterns=None, street_keywords=None): """ Hierarchical extraction working directly on full address string. No comma-based segmentation. Returns: (extracted_value, identifier_found, pattern_value) """ if not text: return None, None, None # PRIORITY 1: Identifier + Pattern match if identifier: id_match = re.search(rf"\b{re.escape(identifier)}\b", text, re.IGNORECASE) if id_match: if value_patterns: # Search for pattern AFTER the identifier text_after_id = text[id_match.end():] for pattern in value_patterns: m = re.search(pattern, text_after_id, re.IGNORECASE) if m: return m.group(0).strip(), identifier, m.group(0).strip() # Identifier found but no pattern matched — return identifier found signal return None, identifier, None else: # No pattern needed, extract everything after identifier till delimiter text_after = text[id_match.end():].strip() # Take until next comma or end value = re.split(r"[,]", text_after)[0].strip() return value if value else None, identifier, None # PRIORITY 2: Leading house number heuristic (only for house extraction) if street_keywords is not None: leading = extract_leading_house_number(text, street_keywords) if leading: return leading, None, leading # PRIORITY 3: Pattern-only match if value_patterns: for pattern in value_patterns: for match in re.finditer(pattern, text, re.IGNORECASE): extracted_value = match.group(0).strip() if street_keywords: if is_street_context(text, match.start(), street_keywords): continue return extracted_value, None, extracted_value return None, None, None def remove_matched_text(text, identifier=None, pattern_value=None): """ Remove identifier and/or pattern value directly from full address string. Rules: - If identifier present AND pattern matched: remove both - If identifier present but no pattern matched: remove identifier only - If no identifier, only pattern matched: remove pattern value only """ if not text: return "" result = text if identifier: result = re.sub( rf"\b{re.escape(identifier)}\b[\s#.:/-]*", " ", result, flags=re.IGNORECASE ) if pattern_value: result = re.sub( rf"\b{re.escape(pattern_value)}\b[\s#.:/-]*", " ", result, flags=re.IGNORECASE ) # Cleanup result = re.sub(r"\s{2,}", " ", result).strip() result = re.sub(r"^[,\s]+|[,\s]+$", "", result) return result def extract_address_components(address_line: str) -> dict: """ Master extraction function — no comma segmentation. Works directly on full address string. """ empty_result = { "original_address": "", "house_number": None, "house_segment": None, "flat_number": None, "flat_segment": None, "apartment": None, "apartment_segment": None, "street": None, "street_segment": None, "remaining_address": "" } if not address_line: return empty_result address_line = clean_text(str(address_line)) if not address_line: return empty_result original_address = address_line remaining = address_line # Step 1: Roman numeral conversion remaining = roman_to_number(remaining) remaining = str(remaining) if remaining else "" # ── Augment street keywords to always block GALI NO / LANE NO style phrases ─ _base_kws = [STREET_KEYWORD] if isinstance(STREET_KEYWORD, str) else list(STREET_KEYWORD) _street_kws = list(dict.fromkeys( _base_kws + ["GALI NO", "LANE NO", "GALI", "GALLI"] )) # ── Patterns ────────────────────────────────────────────────────────────── house_patterns = [ r"\b(MIG|HIG|LIG)-\d+[a-zA-Z]?\b", r"\b\d+(?:-\d+){2,}[a-zA-Z]?\b", r"\b\d+-\d+/\d+[a-zA-Z]?\b", r"\b\d+-\d+/[a-zA-Z]\b", r"\b\d+-\d+/\d+\b", r"\b\d+/\d+(?:/\d+)?\s?[a-zA-Z]?\b", r"\b[a-zA-Z]{1,3}/\d+[a-zA-Z]?\b", r"\b\d+-\d+[a-zA-Z]\b", r"\b\d+-\d+\b", r"\b[a-zA-Z]{1,2}-?\d+[a-zA-Z]?\b", r"\b\d+[a-zA-Z]\b", r"\b\d{1,4}\b", ] flat_patterns = [ r"\b\d+[a-zA-Z]?\b", r"\b[a-zA-Z]-?\d+\b", ] # ── 1. HOUSE NUMBER ─────────────────────────────────────────────────────── house_no, house_id_found, house_pat_val = extract_component_with_hierarchy( remaining, HOUSE_NUMBER_IDENTIFIER, house_patterns, _street_kws ) house_segment = None if house_id_found or house_pat_val: house_segment = remaining # record full text at time of extraction remaining = remove_matched_text(remaining, house_id_found, house_pat_val) remaining = str(remaining) if remaining else "" # ── 2. FLAT NUMBER ──────────────────────────────────────────────────────── # Pass street_keywords so numbers inside GALI NO / LANE NO / etc. are blocked flat_no, flat_id_found, flat_pat_val = extract_component_with_hierarchy( remaining, FLAT_NUMBER_IDENTIFIER, flat_patterns, street_keywords=_street_kws ) flat_segment = None if flat_id_found or flat_pat_val: flat_segment = remaining remaining = remove_matched_text(remaining, flat_id_found, flat_pat_val) remaining = str(remaining) if remaining else "" # ── 3. APARTMENT/BUILDING ───────────────────────────────────────────────── apartment, apt_id_found, apt_pat_val = extract_component_with_hierarchy( remaining, APARTMENT_IDENTIFIER ) apartment_segment = None if apt_id_found or apt_pat_val: apartment_segment = remaining remaining = remove_matched_text(remaining, apt_id_found, apt_pat_val) remaining = str(remaining) if remaining else "" # ── 4. STREET ───────────────────────────────────────────────────────────── street, street_id_found, street_pat_val = extract_component_with_hierarchy( remaining, STREET_KEYWORD, street_keywords=_street_kws ) street_segment = None if street_id_found or street_pat_val: street_segment = remaining remaining = remove_matched_text(remaining, street_id_found, street_pat_val) remaining = str(remaining) if remaining else "" # ── Final cleanup ───────────────────────────────────────────────────────── # Step 1: remove "GALI NO 3A" style — keyword + its value together remaining = re.sub( r'\b(GALI|LANE|CROSS|MAIN)\s+NO\s+[A-Z0-9][A-Z0-9\-]*\b[\s,]*', ' ', remaining, flags=re.IGNORECASE ) # Step 2: remove bare "GALI NO" / "LANE NO" with no value following remaining = re.sub( r'\b(GALI|LANE|ROAD|MARG|STREET|CROSS|MAIN)\s+NO\b[\s,]*', ' ', remaining, flags=re.IGNORECASE ) remaining = re.sub(r"\s+", " ", remaining).strip() remaining = re.sub(r"^[,\s]+|[,\s]+$", "", remaining) remaining = normalize_and_deduplicate_address(remaining) print(f"[EXTRACT] house_no : {house_no!r} | segment: {house_segment!r}") print(f"[EXTRACT] flat_no : {flat_no!r} | segment: {flat_segment!r}") print(f"[EXTRACT] apartment : {apartment!r} | segment: {apartment_segment!r}") print(f"[EXTRACT] street : {street!r} | segment: {street_segment!r}") print(f"[EXTRACT] remaining_addr: {remaining!r}") return { "original_address": original_address, "house_number": house_no, "house_segment": house_segment, "flat_number": flat_no, "flat_segment": flat_segment, "apartment": apartment, "apartment_segment": apartment_segment, "street": street, "street_segment": street_segment, "remaining_address": remaining if remaining else "" } # ========================================================= # STRUCTURED ADDRESS MATCHING # Handles data format: separate ADDRESSLINE / CITY / ZIPCODE / STATE columns # ========================================================= # Non-HNO structural numbers (sector/ward/phase) — never treat as house no _STRUCT_NON_HNO = re.compile( r'\b(sector|ward|phase|block|zone|gali\s*no|gali\s*number|lane\s*no)\s*' r'[:\-]?\s*(\d+[A-Z]?)', re.IGNORECASE, ) # Keyword-based house number extractor _STRUCT_HNO_KW = re.compile( r'\b(?:d\.?\s*no\.?|door\s*no\.?|h\.?\s*no\.?|house\s*no\.?|' r'house\s*number|plot\s*no\.?|flat\s*no\.?|flat\s*number|' r'mig\s*no\.?|hig\s*no\.?|lig\s*no\.?|' r'khata\s*no\.?|khasra\s*no\.?)' r'\s*[:\-]?\s*([A-Z0-9][A-Z0-9\-/]*)', re.IGNORECASE, ) def extract_house_number_from_addressline(text: str) -> str: """ Extract house/door number from a raw addressline string. No keyword required — uses pattern priority: 1. Explicit keyword (H.No, D.No, House No, Flat No …) 2. Compound formats: 2-6-116, 144/143, MIG-25, 1-180a 3. Simple alpha-numeric: 12B, A-110 Excludes sector/ward/phase numbers. Returns normalised uppercase string or empty string. """ if not text: return "" excluded = {m.group(2).strip().upper() for m in _STRUCT_NON_HNO.finditer(text)} # Priority 1: keyword m = _STRUCT_HNO_KW.search(text) if m: val = m.group(1).strip().upper() if val not in excluded: return val # Priority 2 & 3: patterns most-specific first _pats = [ r'\b((?:MIG|HIG|LIG)-\d+[A-Z]?)\b', r'\b(\d+(?:-\d+){2,}[A-Z]?)\b', r'\b(\d+-\d+/\d+[A-Z]?)\b', r'\b(\d+/\d+(?:/\d+)?[A-Z]?)\b', r'\b([A-Z]{1,3}/\d+[A-Z]?)\b', r'\b(\d+-\d+[A-Z]?)\b', r'\b([A-Z]-?\d+[A-Z]?)\b', r'\b(\d+[A-Z])\b', ] for pat in _pats: for m in re.finditer(pat, text, re.IGNORECASE): val = m.group(1).strip().upper() if val not in excluded: before = text[:m.start()].upper() if not re.search(r'\b(sector|ward|phase|block|zone|gali)\s*$', before): return val return "" class _StructuredAddressRecord: """ Internal helper: holds one address record with separate column values. Enriches missing state/city from zipcode via pgeocode. Extracts all address components (house_number, flat_number, apartment, street) and stores the remaining address (all components removed) for model input. """ __slots__ = ('raw_addressline', 'raw_city', 'raw_zipcode', 'raw_state', 'addressline', 'city', 'state', 'zipcode', 'pgeocode_info', 'house_number', 'flat_number', 'apartment', 'street') def __init__(self, addressline="", city="", zipcode="", state=""): self.raw_addressline = str(addressline or "").strip() self.raw_city = str(city or "").strip() self.raw_zipcode = str(zipcode or "").strip() self.raw_state = str(state or "").strip() self.addressline = "" self.city = "" self.state = "" self.zipcode = "" self.pgeocode_info = {} self.house_number = "" self.flat_number = "" self.apartment = "" self.street = "" self._enrich() @staticmethod def _norm(val): """Normalize extracted component: strip non-alphanumerics and spaces.""" if not val: return "" return re.sub(r'[^A-Z0-9]', '', str(val).upper()) def _enrich(self): # Addressline — full preprocessing pipeline preprocessed = preprocess_address(self.raw_addressline).upper() if self.raw_addressline else "" # Zipcode — digits only, must be 6 pin_clean = re.sub(r'\D', '', self.raw_zipcode) self.zipcode = pin_clean if len(pin_clean) == 6 else "" # State — canonical form self.state = standardize_state(self.raw_state) or "" # City — canonical form self.city = standardize_city(self.raw_city) or "" # Extract all address components from raw addressline components = extract_address_components(self.raw_addressline) self.house_number = self._norm(components.get("house_number")) self.flat_number = self._norm(components.get("flat_number")) self.apartment = self._norm(components.get("apartment")) self.street = self._norm(components.get("street")) # Model input = remaining address after all components removed remaining = components.get("remaining_address", "").strip() self.addressline = remaining if remaining else preprocessed # pgeocode enrichment — fill missing state/city from pincode if self.zipcode: self.pgeocode_info = lookup_pincode_info(self.zipcode) if not self.state and self.pgeocode_info.get("state"): self.state = standardize_state(self.pgeocode_info["state"]) or "" if not self.city and self.pgeocode_info.get("district"): self.city = standardize_city(self.pgeocode_info["district"]) or "" def match_structured_address_fields( addressline1: str, city1: str, zipcode1: str, state1: str, addressline2: str, city2: str, zipcode2: str, state2: str, ) -> dict: """ Match two address records provided as already-split column values (ADDRESSLINE, CITY, ZIPCODE, STATE). Address component scoring (applied only when remaining address base_score > 60): house_number : match → +30, mismatch → -30 flat_number : match → +10, mismatch → -10 street : match → +10, mismatch → -10 apartment : match → +10, mismatch → -10 (missing on either side → no adjustment for that component) If base_score <= 60, component adjustments are NOT applied. """ from rapidfuzz import fuzz as _rfuzz r1 = _StructuredAddressRecord(addressline1, city1, zipcode1, state1) r2 = _StructuredAddressRecord(addressline2, city2, zipcode2, state2) # ── Zipcode ────────────────────────────────────────────── if r1.zipcode and r2.zipcode: if r1.zipcode == r2.zipcode: zip_cmp = {"verdict": "match", "adjustment": 20.0, "z1": r1.zipcode, "z2": r2.zipcode} else: zip_cmp = {"verdict": "mismatch", "adjustment": -25.0, "z1": r1.zipcode, "z2": r2.zipcode} else: zip_cmp = {"verdict": "missing", "adjustment": 0.0, "z1": r1.zipcode, "z2": r2.zipcode} # ── State ──────────────────────────────────────────────── s1, s2 = r1.state, r2.state if s1 and s2: if s1 == s2: state_cmp = {"verdict": "match", "adjustment": 10.0, "s1": s1, "s2": s2} else: state_cmp = {"verdict": "mismatch", "adjustment": -20.0, "s1": s1, "s2": s2} else: state_cmp = {"verdict": "missing", "adjustment": 0.0, "s1": s1, "s2": s2} # ── City ───────────────────────────────────────────────── c1, c2 = r1.city, r2.city if c1 and c2: sim = _rfuzz.token_set_ratio(c1, c2) if sim >= 85: city_cmp = {"verdict": "match", "adjustment": 10.0, "c1": c1, "c2": c2, "similarity": sim} elif sim >= 60: city_cmp = {"verdict": "partial", "adjustment": 3.0, "c1": c1, "c2": c2, "similarity": sim} else: city_cmp = {"verdict": "mismatch","adjustment":-10.0, "c1": c1, "c2": c2, "similarity": sim} else: city_cmp = {"verdict": "missing", "adjustment": 0.0, "c1": c1, "c2": c2, "similarity": 0} # ── Base addressline text similarity (on remaining address) ────────────── t1, t2 = r1.addressline, r2.addressline if t1 and t2: try: from services.model import match_entities from services.config import ADDRESS_MODEL_WEIGHTS base_score = float(match_entities(t1, t2, weights=ADDRESS_MODEL_WEIGHTS)) except Exception: base_score = float(max( _rfuzz.token_set_ratio(t1, t2), _rfuzz.WRatio(t1, t2), _rfuzz.ratio(t1, t2), )) else: base_score = 0.0 # ── Per-component comparison (boost/penalty only when base_score > 60) ─── def _compare_component(v1, v2, boost, penalty): """Compare two normalized component values. Returns result dict.""" if v1 and v2: if v1 == v2: return {"verdict": "match", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty} else: return {"verdict": "mismatch", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty} return {"verdict": "missing", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty} hno_cmp = _compare_component(r1.house_number, r2.house_number, boost=30.0, penalty=30.0) flat_cmp = _compare_component(r1.flat_number, r2.flat_number, boost=10.0, penalty=10.0) apt_cmp = _compare_component(r1.apartment, r2.apartment, boost=10.0, penalty=10.0) str_cmp = _compare_component(r1.street, r2.street, boost=10.0, penalty=10.0) # Apply component adjustments only when remaining address score > 60 comp_adj = 0.0 print(f"[ADDR_COMPONENTS] base_score={base_score:.2f} | threshold=60 | adjustments_applied={base_score > 60}") print(f" remaining_addr1 : {r1.addressline!r}") print(f" remaining_addr2 : {r2.addressline!r}") for cmp, label in [ (hno_cmp, "house_number"), (flat_cmp, "flat_number"), (apt_cmp, "apartment"), (str_cmp, "street"), ]: verdict = cmp["verdict"] v1, v2 = cmp.get("v1", ""), cmp.get("v2", "") if verdict == "missing": print(f" {label:<15} | verdict=missing | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [skipped - component absent]") elif base_score <= 60: sign = "+" if verdict == "match" else "-" pts = cmp["boost"] if verdict == "match" else cmp["penalty"] print(f" {label:<15} | verdict={verdict:<9} | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [SKIPPED - base_score<=60]") else: if verdict == "match": adj = cmp["boost"] comp_adj += adj print(f" {label:<15} | verdict=match | v1={v1!r:>10} v2={v2!r:<10} | adjustment=+{adj:.1f} [BOOSTED]") else: adj = cmp["penalty"] comp_adj -= adj print(f" {label:<15} | verdict=mismatch | v1={v1!r:>10} v2={v2!r:<10} | adjustment=-{adj:.1f} [PENALISED]") print(f" total comp_adj : {comp_adj:+.1f}") # ── Accumulate and cap ─────────────────────────────────── total_adj = (zip_cmp["adjustment"] + state_cmp["adjustment"] + city_cmp["adjustment"] + comp_adj) final_score = max(0.0, min(100.0, base_score + total_adj)) # ── Notes ──────────────────────────────────────────────── notes = [] for cmp, key, v1k, v2k in [ (zip_cmp, "zipcode", "z1", "z2"), (state_cmp, "state", "s1", "s2"), (city_cmp, "city", "c1", "c2"), ]: v = cmp["verdict"] if v == "match": notes.append(f"{key} match ({cmp.get(v1k,'')})") elif v == "mismatch": notes.append(f"{key} MISMATCH ({cmp.get(v1k,'')} ≠ {cmp.get(v2k,'')})") for cmp, key in [(hno_cmp, "house_no"), (flat_cmp, "flat_no"), (apt_cmp, "apartment"), (str_cmp, "street")]: v = cmp["verdict"] if v == "match": notes.append(f"{key} match ({cmp['v1']})") elif v == "mismatch": notes.append(f"{key} MISMATCH ({cmp['v1']} ≠ {cmp['v2']})" + (" [applied]" if base_score > 60 else " [skipped, base<=60]")) return { "final_score": round(final_score, 2), "base_score": round(base_score, 2), "adjustment": round(total_adj, 2), "comp_adjustment": round(comp_adj, 2), "zipcode": zip_cmp, "state": state_cmp, "city": city_cmp, "house_number": hno_cmp, "flat_number": flat_cmp, "apartment": apt_cmp, "street": str_cmp, "record1": { "addressline": r1.addressline, "city": r1.city, "state": r1.state, "zipcode": r1.zipcode, "house_number": r1.house_number or None, "flat_number": r1.flat_number or None, "apartment": r1.apartment or None, "street": r1.street or None, "pgeocode": r1.pgeocode_info, }, "record2": { "addressline": r2.addressline, "city": r2.city, "state": r2.state, "zipcode": r2.zipcode, "house_number": r2.house_number or None, "flat_number": r2.flat_number or None, "apartment": r2.apartment or None, "street": r2.street or None, "pgeocode": r2.pgeocode_info, }, "notes": notes, } def match_structured_address_lists( addrs1: list, addrs2: list, ) -> float: """ Match N address dicts from record1 against M from record2. Each dict: {addressline, city, zipcode, state}. Returns best score across all N×M combinations (0-100). """ if not addrs1 or not addrs2: return 0.0 best = 0.0 for a1 in addrs1: for a2 in addrs2: r = match_structured_address_fields( a1.get("addressline", ""), a1.get("city", ""), a1.get("zipcode", ""), a1.get("state", ""), a2.get("addressline", ""), a2.get("city", ""), a2.get("zipcode", ""), a2.get("state", ""), ) if r["final_score"] > best: best = r["final_score"] return round(best, 2)