from concurrent.futures import ThreadPoolExecutor from typing import Dict, List, Optional, Tuple import pandas as pd from rapidfuzz import fuzz from rapidfuzz.distance import JaroWinkler from sklearn.metrics.pairwise import cosine_similarity from sentence_transformers import SentenceTransformer import re import itertools from services.config import ( SURNAME_IDENTIFIER, MODEL_WEIGHTS, MODEL_1_NAME, MODEL_2_NAME, NAME_MODEL_WEIGHTS, NAME_MATCH_ADJUSTMENTS, ADDRESS_MODEL_WEIGHTS, ) from services.rules import detect_surnames, compute_initial_letter_boost, is_subset_match # ---------- Model Store ---------- MODEL_STORE = {} def get_model(model_name: str) -> SentenceTransformer: if model_name not in MODEL_STORE: print(f"Loading {model_name} into memory on CPU...") if model_name == "model1": MODEL_STORE["model1"] = SentenceTransformer(MODEL_1_NAME, device="cpu") elif model_name == "model2": MODEL_STORE["model2"] = SentenceTransformer(MODEL_2_NAME, device="cpu") return MODEL_STORE[model_name] # ---------- Text Preprocessing ---------- def preprocess_for_matching(text: str) -> str: """Standardize text for matching""" if not text or text in ["-", " ", ""]: return "" return text.upper().strip() # ---------- Core Matching Functions ---------- # ---------- Indic Soundex (phonetic for Indian names) ---------- # def indic_soundex_code(name: str) -> str: # """ # Generate Indic Soundex code for a name token. # Handles Indian transliteration phonetics (aspirated consonants, etc.) # """ # if not name: # return "" # name = name.upper().strip() # if not name: # return "" # # Pre-process: map aspirated/compound consonants to base # for digraph, base in [("SH", "S"), ("PH", "F"), ("TH", "T"), ("DH", "D"), # ("KH", "K"), ("GH", "G"), ("BH", "B"), ("CH", "C"), ("JH", "J")]: # name = name.replace(digraph, base) # SOUNDEX_MAP = { # 'B': '1', 'F': '1', 'P': '1', 'V': '1', 'W': '1', # 'C': '2', 'G': '2', 'J': '2', 'K': '2', 'Q': '2', 'S': '2', 'X': '2', 'Z': '2', # 'D': '3', 'T': '3', # 'L': '4', # 'M': '5', 'N': '5', # 'R': '6', # } # code = name[0] # prev_code = SOUNDEX_MAP.get(name[0], '0') # for char in name[1:]: # if char in 'AEIOUHY ': # prev_code = '0' # Reset on vowel/separator # continue # digit = SOUNDEX_MAP.get(char, '0') # if digit != '0' and digit != prev_code: # code += digit # prev_code = digit # return (code + '000')[:4] def indic_soundex_code(name: str) -> str: """ Generate Indic Soundex code for a name token. Handles Indian transliteration phonetics (aspirated consonants, etc.) [MODIFIED 2026-03-15] - Separated palatal fricatives (J, S, Z) from velars (K, G) in SOUNDEX_MAP to accurately penalize phonetically distinct names like Rajesh vs Rakesh. """ if not name: return "" name = name.upper().strip() if not name: return "" # Pre-process: map aspirated/compound consonants to base for digraph, base in [("SH", "S"), ("PH", "F"), ("TH", "T"), ("DH", "D"), ("KH", "K"), ("GH", "G"), ("BH", "B"), ("CH", "C"), ("JH", "J")]: name = name.replace(digraph, base) SOUNDEX_MAP = { 'B': '1', 'F': '1', 'P': '1', 'V': '1', 'W': '1', 'C': '2', 'G': '2', 'K': '2', 'Q': '2', 'X': '2', 'D': '3', 'T': '3', 'L': '4', 'M': '5', 'N': '5', 'R': '6', 'J': '7', 'S': '7', 'Z': '7' } code = name[0] prev_code = SOUNDEX_MAP.get(name[0], '0') for char in name[1:]: if char in 'AEIOUHY ': prev_code = '0' # Reset on vowel/separator continue digit = SOUNDEX_MAP.get(char, '0') if digit != '0' and digit != prev_code: code += digit prev_code = digit return (code + '000')[:4] def indic_soundex_similarity(text1: str, text2: str) -> float: """ Compare two texts using Indic Soundex on each token. Returns 0-100 similarity score. """ tokens1 = text1.upper().split() if text1 else [] tokens2 = text2.upper().split() if text2 else [] if not tokens1 or not tokens2: return 0.0 codes1 = [indic_soundex_code(t) for t in tokens1] codes2 = [indic_soundex_code(t) for t in tokens2] shorter, longer = (codes1, codes2) if len(codes1) <= len(codes2) else (codes2, codes1) if not shorter: return 0.0 total_match = 0.0 used = set() for s_code in shorter: best_match = 0.0 best_idx = -1 for i, l_code in enumerate(longer): if i in used: continue match = sum(c1 == c2 for c1, c2 in zip(s_code, l_code)) / 4.0 if match > best_match: best_match = match best_idx = i if best_idx >= 0: used.add(best_idx) total_match += best_match return (total_match / len(shorter)) * 100 # ---------- Core Matching Functions ---------- def calculate_fuzzy_scores(input1: str, input2: str) -> Dict[str, float]: """Calculate fuzzy matching scores using RapidFuzz (5 functions)""" return { "simple_ratio": fuzz.ratio(input1, input2), "token_set_ratio": fuzz.token_set_ratio(input1, input2), "w_ratio": fuzz.WRatio(input1, input2), "partial_ratio": fuzz.partial_ratio(input1, input2), "token_sort_ratio": fuzz.token_sort_ratio(input1, input2), } def calculate_semantic_similarity(model_name: str, input1: str, input2: str) -> float: """Calculate semantic similarity using sentence transformers""" model = get_model(model_name) # print("input1 to model",input1) # print("input2 to model",input2) embedding1 = model.encode([input1], show_progress_bar=False) embedding2 = model.encode([input2], show_progress_bar=False) return cosine_similarity(embedding1, embedding2)[0][0] def calculate_final_score(fuzzy_scores: Dict[str, float], semantic_score: float) -> float: """Calculate weighted final score""" weights = MODEL_WEIGHTS normalized_scores = { "simple_ratio": fuzzy_scores.get("simple_ratio", 0), "token_set_ratio": fuzzy_scores.get("token_set_ratio", 0), "partial_ratio": fuzzy_scores.get("partial_ratio", 0), "w_ratio": fuzzy_scores.get("w_ratio", 0), "semantic_score": semantic_score * 100, } weighted_sum = sum(normalized_scores[key] * weight for key, weight in weights.items()) return max(0, min(100, weighted_sum)) def calculate_overall_similarity(score1: float, score2: float) -> float: """Calculate overall similarity from two model scores""" return score1 * 0.6 + score2 * 0.4 def check_substring_match(str1: str, str2: str) -> bool: """Check if one string is a substring of another""" if not str1 or not str2: return False return str1 in str2 or str2 in str1 def check_individual_name_matches(name_full: str, fname: str, mname: str, lname: str) -> Tuple[bool, bool, bool]: """ Check if full name contains first, middle, or last name as substring Returns: (first_match, middle_match, last_match) """ f_match = check_substring_match(name_full, fname) if fname else False m_match = check_substring_match(name_full, mname) if mname else False l_match = check_substring_match(name_full, lname) if lname else False return f_match, m_match, l_match def concatenate_name_parts(firstname: str, middlename: str, lastname: str) -> str: """Concatenate name parts""" parts = [] if firstname and firstname not in ["-", " ", ""]: parts.append(firstname.upper().strip()) if middlename and middlename not in ["-", " ", ""]: parts.append(middlename.upper().strip()) if lastname and lastname not in ["-", " ", ""]: parts.append(lastname.upper().strip()) if not parts: return "" parts.sort() return " ".join(parts) # ---------- helpers used only inside the new logic ---------- def _normalize_and_sort(name: str) -> str: """ 1. Split on any non-alphanumeric character (space, underscore, comma, etc.) 2. Remove empty tokens 3. Upper-case 4. Sort alphabetically 5. Re-join with single space """ tokens = re.split(r'[^A-Za-z0-9]+', name.strip()) tokens = [t.upper() for t in tokens if t] return ' '.join(sorted(tokens)) def _all_name_combinations(fname: str, mname: str, lname: str) -> list[str]: """ Return every possible ordering of the supplied parts, dropping any empty/blank components. """ parts = [] for p in (fname, mname, lname): if p and p.strip() not in ('-', '', ' '): parts.append(p.strip().upper()) if not parts: return [] # itertools.permutations gives every ordering return [' '.join(order) for order in itertools.permutations(parts)] # def match_entities(value1: str, value2: str, weights: Dict[str, float] = None) -> float: # """ # Match two entities using fuzzy + semantic + optional phonetic similarity. # Weights dict determines score component contributions. # Returns: similarity score as float (0-100) # """ # if weights is None: # weights = MODEL_WEIGHTS # standardized_input1 = preprocess_for_matching(value1) # standardized_input2 = preprocess_for_matching(value2) # if not standardized_input1 or not standardized_input2: # return 0 # # Space-agnostic exact match # if standardized_input1.replace(" ", "") == standardized_input2.replace(" ", ""): # return 100.0 # return calculate_similarity_with_models(standardized_input1, standardized_input2, weights) def match_entities(value1: str, value2: str, weights: Dict[str, float] = None) -> float: """ Match two entities using fuzzy + semantic + optional phonetic similarity. Weights dict determines score component contributions. Handles: 1. Normal match : "Pujitha Sharma" vs "pujitha sharma" 2. Space-agnostic match : "Pujitha Sharma" vs "pujithasharma" 3. South Indian names : "Sharma Gari Pujitha" vs "Pujitha Sharma Gari" (token order doesn't matter, combinations checked) Returns: similarity score as float (0-100) - Integrated 'Check 3: Acronym / Initial expansion'. Matches acronyms to full names (e.g. K V Reddy vs Katta Venkata Reddy) and boosts to 90+. Penalizes mismatching initials (e.g. C Anitha vs H Anitha) by -40. - Added 'Check 5: Final Phonetic Audit'. Uses Indic Soundex to securely escalate minor spelling variants (likitha vs likheetha) to 95+ and heavily punish mathematically close false-positives (rajesh vs rakesh). """ if weights is None: weights = MODEL_WEIGHTS standardized_input1 = preprocess_for_matching(value1) standardized_input2 = preprocess_for_matching(value2) if not standardized_input1 or not standardized_input2: return 0 # ========================================================= # CHECK 1: Space-agnostic exact match # "Pujitha Sharma" vs "pujithasharma" → 100.0 # ========================================================= if standardized_input1.replace(" ", "") == standardized_input2.replace(" ", ""): return 100.0 # ========================================================= # CHECK 2: Token-order permutation match (South Indian names) # "sharmagari pujitha" vs "pujitha sharmagari" → 100.0 # Splits both names into tokens, checks if any permutation # of tokens (joined with/without space) matches the other # ========================================================= tokens1 = standardized_input1.split() tokens2 = standardized_input2.split() # Only attempt if token count is manageable (avoid factorial explosion) if len(tokens1) <= 4 and len(tokens2) <= 4: # Generate all permutations of tokens1 and check against tokens2 (space-agnostic) target_nospace = standardized_input2.replace(" ", "") for perm in itertools.permutations(tokens1): # joined with space: "pujitha sharmagari" # joined without space: "pujithasharmagari" perm_with_space = " ".join(perm) perm_without_space = "".join(perm) if perm_with_space == standardized_input2: return 100.0 if perm_without_space == target_nospace: return 100.0 # Also check permutations of tokens2 against tokens1 (space-agnostic) target_nospace1 = standardized_input1.replace(" ", "") for perm in itertools.permutations(tokens2): perm_with_space = " ".join(perm) perm_without_space = "".join(perm) if perm_with_space == standardized_input1: return 100.0 if perm_without_space == target_nospace1: return 100.0 # ========================================================= # CHECK 3: Acronym / Initial expansion match or mismatch # "K V Reddy" vs "Katta Venkata Reddy" → initial match → escalate to 90.0+ # "C Anitha" vs "H Anitha" → mismatched initials → severe penalty (-40.0) # ========================================================= if len(tokens1) > 0 and len(tokens2) > 0: common = set(tokens1) & set(tokens2) rem1 = [t for t in tokens1 if t not in common] rem2 = [t for t in tokens2 if t not in common] # Only apply if they share some tokens (like a last name) but differ in the rest if common and rem1 and rem2: rem1_is_initials = all(len(t) == 1 for t in rem1) rem2_is_initials = all(len(t) == 1 for t in rem2) initials_list = None fullcaps_list = None # Identify which is the initials array and which is the longer names array if rem1_is_initials and not rem2_is_initials: initials_list = rem1 fullcaps_list = rem2 elif rem2_is_initials and not rem1_is_initials: initials_list = rem2 fullcaps_list = rem1 elif rem1_is_initials and rem2_is_initials: # Both are just single letters! (e.g. C Anitha vs H Anitha) initials_list = rem1 fullcaps_list = rem2 if initials_list is not None and fullcaps_list is not None: initials_set = {t[0] for t in initials_list} first_letters_set = {t[0] for t in fullcaps_list if t} # Check for intersection. If they map cleanly, escalate to 90 if initials_set == first_letters_set or initials_set.issubset(first_letters_set) or first_letters_set.issubset(initials_set): base_score = calculate_similarity_with_models(standardized_input1, standardized_input2, weights) return max(90.0, base_score) else: # Explicit conflicting initials! (e.g., C vs H or K vs M) base_score = calculate_similarity_with_models(standardized_input1, standardized_input2, weights) return max(0.0, base_score - 40.0) else: # ========================================================= # EXPLICIT CONFLICTING CORE NAMES - 15-03-2026 # Example: "M. Manisha Reddy" vs "M. Manoj Reddy" -> Shared: M, Reddy. Unmatched: Manisha vs Manoj # Example: "Mukherjee Lakshmi" vs "Prasad Lakshmi" -> Shared: Lakshmi. Unmatched: Mukherjee vs Prasad # Since neither unmatched set are initials, evaluate them as explicit words # ========================================================= rem1_str = " ".join(rem1) rem2_str = " ".join(rem2) rem_fuzzy = fuzz.ratio(rem1_str, rem2_str) if rem_fuzzy < 65.0: base_score = calculate_similarity_with_models(standardized_input1, standardized_input2, weights) # Severely penalize because key identifying words actively contradict each other return max(0.0, base_score - 40.0) # ========================================================= # CHECK 4: Fallback → weighted model scoring # "Pujitha Sharma" vs "Jon Smyth" → ~78.5 (fuzzy+semantic) # ========================================================= base_score = calculate_similarity_with_models(standardized_input1, standardized_input2, weights) # ========================================================= # CHECK 5: Final Phonetic Audit (for single words/names primarily) # If they are single continuous names, check if they are identical # phonetically. If they are divergent, brutally penalize to prevent false positives. # ========================================================= if len(tokens1) == 1 and len(tokens2) == 1: ph_score = indic_soundex_similarity(standardized_input1, standardized_input2) # Phonetically identical but minor spelling difference (likitha vs likheetha) -> escalate to 95.0+ if ph_score == 100.0: if fuzz.ratio(standardized_input1, standardized_input2) > 65 and abs(len(standardized_input1) - len(standardized_input2)) <= 2: return max(95.0, base_score) # Highly distinct phonetics but mathematically close text (Rajesh vs Rakesh) -> ~50.0 elif ph_score <= 80.0: if base_score > 55.0: # heavily penalize false-positive anagrams/typos return min(base_score - 25.0, 55.0) return base_score # def calculate_similarity_with_models(text1: str, text2: str, weights: Dict[str, float] = None) -> float: # """ # Calculate similarity using fuzzy scores, embedding models, and optional phonetic. # The weights dict controls which components are active and their contribution. # Phonetic components (jaro_winkler, indic_soundex) are used only if present in weights. # Returns similarity percentage as float (0-100) # """ # if weights is None: # weights = MODEL_WEIGHTS # if not text1 or not text2: # print(f"[SIMILARITY] either value is empty — text1={text1!r} text2={text2!r}") # return 0.0 # text1 = str(text1).strip() # text2 = str(text2).strip() # if not text1 or not text2: # return 0.0 # print(f"[SIMILARITY] text1={text1!r}") # print(f"[SIMILARITY] text2={text2!r}") # # Space-agnostic exact match # if text1.replace(" ", "") == text2.replace(" ", ""): # return 100.0 # # --- Fuzzy scores (5 functions) --- # fuzzy_scores = { # "simple_ratio": fuzz.ratio(text1, text2), # "token_set_ratio": fuzz.token_set_ratio(text1, text2), # "w_ratio": fuzz.WRatio(text1, text2), # "partial_ratio": fuzz.partial_ratio(text1, text2), # "token_sort_ratio": fuzz.token_sort_ratio(text1, text2), # } # # --- Phonetic scores (only if weights include them) --- # phonetic_scores = {} # if weights.get("jaro_winkler", 0) > 0: # phonetic_scores["jaro_winkler"] = JaroWinkler.similarity(text1, text2) * 100 # if weights.get("indic_soundex", 0) > 0: # phonetic_scores["indic_soundex"] = indic_soundex_similarity(text1, text2) # # --- Semantic scores (dual model, computed in parallel) --- # with ThreadPoolExecutor() as executor: # model1 = get_model("model1") # model2 = get_model("model2") # f1 = executor.submit( # lambda: cosine_similarity( # model1.encode([text1], show_progress_bar=False), # model1.encode([text2], show_progress_bar=False) # )[0][0] # ) # f2 = executor.submit( # lambda: cosine_similarity( # model2.encode([text1], show_progress_bar=False), # model2.encode([text2], show_progress_bar=False) # )[0][0] # ) # cosine1 = f1.result() # cosine2 = f2.result() # def calc_final(semantic_cosine): # all_scores = {} # all_scores.update(fuzzy_scores) # all_scores.update(phonetic_scores) # all_scores["semantic_score"] = semantic_cosine * 100 # return sum(all_scores.get(k, 0) * v for k, v in weights.items()) # final1 = calc_final(cosine1) # final2 = calc_final(cosine2) # overall_similarity = final1 * 0.6 + final2 * 0.4 # print("similarity given by model",overall_similarity) # return round(max(0, min(100, overall_similarity)), 2) def calculate_similarity_with_models(text1: str, text2: str, weights: Dict[str, float] = None) -> float: """ Calculate similarity using fuzzy scores, embedding models, and optional phonetic. The weights dict controls which components are active and their contribution. Phonetic components (jaro_winkler, indic_soundex) are used only if present in weights. Returns similarity percentage as float (0-100) """ if weights is None: weights = MODEL_WEIGHTS if not text1 or not text2: return 0.0 text1 = str(text1).strip() text2 = str(text2).strip() if not text1 or not text2: return 0.0 # Space-agnostic exact match if text1.replace(" ", "") == text2.replace(" ", ""): return 100.0 # --- Fuzzy scores (5 functions) --- fuzzy_scores = { "simple_ratio": fuzz.ratio(text1, text2), "token_set_ratio": fuzz.token_set_ratio(text1, text2), "w_ratio": fuzz.WRatio(text1, text2), "partial_ratio": fuzz.partial_ratio(text1, text2), "token_sort_ratio": fuzz.token_sort_ratio(text1, text2), } # --- Phonetic scores (only if weights include them) --- phonetic_scores = {} if weights.get("jaro_winkler", 0) > 0: phonetic_scores["jaro_winkler"] = JaroWinkler.similarity(text1, text2) * 100 if weights.get("indic_soundex", 0) > 0: phonetic_scores["indic_soundex"] = indic_soundex_similarity(text1, text2) # --- Semantic scores (dual model, computed in parallel) --- with ThreadPoolExecutor() as executor: model1 = get_model("model1") model2 = get_model("model2") f1 = executor.submit( lambda: cosine_similarity( model1.encode([text1]), model1.encode([text2]) )[0][0] ) f2 = executor.submit( lambda: cosine_similarity( model2.encode([text1]), model2.encode([text2]) )[0][0] ) cosine1 = f1.result() cosine2 = f2.result() def calc_final(semantic_cosine): all_scores = {} all_scores.update(fuzzy_scores) all_scores.update(phonetic_scores) all_scores["semantic_score"] = semantic_cosine * 100 return sum(all_scores.get(k, 0) * v for k, v in weights.items()) final1 = calc_final(cosine1) final2 = calc_final(cosine2) overall_similarity = final1 * 0.6 + final2 * 0.4 return round(max(0, min(100, overall_similarity)), 2) # def handle_case1(full_name1: str, full_name2: str, # r1_fname: str, r1_mname: str, r1_lname: str, # r2_fname: str, r2_mname: str, r2_lname: str) -> dict: # """ # Case-1 (both records supply a full name) # Returns a dictionary with separate similarity scores for each component # Returns: # dict: { # 'full_name_percent': float, # full_name1 vs full_name2 # 'firstname_percent': float, # r1_fname vs r2_fname # 'middlename_percent': float, # r1_mname vs r2_mname # 'lastname_percent': float # r1_lname vs r2_lname # } # """ # result={} # # Check space-agnostic exact match on original strings before sorting # if full_name1.replace(" ", "").upper() == full_name2.replace(" ", "").upper(): # full_name_percent = 100.0 # else: # # 1. Normalize + alphabetically sort each full name and calculate similarity # sorted1 = _normalize_and_sort(full_name1) # sorted2 = _normalize_and_sort(full_name2) # full_name_percent = calculate_similarity_with_models(sorted1, sorted2, NAME_MODEL_WEIGHTS) # # print("full_name_percent is:",full_name_percent) # # 2. Calculate firstname_percent: compare firstnames # # firstname # if r1_fname and r2_fname: # firstname_percent = calculate_similarity_with_models( # r1_fname, r2_fname, NAME_MODEL_WEIGHTS # ) # # print("firstname_percent is:",firstname_percent) # else: # firstname_percent = 0.0 # # middlename # if r1_mname and r2_mname: # middlename_percent = calculate_similarity_with_models( # r1_mname, r2_mname, NAME_MODEL_WEIGHTS # ) # # print("middlename_percent is:",middlename_percent) # else: # middlename_percent = 0.0 # # lastname # if r1_lname and r2_lname: # lastname_percent = calculate_similarity_with_models( # r1_lname, r2_lname, NAME_MODEL_WEIGHTS # ) # # print("lastname_percent is:",lastname_percent) # else: # lastname_percent = 0.0 # result={ # 'full_name_percent': full_name_percent, # 'firstname_percent': firstname_percent, # 'middlename_percent': middlename_percent, # 'lastname_percent': lastname_percent # } # return result # def handle_case2(full_name: str, # fname: str, mname: str, lname: str, # concat_name: str) -> dict: # """ # Case-2 (one side has full name, the other has F/M/L) # Returns a dictionary with separate similarity scores for each component # Returns: # dict: { # 'full_name_percent': float, # full_name vs concat_name # 'firstname_percent': float, # full_name vs fname # 'middlename_percent': float, # full_name vs mname # 'lastname_percent': float # full_name vs lname # } # """ # # 0. Check if any permutation of F/M/L exactly reconstructs full_name. # # If yes, full_name_percent = 100. Component scores are still computed # # individually — a part inside full_name does NOT score 100% on its own. # # e.g. full_name="KALLI LIKHITHA", fname="KALLI", mname="LIKHITHA": # # full_name_percent = 100 (together they reconstruct it exactly) # # firstname_percent != 100 ("KALLI" is only half of "KALLI LIKHITHA") # permutation_full_match = any( # permuted.replace(" ", "") == full_name.upper().strip().replace(" ", "") # for permuted in _all_name_combinations(fname, mname, lname) # ) # # 1. Calculate full_name_percent # if permutation_full_match: # full_name_percent = 100.0 # else: # sorted_full = _normalize_and_sort(full_name) # sorted_concat = _normalize_and_sort(concat_name) # full_name_percent = calculate_similarity_with_models( # sorted_full, # sorted_concat, # NAME_MODEL_WEIGHTS # ) # # Component-level scores: compare full_name vs each individual part (fname/mname/lname). # # # # Requirement: # # - full_name="KALLI LIKHITHA", fname="KALLI" → firstname_percent reflects # # how well "KALLI" matches within the context of the full name, but must # # NOT be 100% just because "KALLI" is a complete subset of "KALLI LIKHITHA". # # - The comparison is full_name vs part (not token-to-token), so the full # # context of the name is preserved. # # # # Why standard weights fail: # # - partial_ratio("KALLI LIKHITHA", "KALLI") = 100 ← subset inflation # # - token_set_ratio produces same inflation # # - w_ratio picks the best of these → also inflated # # - semantic embeddings: short name vs full name share high cosine similarity # # because they encode overlapping meaning → also inflated # # # # Fix: use only LENGTH-SENSITIVE metrics that naturally penalise length # # disparity between the strings. # # - simple_ratio: 2 * matches / total_chars — drops when lengths differ # # - jaro_winkler: character-overlap with length normalisation — same # # - indic_soundex: phonetic token overlap / shorter length — same # # Intentionally excluded: partial_ratio, token_set_ratio, w_ratio, semantic. # _COMPONENT_WEIGHTS = { # "simple_ratio": 0.35, # "jaro_winkler": 0.40, # "indic_soundex": 0.25, # } # def _fullname_vs_part(full: str, part: str) -> float: # """ # Compare full_name against a single name part using only length-sensitive # metrics. Returns 0-100. A part that is a strict subset of full_name will # score proportionally to how much of the full_name it covers, not 100%. # """ # if not full or not part: # return 0.0 # full_u = full.upper().strip() # part_u = part.upper().strip() # if full_u == part_u: # return 100.0 # scores = { # "simple_ratio": fuzz.ratio(full_u, part_u), # "jaro_winkler": JaroWinkler.similarity(full_u, part_u) * 100, # "indic_soundex": indic_soundex_similarity(full_u, part_u), # } # return round(max(0.0, min(100.0, # sum(scores[k] * v for k, v in _COMPONENT_WEIGHTS.items()) # )), 2) # # 2. firstname_percent: full_name vs fname # firstname_percent = _fullname_vs_part(full_name, fname) if fname else 0.0 # # 3. middlename_percent: full_name vs mname # middlename_percent = _fullname_vs_part(full_name, mname) if mname else 0.0 # # 4. lastname_percent: full_name vs lname # lastname_percent = _fullname_vs_part(full_name, lname) if (lname and lname.upper() not in SURNAME_IDENTIFIER) else 0.0 # result={ # 'full_name_percent': full_name_percent, # 'firstname_percent': firstname_percent, # 'middlename_percent': middlename_percent, # 'lastname_percent': lastname_percent # } # return result # def handle_case3(r1_fname: str, r1_mname: str, r1_lname: str, r1_concat: str, # r2_fname: str, r2_mname: str, r2_lname: str, r2_concat: str) -> dict: # """ # Handle Case 3: Both records have F/M/L # Returns a dictionary with separate similarity scores for each component # Returns: # dict: { # 'full_name_percent': float, # r1_concat vs r2_concat # 'firstname_percent': float, # r1_fname vs r2_fname # 'middlename_percent': float, # r1_mname vs r2_mname # 'lastname_percent': float # r1_lname vs r2_lname # } # """ # # Check substring matches for each component # f_match = check_substring_match(r1_fname, r2_fname) if r1_fname and r2_fname else False # m_match = check_substring_match(r1_mname, r2_mname) if r1_mname and r2_mname else False # l_match = check_substring_match(r1_lname, r2_lname) if r1_lname and r2_lname else False # # Calculate full_name_percent: compare concatenated names # full_name_percent = calculate_similarity_with_models(r1_concat, r2_concat, NAME_MODEL_WEIGHTS) # # Apply boosting logic based on substring matches # # Rule 1: Only lastname matches (family match) # if l_match and not f_match and not m_match: # full_name_percent = max(full_name_percent, 85.0) # Ensure minimum 85% for family match # # Rule 2: Lastname + (firstname or middle) matches (partial match) # # Strong indicator of same person # elif l_match and (f_match or m_match): # full_name_percent = max(full_name_percent, 90.0) # Higher confidence when lastname + another field matches # # Rule 3: No matches at all or only firstname/middlename matches # # Use the calculated similarity as-is # # Calculate individual component percentages # # 2. Calculate firstname_percent: compare firstnames # if r1_fname and r2_fname: # firstname_percent = calculate_similarity_with_models( # r1_fname, # r2_fname, # NAME_MODEL_WEIGHTS # ) # else: # firstname_percent=0.0 # # 3. Calculate middlename_percent: compare middlenames # if r1_mname and r2_mname: # middlename_percent = calculate_similarity_with_models( # r1_mname, # r2_mname, # NAME_MODEL_WEIGHTS # ) # else: # middlename_percent=0.0 # # 4. Calculate lastname_percent: compare lastnames # if r1_lname and r2_lname and r1_lname.upper() not in SURNAME_IDENTIFIER and r2_lname.upper() not in SURNAME_IDENTIFIER: # lastname_percent = calculate_similarity_with_models( # r1_lname, # r2_lname, # NAME_MODEL_WEIGHTS # ) # else: # lastname_percent=0.0 # result= { # 'full_name_percent': full_name_percent, # 'firstname_percent': firstname_percent, # 'middlename_percent': middlename_percent, # 'lastname_percent': lastname_percent # } # return result # def match_name(name: str, firstname: str, lastname: str, middlename: str) -> float: # """ # Match name with logic # Returns similarity score as float or "missing value" # """ # name_processed = preprocess_for_matching(name) # concat_name = concatenate_name_parts(firstname, middlename, lastname) # # Case 1: NAME matches concatenated name # if name_processed and concat_name and name_processed == concat_name: # return 100 # # Case 2: NAME is empty, use concatenated # if not name_processed and concat_name: # return 100 # # Case 3: Concat is empty, use NAME # if name_processed and not concat_name: # return 100 # # Case 4: Both exist but different - use model # if name_processed and concat_name and name_processed != concat_name: # # Pass both to model for fuzzy matching # return match_entities(name_processed, concat_name) # # Both empty # return 0 # def match_names_cross_records(r1_name: str, r1_firstname: str, r1_lastname: str, r1_middlename: str, # r2_name: str, r2_firstname: str, r2_lastname: str, r2_middlename: str) -> float: # """ # Match names between two records with enhanced preprocessing: # 1. Input is already lowercase + preprocessed (titles removed, variations standardized) # 2. Surname detection — if only common surnames match, return 20% # 3. Token sorting for consistent comparison # 4. Common token detection # 5. Initial letter boost for abbreviated names # 6. Three-case matching (both fullname / one fullname+FML / both FML) # """ # # ── Normalize inputs (already lowercase from preprocess_name) ── # r1_name_proc = r1_name.strip() if r1_name and r1_name.strip() not in ["-", ""] else "" # r2_name_proc = r2_name.strip() if r2_name and r2_name.strip() not in ["-", ""] else "" # r1_fname = r1_firstname.strip() if r1_firstname and r1_firstname.strip() not in ["-", ""] else "" # r1_mname = r1_middlename.strip() if r1_middlename and r1_middlename.strip() not in ["-", ""] else "" # r1_lname = r1_lastname.strip() if r1_lastname and r1_lastname.strip() not in ["-", ""] else "" # r2_fname = r2_firstname.strip() if r2_firstname and r2_firstname.strip() not in ["-", ""] else "" # r2_mname = r2_middlename.strip() if r2_middlename and r2_middlename.strip() not in ["-", ""] else "" # r2_lname = r2_lastname.strip() if r2_lastname and r2_lastname.strip() not in ["-", ""] else "" # # ── Determine case ── # r1_has_fullname = bool(r1_name_proc) # r2_has_fullname = bool(r2_name_proc) # r1_concat = concatenate_name_parts(r1_fname, r1_mname, r1_lname).lower() # r2_concat = concatenate_name_parts(r2_fname, r2_mname, r2_lname).lower() # # Build the effective full name string for each record # name1_effective = r1_name_proc if r1_has_fullname else r1_concat # name2_effective = r2_name_proc if r2_has_fullname else r2_concat # # Both missing → zero # if not name1_effective and not name2_effective: # return { # 'full_name_percent': 0.0, # 'firstname_percent': 0.0, # 'middlename_percent': 0.0, # 'lastname_percent': 0.0 # } # # ── Accumulate adjustments (applied AFTER handle_case computation) ── # adjustment = 0 # surname_penalty_val = NAME_MATCH_ADJUSTMENTS.get("surname_penalty", -30) # initial_boost_val = NAME_MATCH_ADJUSTMENTS.get("initial_boost", 30) # subset_boost_val = NAME_MATCH_ADJUSTMENTS.get("subset_boost", 40) # # ── Surname detection (case 2): penalty if surname-only match ── # surname_only_match = False # if name1_effective and name2_effective: # surnames1 = detect_surnames(name1_effective) # surnames2 = detect_surnames(name2_effective) # if surnames1 and surnames2: # common_surnames = surnames1 & surnames2 # if common_surnames: # tokens1_non_surname = [t for t in name1_effective.split() if t not in surnames1] # tokens2_non_surname = [t for t in name2_effective.split() if t not in surnames2] # if tokens1_non_surname and tokens2_non_surname: # non_surname_overlap = set(tokens1_non_surname) & set(tokens2_non_surname) # if not non_surname_overlap: # non_surname1_str = " ".join(tokens1_non_surname) # non_surname2_str = " ".join(tokens2_non_surname) # if fuzz.ratio(non_surname1_str, non_surname2_str) < 60: # surname_only_match = True # adjustment += surname_penalty_val # e.g., -30 # # ── Sort tokens for boost/subset detection ── # name1_tokens = sorted(name1_effective.split()) if name1_effective else [] # name2_tokens = sorted(name2_effective.split()) if name2_effective else [] # # ── Initial letter boost / mismatch penalty (Case 3A) ── # # compute_initial_letter_boost returns: # # +0.2 → all initials matched → add initial_boost_val (+10.5) # # -0.2 → at least one initial did NOT match → subtract initial_boost_val (-10.5) # # 0.0 → no initials present → no change # if name1_tokens and name2_tokens: # boost = compute_initial_letter_boost(name1_tokens, name2_tokens) # if boost > 0: # adjustment += initial_boost_val # initials matched → boost # elif boost < 0: # adjustment -= initial_boost_val # initials mismatched → penalty # # ── Subset match boost (case 5): +40 if one is complete subset ── # if name1_tokens and name2_tokens and len(name1_tokens) != len(name2_tokens): # if is_subset_match(name1_tokens, name2_tokens): # adjustment += subset_boost_val # e.g., +40 # # ── Run the appropriate case handler for base similarity ── # result = None # # CASE 1: Both records have full names # if r1_has_fullname and r2_has_fullname: # result = handle_case1(r1_name_proc, r2_name_proc, # r1_firstname, r1_middlename, r1_lastname, # r2_firstname, r2_middlename, r2_lastname) # # CASE 2: One has full name, other has F/M/L # elif r1_has_fullname and not r2_has_fullname and r2_concat: # result = handle_case2(r1_name_proc, r2_fname, r2_mname, r2_lname, r2_concat) # elif r2_has_fullname and not r1_has_fullname and r1_concat: # result = handle_case2(r2_name_proc, r1_fname, r1_mname, r1_lname, r1_concat) # # CASE 3: Both have F/M/L # elif not r1_has_fullname and not r2_has_fullname and r1_concat and r2_concat: # result = handle_case3(r1_fname, r1_mname, r1_lname, r1_concat, # r2_fname, r2_mname, r2_lname, r2_concat) # # Fallback if no case matched # if result is None: # result = { # 'full_name_percent': 0.0, # 'firstname_percent': 0.0, # 'middlename_percent': 0.0, # 'lastname_percent': 0.0 # } # # ── Apply accumulated adjustments to full_name_percent ── # if adjustment != 0: # result['full_name_percent'] = max(0.0, min(100.0, result['full_name_percent'] + adjustment)) # return result def handle_case1(full_name1: str, full_name2: str, r1_fname: str, r1_mname: str, r1_lname: str, r2_fname: str, r2_mname: str, r2_lname: str) -> dict: """ Case-1 (both records supply a full name) Returns a dictionary with separate similarity scores for each component Returns: dict: { 'full_name_percent': float, # full_name1 vs full_name2 'firstname_percent': float, # r1_fname vs r2_fname 'middlename_percent': float, # r1_mname vs r2_mname 'lastname_percent': float # r1_lname vs r2_lname } """ result={} # Check space-agnostic exact match on original strings before sorting if full_name1.replace(" ", "").upper() == full_name2.replace(" ", "").upper(): full_name_percent = 100.0 else: # 1. Normalize + alphabetically sort each full name and calculate similarity sorted1 = _normalize_and_sort(full_name1) sorted2 = _normalize_and_sort(full_name2) full_name_percent = match_entities(sorted1, sorted2, NAME_MODEL_WEIGHTS) # print("full_name_percent is:",full_name_percent) # 2. Calculate firstname_percent: compare firstnames # firstname if r1_fname and r2_fname: firstname_percent = match_entities( r1_fname, r2_fname, NAME_MODEL_WEIGHTS ) # print("firstname_percent is:",firstname_percent) else: firstname_percent = 0.0 # middlename if r1_mname and r2_mname: middlename_percent = match_entities( r1_mname, r2_mname, NAME_MODEL_WEIGHTS ) # print("middlename_percent is:",middlename_percent) else: middlename_percent = 0.0 # lastname if r1_lname and r2_lname: lastname_percent = match_entities( r1_lname, r2_lname, NAME_MODEL_WEIGHTS ) # print("lastname_percent is:",lastname_percent) else: lastname_percent = 0.0 result={ 'full_name_percent': full_name_percent, 'firstname_percent': firstname_percent, 'middlename_percent': middlename_percent, 'lastname_percent': lastname_percent } return result def handle_case2(full_name: str, fname: str, mname: str, lname: str, concat_name: str) -> dict: """ Case-2 (one side has full name, the other has F/M/L) Returns a dictionary with separate similarity scores for each component Returns: dict: { 'full_name_percent': float, # full_name vs concat_name 'firstname_percent': float, # full_name vs fname 'middlename_percent': float, # full_name vs mname 'lastname_percent': float # full_name vs lname } """ # 0. Try every permutation of F/M/L full_name_percent = None for permuted in _all_name_combinations(fname, mname, lname): if permuted.replace(" ", "") == full_name.upper().strip().replace(" ", ""): # Perfect match for the Full Name component full_name_percent = 100.0 break # 1. Calculate full_name_percent: compare sorted components if exact match failed if full_name_percent is None: sorted_full = _normalize_and_sort(full_name) sorted_concat = _normalize_and_sort(concat_name) full_name_percent = match_entities( sorted_full, sorted_concat, NAME_MODEL_WEIGHTS ) # 2. Calculate firstname_percent: compare full_name with firstname only if fname : firstname_percent = match_entities( full_name, fname, NAME_MODEL_WEIGHTS ) else: firstname_percent=0.0 # 3. Calculate middlename_percent: compare full_name with middlename only if mname : middlename_percent = match_entities( full_name, mname, NAME_MODEL_WEIGHTS ) else: middlename_percent=0.0 # 4. Calculate lastname_percent: compare full_name with lastname only if lname and lname.upper() not in SURNAME_IDENTIFIER: lastname_percent = match_entities( full_name, lname, NAME_MODEL_WEIGHTS ) else: lastname_percent=0.0 result={ 'full_name_percent': full_name_percent, 'firstname_percent': firstname_percent, 'middlename_percent': middlename_percent, 'lastname_percent': lastname_percent } return result def handle_case3(r1_fname: str, r1_mname: str, r1_lname: str, r1_concat: str, r2_fname: str, r2_mname: str, r2_lname: str, r2_concat: str) -> dict: """ Handle Case 3: Both records have F/M/L Returns a dictionary with separate similarity scores for each component Returns: dict: { 'full_name_percent': float, # r1_concat vs r2_concat 'firstname_percent': float, # r1_fname vs r2_fname 'middlename_percent': float, # r1_mname vs r2_mname 'lastname_percent': float # r1_lname vs r2_lname } """ # Check substring matches for each component f_match = check_substring_match(r1_fname, r2_fname) if r1_fname and r2_fname else False m_match = check_substring_match(r1_mname, r2_mname) if r1_mname and r2_mname else False l_match = check_substring_match(r1_lname, r2_lname) if r1_lname and r2_lname else False # Calculate full_name_percent: compare concatenated names full_name_percent = match_entities(r1_concat, r2_concat, NAME_MODEL_WEIGHTS) # Apply boosting logic based on substring matches # Rule 1: Only lastname matches (family match) if l_match and not f_match and not m_match: full_name_percent = max(full_name_percent, 85.0) # Ensure minimum 85% for family match # Rule 2: Lastname + (firstname or middle) matches (partial match) # Strong indicator of same person elif l_match and (f_match or m_match): full_name_percent = max(full_name_percent, 90.0) # Higher confidence when lastname + another field matches # Rule 3: No matches at all or only firstname/middlename matches # Use the calculated similarity as-is # 2. Calculate firstname_percent: compare firstnames if r1_fname and r2_fname: firstname_percent = match_entities( r1_fname, r2_fname, NAME_MODEL_WEIGHTS ) else: firstname_percent=0.0 # 3. Calculate middlename_percent: compare middlenames if r1_mname and r2_mname: middlename_percent = match_entities( r1_mname, r2_mname, NAME_MODEL_WEIGHTS ) else: middlename_percent=0.0 # 4. Calculate lastname_percent: compare lastnames if r1_lname and r2_lname and r1_lname.upper() not in SURNAME_IDENTIFIER and r2_lname.upper() not in SURNAME_IDENTIFIER: lastname_percent = match_entities( r1_lname, r2_lname, NAME_MODEL_WEIGHTS ) else: lastname_percent=0.0 result= { 'full_name_percent': full_name_percent, 'firstname_percent': firstname_percent, 'middlename_percent': middlename_percent, 'lastname_percent': lastname_percent } return result def match_name(name: str, firstname: str, lastname: str, middlename: str) -> float: """ Match name with logic Returns similarity score as float or "missing value" """ name_processed = preprocess_for_matching(name) concat_name = concatenate_name_parts(firstname, middlename, lastname) # Case 1: NAME matches concatenated name if name_processed and concat_name and name_processed == concat_name: return 100 # Case 2: NAME is empty, use concatenated if not name_processed and concat_name: return 100 # Case 3: Concat is empty, use NAME if name_processed and not concat_name: return 100 # Case 4: Both exist but different - use model if name_processed and concat_name and name_processed != concat_name: # Pass both to model for fuzzy matching return match_entities(name_processed, concat_name) # Both empty return 0 def match_names_cross_records(r1_name: str, r1_firstname: str, r1_lastname: str, r1_middlename: str, r2_name: str, r2_firstname: str, r2_lastname: str, r2_middlename: str) -> float: """ Match names between two records with enhanced preprocessing: 1. Input is already lowercase + preprocessed (titles removed, variations standardized) 2. Surname detection — if only common surnames match, return 20% 3. Token sorting for consistent comparison 4. Common token detection 5. Initial letter boost for abbreviated names 6. Three-case matching (both fullname / one fullname+FML / both FML) [MODIFIED 2026-03-15] - Refactored handle_case functions to properly pass exact permutation checking down to match_entities() instead of bypassing it to ml models. - Updated handle_case2 exact match checker to cleanly yield the first, middle, and last name proportions instead of assuming 100% across the board. - Implemented a -40 explicit penalty if two recognized surnames are detected but contradict each other completely (e.g. Krishna Rajput vs Krishna Singh). """ # ── Normalize inputs (already lowercase from preprocess_name) ── r1_name_proc = r1_name.strip() if r1_name and r1_name.strip() not in ["-", ""] else "" r2_name_proc = r2_name.strip() if r2_name and r2_name.strip() not in ["-", ""] else "" r1_fname = r1_firstname.strip() if r1_firstname and r1_firstname.strip() not in ["-", ""] else "" r1_mname = r1_middlename.strip() if r1_middlename and r1_middlename.strip() not in ["-", ""] else "" r1_lname = r1_lastname.strip() if r1_lastname and r1_lastname.strip() not in ["-", ""] else "" r2_fname = r2_firstname.strip() if r2_firstname and r2_firstname.strip() not in ["-", ""] else "" r2_mname = r2_middlename.strip() if r2_middlename and r2_middlename.strip() not in ["-", ""] else "" r2_lname = r2_lastname.strip() if r2_lastname and r2_lastname.strip() not in ["-", ""] else "" # ── Determine case ── r1_has_fullname = bool(r1_name_proc) r2_has_fullname = bool(r2_name_proc) r1_concat = concatenate_name_parts(r1_fname, r1_mname, r1_lname).lower() r2_concat = concatenate_name_parts(r2_fname, r2_mname, r2_lname).lower() # Build the effective full name string for each record name1_effective = r1_name_proc if r1_has_fullname else r1_concat name2_effective = r2_name_proc if r2_has_fullname else r2_concat # Both missing → zero if not name1_effective and not name2_effective: return { 'full_name_percent': 0.0, 'firstname_percent': 0.0, 'middlename_percent': 0.0, 'lastname_percent': 0.0 } # ── Accumulate adjustments (applied AFTER handle_case computation) ── adjustment = 0 surname_penalty_val = NAME_MATCH_ADJUSTMENTS.get("surname_penalty", -30) initial_boost_val = NAME_MATCH_ADJUSTMENTS.get("initial_boost", 30) subset_boost_val = NAME_MATCH_ADJUSTMENTS.get("subset_boost", 40) # ── Surname detection (case 2): penalty if surname-only match ── surname_only_match = False if name1_effective and name2_effective: surnames1 = detect_surnames(name1_effective) surnames2 = detect_surnames(name2_effective) if surnames1 and surnames2: common_surnames = surnames1 & surnames2 if common_surnames: tokens1_non_surname = [t for t in name1_effective.split() if t not in surnames1] tokens2_non_surname = [t for t in name2_effective.split() if t not in surnames2] if tokens1_non_surname and tokens2_non_surname: non_surname_overlap = set(tokens1_non_surname) & set(tokens2_non_surname) if not non_surname_overlap: non_surname1_str = " ".join(tokens1_non_surname) non_surname2_str = " ".join(tokens2_non_surname) if fuzz.ratio(non_surname1_str, non_surname2_str) < 60: surname_only_match = True adjustment += surname_penalty_val # e.g., -30 else: # Mismatching surnames! Both have a known surname, but they don't match. # Example: "krishna rajput" vs "krishna singh" adjustment -= 40 # severe penalty for conflicting standard surnames # ── Sort tokens for boost/subset detection ── name1_tokens = sorted(name1_effective.split()) if name1_effective else [] name2_tokens = sorted(name2_effective.split()) if name2_effective else [] # ── Initial letter boost (case 4): +30 if initials match ── if name1_tokens and name2_tokens: boost = compute_initial_letter_boost(name1_tokens, name2_tokens) if boost > 0: adjustment += initial_boost_val # e.g., +30 # ── Subset match boost (case 5): +40 if one is complete subset ── if name1_tokens and name2_tokens and len(name1_tokens) != len(name2_tokens): if is_subset_match(name1_tokens, name2_tokens): adjustment += subset_boost_val # e.g., +40 # ── Run the appropriate case handler for base similarity ── result = None # CASE 1: Both records have full names if r1_has_fullname and r2_has_fullname: result = handle_case1(r1_name_proc, r2_name_proc, r1_firstname, r1_middlename, r1_lastname, r2_firstname, r2_middlename, r2_lastname) # CASE 2: One has full name, other has F/M/L elif r1_has_fullname and not r2_has_fullname and r2_concat: result = handle_case2(r1_name_proc, r2_fname, r2_mname, r2_lname, r2_concat) elif r2_has_fullname and not r1_has_fullname and r1_concat: result = handle_case2(r2_name_proc, r1_fname, r1_mname, r1_lname, r1_concat) # CASE 3: Both have F/M/L elif not r1_has_fullname and not r2_has_fullname and r1_concat and r2_concat: result = handle_case3(r1_fname, r1_mname, r1_lname, r1_concat, r2_fname, r2_mname, r2_lname, r2_concat) # Fallback if no case matched if result is None: result = { 'full_name_percent': 0.0, 'firstname_percent': 0.0, 'middlename_percent': 0.0, 'lastname_percent': 0.0 } # ── Apply accumulated adjustments to full_name_percent ── if adjustment != 0: result['full_name_percent'] = max(0.0, min(100.0, result['full_name_percent'] + adjustment)) return result def match_addresses_1_to_n(addresses_r1: List[str], addresses_r2: List[str]) -> float: """ Match addresses 1:N (plain addressline strings only — no city/zipcode/state). Pipeline: 1. Extract all address components (house_no, flat, apartment, street) from each address 2. Pass remaining address (components removed) to embedding model → base_score 3. If base_score > 60: apply per-component boost/penalty house_number : match +30 / mismatch -30 flat_number : match +10 / mismatch -10 apartment : match +10 / mismatch -10 street : match +10 / mismatch -10 If base_score <= 60: skip all component adjustments 4. Named component + post-box adjustments 5. Cap final score to [0, 100] """ from services.rules import ( preprocess_address as _preprocess_addr, compare_named_components as _compare_named, compare_postbox as _compare_postbox, remove_postbox_from_address as _strip_postbox, extract_address_components as _extract_components, ) def _norm(val): """Strip all non-alphanumerics — 144/143 → 144143.""" if not val: return "" return re.sub(r'[^A-Z0-9]', '', str(val).upper()) def _component_adj(v1, v2, boost, penalty): """Return (verdict, adjustment) for a single component pair.""" if v1 and v2: return ("match", boost) if v1 == v2 else ("mismatch", -penalty) return ("missing", 0.0) raw1 = [a for a in addresses_r1 if a and str(a).strip() not in ["-", " ", ""]] raw2 = [a for a in addresses_r2 if a and str(a).strip() not in ["-", " ", ""]] if not raw1 or not raw2: return 0 best_score = 0.0 for raw_a1 in raw1: for raw_a2 in raw2: if not raw_a1 or not raw_a2: continue # ── Extract components from both raw addresses ──────────────── comp1 = _extract_components(raw_a1) comp2 = _extract_components(raw_a2) hno1 = _norm(comp1.get("house_number")) hno2 = _norm(comp2.get("house_number")) flat1 = _norm(comp1.get("flat_number")) flat2 = _norm(comp2.get("flat_number")) apt1 = _norm(comp1.get("apartment")) apt2 = _norm(comp2.get("apartment")) str1 = _norm(comp1.get("street")) str2 = _norm(comp2.get("street")) # ── Remaining address → model input ─────────────────────────── rem1 = comp1.get("remaining_address", "").strip() rem2 = comp2.get("remaining_address", "").strip() # Fallback to full preprocessed address if remaining is empty if not rem1: rem1 = _preprocess_addr(raw_a1).upper() if not rem2: rem2 = _preprocess_addr(raw_a2).upper() addr1_clean = _strip_postbox(rem1) or rem1 addr2_clean = _strip_postbox(rem2) or rem2 # Named components comparison (on full preprocessed address) addr1_full = _preprocess_addr(raw_a1).upper() addr2_full = _preprocess_addr(raw_a2).upper() named_result = _compare_named(addr1_full, addr2_full) pb_result = _compare_postbox(addr1_full, addr2_full) try: base_score = float(match_entities(addr1_clean, addr2_clean, weights=ADDRESS_MODEL_WEIGHTS)) except (TypeError, ValueError): base_score = 0.0 # ── Component adjustments (only when base_score > 60) ───────── comp_adj = 0.0 component_specs = [ ("house_number", hno1, hno2, 30.0, 30.0), ("flat_number", flat1, flat2, 10.0, 10.0), ("apartment", apt1, apt2, 10.0, 10.0), ("street", str1, str2, 10.0, 10.0), ] print(f"[ADDR_COMPONENTS] base_score={base_score:.2f} | threshold=60 | adjustments_applied={base_score > 60}") print(f" remaining_addr1 : {addr1_clean!r}") print(f" remaining_addr2 : {addr2_clean!r}") for label, v1, v2, boost, penalty in component_specs: verdict, adj = _component_adj(v1, v2, boost, penalty) if verdict == "missing": print(f" {label:<15} | verdict=missing | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [skipped - component absent]") elif base_score <= 60: print(f" {label:<15} | verdict={verdict:<9} | v1={v1!r:>10} v2={v2!r:<10} | adjustment=0.0 [SKIPPED - base_score<=60]") else: comp_adj += adj sign = "+" if adj >= 0 else "" tag = "BOOSTED" if adj > 0 else "PENALISED" print(f" {label:<15} | verdict={verdict:<9} | v1={v1!r:>10} v2={v2!r:<10} | adjustment={sign}{adj:.1f} [{tag}]") print(f" total comp_adj : {comp_adj:+.1f}") adjustment = comp_adj + named_result['score_adjustment'] + pb_result['adjustment'] final_score = max(0.0, min(100.0, base_score + adjustment)) if final_score > best_score: best_score = final_score return round(best_score, 2) def match_addresses_structured( addrs_r1: List[dict], addrs_r2: List[dict], ) -> float: """ Match addresses when city / zipcode / state are available as separate columns. Each address dict must have keys: addressline, city, zipcode, state. Returns best score across all N×M combinations (0-100). Handles: - Missing state/city → inferred from zipcode via pgeocode (offline) - Bank state codes (NDH, BLR …) → canonical form - City name variants → canonical via CITY_MAPPING - House number extraction + comparison - Full addressline text via embedding model Example: addrs1 = [{"addressline": "A13 GUPTA ENCLAVE...", "city": "NEW DELHI", "zipcode": "110059", "state": "NDH"}] addrs2 = [{"addressline": "A13 GUPTA ENCLAVE...", "city": "NEW DELHI", "zipcode": "110059", "state": "DELHI"}] score = match_addresses_structured(addrs1, addrs2) # → ~100 """ from services.rules import match_structured_address_lists as _sa_match return _sa_match(addrs_r1, addrs_r2) def match_single_field(value1: str, value2: str) -> float: """ Match single fields like SPOUSENAME, MOTHERNAME, etc. Returns similarity score as float or "missing value" """ return match_entities(value1, value2)