| import re |
| from datetime import datetime |
| from typing import List, Dict |
| import pandas as pd |
| import logging |
| try: |
| import pgeocode |
| except ImportError: |
| pgeocode = None |
| import math |
|
|
| logger = logging.getLogger("rules") |
|
|
| from services.config import ( |
| config, |
| APARTMENT_IDENTIFIER, |
| FLAT_NUMBER_IDENTIFIER, |
| HOUSE_NUMBER_IDENTIFIER, |
| STREET_KEYWORD, |
| name_variation_df, |
| hno_variation_df, |
| city_prev_pres_df, |
| state_name_standard_df, |
| sur_comm_names_df, |
| pin_city_state_df, |
| CITY_MAPPING, |
| STATE_MAPPING, |
| MATCHING_RULES |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def clean_text(text): |
| """ |
| Simple text cleaning for all input values: |
| 1. Strip leading/trailing whitespace |
| 2. Remove HTML tags and HTML entities |
| 3. Remove non-printable/control characters and unicode artifacts |
| 4. Collapse multiple whitespace into single space |
| 5. Convert to lowercase |
| """ |
| if not text or not isinstance(text, str): |
| return "" if text is None else text |
|
|
| |
| text = re.sub(r'<[^>]+>', ' ', text) |
|
|
| |
| text = re.sub(r'&(?:#\d+|#x[0-9a-fA-F]+|[a-zA-Z]+);', ' ', text) |
|
|
| |
| text = re.sub(r'[^\x20-\x7E]', '', text) |
|
|
| |
| text = re.sub(r'\s+', ' ', text) |
|
|
| |
| text = text.strip() |
|
|
| |
| text = text.upper() |
|
|
| return text |
|
|
|
|
| logger.info("Using simple text cleaning (no regex/keyword pipeline)") |
|
|
|
|
| |
| |
| |
|
|
|
|
| |
| NAME_TITLES = { |
| "dr", "mr", "mrs", "ms", "miss", "master", |
| "m/s", "sri", "sree", "shri", "shree", |
| "smt", "shrimati", "kumari", |
| "prof", "late", |
| "er", "adv", "ca", |
| "capt", "col", "lt", "major", "brig", "brigadier", |
| "cmdr", "commander", "wingcmdr", "groupcapt", |
| "justice", "judge", "cj", "chiefjustice", |
| "ias", "ips", "ifs", |
| "pt", "pandit", "swami", "guru", "maulana", "maulvi", |
| "haji", "haj", "imam", "maharaj", |
| "sardar", |
| "phd", "md", "dphil", |
| } |
|
|
| |
| _TITLE_PATTERN = re.compile( |
| r'^(dr\.?|mr\.?|mrs\.?|ms\.?|miss|m/s\.?|sri|sree|shri|shree|' |
| r'smt\.?|prof\.?|late|er|adv|ca|capt|col|lt|major|justice|' |
| r'shrimati|kumari|master|brig|brigadier|cmdr|commander|wingcmdr|' |
| r'groupcapt|judge|cj|chiefjustice|ias|ips|ifs|pt|pandit|swami|' |
| r'guru|maulana|maulvi|haji|haj|imam|maharaj|sardar|phd|dphil)\s*', |
| re.IGNORECASE |
| ) |
|
|
| |
| _RELATIONAL_PATTERNS = re.compile( |
| r'\b(?:s/o|d/o|w/o|h/o|c/o|g/o|' |
| r'son\s+of|daughter\s+of|wife\s+of|husband\s+of|care\s+of|guardian\s+of|' |
| r'so|do|wo|ho|co|go)\b', |
| re.IGNORECASE |
| ) |
|
|
|
|
| def remove_name_titles(text): |
| """ |
| Remove title prefixes and suffixes from name. |
| Handles both space-separated ("mr rajesh") and dot-attached ("dr.rajesh"). |
| """ |
| if not text: |
| return "" |
| |
| tokens = text.upper().split() |
| |
| while tokens and tokens[0].rstrip('.') in NAME_TITLES: |
| tokens.pop(0) |
| |
| while tokens and tokens[-1].rstrip('.') in NAME_TITLES: |
| tokens.pop() |
| text = " ".join(tokens) |
| |
| text = _TITLE_PATTERN.sub('', text) |
| return text.strip() |
|
|
|
|
| def remove_relational_prefixes(text): |
| """ |
| Remove relational prefixes from names. |
| "sita w/o ram" β "sita ram" |
| "anil s/o suresh" β "anil suresh" |
| """ |
| if not text: |
| return "" |
| text = _RELATIONAL_PATTERNS.sub(' ', text) |
| text = re.sub(r'\s+', ' ', text).strip() |
| return text |
|
|
|
|
| def remove_non_alpha_trailing(text): |
| """ |
| Remove non-alpha trailing content from names. |
| "anil kumar 1/05/1985" β "anil kumar" |
| "rajesh 12345" β "rajesh" |
| Keeps only alphabetic tokens from the name. |
| """ |
| if not text: |
| return "" |
| tokens = text.split() |
| cleaned = [] |
| for token in tokens: |
| |
| if re.search(r'[a-zA-Z]', token): |
| |
| alpha_only = re.sub(r'[^a-zA-Z]', '', token) |
| if alpha_only: |
| cleaned.append(alpha_only) |
| return " ".join(cleaned) |
|
|
|
|
| def deduplicate_tokens(text): |
| """ |
| Remove repeated tokens, keeping only unique ones in order. |
| "rajesh kumar rajesh" β "rajesh kumar" |
| """ |
| if not text: |
| return "" |
| tokens = text.split() |
| seen = set() |
| unique = [] |
| for t in tokens: |
| key = t.lower() |
| if key not in seen: |
| seen.add(key) |
| unique.append(t) |
| return " ".join(unique) |
|
|
|
|
| def deduplicate_consecutive_tokens(text): |
| """ |
| Remove only consecutively repeated tokens (for addresses). |
| "mg road mg road bangalore" β "mg road bangalore" |
| "mg road bangalore mg road" stays as-is (non-consecutive) |
| """ |
| if not text: |
| return "" |
| tokens = text.split() |
| if not tokens: |
| return "" |
| result = [tokens[0]] |
| for t in tokens[1:]: |
| if t.upper() != result[-1].upper(): |
| result.append(t) |
| return " ".join(result) |
|
|
|
|
| def collapse_repeated_chars(text): |
| """ |
| Preprocessing step: |
| 1. Collapse 3+ consecutive identical alpha characters to 2 (typo fix) |
| e.g., "MOHAMMMED" β "MOHAMMED", "SHARRMA" β "SHARMA" |
| 2. Replace consecutive non-alphanumeric chars with single space |
| e.g., "---" β " ", "..." β " " |
| """ |
| if not text: |
| return "" |
| |
| text = re.sub(r'([a-zA-Z])\1{2,}', r'\1\1', text) |
| |
| text = re.sub(r'[^a-zA-Z0-9\s]{2,}', ' ', text) |
| |
| text = re.sub(r'\s+', ' ', text) |
| return text.strip() |
|
|
|
|
| def strip_non_alphanumeric(text): |
| """ |
| Remove non-alphanumeric characters from text, keeping spaces. |
| Used for address cleanup before sending to embedding model. |
| """ |
| if not text: |
| return "" |
| text = re.sub(r'[^a-zA-Z0-9\s]', ' ', text) |
| text = re.sub(r'\s+', ' ', text) |
| return text.strip() |
|
|
|
|
| def is_subset_match(tokens1, tokens2): |
| """ |
| Check if all tokens of one name are a complete subset of the other. |
| Returns True if name1 tokens β name2 tokens or vice versa. |
| e.g., ["rajesh", "kumar"] β ["rajesh", "kumar", "sharma"] β True |
| """ |
| if not tokens1 or not tokens2: |
| return False |
| set1 = {t.upper() for t in tokens1} |
| set2 = {t.upper() for t in tokens2} |
| return set1.issubset(set2) or set2.issubset(set1) |
|
|
|
|
| def standardize_name_variations(text): |
| """ |
| Replace name token variations with standard forms using name_variation_standard.csv. |
| Iterates through each token and checks if it exists as a variation. |
| e.g., "mohommed" β "mohammad" |
| """ |
| if not text or name_variation_df.empty: |
| return text if text else "" |
|
|
| |
| if not hasattr(standardize_name_variations, '_lookup'): |
| lookup = {} |
| for _, row in name_variation_df.iterrows(): |
| var = str(row.get('VARIATION', '')).strip().upper() |
| std = str(row.get('STANDARD', '')).strip().upper() |
| if var and std: |
| lookup[var] = std |
| standardize_name_variations._lookup = lookup |
|
|
| lookup = standardize_name_variations._lookup |
| tokens = text.upper().split() |
| result = [] |
| for token in tokens: |
| result.append(lookup.get(token, token)) |
| return " ".join(result) |
|
|
|
|
|
|
|
|
| |
| |
| |
| |
|
|
| |
|
|
| |
| |
| |
|
|
| |
| _ADDR_SPECIAL_CHARS = re.compile(r'[|#@$%^&*\(\)\[\]\{\};:\'\"\\<>?]') |
|
|
| def _normalize_delimiters(text: str) -> str: |
| """Replace non-standard delimiters with space; collapse whitespace.""" |
| text = _ADDR_SPECIAL_CHARS.sub(' ', text) |
| text = re.sub(r'\s+', ' ', text) |
| return text.strip() |
|
|
| |
| def _normalize_hyphens(text: str) -> str: |
| """ |
| Remove hyphens that are purely cosmetic inside alphanumeric tokens |
| (e.g. '12-B' β '12B', 'A-110' β 'A110') while preserving hyphens |
| that form compound locality names like 'Pimpri-Chinchwad'. |
| """ |
| def _dehyphen(m): |
| a, b = m.group(1), m.group(2) |
| |
| if re.fullmatch(r'[0-9]+[A-Z]?', a, re.I) and re.fullmatch(r'[A-Z]?[0-9]+[A-Z]?', b, re.I): |
| return a + b |
| return m.group(0) |
| return re.sub(r'([A-Z0-9]+)-([A-Z0-9]+)', _dehyphen, text, flags=re.I) |
|
|
| |
| _ADDR_ABBREV = { |
| |
| 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', |
| 'NE': 'NORTH EAST', 'NW': 'NORTH WEST', 'SE': 'SOUTH EAST', 'SW': 'SOUTH WEST', |
| |
| 'NGR': 'NAGAR', 'NGRS': 'NAGAR', 'LYT': 'LAYOUT', 'LT': 'LAYOUT', |
| 'HYD': 'HYDERABAD', 'BLR': 'BANGALORE', 'MUM': 'MUMBAI', 'DEL': 'DELHI', |
| 'CHN': 'CHENNAI', 'KOL': 'KOLKATA', 'PUN': 'PUNE', 'AHM': 'AHMEDABAD', |
| |
| 'RD': 'ROAD', 'ST': 'STREET', 'AVE': 'AVENUE', 'BLVD': 'BOULEVARD', |
| 'MRG': 'MARG', 'LN': 'LANE', 'CR': 'CROSS', 'CIR': 'CIRCLE', |
| |
| 'APT': 'APARTMENT', 'APTS': 'APARTMENTS', 'BLDG': 'BUILDING', |
| 'BLK': 'BLOCK', 'SECT': 'SECTOR', 'SEC': 'SECTOR', |
| |
| 'OPP': 'OPPOSITE', 'NR': 'NEAR', 'ADJ': 'ADJACENT', |
| 'JN': 'JUNCTION', 'STA': 'STATION', |
| 'PO': 'POST OFFICE', 'PB': 'POST BOX', 'PO BOX': 'POST BOX', |
| 'P.O BOX': 'POST BOX', 'P.O. BOX': 'POST BOX', |
| 'DIST': 'DISTRICT', 'DST': 'DISTRICT', 'DT': 'DISTRICT', |
| 'TAL': 'TALUK', 'TQ': 'TALUK', 'TEH': 'TEHSIL', |
| 'VLG': 'VILLAGE', 'VIL': 'VILLAGE', 'VILL': 'VILLAGE', |
| 'CLNY': 'COLONY', 'COL': 'COLONY', |
| 'EXT': 'EXTENSION', 'EXTN': 'EXTENSION', |
| 'PH': 'PHASE', |
| } |
|
|
| def _expand_address_abbreviations(text: str) -> str: |
| """Expand common address abbreviations to full forms.""" |
| tokens = text.upper().split() |
| expanded = [] |
| i = 0 |
| while i < len(tokens): |
| |
| if i + 1 < len(tokens): |
| two = tokens[i] + ' ' + tokens[i+1] |
| if two in _ADDR_ABBREV: |
| expanded.append(_ADDR_ABBREV[two]) |
| i += 2 |
| continue |
| tok = re.sub(r'\.', '', tokens[i]) |
| expanded.append(_ADDR_ABBREV.get(tok, tokens[i])) |
| i += 1 |
| return ' '.join(expanded) |
|
|
| |
| |
|
|
| |
| _NUMBER_WORDS = { |
| 'ZERO':'0','ONE':'1','TWO':'2','THREE':'3','FOUR':'4','FIVE':'5', |
| 'SIX':'6','SEVEN':'7','EIGHT':'8','NINE':'9','TEN':'10', |
| 'ELEVEN':'11','TWELVE':'12','THIRTEEN':'13','FOURTEEN':'14','FIFTEEN':'15', |
| 'SIXTEEN':'16','SEVENTEEN':'17','EIGHTEEN':'18','NINETEEN':'19','TWENTY':'20', |
| 'TWENTY ONE':'21','TWENTY TWO':'22','TWENTY THREE':'23','TWENTY FOUR':'24', |
| 'TWENTY FIVE':'25','TWENTY SIX':'26','TWENTY SEVEN':'27','TWENTY EIGHT':'28', |
| 'TWENTY NINE':'29','THIRTY':'30','THIRTY TWO':'32','FORTY':'40','FIFTY':'50', |
| 'FIRST':'1ST','SECOND':'2ND','THIRD':'3RD','FOURTH':'4TH','FIFTH':'5TH', |
| 'SIXTH':'6TH','SEVENTH':'7TH','EIGHTH':'8TH','NINTH':'9TH','TENTH':'10TH', |
| 'FOURTH':'4TH','FIFTH':'5TH', |
| } |
| _ORDINAL_MAP = {'FIRST':'1ST','SECOND':'2ND','THIRD':'3RD','FOURTH':'4TH','FIFTH':'5TH', |
| 'SIXTH':'6TH','SEVENTH':'7TH','EIGHTH':'8TH','NINTH':'9TH','TENTH':'10TH'} |
|
|
| def _normalize_spelled_numbers(text: str) -> str: |
| """Replace spelled-out numbers with digits: 'Thirty-Two' β '32'.""" |
| t = text.upper() |
| |
| for phrase, digit in sorted(_NUMBER_WORDS.items(), key=lambda x: -len(x[0])): |
| t = re.sub(r'\b' + re.escape(phrase) + r'\b', digit, t) |
| return t |
|
|
| |
| _LANDMARK_SYNONYMS = { |
| 'OPP': 'NEAR', 'OPPOSITE': 'NEAR', 'OPPOSITE TO': 'NEAR', |
| 'ADJACENT TO': 'NEAR', 'ADJ TO': 'NEAR', 'BEHIND': 'NEAR', |
| 'IN FRONT OF': 'NEAR', 'BESIDE': 'NEAR', 'NEXT TO': 'NEAR', |
| 'CLOSE TO': 'NEAR', |
| } |
|
|
| def _normalize_landmark_phrases(text: str) -> str: |
| """Standardise landmark relative phrases to a single token.""" |
| t = text.upper() |
| for phrase, std in sorted(_LANDMARK_SYNONYMS.items(), key=lambda x: -len(x[0])): |
| t = re.sub(r'\b' + re.escape(phrase) + r'\b', std, t) |
| return t |
|
|
| |
| _ADDR_RELATIONAL = re.compile( |
| r'\b(S/O|D/O|W/O|H/O|SON\s+OF|DAUGHTER\s+OF|WIFE\s+OF|HUSBAND\s+OF)\b', |
| re.IGNORECASE |
| ) |
|
|
| def _remove_addr_relational_markers(text: str) -> str: |
| """Remove s/o, d/o, w/o etc. from address lines.""" |
| text = _ADDR_RELATIONAL.sub(' ', text) |
| return re.sub(r'\s+', ' ', text).strip() |
|
|
| |
| def _normalize_po_box(text: str) -> str: |
| """Normalise P.O Box / P.O. Box / Post Box to a canonical form.""" |
| t = re.sub(r'P\.?\s*O\.?\s*BOX', 'POST BOX', text, flags=re.IGNORECASE) |
| t = re.sub(r'POST\s+BOX', 'POSTBOX', t, flags=re.IGNORECASE) |
| return t |
|
|
| |
| _DIR_MAP = { |
| 'EAST': 'E', 'WEST': 'W', 'NORTH': 'N', 'SOUTH': 'S', |
| 'NORTH EAST': 'NE', 'NORTH WEST': 'NW', 'SOUTH EAST': 'SE', 'SOUTH WEST': 'SW', |
| } |
| |
| def _normalize_directions(text: str) -> str: |
| t = text.upper() |
| for full, abbr in sorted(_DIR_MAP.items(), key=lambda x: -len(x[0])): |
| t = re.sub(r'\b' + re.escape(full) + r'\b', abbr, t) |
| return t |
|
|
| |
| def preprocess_address(text: str) -> str: |
| """ |
| Full address preprocessing pipeline covering all 19 PDF variation cases |
| plus new requirements (landmark removal, PO box normalise, comprehensive |
| admin abbreviation expansion): |
| 1/4. Delimiter + special char normalisation, hyphen in house no. |
| 2. Comprehensive abbreviation expansion (rural+urban) |
| 9. Roman numeral β digit |
| 14. Spelled-out numbers β digit |
| 15. Landmark synonym standardisation |
| 16. Relational marker removal (s/o, w/o β¦) |
| 17. Directional token normalisation |
| 18. P.O Box / Post Box normalisation |
| 19. Duplicate token removal |
| NEW. Landmark phrase removal (near/nearby/landmark is β¦) |
| All. Case fold, whitespace collapse, strip |
| """ |
| if not text or not isinstance(text, str): |
| return "" |
| t = clean_text(text) |
| if not t: |
| return "" |
| t = _normalize_delimiters(t) |
| t = _normalize_hyphens(t) |
| t = _remove_addr_relational_markers(t) |
| t = remove_landmark_phrases(t) |
| t = roman_to_number(t) |
| t = _normalize_spelled_numbers(t) |
| t = _expand_all_address_variations(t) |
| t = _normalize_landmark_phrases(t) |
| t = _normalize_po_box(t) |
| t = _normalize_directions(t) |
| t = normalize_and_deduplicate_address(t) |
| t = re.sub(r'\s+', ' ', t).strip() |
| return t |
|
|
|
|
| |
| |
| |
|
|
| |
| |
| _LANDMARK_INTRO_PATTERNS = re.compile( |
| r'(?<![a-z])' |
| r'(near\s*to|nearbyto|near\s*by|nearby|near|landmark\s+is|landmark:|landmark)\s*', |
| re.IGNORECASE |
| ) |
|
|
| def remove_landmark_phrases(text: str) -> str: |
| """ |
| Remove landmark references from address text. |
| Strips from the landmark keyword up to the next comma (or end of string). |
| Preserves all other address tokens. |
| |
| Examples: |
| "12B Lakshmi Nagar, near Hanuman Temple, Hyderabad" |
| β "12B Lakshmi Nagar, Hyderabad" |
| |
| "32 Main Road nearbyto Bus Stand Jaipur" |
| β "32 Main Road Jaipur" |
| """ |
| if not text: |
| return text |
| |
| parts = text.split(',') |
| cleaned = [] |
| for part in parts: |
| |
| stripped = _LANDMARK_INTRO_PATTERNS.sub('', part) |
| |
| if stripped != part: |
| before = _LANDMARK_INTRO_PATTERNS.split(part)[0].strip() |
| if before: |
| cleaned.append(before) |
| else: |
| cleaned.append(part.strip()) |
| result = ', '.join(s for s in cleaned if s) |
| return re.sub(r'\s+', ' ', result).strip() |
|
|
|
|
| |
| |
| |
|
|
| |
| _NAMED_COMPONENT_KEYWORDS = [ |
| 'street', 'colony', 'sector', 'nagar', 'bhavan', 'bhawan', |
| 'layout', 'enclave', 'vihar', 'phase', 'block', 'ward', |
| 'galli', 'gali', 'cross', 'main', 'road', 'marg', 'lane', |
| 'avenue', 'circle', 'plaza', 'park', 'garden', 'gardens', |
| 'extension', 'extn', 'township', 'town', 'puram', 'pura', |
| 'nagara', 'nagar', 'bazaar', 'bazar', 'market', |
| ] |
|
|
| _NAMED_COMP_PATTERN = re.compile( |
| r'\b(' + '|'.join(re.escape(k) for k in _NAMED_COMPONENT_KEYWORDS) + r')\b', |
| re.IGNORECASE |
| ) |
|
|
| def extract_named_components(text: str) -> dict: |
| """ |
| Extract named locality components from an address. |
| Returns dict with: |
| 'components': list of (keyword, full_phrase) tuples found |
| 'remaining': address text with those components removed |
| |
| Example: |
| "Plot 5, HSR Layout, Sector 7, Bengaluru" |
| β components: [('layout','hsr layout'), ('sector','sector 7')] |
| remaining: "Plot 5, Bengaluru" |
| """ |
| if not text: |
| return {'components': [], 'remaining': text} |
|
|
| t = text.upper() |
| found = [] |
| consumed_spans = [] |
|
|
| for m in _NAMED_COMP_PATTERN.finditer(t): |
| kw = m.group(1).upper() |
| start = m.start() |
| |
| before_chunk = t[max(0, start-30):start].strip() |
| after_chunk = t[m.end():min(len(t), m.end()+30)].strip() |
|
|
| |
| before_toks = before_chunk.split()[-2:] if before_chunk else [] |
| after_toks = after_chunk.split()[:2] if after_chunk else [] |
| phrase = ' '.join(before_toks + [kw] + after_toks).strip() |
| found.append((kw, phrase)) |
| consumed_spans.append((max(0, start - len(' '.join(before_toks))), |
| m.end() + len(' '.join(after_toks)))) |
|
|
| |
| remaining = t |
| for kw, phrase in found: |
| remaining = re.sub(re.escape(phrase), ' ', remaining, count=1) |
| remaining = re.sub(r'\s+', ' ', remaining).strip().strip(',').strip() |
|
|
| return {'components': found, 'remaining': remaining} |
|
|
|
|
| def compare_named_components(addr1: str, addr2: str) -> dict: |
| """ |
| Compare named locality components between two addresses. |
| Returns: |
| 'verdict': 'match' | 'mismatch' | 'skip' (skip = one/both sides missing) |
| 'score_adjustment': float to add to base address score |
| 'detail': list of comparison results per keyword |
| |
| Logic: |
| - For each keyword present in BOTH addresses: compare the associated phrase. |
| If phrases are similar (token overlap >= 50%): match (+5 per component) |
| If phrases clearly differ: mismatch (-20 per component) |
| - If keyword only present in one address: remove it, continue with rest (skip). |
| """ |
| from rapidfuzz import fuzz as _fuzz |
| c1 = extract_named_components(addr1) |
| c2 = extract_named_components(addr2) |
|
|
| kw_map1 = {kw: phrase for kw, phrase in c1['components']} |
| kw_map2 = {kw: phrase for kw, phrase in c2['components']} |
|
|
| shared_kws = set(kw_map1.keys()) & set(kw_map2.keys()) |
| detail = [] |
| score_adj = 0.0 |
| mismatches = 0 |
|
|
| for kw in shared_kws: |
| p1, p2 = kw_map1[kw], kw_map2[kw] |
| sim = _fuzz.token_set_ratio(p1, p2) |
| if sim >= 70: |
| detail.append({'keyword': kw, 'result': 'match', 'score': sim}) |
| score_adj += 5.0 |
| else: |
| detail.append({'keyword': kw, 'result': 'mismatch', 'score': sim}) |
| score_adj -= 20.0 |
| mismatches += 1 |
|
|
| if not shared_kws: |
| return {'verdict': 'skip', 'score_adjustment': 0.0, 'detail': []} |
|
|
| verdict = 'mismatch' if mismatches > 0 else 'match' |
| return {'verdict': verdict, 'score_adjustment': score_adj, 'detail': detail} |
|
|
|
|
| |
| |
| |
|
|
| _POSTBOX_PATTERN = re.compile( |
| r'(?:p\.?\s*o\.?\s*box|post\s*box|postbox|p\.?b\.?\s*no\.?|pb\s*no\.?)' |
| r'\s*[:\-]?\s*(\d{1,6})', |
| re.IGNORECASE |
| ) |
|
|
| def extract_postbox_number(text: str) -> str | None: |
| """ |
| Extract post box number from address text. |
| Returns the numeric part as string, or None if not found. |
| """ |
| if not text: |
| return None |
| m = _POSTBOX_PATTERN.search(text) |
| return m.group(1).strip() if m else None |
|
|
|
|
| def remove_postbox_from_address(text: str) -> str: |
| """Remove post box reference entirely from address for remaining comparison.""" |
| if not text: |
| return text |
| cleaned = _POSTBOX_PATTERN.sub(' ', text) |
| return re.sub(r'\s+', ' ', cleaned).strip() |
|
|
|
|
| def compare_postbox(addr1: str, addr2: str) -> dict: |
| """ |
| Extract and compare post box numbers from two addresses. |
| Returns: |
| 'found': bool β True if PO box detected in either address |
| 'adjustment': float |
| +10 if both have PO box AND numbers match |
| -30 if both have PO box AND numbers differ |
| 0 if only one (or neither) has PO box (no signal either way) |
| """ |
| pb1 = extract_postbox_number(addr1) |
| pb2 = extract_postbox_number(addr2) |
|
|
| if pb1 is None and pb2 is None: |
| return {'found': False, 'adjustment': 0.0, 'pb1': None, 'pb2': None} |
| if pb1 is not None and pb2 is not None: |
| adj = 10.0 if pb1 == pb2 else -30.0 |
| return {'found': True, 'adjustment': adj, 'pb1': pb1, 'pb2': pb2} |
| |
| return {'found': True, 'adjustment': 0.0, 'pb1': pb1, 'pb2': pb2} |
|
|
|
|
| |
| |
| |
|
|
| |
| _HNO_KEYWORD_PATTERN = re.compile( |
| r'\b(?:' |
| r'd\.?\s*no\.?|door\s*no\.?|h\.?\s*no\.?|house\s*no\.?|' |
| r'house\s*number|property\s*no\.?|plot\s*no\.?|' |
| r'flat\s*no\.?|flat\s*number|' |
| r'mig\s*no\.?|hig\s*no\.?|lig\s*no\.?|' |
| r'khata\s*no\.?|khasra\s*no\.?' |
| r')' |
| r'\s*[:\-]?\s*([A-Z0-9][A-Z0-9\-/]*)', |
| re.IGNORECASE |
| ) |
|
|
| |
| _NON_HNO_COMPONENT_PATTERN = re.compile( |
| r'\b(sector|ward|phase|block|zone|taluk|village|vill|dist|district|' |
| r'plot|survey|sy\.?\s*no\.?|s\.?\s*no\.?)\s*[:\-]?\s*(\d+[A-Z]?)', |
| re.IGNORECASE |
| ) |
|
|
| def extract_house_number_v2(text: str) -> str | None: |
| """ |
| Revamped house number extraction with high priority to explicit keywords. |
| |
| Priority order: |
| 1. Explicit HNO keyword (H.No, D.No, House No, Door No, Plot No, Flat No β¦) |
| 2. Leading numeric token (first token if it looks like HNO, not sector/ward) |
| 3. Pattern match for compound numbers (12-B, 45/3, A-110) |
| |
| Explicitly excludes sector numbers, ward numbers, phase numbers, block numbers |
| from being treated as house numbers. |
| |
| Returns the extracted house number string or None. |
| """ |
| if not text: |
| return None |
| t = text.strip() |
|
|
| |
| m = _HNO_KEYWORD_PATTERN.search(t) |
| if m: |
| return m.group(1).strip().upper() |
|
|
| |
| non_hno_values = set() |
| for nm in _NON_HNO_COMPONENT_PATTERN.finditer(t): |
| non_hno_values.add(nm.group(2).strip().upper()) |
|
|
| |
| tokens = t.split() |
| if tokens: |
| first = tokens[0].upper() |
| |
| if re.fullmatch(r'[A-Z]?\d+[A-Z]?(?:[/\-]\d+[A-Z]?)*', first): |
| if first not in non_hno_values: |
| return first |
|
|
| |
| compound_patterns = [ |
| r'\b(\d+[A-Z]?/\d+[A-Z]?)\b', |
| r'\b(\d+-\d+[A-Z]?)\b', |
| r'\b([A-Z]-\d+[A-Z]?)\b', |
| r'\b(\d+[A-Z])\b', |
| r'\b(\d{1,4})\b', |
| ] |
| for pat in compound_patterns: |
| for m in re.finditer(pat, t, re.IGNORECASE): |
| val = m.group(1).strip().upper() |
| if val not in non_hno_values: |
| |
| before = t[:m.start()].upper() |
| if not re.search(r'\b(sector|ward|phase|block|zone)\s*$', before): |
| return val |
|
|
| return None |
|
|
|
|
| def compare_house_numbers(addr1: str, addr2: str) -> dict: |
| """ |
| Extract and compare house numbers from two addresses. |
| Returns: |
| 'h1', 'h2': extracted house numbers (or None), normalized alphanumeric-only |
| 'verdict': 'match' | 'mismatch' | 'missing' |
| 'score_adjustment': float |
| +30 if both present and match AND base_score > 50 (caller must apply conditionally) |
| -30 if both present and clearly different |
| 0 if one/both absent |
| """ |
| h1_raw = extract_house_number_v2(addr1) |
| h2_raw = extract_house_number_v2(addr2) |
|
|
| |
| h1 = re.sub(r'[^A-Z0-9]', '', h1_raw.upper()) if h1_raw else None |
| h2 = re.sub(r'[^A-Z0-9]', '', h2_raw.upper()) if h2_raw else None |
|
|
| if h1 is None and h2 is None: |
| return {'h1': None, 'h2': None, 'verdict': 'missing', 'score_adjustment': 0.0} |
|
|
| if h1 is not None and h2 is not None: |
| if h1 == h2: |
| |
| return {'h1': h1, 'h2': h2, 'verdict': 'match', 'score_adjustment': 0.0} |
| else: |
| return {'h1': h1, 'h2': h2, 'verdict': 'mismatch', 'score_adjustment': -30.0} |
|
|
| return {'h1': h1, 'h2': h2, 'verdict': 'missing', 'score_adjustment': 0.0} |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| _INDIAN_ADDR_VARIATIONS: dict[str, str] = { |
| |
| 'RD': 'ROAD', 'STR': 'STREET', 'ST': 'STREET', 'AVE': 'AVENUE', |
| 'MRG': 'MARG', 'LN': 'LANE', 'BLVD': 'BOULEVARD', 'CIR': 'CIRCLE', |
| 'CR': 'CROSS', 'CROSS RD': 'CROSS ROAD', 'X RD': 'CROSS ROAD', |
| |
| 'NGR': 'NAGAR', 'NGRS': 'NAGAR', 'NAGARA': 'NAGAR', |
| 'LYT': 'LAYOUT', 'LOUT': 'LAYOUT', |
| 'CLY': 'COLONY', 'CLNY': 'COLONY', 'COL': 'COLONY', |
| 'EXT': 'EXTENSION', 'EXTN': 'EXTENSION', |
| 'ENCL': 'ENCLAVE', |
| 'VIHAR': 'VIHAR', |
| 'VIHARA': 'VIHAR', |
| 'PURA': 'PURAM', 'PORA': 'PURAM', |
| |
| 'N': 'NORTH', 'S': 'SOUTH', 'E': 'EAST', 'W': 'WEST', |
| 'NE': 'NORTH EAST', 'NW': 'NORTH WEST', 'SE': 'SOUTH EAST', 'SW': 'SOUTH WEST', |
| |
| 'SECT': 'SECTOR', 'SEC': 'SECTOR', 'SCT': 'SECTOR', |
| 'BLK': 'BLOCK', 'BK': 'BLOCK', |
| 'PH': 'PHASE', 'PHZ': 'PHASE', |
| 'APT': 'APARTMENT', 'APTS': 'APARTMENTS', |
| 'BLDG': 'BUILDING', 'BLDGS': 'BUILDINGS', |
| 'FLR': 'FLOOR', 'FL': 'FLOOR', |
| 'OPP': 'OPPOSITE', 'NR': 'NEAR', 'ADJ': 'ADJACENT', |
| 'JN': 'JUNCTION', 'JCT': 'JUNCTION', |
| 'STA': 'STATION', 'STN': 'STATION', |
| |
| 'VLG': 'VILLAGE', 'VIL': 'VILLAGE', 'VILL': 'VILLAGE', 'VG': 'VILLAGE', |
| 'GRMA': 'GRAMA', 'GM': 'GRAMA', 'PANCHAYAT': 'PANCHAYAT', |
| 'DIST': 'DISTRICT', 'DST': 'DISTRICT', 'DT': 'DISTRICT', 'ZILLA': 'DISTRICT', |
| 'JILLA': 'DISTRICT', 'ZILA': 'DISTRICT', |
| 'TAL': 'TALUK', 'TQ': 'TALUK', 'TALUKA': 'TALUK', |
| 'TEH': 'TEHSIL', 'TEHS': 'TEHSIL', 'MANDAL': 'MANDAL', 'MD': 'MANDAL', |
| 'POST': 'POST', 'PO': 'POST OFFICE', |
| 'HOBLI': 'HOBLI', 'HBL': 'HOBLI', |
| 'REV': 'REVENUE', 'REV VILLAGE': 'REVENUE VILLAGE', |
| 'SY NO': 'SURVEY NUMBER', 'SY. NO': 'SURVEY NUMBER', |
| 'KHASRA': 'KHASRA', 'KHATA': 'KHATA', |
| |
| 'PB': 'POST BOX', 'PO BOX': 'POST BOX', |
| 'P.O BOX': 'POST BOX', 'P.O. BOX': 'POST BOX', |
| |
| 'AP': 'ANDHRA PRADESH', 'TS': 'TELANGANA', 'KA': 'KARNATAKA', |
| 'TN': 'TAMIL NADU', 'MH': 'MAHARASHTRA', 'GJ': 'GUJARAT', |
| 'RJ': 'RAJASTHAN', 'UP': 'UTTAR PRADESH', 'MP': 'MADHYA PRADESH', |
| 'WB': 'WEST BENGAL', 'OR': 'ODISHA', 'OD': 'ODISHA', |
| } |
|
|
| def _expand_all_address_variations(text: str) -> str: |
| """ |
| Expand ALL Indian address administrative variations (rural + urban) |
| using the comprehensive dictionary above. |
| Replaces the earlier _expand_address_abbreviations for address lines. |
| """ |
| tokens = text.upper().split() |
| expanded = [] |
| i = 0 |
| while i < len(tokens): |
| |
| if i + 1 < len(tokens): |
| two = tokens[i] + ' ' + tokens[i+1] |
| two_clean = re.sub(r'\.', '', two) |
| if two_clean in _INDIAN_ADDR_VARIATIONS: |
| expanded.append(_INDIAN_ADDR_VARIATIONS[two_clean]) |
| i += 2 |
| continue |
| tok_clean = re.sub(r'\.', '', tokens[i]) |
| expanded.append(_INDIAN_ADDR_VARIATIONS.get(tok_clean, tokens[i])) |
| i += 1 |
| return ' '.join(expanded) |
|
|
|
|
| |
| |
| |
|
|
| |
| _NAME_PREFIX_EXPANSION = { |
| |
| 'MD': 'MOHAMMED', 'MOHD': 'MOHAMMED', 'MHD': 'MOHAMMED', |
| 'MUHAMMAD': 'MOHAMMED', 'MOHAMAD': 'MOHAMMED', 'MOHHAMED': 'MOHAMMED', |
| 'MUHAMED': 'MOHAMMED', 'MUHAMMED': 'MOHAMMED', 'MOHAMMD': 'MOHAMMED', |
| |
| 'SK': 'SHEIKH', 'SHK': 'SHEIKH', 'SHAIKH': 'SHEIKH', |
| 'SHEKH': 'SHEIKH', 'SHIEKH': 'SHEIKH', 'SHEIK': 'SHEIKH', |
| 'SHEK': 'SHEIKH', 'SAIKH': 'SHEIKH', |
| |
| 'ABD': 'ABDUL', 'ABDL': 'ABDUL', 'ABDU': 'ABDUL', |
| |
| 'SYD': 'SYED', 'SYE': 'SYED', 'SAIYAD': 'SYED', 'SAIYED': 'SYED', |
| 'SAYYED': 'SYED', 'SAYYAD': 'SYED', |
| |
| 'KUM': 'KUMARI', 'KM': 'KUMARI', |
| |
| 'CH': 'CHAUDHARY', 'CHD': 'CHAUDHARY', 'CHOUDHARY': 'CHAUDHARY', |
| 'CHOWDHARY': 'CHAUDHARY', 'CHOWDARY': 'CHAUDHARY', |
| |
| 'BAL': 'BALA', |
| |
| 'RNG': 'RANGA', |
| } |
|
|
| def _expand_name_prefix_abbreviations(text: str) -> str: |
| """Expand religious/cultural name prefix abbreviations.""" |
| tokens = text.upper().split() |
| result = [] |
| for tok in tokens: |
| clean_tok = tok.rstrip('.') |
| result.append(_NAME_PREFIX_EXPANSION.get(clean_tok.upper(), tok)) |
| return ' '.join(result) |
|
|
| |
| def _remove_name_special_chars(text: str) -> str: |
| """Remove hyphens, slashes and punctuation from names.""" |
| text = re.sub(r'[-/\\@$%^&*\(\)\[\]\{\};:\'"<>?!]', ' ', text) |
| return re.sub(r'\s+', ' ', text).strip() |
|
|
| |
| _ORG_SUFFIXES = re.compile( |
| r'\b(AND\s+SONS?|ENTERPRISES?|TRADERS?|INDUSTRIES|LTD|PVT\.?\s*LTD|' |
| r'LIMITED|CORP|CORPORATION|INC|LLC|CO\.?\s*LTD|COMPANY|ASSOCIATES?|' |
| r'BROTHERS?|BROS?|AGENCIES?)\b', |
| re.IGNORECASE |
| ) |
|
|
| def _remove_org_suffixes(text: str) -> str: |
| """Remove organisation suffix tokens from name fields.""" |
| return re.sub(r'\s+', ' ', _ORG_SUFFIXES.sub(' ', text)).strip() |
|
|
| |
| |
| |
| |
| def _split_merged_tokens(text: str) -> str: |
| """ |
| Best-effort split of CamelCase or merged uppercase tokens. |
| 'DiGVIJAYSINGH' β 'Di GVIJAY SINGH' (rough; embeddings handle remainder). |
| Only applied when token length > 12 and no spaces present. |
| """ |
| tokens = text.split() |
| result = [] |
| for tok in tokens: |
| if len(tok) > 12: |
| |
| split = re.sub(r'([a-z])([A-Z])', r'\1 \2', tok) |
| result.append(split) |
| else: |
| result.append(tok) |
| return ' '.join(result) |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| |
| def enhanced_preprocess_name(text: str) -> str: |
| """ |
| Extended name preprocessing pipeline covering all 14 PDF cases. |
| Calls original pipeline steps PLUS new variation handlers. |
| """ |
| if not text or not isinstance(text, str): |
| return "" |
| t = clean_text(text) |
| if not t: |
| return "" |
| t = collapse_repeated_chars(t) |
| t = remove_relational_prefixes(t) |
| t = remove_non_alpha_trailing(t) |
| t = _remove_name_special_chars(t) |
| t = remove_name_titles(t) |
| t = _expand_name_prefix_abbreviations(t) |
| t = _remove_org_suffixes(t) |
| t = _split_merged_tokens(t) |
| t = deduplicate_tokens(t) |
| t = standardize_name_variations(t) |
| return t.strip() |
|
|
|
|
| def preprocess_name(text): |
| """ |
| Full name preprocessing pipeline for embedding model matching. |
| Steps: |
| 1. Clean text (strip, remove HTML/unicode, collapse spaces, lowercase) |
| 2. Remove relational prefixes (s/o, d/o, w/o etc.) |
| 3. Remove non-alpha trailing content (dates, numbers) |
| 4. Remove title prefixes/suffixes (Dr, Mr, Shri etc.) |
| 5. Deduplicate tokens |
| 6. Standardize name variations from CSV |
| """ |
| if not text or not isinstance(text, str): |
| return "" |
|
|
| |
| text = clean_text(text) |
| if not text: |
| return "" |
|
|
| |
| text = collapse_repeated_chars(text) |
|
|
| |
| text = remove_relational_prefixes(text) |
|
|
| |
| text = remove_non_alpha_trailing(text) |
|
|
| |
| text = remove_name_titles(text) |
|
|
| |
| text = deduplicate_tokens(text) |
|
|
| |
| text = standardize_name_variations(text) |
|
|
| |
| |
| |
| |
| |
| |
|
|
| return text.strip() |
|
|
|
|
| |
| |
| |
|
|
| def detect_surnames(text): |
| """ |
| Detect which tokens in specified text are common surnames |
| from sur_comm_names.csv. |
| Returns: set of surname tokens found. |
| """ |
| if not text or sur_comm_names_df.empty: |
| return set() |
|
|
| |
| if not hasattr(detect_surnames, '_surname_set'): |
| surname_set = set() |
| col = 'surname_community_extension' if 'surname_community_extension' in sur_comm_names_df.columns else sur_comm_names_df.columns[-1] |
| for val in sur_comm_names_df[col].dropna(): |
| surname_set.add(str(val).strip().upper()) |
| detect_surnames._surname_set = surname_set |
|
|
| tokens = text.upper().split() |
| return {t for t in tokens if t in detect_surnames._surname_set} |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|
|
|
|
|
| def compute_initial_letter_boost(name1_tokens, name2_tokens): |
| """ |
| If one name has more tokens than the other, check if the initials |
| of the shorter name match the first letters of tokens in the longer name. |
| Returns 0.2 boost if initials match, else 0.0. |
| |
| Example: ["k", "v", "reddy"] vs ["krishna", "venkata", "reddy"] |
| Common tokens: {"reddy"} |
| Remaining short: ["k", "v"], remaining long: ["krishna", "venkata"] |
| Initials of short: {"k", "v"}, first-letters of long: {"k", "v"} β match β +0.2 |
| """ |
| if not name1_tokens or not name2_tokens: |
| return 0.0 |
|
|
| |
| set1, set2 = set(name1_tokens), set(name2_tokens) |
| common = set1 & set2 |
|
|
| |
| rem1 = [t for t in name1_tokens if t not in common] |
| rem2 = [t for t in name2_tokens if t not in common] |
|
|
| if not rem1 or not rem2: |
| return 0.0 |
|
|
| |
| if len(rem1) <= len(rem2): |
| shorter, longer = rem1, rem2 |
| else: |
| shorter, longer = rem2, rem1 |
|
|
| |
| shorter_initials = {t[0] for t in shorter if len(t) == 1} |
| if not shorter_initials: |
| return 0.0 |
|
|
| |
| longer_first_letters = {t[0] for t in longer if t} |
|
|
| |
| if shorter_initials.issubset(longer_first_letters): |
| return 0.2 |
|
|
| return 0.0 |
|
|
| def replace_with_standard(string_value, df=None): |
| """ |
| Replace string with standard value if found in CSV variation column. |
| Handles exact match AND substring match (e.g., "TRIVANDRUM KERALA" matches "TRIVANDRUM"). |
| |
| Args: |
| string_value: String to search for |
| df: DataFrame with 'VARIATION' and 'STANDARD' columns (optional) |
| |
| Returns: |
| Standard value if found, otherwise original string |
| """ |
| source_df = df if df is not None and not df.empty else name_variation_df |
|
|
| if source_df.empty: |
| return string_value |
|
|
| string_upper = string_value.strip().upper() |
| variations = source_df['VARIATION'].str.strip().str.upper() |
|
|
| |
| exact_mask = variations == string_upper |
| if exact_mask.any(): |
| return source_df.loc[exact_mask, 'STANDARD'].iloc[0] |
|
|
| |
| |
| for idx, variation in variations.items(): |
| if not variation: |
| continue |
| |
| pattern = r'\b' + re.escape(variation) + r'\b' |
| if re.search(pattern, string_upper): |
| return source_df.loc[idx, 'STANDARD'] |
|
|
| |
| |
| for idx, variation in variations.items(): |
| if not variation: |
| continue |
| pattern = r'\b' + re.escape(string_upper) + r'\b' |
| if re.search(pattern, variation): |
| return source_df.loc[idx, 'STANDARD'] |
|
|
| return string_value |
|
|
| def lookup_from_mapping(value, mapping_dict): |
| """ |
| Look up a value in a mapping dictionary (Value List -> Key) |
| Example: {"BENGALURU": ["BANGALORE", "BENGALURU"]} |
| Handles: |
| 1. Exact key match: "BENGALURU" -> "BENGALURU" |
| 2. Exact variation match: "BANGALORE" -> "BENGALURU" |
| 3. Variation-in-input: "BANGALORE KARNATAKA" -> "BENGALURU" |
| 4. Input-in-variation: "BANGAL" inside variation "BANGAL URBAN" -> "BENGALURU" |
| """ |
| if not value or not mapping_dict: |
| return None |
|
|
| value_upper = str(value).strip().upper() |
|
|
| |
| if value_upper in mapping_dict: |
| return value_upper |
|
|
| |
| for standard, variations in mapping_dict.items(): |
| if isinstance(variations, list): |
| if value_upper in [v.strip().upper() for v in variations]: |
| return standard |
|
|
| |
| for standard, variations in mapping_dict.items(): |
| if isinstance(variations, list): |
| for variation in variations: |
| pattern = r'\b' + re.escape(variation.strip().upper()) + r'\b' |
| if re.search(pattern, value_upper): |
| return standard |
|
|
| |
| for standard, variations in mapping_dict.items(): |
| if isinstance(variations, list): |
| for variation in variations: |
| pattern = r'\b' + re.escape(value_upper) + r'\b' |
| if re.search(pattern, variation.strip().upper()): |
| return standard |
|
|
| return None |
|
|
|
|
|
|
| |
| |
| |
| def pincode_similarity_india(pin1, pin2): |
| """ |
| Calculate similarity between two Indian pincodes based on geographic distance |
| and metro/non-metro classification. |
| |
| Args: |
| pin1: First pincode (string or int) |
| pin2: Second pincode (string or int) |
| |
| Returns: |
| dict: Contains match status, similarity score, distance, and classification details, |
| plus geocoding details (county_name, state_name for both pins) |
| """ |
| |
| INVALID_VALUES = {None, "", "-", "NA", "N/A", "NULL"} |
|
|
| def is_missing(pin): |
| return pin is None or str(pin).strip().upper() in INVALID_VALUES |
|
|
| if is_missing(pin1) or is_missing(pin2): |
| return { |
| "match": False, |
| "similarity_score": None, |
| "distance_km": None, |
| "area_type": "Missing pincode", |
| "reason": "One or both pincodes are null / empty / placeholder", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| |
| try: |
| pin1 = str(pin1).strip().zfill(6) |
| pin2 = str(pin2).strip().zfill(6) |
| |
| if pin1 == pin2: |
| |
| try: |
| nomi = pgeocode.Nominatim("IN") |
| p1 = nomi.query_postal_code(pin1) |
| |
| |
| county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None |
| state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None |
| |
| return { |
| "match": True, |
| "similarity_score": 100, |
| "distance_km": 0.0, |
| "area_type": "Exact same pincode", |
| "is_metro_logic": None, |
| "is_extended_metro": None, |
| "metro_cluster": None, |
| "pin1_prefix": pin1[:3], |
| "pin2_prefix": pin2[:3], |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": county_name, |
| "pin2_county_name": county_name, |
| "pin1_state_name": state_name, |
| "pin2_state_name": state_name, |
| "pin1_location": None, |
| "pin2_location": None, |
| } |
| except Exception as e: |
| return { |
| "match": True, |
| "similarity_score": 100, |
| "distance_km": 0.0, |
| "area_type": "Exact same pincode", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
|
|
| except (ValueError, AttributeError): |
| return { |
| "match": False, |
| "similarity_score": 0, |
| "reason": "Invalid pincode format - cannot convert to string", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| |
| if len(pin1) != 6 or len(pin2) != 6: |
| return { |
| "match": False, |
| "similarity_score": 0, |
| "reason": f"Invalid pincode length (pin1: {len(pin1)}, pin2: {len(pin2)})", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| if not pin1.isdigit() or not pin2.isdigit(): |
| return { |
| "match": False, |
| "similarity_score": 0, |
| "reason": "Pincode must contain only digits", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| |
| pin1_num = int(pin1) |
| pin2_num = int(pin2) |
| |
| if pin1_num < 110001 or pin1_num > 855117 or pin2_num < 110001 or pin2_num > 855117: |
| return { |
| "match": False, |
| "similarity_score": 0, |
| "reason": "Pincode outside valid Indian range (110001-855117)", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| |
| |
| |
| METRO_PIN_PREFIXES = { |
| "110", |
| "400", |
| "560", |
| "600", |
| "500", |
| "700", |
| "411", |
| "380", |
| } |
| |
| |
| EXTENDED_METROS = [ |
| {"110", "201", "122", "121", "124"}, |
| {"400", "421", "410"}, |
| {"500", "501"}, |
| {"560", "562"}, |
| {"600", "601", "603"}, |
| {"700", "711", "712"}, |
| ] |
| |
| |
| METRO_THRESHOLDS = { |
| "same_locality": 8, |
| "nearby": 15, |
| "same_metro": 35, |
| "extended_metro": 60, |
| } |
| |
| |
| NON_METRO_THRESHOLDS = { |
| "same_locality": 5, |
| "nearby": 12, |
| "same_district": 40, |
| } |
| |
| |
| |
| def haversine(lat1, lon1, lat2, lon2): |
| """Calculate distance between two lat/lon points using Haversine formula""" |
| R = 6371 |
| |
| dlat = math.radians(lat2 - lat1) |
| dlon = math.radians(lon2 - lon1) |
| |
| a = ( |
| math.sin(dlat / 2) ** 2 + |
| math.cos(math.radians(lat1)) * |
| math.cos(math.radians(lat2)) * |
| math.sin(dlon / 2) ** 2 |
| ) |
| |
| c = 2 * math.asin(math.sqrt(a)) |
| return R * c |
| |
| |
| |
| try: |
| nomi = pgeocode.Nominatim("IN") |
| p1 = nomi.query_postal_code(pin1) |
| p2 = nomi.query_postal_code(pin2) |
| except Exception as e: |
| return { |
| "match": False, |
| "similarity_score": 0, |
| "reason": f"Geocoding service error: {str(e)}", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| |
| if p1 is None or p2 is None: |
| return { |
| "match": False, |
| "similarity_score": 0, |
| "reason": "Geocoding returned None", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| if (p1.latitude is None or p1.longitude is None or |
| p2.latitude is None or p2.longitude is None or |
| math.isnan(p1.latitude) or math.isnan(p2.latitude)): |
| return { |
| "match": False, |
| "similarity_score": 0, |
| "reason": "Pincode not found in geocoding database", |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": None, |
| "pin2_county_name": None, |
| "pin1_state_name": None, |
| "pin2_state_name": None |
| } |
| |
| |
| |
| pin1_county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None |
| pin2_county_name = p2.county_name if hasattr(p2, 'county_name') and not (p2.county_name is None or (isinstance(p2.county_name, float) and math.isnan(p2.county_name))) else None |
| |
| pin1_state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None |
| pin2_state_name = p2.state_name if hasattr(p2, 'state_name') and not (p2.state_name is None or (isinstance(p2.state_name, float) and math.isnan(p2.state_name))) else None |
| |
| |
| |
| distance = haversine( |
| p1.latitude, p1.longitude, |
| p2.latitude, p2.longitude |
| ) |
| |
| |
| |
| prefix1 = pin1[:3] |
| prefix2 = pin2[:3] |
| |
| |
| |
| is_metro = False |
| is_extended_metro = False |
| metro_cluster_name = None |
| |
| |
| for cluster in EXTENDED_METROS: |
| if prefix1 in cluster and prefix2 in cluster: |
| is_extended_metro = True |
| is_metro = True |
| if "110" in cluster: |
| metro_cluster_name = "Delhi NCR" |
| elif "400" in cluster: |
| metro_cluster_name = "Mumbai Metropolitan Region" |
| elif "500" in cluster: |
| metro_cluster_name = "Hyderabad Metro" |
| elif "560" in cluster: |
| metro_cluster_name = "Bengaluru Metro" |
| elif "600" in cluster: |
| metro_cluster_name = "Chennai Metro" |
| elif "700" in cluster: |
| metro_cluster_name = "Kolkata Metro" |
| break |
| |
| |
| if not is_metro and prefix1 == prefix2 and prefix1 in METRO_PIN_PREFIXES: |
| is_metro = True |
| metro_map = { |
| "110": "Delhi", "400": "Mumbai", "560": "Bengaluru", |
| "600": "Chennai", "500": "Hyderabad", "700": "Kolkata", |
| "411": "Pune", "380": "Ahmedabad" |
| } |
| metro_cluster_name = metro_map.get(prefix1, "Metro City") |
| |
| one_is_metro = prefix1 in METRO_PIN_PREFIXES or prefix2 in METRO_PIN_PREFIXES |
| |
| |
| |
| score = 0 |
| |
| if is_metro: |
| if distance <= METRO_THRESHOLDS["same_locality"]: |
| score = 95 |
| elif distance <= METRO_THRESHOLDS["nearby"]: |
| score = 85 |
| elif distance <= METRO_THRESHOLDS["same_metro"]: |
| score = 70 |
| elif is_extended_metro and distance <= METRO_THRESHOLDS["extended_metro"]: |
| score = 60 |
| else: |
| score = 35 |
| |
| elif one_is_metro and not is_metro: |
| if distance <= 20: |
| score = 50 |
| else: |
| score = 25 |
| |
| else: |
| same_state = False |
| if hasattr(p1, 'state_name') and hasattr(p2, 'state_name'): |
| same_state = p1.state_name == p2.state_name |
| |
| if distance <= NON_METRO_THRESHOLDS["same_locality"]: |
| score = 92 |
| elif distance <= NON_METRO_THRESHOLDS["nearby"]: |
| score = 75 |
| elif distance <= NON_METRO_THRESHOLDS["same_district"]: |
| score = 55 |
| elif same_state and distance <= 100: |
| score = 40 |
| else: |
| score = 20 |
| |
| return { |
| "match": score >= 60, |
| "similarity_score": score, |
| "distance_km": distance, |
| "pin1": pin1, |
| "pin2": pin2, |
| "pin1_county_name": pin1_county_name, |
| "pin2_county_name": pin2_county_name, |
| "pin1_state_name": pin1_state_name, |
| "pin2_state_name": pin2_state_name, |
| "area_type": metro_cluster_name if is_metro else "Non-metro", |
| "is_metro_logic": is_metro, |
| "is_extended_metro": is_extended_metro |
| } |
|
|
| |
| |
| |
| def preprocess_text(text): |
| """Remove extra trailing/leading spaces and normalize whitespace""" |
| if not text: |
| return "" |
| text = re.sub(r"\s+", " ", text.strip()) |
| return text |
|
|
| def normalize_text(text): |
| """Normalize text to uppercase and remove extra spaces""" |
| return re.sub(r"\s+", " ", text.upper().strip()) if text else "" |
|
|
| |
| |
| |
| def validate_and_normalize_pincode(pincode): |
| """ |
| Validate and normalize pincode to exactly 6 digits |
| Returns normalized pincode or None if invalid |
| """ |
| if not pincode: |
| return None |
| |
| digits = re.sub(r'\D', '', str(pincode).strip()) |
| |
| if len(digits) == 6: |
| return digits |
| |
| return None |
|
|
| def validate_and_normalize_phone(phone): |
| """ |
| Validate and normalize phone to exactly 10 digits |
| Handles formats: +91, 91-, 91, or plain 10 digits |
| Returns normalized 10-digit phone or None if invalid |
| """ |
| if not phone: |
| return None |
| |
| phone_str = str(phone).strip() |
| |
| |
| phone_str = re.sub(r'^\+91[-\s]?', '', phone_str) |
| phone_str = re.sub(r'^91[-\s]?', '', phone_str) |
| phone_str = re.sub(r'^0[-\s]?', '', phone_str) |
| |
| digits = re.sub(r'\D', '', phone_str) |
| |
| if len(digits) == 10: |
| return digits |
| |
| return None |
|
|
| def validate_and_normalize_email(email): |
| """ |
| Validate and normalize email using regex |
| Returns normalized email or None if invalid |
| """ |
| if not email: |
| return None |
| |
| email_str = str(email).strip().upper() |
| |
| email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' |
| |
| if re.match(email_pattern, email_str): |
| return email_str |
| |
| return None |
|
|
| return None |
|
|
| def validate_and_normalize_pan(pan): |
| """ |
| Validate and normalize PAN (Permanent Account Number) |
| Format: 5 letters, 4 digits, 1 letter (e.g., ABCDE1234F) |
| """ |
| if not pan: |
| return None |
| |
| |
| pan_str = str(pan).strip().upper() |
| pan_str = re.sub(r'[\s-]', '', pan_str) |
| |
| |
| if len(pan_str) != 10: |
| return None |
| |
| |
| pattern = r'^[A-Z]{5}[0-9]{4}[A-Z]{1}$' |
| if re.match(pattern, pan_str): |
| return pan_str |
| |
| return None |
|
|
|
|
|
|
| |
| verhoeff_table_d = [ |
| [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], |
| [1, 2, 3, 4, 0, 6, 7, 8, 9, 5], |
| [2, 3, 4, 0, 1, 7, 8, 9, 5, 6], |
| [3, 4, 0, 1, 2, 8, 9, 5, 6, 7], |
| [4, 0, 1, 2, 3, 9, 5, 6, 7, 8], |
| [5, 9, 8, 7, 6, 0, 4, 3, 2, 1], |
| [6, 5, 9, 8, 7, 1, 0, 4, 3, 2], |
| [7, 6, 5, 9, 8, 2, 1, 0, 4, 3], |
| [8, 7, 6, 5, 9, 3, 2, 1, 0, 4], |
| [9, 8, 7, 6, 5, 4, 3, 2, 1, 0] |
| ] |
|
|
| verhoeff_table_p = [ |
| [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], |
| [1, 5, 7, 6, 2, 8, 3, 0, 9, 4], |
| [5, 8, 0, 3, 7, 9, 6, 1, 4, 2], |
| [8, 9, 1, 6, 0, 4, 3, 5, 2, 7], |
| [9, 4, 5, 3, 1, 2, 6, 8, 7, 0], |
| [4, 2, 8, 6, 5, 7, 3, 9, 0, 1], |
| [2, 7, 9, 3, 8, 0, 6, 4, 1, 5], |
| [7, 0, 4, 6, 9, 1, 3, 2, 5, 8] |
| ] |
|
|
| verhoeff_table_inv = [0, 4, 3, 2, 1, 5, 6, 7, 8, 9] |
|
|
| def validate_verhoeff(num): |
| """Validate Verhoeff checksum for a given number string.""" |
| c = 0 |
| ll = list(map(int, reversed(num))) |
| for i, item in enumerate(ll): |
| c = verhoeff_table_d[c][verhoeff_table_p[i % 8][item]] |
| return c == 0 |
|
|
| def validate_and_normalize_aadhar(aadhar): |
| """ |
| Validate and normalize Aadhar Number using Verhoeff algorithm |
| Format: 12 digits, last digit is checksum |
| """ |
| if not aadhar: |
| return None |
| |
| |
| aadhar_str = str(aadhar).strip() |
| aadhar_str = re.sub(r'[\s-]', '', aadhar_str) |
| |
| |
| if aadhar_str.isdigit() and len(aadhar_str) == 12: |
| |
| |
| |
| if validate_verhoeff(aadhar_str): |
| return aadhar_str |
| |
| return None |
| |
| def normalize_dob(text: str) -> str: |
| """ |
| Extract and normalize date from text to DD-MM-YYYY format using regex. |
| """ |
| if not text: |
| return None |
| text = text.strip() |
| text = re.sub(r'\s*([-/.])\s*', r'\1', text) |
| text_lower = text.upper() |
| |
| month_names = { |
| 'jan': '01', 'january': '01', 'feb': '02', 'february': '02', |
| 'mar': '03', 'march': '03', 'apr': '04', 'april': '04', |
| 'may': '05', 'jun': '06', 'june': '06', 'jul': '07', 'july': '07', |
| 'aug': '08', 'august': '08', 'sep': '09', 'sept': '09', 'september': '09', |
| 'oct': '10', 'october': '10', 'nov': '11', 'november': '11', |
| 'dec': '12', 'december': '12', |
| '1': '01', '2': '02', '3': '03', '4': '04', '5': '05', |
| '6': '06', '7': '07', '8': '08', '9': '09' |
| } |
| |
| def normalize_number(num_str: str) -> str: |
| num = int(num_str) |
| if 1 <= num <= 9: |
| return f'0{num}' |
| return str(num) |
| |
| def is_valid_year(year_str: str) -> bool: |
| try: |
| year = int(year_str) |
| return 1900 <= year <= 2026 |
| except ValueError: |
| return False |
| |
| def validate_and_determine_format(first: str, second: str) -> tuple: |
| """Determine if DD-MM or MM-DD format and return (month, day)""" |
| try: |
| first_int = int(first) |
| second_int = int(second) |
| except ValueError: |
| return (None, None) |
| |
| if first_int < 1 or second_int < 1: |
| return (None, None) |
| |
| |
| if first_int > 12: |
| if first_int > 31 or second_int > 12 or second_int < 1: |
| return (None, None) |
| return (normalize_number(second), normalize_number(first)) |
| |
| |
| if second_int > 12: |
| if second_int > 31 or first_int > 12 or first_int < 1: |
| return (None, None) |
| return (normalize_number(first), normalize_number(second)) |
| |
| |
| if first_int > 31 or second_int > 31: |
| return (None, None) |
| return (normalize_number(second), normalize_number(first)) |
| |
| |
| |
| match = re.search(r'(\d{4})[-\/\.\s](\d{1,2})[-\/\.\s](\d{1,2})', text) |
| if match: |
| year, second, third = match.groups() |
| |
| if not is_valid_year(year): |
| |
| pass |
| else: |
| second_int = int(second) |
| third_int = int(third) |
| |
| if second_int > 12 and 1 <= third_int <= 12: |
| |
| day = normalize_number(second) |
| month = normalize_number(third) |
| elif 1 <= second_int <= 12: |
| |
| month = normalize_number(second) |
| day = normalize_number(third) |
| else: |
| |
| day = None |
| month = None |
| |
| if day and month: |
| try: |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| |
| pass |
| |
| |
| match = re.search(r'(\d{4})[-\/\.\s]([a-z]{3,9})[-\/\.\s](\d{1,2})', text_lower) |
| if match: |
| year, month_str, day_str = match.groups() |
| |
| if not is_valid_year(year): |
| |
| pass |
| elif month_str in month_names: |
| day = normalize_number(day_str) |
| month = month_names[month_str] |
| try: |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| |
| pass |
| |
| |
| match = re.search(r'\b(\d{1,2})[-\/\.\s](\d{1,2})[-\/\.\s](\d{4})\b', text) |
| if match: |
| first, second, year = match.groups() |
| |
| if not is_valid_year(year): |
| |
| pass |
| else: |
| month, day = validate_and_determine_format(first, second) |
| if month is None or day is None: |
| return "Invalid DOB" |
| try: |
| |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| |
| pass |
| |
| |
| match = re.search(r'\b(\d{2})(\d{2})(\d{4})\b', text) |
| if match: |
| first, second, year = match.groups() |
| |
| if not is_valid_year(year): |
| |
| pass |
| else: |
| month, day = validate_and_determine_format(first, second) |
| if month is None or day is None: |
| return "Invalid DOB" |
| try: |
| |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| |
| pass |
| |
| |
| match = re.search(r'\b(\d{1,2})[-\s]([a-z]{3,9})[-\s](\d{4})\b', text_lower) |
| if match: |
| day_str, month_str, year = match.groups() |
| |
| if not is_valid_year(year): |
| |
| pass |
| elif month_str in month_names: |
| day = normalize_number(day_str) |
| month = month_names[month_str] |
| try: |
| |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| |
| pass |
|
|
| |
| match = re.search(r'\b(\d{1,2})([a-z]{3,9})(\d{4}|\d{2})\b', text_lower) |
| if match: |
| day_str, month_str, year = match.groups() |
| |
| |
| if len(year) == 2: |
| year_int = int(year) |
| if year_int >= 0 and year_int <= 26: |
| year = f'20{year}' |
| else: |
| year = f'19{year}' |
| |
| if not is_valid_year(year): |
| |
| pass |
| elif month_str in month_names: |
| day = normalize_number(day_str) |
| month = month_names[month_str] |
| try: |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| |
| pass |
| |
| |
| match = re.search(r'\b([a-z]{3,9})[-\/\.\s](\d{1,2})[-\/\.\s,]+(\d{4})\b', text_lower) |
| if match: |
| month_str, day_str, year = match.groups() |
| |
| if not is_valid_year(year): |
| |
| pass |
| elif month_str in month_names: |
| day = normalize_number(day_str) |
| month = month_names[month_str] |
| try: |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| |
| pass |
| |
| |
| match = re.search(r'\b(\d{1,2})[-\s]([a-z]{3,9})[-\s](\d{2})\b', text_lower) |
| if match: |
| day_str, month_str, year_short = match.groups() |
| |
| |
| year_int = int(year_short) |
| if year_int >= 0 and year_int <= 26: |
| year = f'20{year_short}' |
| else: |
| year = f'19{year_short}' |
| |
| if month_str in month_names: |
| day = normalize_number(day_str) |
| month = month_names[month_str] |
| try: |
| dt = datetime(int(year), int(month), int(day)) |
| return f'{day}-{month}-{year}' |
| except ValueError: |
| print(f"Invalid date: {day}-{month}-{year}") |
| pass |
| |
| return None |
|
|
|
|
|
|
|
|
| |
| |
| |
| _PGEOCODE_NOMI_INST = None |
| _PGEOCODE_LOOKUP_CACHE: dict = {} |
|
|
|
|
| def _get_pgeocode_inst(): |
| """Return cached pgeocode.Nominatim("IN") instance.""" |
| global _PGEOCODE_NOMI_INST |
| if _PGEOCODE_NOMI_INST is None: |
| try: |
| import pgeocode as _pgeocode_lib |
| _PGEOCODE_NOMI_INST = _pgeocode_lib.Nominatim("IN") |
| logger.info("pgeocode loaded for India (offline pincode DB).") |
| except Exception as e: |
| logger.warning("pgeocode unavailable β pincode enrichment disabled: %s", e) |
| return _PGEOCODE_NOMI_INST |
|
|
|
|
| def lookup_pincode_info(pin: str) -> dict: |
| """ |
| Offline lookup of a 6-digit Indian pincode. |
| Returns dict: {district, state, place, lat, lng} |
| All values are strings (empty string if not found), lat/lng are float or None. |
| Result is cached in memory after first call β no repeated disk/network I/O. |
| """ |
| if not pin: |
| return {} |
| pin_str = re.sub(r"\D", "", str(pin).strip()).zfill(6) |
| if len(pin_str) != 6: |
| return {} |
|
|
| if pin_str in _PGEOCODE_LOOKUP_CACHE: |
| return _PGEOCODE_LOOKUP_CACHE[pin_str] |
|
|
| db = _get_pgeocode_inst() |
| if db is None: |
| _PGEOCODE_LOOKUP_CACHE[pin_str] = {} |
| return {} |
|
|
| try: |
| row = db.query_postal_code(pin_str) |
| if row is None: |
| _PGEOCODE_LOOKUP_CACHE[pin_str] = {} |
| return {} |
|
|
| def _safe_str(val) -> str: |
| if val is None: |
| return "" |
| try: |
| if isinstance(val, float) and math.isnan(val): |
| return "" |
| except Exception: |
| pass |
| return str(val).strip() |
|
|
| def _safe_float(val): |
| try: |
| f = float(val) |
| return None if math.isnan(f) else f |
| except Exception: |
| return None |
|
|
| result = { |
| "district": _safe_str(getattr(row, "county_name", "")), |
| "state": _safe_str(getattr(row, "state_name", "")), |
| "place": _safe_str(getattr(row, "place_name", "")), |
| "lat": _safe_float(getattr(row, "latitude", None)), |
| "lng": _safe_float(getattr(row, "longitude", None)), |
| } |
| _PGEOCODE_LOOKUP_CACHE[pin_str] = result |
| return result |
| except Exception as e: |
| logger.debug("pgeocode lookup error for %s: %s", pin_str, e) |
| _PGEOCODE_LOOKUP_CACHE[pin_str] = {} |
| return {} |
|
|
|
|
| |
| |
| |
| |
| |
| _BANK_STATE_CODE_MAP: dict = { |
| |
| "NDH": "DELHI", "SDH": "DELHI", "CDH": "DELHI", |
| "EDH": "DELHI", "WDH": "DELHI", "NWD": "DELHI", |
| "SWD": "DELHI", "NED": "DELHI", |
| |
| "MUM": "MAHARASHTRA", "BOM": "MAHARASHTRA", |
| "BLR": "KARNATAKA", "BNG": "KARNATAKA", |
| "HYD": "TELANGANA", "SCB": "TELANGANA", |
| "CHN": "TAMIL NADU", "MAD": "TAMIL NADU", |
| "KOL": "WEST BENGAL","CAL": "WEST BENGAL", |
| "PUN": "MAHARASHTRA","PCM": "MAHARASHTRA", |
| "AHM": "GUJARAT", "AMD": "GUJARAT", |
| "JAI": "RAJASTHAN", |
| "LKO": "UTTAR PRADESH", "KNP": "UTTAR PRADESH", |
| "PAT": "BIHAR", |
| "RNC": "JHARKHAND", |
| "BHU": "ODISHA", |
| "GHY": "ASSAM", |
| "CCU": "WEST BENGAL", |
| |
| "A.P.": "ANDHRA PRADESH", "A.P": "ANDHRA PRADESH", |
| "T.N.": "TAMIL NADU", "T.N": "TAMIL NADU", |
| "U.P.": "UTTAR PRADESH", "U.P": "UTTAR PRADESH", |
| "M.P.": "MADHYA PRADESH", "M.P": "MADHYA PRADESH", |
| "H.P.": "HIMACHAL PRADESH","H.P": "HIMACHAL PRADESH", |
| "W.B.": "WEST BENGAL", "W.B": "WEST BENGAL", |
| } |
|
|
| def standardize_state(state_str): |
| """ |
| Standardize state names to canonical lowercase form. |
| Handles: |
| - Standard ISO abbreviations (AP, TS, KA β¦) |
| - Full state names and common variants |
| - Bank/system internal codes (NDHβDELHI, BLRβKARNATAKA β¦) |
| - Dotted abbreviations (A.P., T.N. β¦) |
| """ |
| if not state_str: |
| return None |
|
|
| state_str = clean_text(state_str) |
| if not state_str: |
| return None |
|
|
| normalized = state_str.strip() |
| lookup_key = normalized.upper() |
|
|
| |
| if lookup_key in _BANK_STATE_CODE_MAP: |
| canonical = _BANK_STATE_CODE_MAP[lookup_key] |
| |
| if STATE_MAPPING: |
| std_name = lookup_from_mapping(canonical, STATE_MAPPING) |
| if std_name: |
| return std_name.upper() |
| return canonical.upper() |
|
|
| if STATE_MAPPING: |
| std_name = lookup_from_mapping(lookup_key, STATE_MAPPING) |
| if std_name: |
| return std_name.upper() |
|
|
| if not state_name_standard_df.empty: |
| state_mappping_df = state_name_standard_df.copy() |
| state_mappping_df.columns = state_mappping_df.columns.str.upper() |
| state_name = replace_with_standard(lookup_key, state_mappping_df) |
| if state_name != "" and state_name != lookup_key: |
| return state_name.upper() |
|
|
| return normalized |
|
|
| def standardize_city(city_str): |
| """ |
| Standardize city names to canonical lowercase form. |
| """ |
| if not city_str: |
| return None |
|
|
| city_str = clean_text(city_str) |
| if not city_str: |
| return None |
|
|
| normalized = city_str.strip() |
| lookup_key = normalized.upper() |
|
|
| if CITY_MAPPING: |
| std_name = lookup_from_mapping(lookup_key, CITY_MAPPING) |
| if std_name: |
| return std_name.upper() |
|
|
| if not city_prev_pres_df.empty: |
| city_prev_pres_data = city_prev_pres_df.copy() |
| city_prev_pres_data.columns = city_prev_pres_data.columns.str.upper() |
| city_name = replace_with_standard(lookup_key, city_prev_pres_data) |
| if city_name != "" and city_name != lookup_key: |
| return city_name.upper() |
|
|
| return normalized |
|
|
| def standardize_column(text, column_name): |
| """ |
| Standardize field values to canonical lowercase form. |
| """ |
| if not text: |
| return None |
| if isinstance(text, str): |
| text = clean_text(text) |
| if not text: |
| return None |
| column_lower = str(column_name).upper() if column_name else "" |
| if "addressline" in column_lower: |
| if not hno_variation_df.empty: |
| try: |
| address_df = hno_variation_df.copy() |
| address_df.columns = address_df.columns.str.upper() |
| |
| text_upper = text.upper() |
| result = replace_with_standard(text_upper, address_df) |
| text = result.upper() if result else text |
| except Exception as e: |
| pass |
|
|
| if column_lower == 'pan': |
| return validate_and_normalize_pan(text.upper() if text else text) |
| elif column_lower == 'aadhar': |
| return validate_and_normalize_aadhar(text) |
|
|
| return text |
|
|
| def standardize_dob(dob_str): |
| if not dob_str: |
| return None |
| |
| |
| |
| |
| |
| |
| raw_input = dob_str |
| dob_str = normalize_dob(dob_str) |
| |
| return dob_str |
|
|
| |
| |
| |
| def compare_exact(val1, val2): |
| """Exact match (case-insensitive)""" |
| if not val1 or not val2: |
| return 0 |
| |
| |
| v1 = str(val1).strip().upper() |
| v2 = str(val2).strip().upper() |
| |
| return 100 if v1 == v2 else 0 |
|
|
| def compare_any_match(list1, list2, field_type="pincode"): |
| """ |
| 1:N matching for lists of values (pincodes, states, cities) |
| Returns 100 if any value in list1 matches any value in list2 |
| """ |
| valid_list1 = [v for v in list1 if v and str(v).strip() not in ["", "-", " "]] |
| valid_list2 = [v for v in list2 if v and str(v).strip() not in ["", "-", " "]] |
| |
| if not valid_list1 or not valid_list2: |
| return 0 |
| |
| |
| if field_type == "pincode": |
| normalized_list1 = [validate_and_normalize_pincode(v) for v in valid_list1] |
| normalized_list2 = [validate_and_normalize_pincode(v) for v in valid_list2] |
| elif field_type == "state": |
| normalized_list1 = [standardize_state(v) for v in valid_list1] |
| normalized_list2 = [standardize_state(v) for v in valid_list2] |
| elif field_type == "city": |
| normalized_list1 = [standardize_city(v) for v in valid_list1] |
| normalized_list2 = [standardize_city(v) for v in valid_list2] |
| elif field_type == "dob" or field_type == "birthdate": |
| normalized_list1 = [standardize_dob(v) for v in valid_list1] |
| normalized_list2 = [standardize_dob(v) for v in valid_list2] |
| else: |
| normalized_list1 = [str(v).strip().upper() for v in valid_list1] |
| normalized_list2 = [str(v).strip().upper() for v in valid_list2] |
| |
| normalized_list1 = [v for v in normalized_list1 if v] |
| normalized_list2 = [v for v in normalized_list2 if v] |
| |
| if not normalized_list1 or not normalized_list2: |
| return 0 |
| |
| for v1 in normalized_list1: |
| if v1 in normalized_list2: |
| return 100 |
| |
| return 0 |
|
|
| def compare_phone_any_match(phones1, phones2): |
| """1:N matching for phone numbers""" |
| valid_phones1 = [validate_and_normalize_phone(p) for p in phones1 if p] |
| valid_phones2 = [validate_and_normalize_phone(p) for p in phones2 if p] |
| |
| valid_phones1 = [p for p in valid_phones1 if p] |
| valid_phones2 = [p for p in valid_phones2 if p] |
| |
| if not valid_phones1 or not valid_phones2: |
| return 0 |
| |
| for p1 in valid_phones1: |
| if p1 in valid_phones2: |
| return 100 |
| |
| return 0 |
|
|
| def compare_email_any_match(emails1, emails2): |
| """1:N matching for email addresses""" |
| valid_emails1 = [validate_and_normalize_email(e) for e in emails1 if e] |
| valid_emails2 = [validate_and_normalize_email(e) for e in emails2 if e] |
| |
| valid_emails1 = [e for e in valid_emails1 if e] |
| valid_emails2 = [e for e in valid_emails2 if e] |
| |
| if not valid_emails1 or not valid_emails2: |
| return 0 |
| |
| for e1 in valid_emails1: |
| if e1 in valid_emails2: |
| return 100 |
| |
| return 0 |
|
|
| |
| |
| |
| def evaluate_matching_rules(field_scores: Dict[str, float]) -> tuple: |
| """ |
| Evaluate matching rules and return overall decision |
| Returns: (decision, reason) |
| """ |
| def get_score(field_name): |
| return field_scores.get(field_name, 0) |
| |
| def rule_satisfied(conditions): |
| for field, threshold in conditions: |
| if get_score(field) < threshold: |
| return False |
| return True |
| |
| |
| RULES = MATCHING_RULES |
| |
| for conditions, reason in RULES: |
| if rule_satisfied(conditions): |
| return "Match", reason |
| |
| return "No Match", "None of the defined matching rules were satisfied" |
|
|
|
|
|
|
|
|
| |
| |
| |
| def apply_pattern_matching_logic(field_name: str, score) -> float: |
| """ |
| Apply 0 or 100 logic for pattern-based fields |
| """ |
| PATTERN_FIELDS = { |
| "BIRTHDATE", "PHONE", "EMAIL", "ZIPCODE", |
| "TAXID", "LICENSEID", "PASSPORTID", "GENDER", |
| "AADHAR", "PAN" |
| } |
| |
| if score == "missing value": |
| return 0 |
| |
| if field_name in PATTERN_FIELDS: |
| return 100 if score >= 100 else 0 |
| |
| return score |
|
|
|
|
| def roman_to_number(text): |
| """Convert Roman numerals to Arabic numbers in text""" |
| if not text or not isinstance(text, str): |
| return str(text) if text else "" |
| |
| def roman_to_int(roman): |
| roman = roman.upper() |
| |
| |
| |
| |
| strict_regex = r"^M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})$" |
| |
| if not re.fullmatch(strict_regex, roman): |
| return None |
| |
| values = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000} |
| total = 0 |
| prev = 0 |
| for ch in reversed(roman): |
| val = values.get(ch, 0) |
| if val < prev: |
| total -= val |
| else: |
| total += val |
| prev = val |
| return total |
| |
| |
| |
| pattern = r'\b([IVXLCDM]+)\b' |
| |
| def replace_roman(match): |
| roman = match.group(1) |
| |
| |
| |
| |
| number = roman_to_int(roman) |
| return str(number) if number is not None else roman |
| |
| replaced_roman = re.sub(pattern, replace_roman, text, flags=re.IGNORECASE) |
| |
| return replaced_roman |
|
|
| def normalize_and_deduplicate_address(text): |
| """Remove duplicate words from entire address while preserving order""" |
| if not text or not isinstance(text, str): |
| return "" |
|
|
| segments = text.split(',') |
| seen = set() |
| deduplicated_segments = [] |
|
|
| for segment in segments: |
| words = segment.strip().split() |
| unique_words = [] |
|
|
| for word in words: |
| key = word.upper() |
| if key not in seen: |
| seen.add(key) |
| unique_words.append(word) |
|
|
| if unique_words: |
| deduplicated_segments.append(" ".join(unique_words)) |
|
|
| return " ".join(deduplicated_segments) |
|
|
|
|
| def extract_leading_house_number(segment, street_keywords): |
| """Extract house number if it appears as the FIRST token""" |
| tokens = segment.strip().split() |
|
|
| if len(tokens) < 1: |
| return None |
|
|
| first = tokens[0].upper() |
|
|
| if not re.fullmatch(r"[A-Z]?\d+[A-Z]?", first): |
| return None |
|
|
| if len(tokens) >= 2: |
| second = tokens[1].upper() |
| keywords_list = [street_keywords] if isinstance(street_keywords, str) else street_keywords |
| |
| if second in [kw.upper() for kw in keywords_list]: |
| return None |
|
|
| return first |
|
|
|
|
| def is_street_context(text, match_start, street_keywords): |
| """Check if a match occurs near street keywords""" |
| window = text[max(0, match_start - 20):match_start] |
| keywords_list = [street_keywords] if isinstance(street_keywords, str) else street_keywords |
| |
| for kw in keywords_list: |
| if re.search(rf"\b{re.escape(kw)}\b", window, re.IGNORECASE): |
| return True |
| return False |
|
|
|
|
| def extract_component_with_hierarchy(text, identifier, value_patterns=None, street_keywords=None): |
| """ |
| Hierarchical extraction working directly on full address string. |
| No comma-based segmentation. |
| |
| Returns: (extracted_value, identifier_found, pattern_value) |
| """ |
| if not text: |
| return None, None, None |
|
|
| |
| if identifier: |
| id_match = re.search(rf"\b{re.escape(identifier)}\b", text, re.IGNORECASE) |
| if id_match: |
| if value_patterns: |
| |
| text_after_id = text[id_match.end():] |
| for pattern in value_patterns: |
| m = re.search(pattern, text_after_id, re.IGNORECASE) |
| if m: |
| return m.group(0).strip(), identifier, m.group(0).strip() |
| |
| return None, identifier, None |
| else: |
| |
| text_after = text[id_match.end():].strip() |
| |
| value = re.split(r"[,]", text_after)[0].strip() |
| return value if value else None, identifier, None |
|
|
| |
| if street_keywords is not None: |
| leading = extract_leading_house_number(text, street_keywords) |
| if leading: |
| return leading, None, leading |
|
|
| |
| if value_patterns: |
| for pattern in value_patterns: |
| for match in re.finditer(pattern, text, re.IGNORECASE): |
| extracted_value = match.group(0).strip() |
| if street_keywords: |
| if is_street_context(text, match.start(), street_keywords): |
| continue |
| return extracted_value, None, extracted_value |
|
|
| return None, None, None |
|
|
|
|
| def remove_matched_text(text, identifier=None, pattern_value=None): |
| """ |
| Remove identifier and/or pattern value directly from full address string. |
| |
| Rules: |
| - If identifier present AND pattern matched: remove both |
| - If identifier present but no pattern matched: remove identifier only |
| - If no identifier, only pattern matched: remove pattern value only |
| """ |
| if not text: |
| return "" |
|
|
| result = text |
|
|
| if identifier: |
| result = re.sub( |
| rf"\b{re.escape(identifier)}\b[\s#.:/-]*", |
| " ", |
| result, |
| flags=re.IGNORECASE |
| ) |
|
|
| if pattern_value: |
| result = re.sub( |
| rf"\b{re.escape(pattern_value)}\b[\s#.:/-]*", |
| " ", |
| result, |
| flags=re.IGNORECASE |
| ) |
|
|
| |
| result = re.sub(r"\s{2,}", " ", result).strip() |
| result = re.sub(r"^[,\s]+|[,\s]+$", "", result) |
|
|
| return result |
|
|
|
|
| def extract_address_components(address_line: str) -> dict: |
| """ |
| Master extraction function β no comma segmentation. |
| Works directly on full address string. |
| """ |
| empty_result = { |
| "original_address": "", |
| "house_number": None, |
| "house_segment": None, |
| "flat_number": None, |
| "flat_segment": None, |
| "apartment": None, |
| "apartment_segment": None, |
| "street": None, |
| "street_segment": None, |
| "remaining_address": "" |
| } |
|
|
| if not address_line: |
| return empty_result |
|
|
| address_line = clean_text(str(address_line)) |
| if not address_line: |
| return empty_result |
|
|
| original_address = address_line |
| remaining = address_line |
|
|
| |
| remaining = roman_to_number(remaining) |
| remaining = str(remaining) if remaining else "" |
|
|
| |
| _base_kws = [STREET_KEYWORD] if isinstance(STREET_KEYWORD, str) else list(STREET_KEYWORD) |
| _street_kws = list(dict.fromkeys( |
| _base_kws + ["GALI NO", "LANE NO", "GALI", "GALLI"] |
| )) |
|
|
| |
| house_patterns = [ |
| r"\b(MIG|HIG|LIG)-\d+[a-zA-Z]?\b", |
| r"\b\d+(?:-\d+){2,}[a-zA-Z]?\b", |
| r"\b\d+-\d+/\d+[a-zA-Z]?\b", |
| r"\b\d+-\d+/[a-zA-Z]\b", |
| r"\b\d+-\d+/\d+\b", |
| r"\b\d+/\d+(?:/\d+)?\s?[a-zA-Z]?\b", |
| r"\b[a-zA-Z]{1,3}/\d+[a-zA-Z]?\b", |
| r"\b\d+-\d+[a-zA-Z]\b", |
| r"\b\d+-\d+\b", |
| r"\b[a-zA-Z]{1,2}-?\d+[a-zA-Z]?\b", |
| r"\b\d+[a-zA-Z]\b", |
| r"\b\d{1,4}\b", |
| ] |
|
|
| flat_patterns = [ |
| r"\b\d+[a-zA-Z]?\b", |
| r"\b[a-zA-Z]-?\d+\b", |
| ] |
|
|
| |
| house_no, house_id_found, house_pat_val = extract_component_with_hierarchy( |
| remaining, |
| HOUSE_NUMBER_IDENTIFIER, |
| house_patterns, |
| _street_kws |
| ) |
|
|
| house_segment = None |
| if house_id_found or house_pat_val: |
| house_segment = remaining |
| remaining = remove_matched_text(remaining, house_id_found, house_pat_val) |
|
|
| remaining = str(remaining) if remaining else "" |
|
|
| |
| |
| flat_no, flat_id_found, flat_pat_val = extract_component_with_hierarchy( |
| remaining, |
| FLAT_NUMBER_IDENTIFIER, |
| flat_patterns, |
| street_keywords=_street_kws |
| ) |
|
|
| flat_segment = None |
| if flat_id_found or flat_pat_val: |
| flat_segment = remaining |
| remaining = remove_matched_text(remaining, flat_id_found, flat_pat_val) |
|
|
| remaining = str(remaining) if remaining else "" |
|
|
| |
| apartment, apt_id_found, apt_pat_val = extract_component_with_hierarchy( |
| remaining, |
| APARTMENT_IDENTIFIER |
| ) |
|
|
| apartment_segment = None |
| if apt_id_found or apt_pat_val: |
| apartment_segment = remaining |
| remaining = remove_matched_text(remaining, apt_id_found, apt_pat_val) |
|
|
| remaining = str(remaining) if remaining else "" |
|
|
| |
| street, street_id_found, street_pat_val = extract_component_with_hierarchy( |
| remaining, |
| STREET_KEYWORD, |
| street_keywords=_street_kws |
| ) |
|
|
| street_segment = None |
| if street_id_found or street_pat_val: |
| street_segment = remaining |
| remaining = remove_matched_text(remaining, street_id_found, street_pat_val) |
|
|
| remaining = str(remaining) if remaining else "" |
|
|
| |
| |
| remaining = re.sub( |
| r'\b(GALI|LANE|CROSS|MAIN)\s+NO\s+[A-Z0-9][A-Z0-9\-]*\b[\s,]*', |
| ' ', remaining, flags=re.IGNORECASE |
| ) |
| |
| remaining = re.sub( |
| r'\b(GALI|LANE|ROAD|MARG|STREET|CROSS|MAIN)\s+NO\b[\s,]*', |
| ' ', remaining, flags=re.IGNORECASE |
| ) |
| remaining = re.sub(r"\s+", " ", remaining).strip() |
| remaining = re.sub(r"^[,\s]+|[,\s]+$", "", remaining) |
| remaining = normalize_and_deduplicate_address(remaining) |
|
|
| print(f"[EXTRACT] house_no : {house_no!r} | segment: {house_segment!r}") |
| print(f"[EXTRACT] flat_no : {flat_no!r} | segment: {flat_segment!r}") |
| print(f"[EXTRACT] apartment : {apartment!r} | segment: {apartment_segment!r}") |
| print(f"[EXTRACT] street : {street!r} | segment: {street_segment!r}") |
| print(f"[EXTRACT] remaining_addr: {remaining!r}") |
| return { |
| "original_address": original_address, |
| "house_number": house_no, |
| "house_segment": house_segment, |
| "flat_number": flat_no, |
| "flat_segment": flat_segment, |
| "apartment": apartment, |
| "apartment_segment": apartment_segment, |
| "street": street, |
| "street_segment": street_segment, |
| "remaining_address": remaining if remaining else "" |
| } |
| |
| |
| |
| |
|
|
| |
| _STRUCT_NON_HNO = re.compile( |
| r'\b(sector|ward|phase|block|zone|gali\s*no|gali\s*number|lane\s*no)\s*' |
| r'[:\-]?\s*(\d+[A-Z]?)', |
| re.IGNORECASE, |
| ) |
|
|
| |
| _STRUCT_HNO_KW = re.compile( |
| r'\b(?:d\.?\s*no\.?|door\s*no\.?|h\.?\s*no\.?|house\s*no\.?|' |
| r'house\s*number|plot\s*no\.?|flat\s*no\.?|flat\s*number|' |
| r'mig\s*no\.?|hig\s*no\.?|lig\s*no\.?|' |
| r'khata\s*no\.?|khasra\s*no\.?)' |
| r'\s*[:\-]?\s*([A-Z0-9][A-Z0-9\-/]*)', |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def extract_house_number_from_addressline(text: str) -> str: |
| """ |
| Extract house/door number from a raw addressline string. |
| No keyword required β uses pattern priority: |
| 1. Explicit keyword (H.No, D.No, House No, Flat No β¦) |
| 2. Compound formats: 2-6-116, 144/143, MIG-25, 1-180a |
| 3. Simple alpha-numeric: 12B, A-110 |
| Excludes sector/ward/phase numbers. |
| Returns normalised uppercase string or empty string. |
| """ |
| if not text: |
| return "" |
|
|
| excluded = {m.group(2).strip().upper() for m in _STRUCT_NON_HNO.finditer(text)} |
|
|
| |
| m = _STRUCT_HNO_KW.search(text) |
| if m: |
| val = m.group(1).strip().upper() |
| if val not in excluded: |
| return val |
|
|
| |
| _pats = [ |
| r'\b((?:MIG|HIG|LIG)-\d+[A-Z]?)\b', |
| r'\b(\d+(?:-\d+){2,}[A-Z]?)\b', |
| r'\b(\d+-\d+/\d+[A-Z]?)\b', |
| r'\b(\d+/\d+(?:/\d+)?[A-Z]?)\b', |
| r'\b([A-Z]{1,3}/\d+[A-Z]?)\b', |
| r'\b(\d+-\d+[A-Z]?)\b', |
| r'\b([A-Z]-?\d+[A-Z]?)\b', |
| r'\b(\d+[A-Z])\b', |
| ] |
| for pat in _pats: |
| for m in re.finditer(pat, text, re.IGNORECASE): |
| val = m.group(1).strip().upper() |
| if val not in excluded: |
| before = text[:m.start()].upper() |
| if not re.search(r'\b(sector|ward|phase|block|zone|gali)\s*$', before): |
| return val |
| return "" |
|
|
|
|
| class _StructuredAddressRecord: |
| """ |
| Internal helper: holds one address record with separate column values. |
| Enriches missing state/city from zipcode via pgeocode. |
| Extracts all address components (house_number, flat_number, apartment, street) |
| and stores the remaining address (all components removed) for model input. |
| """ |
| __slots__ = ('raw_addressline', 'raw_city', 'raw_zipcode', 'raw_state', |
| 'addressline', 'city', 'state', 'zipcode', 'pgeocode_info', |
| 'house_number', 'flat_number', 'apartment', 'street') |
|
|
| def __init__(self, addressline="", city="", zipcode="", state=""): |
| self.raw_addressline = str(addressline or "").strip() |
| self.raw_city = str(city or "").strip() |
| self.raw_zipcode = str(zipcode or "").strip() |
| self.raw_state = str(state or "").strip() |
| self.addressline = "" |
| self.city = "" |
| self.state = "" |
| self.zipcode = "" |
| self.pgeocode_info = {} |
| self.house_number = "" |
| self.flat_number = "" |
| self.apartment = "" |
| self.street = "" |
| self._enrich() |
|
|
| @staticmethod |
| def _norm(val): |
| """Normalize extracted component: strip non-alphanumerics and spaces.""" |
| if not val: |
| return "" |
| return re.sub(r'[^A-Z0-9]', '', str(val).upper()) |
|
|
| def _enrich(self): |
| |
| preprocessed = preprocess_address(self.raw_addressline).upper() if self.raw_addressline else "" |
|
|
| |
| pin_clean = re.sub(r'\D', '', self.raw_zipcode) |
| self.zipcode = pin_clean if len(pin_clean) == 6 else "" |
|
|
| |
| self.state = standardize_state(self.raw_state) or "" |
|
|
| |
| self.city = standardize_city(self.raw_city) or "" |
|
|
| |
| components = extract_address_components(self.raw_addressline) |
|
|
| self.house_number = self._norm(components.get("house_number")) |
| self.flat_number = self._norm(components.get("flat_number")) |
| self.apartment = self._norm(components.get("apartment")) |
| self.street = self._norm(components.get("street")) |
|
|
| |
| remaining = components.get("remaining_address", "").strip() |
| self.addressline = remaining if remaining else preprocessed |
|
|
| |
| if self.zipcode: |
| self.pgeocode_info = lookup_pincode_info(self.zipcode) |
| if not self.state and self.pgeocode_info.get("state"): |
| self.state = standardize_state(self.pgeocode_info["state"]) or "" |
| if not self.city and self.pgeocode_info.get("district"): |
| self.city = standardize_city(self.pgeocode_info["district"]) or "" |
|
|
|
|
| def match_structured_address_fields( |
| addressline1: str, city1: str, zipcode1: str, state1: str, |
| addressline2: str, city2: str, zipcode2: str, state2: str, |
| ) -> dict: |
| """ |
| Match two address records provided as already-split column values |
| (ADDRESSLINE, CITY, ZIPCODE, STATE). |
| |
| Address component scoring (applied only when remaining address base_score > 60): |
| house_number : match β +30, mismatch β -30 |
| flat_number : match β +10, mismatch β -10 |
| street : match β +10, mismatch β -10 |
| apartment : match β +10, mismatch β -10 |
| (missing on either side β no adjustment for that component) |
| |
| If base_score <= 60, component adjustments are NOT applied. |
| """ |
| from rapidfuzz import fuzz as _rfuzz |
|
|
| r1 = _StructuredAddressRecord(addressline1, city1, zipcode1, state1) |
| r2 = _StructuredAddressRecord(addressline2, city2, zipcode2, state2) |
|
|
| |
| if r1.zipcode and r2.zipcode: |
| if r1.zipcode == r2.zipcode: |
| zip_cmp = {"verdict": "match", "adjustment": 20.0, "z1": r1.zipcode, "z2": r2.zipcode} |
| else: |
| zip_cmp = {"verdict": "mismatch", "adjustment": -25.0, "z1": r1.zipcode, "z2": r2.zipcode} |
| else: |
| zip_cmp = {"verdict": "missing", "adjustment": 0.0, "z1": r1.zipcode, "z2": r2.zipcode} |
|
|
| |
| s1, s2 = r1.state, r2.state |
| if s1 and s2: |
| if s1 == s2: |
| state_cmp = {"verdict": "match", "adjustment": 10.0, "s1": s1, "s2": s2} |
| else: |
| state_cmp = {"verdict": "mismatch", "adjustment": -20.0, "s1": s1, "s2": s2} |
| else: |
| state_cmp = {"verdict": "missing", "adjustment": 0.0, "s1": s1, "s2": s2} |
|
|
| |
| c1, c2 = r1.city, r2.city |
| if c1 and c2: |
| sim = _rfuzz.token_set_ratio(c1, c2) |
| if sim >= 85: |
| city_cmp = {"verdict": "match", "adjustment": 10.0, "c1": c1, "c2": c2, "similarity": sim} |
| elif sim >= 60: |
| city_cmp = {"verdict": "partial", "adjustment": 3.0, "c1": c1, "c2": c2, "similarity": sim} |
| else: |
| city_cmp = {"verdict": "mismatch","adjustment":-10.0, "c1": c1, "c2": c2, "similarity": sim} |
| else: |
| city_cmp = {"verdict": "missing", "adjustment": 0.0, "c1": c1, "c2": c2, "similarity": 0} |
|
|
| |
| t1, t2 = r1.addressline, r2.addressline |
| if t1 and t2: |
| try: |
| from services.model import match_entities |
| from services.config import ADDRESS_MODEL_WEIGHTS |
| base_score = float(match_entities(t1, t2, weights=ADDRESS_MODEL_WEIGHTS)) |
| except Exception: |
| base_score = float(max( |
| _rfuzz.token_set_ratio(t1, t2), |
| _rfuzz.WRatio(t1, t2), |
| _rfuzz.ratio(t1, t2), |
| )) |
| else: |
| base_score = 0.0 |
|
|
| |
| def _compare_component(v1, v2, boost, penalty): |
| """Compare two normalized component values. Returns result dict.""" |
| if v1 and v2: |
| if v1 == v2: |
| return {"verdict": "match", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty} |
| else: |
| return {"verdict": "mismatch", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty} |
| return {"verdict": "missing", "v1": v1, "v2": v2, "boost": boost, "penalty": penalty} |
|
|
| hno_cmp = _compare_component(r1.house_number, r2.house_number, boost=30.0, penalty=30.0) |
| flat_cmp = _compare_component(r1.flat_number, r2.flat_number, boost=10.0, penalty=10.0) |
| apt_cmp = _compare_component(r1.apartment, r2.apartment, boost=10.0, penalty=10.0) |
| str_cmp = _compare_component(r1.street, r2.street, boost=10.0, penalty=10.0) |
|
|
| |
| comp_adj = 0.0 |
| print(f"[ADDR_COMPONENTS] base_score={base_score:.2f} | threshold=60 | adjustments_applied={base_score > 60}") |
| print(f" remaining_addr1 : {r1.addressline!r}") |
| print(f" remaining_addr2 : {r2.addressline!r}") |
| for cmp, label in [ |
| (hno_cmp, "house_number"), |
| (flat_cmp, "flat_number"), |
| (apt_cmp, "apartment"), |
| (str_cmp, "street"), |
| ]: |
| verdict = cmp["verdict"] |
| v1, v2 = cmp.get("v1", ""), cmp.get("v2", "") |
| if verdict == "missing": |
| print(f" {label:<15} | verdict=missing | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [skipped - component absent]") |
| elif base_score <= 60: |
| sign = "+" if verdict == "match" else "-" |
| pts = cmp["boost"] if verdict == "match" else cmp["penalty"] |
| print(f" {label:<15} | verdict={verdict:<9} | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [SKIPPED - base_score<=60]") |
| else: |
| if verdict == "match": |
| adj = cmp["boost"] |
| comp_adj += adj |
| print(f" {label:<15} | verdict=match | v1={v1!r:>10} v2={v2!r:<10} | adjustment=+{adj:.1f} [BOOSTED]") |
| else: |
| adj = cmp["penalty"] |
| comp_adj -= adj |
| print(f" {label:<15} | verdict=mismatch | v1={v1!r:>10} v2={v2!r:<10} | adjustment=-{adj:.1f} [PENALISED]") |
| print(f" total comp_adj : {comp_adj:+.1f}") |
|
|
| |
| total_adj = (zip_cmp["adjustment"] + state_cmp["adjustment"] |
| + city_cmp["adjustment"] + comp_adj) |
| final_score = max(0.0, min(100.0, base_score + total_adj)) |
|
|
| |
| notes = [] |
| for cmp, key, v1k, v2k in [ |
| (zip_cmp, "zipcode", "z1", "z2"), |
| (state_cmp, "state", "s1", "s2"), |
| (city_cmp, "city", "c1", "c2"), |
| ]: |
| v = cmp["verdict"] |
| if v == "match": |
| notes.append(f"{key} match ({cmp.get(v1k,'')})") |
| elif v == "mismatch": |
| notes.append(f"{key} MISMATCH ({cmp.get(v1k,'')} β {cmp.get(v2k,'')})") |
|
|
| for cmp, key in [(hno_cmp, "house_no"), (flat_cmp, "flat_no"), |
| (apt_cmp, "apartment"), (str_cmp, "street")]: |
| v = cmp["verdict"] |
| if v == "match": |
| notes.append(f"{key} match ({cmp['v1']})") |
| elif v == "mismatch": |
| notes.append(f"{key} MISMATCH ({cmp['v1']} β {cmp['v2']})" |
| + (" [applied]" if base_score > 60 else " [skipped, base<=60]")) |
|
|
| return { |
| "final_score": round(final_score, 2), |
| "base_score": round(base_score, 2), |
| "adjustment": round(total_adj, 2), |
| "comp_adjustment": round(comp_adj, 2), |
| "zipcode": zip_cmp, |
| "state": state_cmp, |
| "city": city_cmp, |
| "house_number": hno_cmp, |
| "flat_number": flat_cmp, |
| "apartment": apt_cmp, |
| "street": str_cmp, |
| "record1": { |
| "addressline": r1.addressline, |
| "city": r1.city, |
| "state": r1.state, |
| "zipcode": r1.zipcode, |
| "house_number": r1.house_number or None, |
| "flat_number": r1.flat_number or None, |
| "apartment": r1.apartment or None, |
| "street": r1.street or None, |
| "pgeocode": r1.pgeocode_info, |
| }, |
| "record2": { |
| "addressline": r2.addressline, |
| "city": r2.city, |
| "state": r2.state, |
| "zipcode": r2.zipcode, |
| "house_number": r2.house_number or None, |
| "flat_number": r2.flat_number or None, |
| "apartment": r2.apartment or None, |
| "street": r2.street or None, |
| "pgeocode": r2.pgeocode_info, |
| }, |
| "notes": notes, |
| } |
|
|
|
|
| def match_structured_address_lists( |
| addrs1: list, |
| addrs2: list, |
| ) -> float: |
| """ |
| Match N address dicts from record1 against M from record2. |
| Each dict: {addressline, city, zipcode, state}. |
| Returns best score across all NΓM combinations (0-100). |
| """ |
| if not addrs1 or not addrs2: |
| return 0.0 |
| best = 0.0 |
| for a1 in addrs1: |
| for a2 in addrs2: |
| r = match_structured_address_fields( |
| a1.get("addressline", ""), a1.get("city", ""), |
| a1.get("zipcode", ""), a1.get("state", ""), |
| a2.get("addressline", ""), a2.get("city", ""), |
| a2.get("zipcode", ""), a2.get("state", ""), |
| ) |
| if r["final_score"] > best: |
| best = r["final_score"] |
| return round(best, 2) |