from concurrent.futures import ThreadPoolExecutor from typing import Dict, List, Optional, Tuple from rapidfuzz import fuzz from sklearn.metrics.pairwise import cosine_similarity from sentence_transformers import SentenceTransformer import re import itertools import unicodedata from rules import standardize_address # ---------- Model Store ---------- MODEL_STORE = { "model1": SentenceTransformer("sentence-transformers/all-mpnet-base-v2"), "model2": SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2"), } class NameAddressPreprocessor: """ Preprocessor for name and address matching in HDFC project. Cleans text while preserving important characters like -, /, and , Handles repeated characters intelligently based on whether they're required. """ def __init__(self): # Characters to preserve in addresses self.preserve_chars = {'-', '/', ',', '.','&', '#', ':',';','_',"'"} # Characters that are allowed to have repetitions (required chars) self.required_repeated_chars = {'-', '/', ',', '#'} def normalize_unicode(self, text): """Normalize unicode characters to their closest ASCII equivalent""" if not text: return "" # Normalize to NFKD form and encode to ASCII, ignoring errors text = unicodedata.normalize('NFKD', text) text = text.encode('ASCII', 'ignore').decode('ASCII') return text def remove_extra_whitespace(self, text): """Remove extra whitespace while preserving single spaces""" if not text: return "" # Replace multiple spaces/tabs with single space text = re.sub(r'[ \t]+', ' ', text) # Strip leading/trailing whitespace return text.strip() def handle_repeated_characters(self, text): """ Handle repeated SPECIAL characters intelligently: - First, handle patterns like '.,.,.' or '.,.,' (alternating or mixed special chars) - Then handle consecutive repeated characters like '---' or '....' - If special character is required (in required_repeated_chars), replace repeated chars with single char - If special character is NOT required, replace ALL repeated occurrences with single space - IGNORES repeated letters and numbers (they are preserved as-is) Examples: - '---' (required) -> '-' - '!!!!' (not required) -> ' ' - '....' (not required) -> ' ' - '////' (required) -> '/' - 'AAAA' (letters) -> 'AAAA' (unchanged) - '1111' (numbers) -> '1111' (unchanged) - '.,.,.' -> ' ' - ',.,.' -> ' ' """ if not text: return "" # Step 1: Handle patterns like '.,.,.' or mixed special characters # This pattern matches special characters separated by other special chars (like comma-period patterns) # Match 2+ special chars with optional special chars between them text = re.sub(r'([^a-zA-Z0-9\s])([^a-zA-Z0-9\s])+', lambda m: m.group(0)[0] if m.group(0)[0] in self.required_repeated_chars else ' ', text) # Step 2: Handle consecutive repeated special characters def replace_repeated(match): char = match.group(0)[0] # Get the character being repeated repeated_count = len(match.group(0)) # Only process if it's actually repeated (2 or more times) if repeated_count < 2: return match.group(0) # If character is in required_repeated_chars, keep single instance if char in self.required_repeated_chars: return char # For any other special character (whether in preserve_chars or not), replace with space else: return ' ' # Match only NON-ALPHANUMERIC characters that are repeated 2 or more times text = re.sub(r'([^a-zA-Z0-9\s])\1+', replace_repeated, text) return text def remove_repeated_punctuation(self, text): """ Remove repeated punctuation marks (legacy method - kept for compatibility) Now delegates to handle_repeated_characters for better logic """ if not text: return "" return self.handle_repeated_characters(text) def remove_special_chars(self, text, preserve_chars=None): """ Remove special characters except those specified to preserve Default preserves: - / , . # & : ; _ ' """ if not text: return "" if preserve_chars is None: preserve_chars = self.preserve_chars # Create pattern for allowed characters # Keep alphanumeric, spaces, and preserved special chars preserve_pattern = ''.join(re.escape(c) for c in preserve_chars) pattern = f'[^a-zA-Z0-9\s{preserve_pattern}]' # Remove unwanted special characters text = re.sub(pattern, '', text) return text def standardize_case(self, text, mode='upper'): """ Standardize text case for comparison mode: 'upper', 'lower', or 'title' """ if not text: return "" if mode == 'upper': return text.upper() elif mode == 'lower': return text.lower() elif mode == 'title': return text.title() return text def remove_control_characters(self, text): """Remove control characters (non-printable) and unwanted whitespace characters""" if not text: return "" # Remove control characters and replace tabs/newlines with space text = ''.join(char if unicodedata.category(char)[0] != 'C' else ' ' if char in '\n\t' else char for char in text if unicodedata.category(char)[0] != 'C' or char in '\n\t') return text def remove_digits(self, text): """Remove all digits from text""" if not text: return "" return re.sub(r'\d', '', text) def preprocess_name(self, name): """ Preprocess name for matching Steps: Remove control chars -> Normalize unicode -> Handle repeated chars -> Remove special chars -> Remove extra whitespace -> Standardize case """ if not name: return "" print("org name",name) # Remove control characters and convert tabs/newlines to spaces name = self.remove_control_characters(name) # Normalize unicode name = self.normalize_unicode(name) # Handle repeated SPECIAL characters (not letters/numbers) name = self.handle_repeated_characters(name) # For names, preserve fewer special chars (only hyphen, apostrophe, and period) name_preserve = {'-', "'", '.'} name = self.remove_special_chars(name, preserve_chars=name_preserve) # Remove extra whitespace name = self.remove_extra_whitespace(name) # Standardize to uppercase for comparison name = self.standardize_case(name, mode='upper') ### Remove digits name = self.remove_digits(name) print("cleaned name",name) return name def preprocess_address(self, address): """ Preprocess address for matching Steps: Remove control chars -> Normalize unicode -> Handle repeated chars -> Remove special chars -> Remove extra whitespace -> Standardize case """ if not address: return "" print("org address",address) # Remove control characters and convert tabs/newlines to spaces address = self.remove_control_characters(address) # Normalize unicode address = self.normalize_unicode(address) # Handle repeated SPECIAL characters intelligently (not letters/numbers) address = self.handle_repeated_characters(address) # Remove special chars while preserving important ones address = self.remove_special_chars(address) # Remove extra whitespace (do this again after other cleaning) address = self.remove_extra_whitespace(address) # Standardize to uppercase for comparison address = self.standardize_case(address, mode='upper') address= standardize_address(address) print("cleaned address",address) return address # ---------- Text Preprocessing ---------- def preprocess_for_matching(text: str) -> str: """Standardize text for matching""" if not text or text in ["-", " ", ""]: return "" return text.upper().strip() # ---------- Core Matching Functions ---------- def calculate_fuzzy_scores(input1: str, input2: str) -> Dict[str, float]: """Calculate fuzzy matching scores using RapidFuzz""" return { "simple_ratio": fuzz.ratio(input1, input2), "token_set_ratio": fuzz.token_set_ratio(input1, input2), "w_ratio": fuzz.WRatio(input1, input2), "partial_ratio": fuzz.partial_ratio(input1, input2), } def calculate_semantic_similarity(model_name: str, input1: str, input2: str) -> float: """Calculate semantic similarity using sentence transformers""" model = MODEL_STORE[model_name] print("input1 to the embedding model:",input1) print("input2 to the embedding model:",input2) embedding1 = model.encode([input1]) embedding2 = model.encode([input2]) return cosine_similarity(embedding1, embedding2)[0][0] def calculate_final_score(fuzzy_scores: Dict[str, float], semantic_score: float) -> float: """Calculate weighted final score""" weights = { "simple_ratio": 0.15, "token_set_ratio": 0.40, "partial_ratio": 0.20, "w_ratio": 0.05, "semantic_score": 0.20, } normalized_scores = { "simple_ratio": fuzzy_scores.get("simple_ratio", 0), "token_set_ratio": fuzzy_scores.get("token_set_ratio", 0), "partial_ratio": fuzzy_scores.get("partial_ratio", 0), "w_ratio": fuzzy_scores.get("w_ratio", 0), "semantic_score": semantic_score * 100, } weighted_sum = sum(normalized_scores[key] * weight for key, weight in weights.items()) return max(0, min(100, weighted_sum)) def calculate_overall_similarity(score1: float, score2: float) -> float: """Calculate overall similarity from two model scores""" return score1 * 0.6 + score2 * 0.4 def check_substring_match(str1: str, str2: str) -> bool: """Check if one string is a substring of another""" if not str1 or not str2: return False return str1 in str2 or str2 in str1 def check_individual_name_matches(name_full: str, fname: str, mname: str, lname: str) -> Tuple[bool, bool, bool]: """ Check if full name contains first, middle, or last name as substring Returns: (first_match, middle_match, last_match) """ f_match = check_substring_match(name_full, fname) if fname else False m_match = check_substring_match(name_full, mname) if mname else False l_match = check_substring_match(name_full, lname) if lname else False return f_match, m_match, l_match def concatenate_name_parts(firstname: str, middlename: str, lastname: str) -> str: """Concatenate name parts""" parts = [] if firstname and firstname not in ["-", " ", ""]: parts.append(firstname.upper().strip()) if middlename and middlename not in ["-", " ", ""]: parts.append(middlename.upper().strip()) if lastname and lastname not in ["-", " ", ""]: parts.append(lastname.upper().strip()) if not parts: return "" parts.sort() return " ".join(parts) # ---------- helpers used only inside the new logic ---------- def _normalize_and_sort(name: str) -> str: """ 1. Split on any non-alphanumeric character (space, underscore, comma, etc.) 2. Remove empty tokens 3. Upper-case 4. Sort alphabetically 5. Re-join with single space """ tokens = re.split(r'[^A-Za-z0-9]+', name.strip()) tokens = [t.upper() for t in tokens if t] return ' '.join(sorted(tokens)) def _all_name_combinations(fname: str, mname: str, lname: str) -> list[str]: """ Return every possible ordering of the supplied parts, dropping any empty/blank components. """ parts = [] for p in (fname, mname, lname): if p and p.strip() not in ('-', '', ' '): parts.append(p.strip().upper()) if not parts: return [] # itertools.permutations gives every ordering return [' '.join(order) for order in itertools.permutations(parts)] def match_entities(value1: str, value2: str) -> float: """ Match two entities using fuzzy + semantic similarity Returns: similarity score as float (0-100) """ standardized_input1 = preprocess_for_matching(value1) standardized_input2 = preprocess_for_matching(value2) if not standardized_input1 or not standardized_input2: return 0 # Calculate fuzzy scores fuzzy_match_scores = calculate_fuzzy_scores(standardized_input1, standardized_input2) print("standardized input1",standardized_input1) print("standardized input2",standardized_input2) # Calculate semantic similarity using both models in parallel with ThreadPoolExecutor() as executor: f1 = executor.submit(calculate_semantic_similarity, "model1", standardized_input1, standardized_input2) f2 = executor.submit(calculate_semantic_similarity, "model2", standardized_input1, standardized_input2) cosine1 = f1.result() cosine2 = f2.result() ff1 = executor.submit(calculate_final_score, fuzzy_match_scores, cosine1) ff2 = executor.submit(calculate_final_score, fuzzy_match_scores, cosine2) final1 = ff1.result() final2 = ff2.result() overall_similarity = calculate_overall_similarity(final1, final2) return round(overall_similarity, 2) def calculate_similarity_with_models(text1: str, text2: str) -> float: """ Calculate similarity using fuzzy scores and embedding models Returns similarity percentage as float """ if not text1 or not text2: return 0 # Calculate fuzzy scores fuzzy_scores = { "simple_ratio": fuzz.ratio(text1, text2), "token_set_ratio": fuzz.token_set_ratio(text1, text2), "w_ratio": fuzz.WRatio(text1, text2), "partial_ratio": fuzz.partial_ratio(text1, text2), } # Calculate semantic similarity using both models with ThreadPoolExecutor() as executor: model1 = MODEL_STORE["model1"] model2 = MODEL_STORE["model2"] f1 = executor.submit(lambda: cosine_similarity( model1.encode([text1]), model1.encode([text2]))[0][0]) f2 = executor.submit(lambda: cosine_similarity( model2.encode([text1]), model2.encode([text2]))[0][0]) cosine1 = f1.result() cosine2 = f2.result() # Calculate final scores weights = { "simple_ratio": 0.15, "token_set_ratio": 0.40, "partial_ratio": 0.20, "w_ratio": 0.05, "semantic_score": 0.20, } def calc_final(fuzzy, semantic): normalized = { "simple_ratio": fuzzy["simple_ratio"], "token_set_ratio": fuzzy["token_set_ratio"], "partial_ratio": fuzzy["partial_ratio"], "w_ratio": fuzzy["w_ratio"], "semantic_score": semantic * 100, } return sum(normalized[k] * weights[k] for k in weights.keys()) final1 = calc_final(fuzzy_scores, cosine1) final2 = calc_final(fuzzy_scores, cosine2) overall_similarity = final1 * 0.6 + final2 * 0.4 return round(overall_similarity, 2) def handle_case1(full_name1: str, full_name2: str, r1_fname: str, r1_mname: str, r1_lname: str, r2_fname: str, r2_mname: str, r2_lname: str) -> dict: """ Case-1 (both records supply a full name) Returns a dictionary with separate similarity scores for each component Returns: dict: { 'full_name_percent': float, # full_name1 vs full_name2 'firstname_percent': float, # r1_fname vs r2_fname 'middlename_percent': float, # r1_mname vs r2_mname 'lastname_percent': float # r1_lname vs r2_lname } """ # Handle empty full names if not full_name1 or not full_name2: result={ 'full_name_percent': 0.0, 'firstname_percent': 0.0, 'middlename_percent': 0.0, 'lastname_percent': 0.0 } return result # 1. Normalize + alphabetically sort each full name and calculate similarity sorted1 = _normalize_and_sort(full_name1) sorted2 = _normalize_and_sort(full_name2) full_name_percent = calculate_similarity_with_models(sorted1, sorted2) # 2. Calculate firstname_percent: compare firstnames firstname_percent = calculate_similarity_with_models( r1_fname, r2_fname ) if r1_fname and r2_fname else 0.0 # 3. Calculate middlename_percent: compare middlenames middlename_percent = calculate_similarity_with_models( r1_mname, r2_mname ) if r1_mname and r2_mname else 0.0 # 4. Calculate lastname_percent: compare lastnames lastname_percent = calculate_similarity_with_models( r1_lname, r2_lname ) if r1_lname and r2_lname else 0.0 result={ 'full_name_percent': full_name_percent, 'firstname_percent': firstname_percent, 'middlename_percent': middlename_percent, 'lastname_percent': lastname_percent } return result def handle_case2(full_name: str, fname: str, mname: str, lname: str, concat_name: str) -> dict: """ Case-2 (one side has full name, the other has F/M/L) Returns a dictionary with separate similarity scores for each component Returns: dict: { 'full_name_percent': float, # full_name vs concat_name 'firstname_percent': float, # full_name vs fname 'middlename_percent': float, # full_name vs mname 'lastname_percent': float # full_name vs lname } """ # 0. Try every permutation of F/M/L for permuted in _all_name_combinations(fname, mname, lname): if permuted == full_name.upper().strip(): # Perfect match - all components get 100% result= { 'full_name_percent': 100.0, 'firstname_percent': 100.0, 'middlename_percent': 100.0, 'lastname_percent': 100.0 } return result # 1. Calculate full_name_percent: compare full_name with concatenated name full_name_percent = calculate_similarity_with_models( full_name, concat_name ) # 2. Calculate firstname_percent: compare full_name with firstname only firstname_percent = calculate_similarity_with_models( full_name, fname if fname else "" ) if fname else 0.0 # 3. Calculate middlename_percent: compare full_name with middlename only middlename_percent = calculate_similarity_with_models( full_name, mname if mname else "" ) if mname else 0.0 # 4. Calculate lastname_percent: compare full_name with lastname only lastname_percent = calculate_similarity_with_models( full_name, lname if lname else "" ) if lname else 0.0 result={ 'full_name_percent': full_name_percent, 'firstname_percent': firstname_percent, 'middlename_percent': middlename_percent, 'lastname_percent': lastname_percent } return result def handle_case3(r1_fname: str, r1_mname: str, r1_lname: str, r1_concat: str, r2_fname: str, r2_mname: str, r2_lname: str, r2_concat: str) -> dict: """ Handle Case 3: Both records have F/M/L Returns a dictionary with separate similarity scores for each component Returns: dict: { 'full_name_percent': float, # r1_concat vs r2_concat 'firstname_percent': float, # r1_fname vs r2_fname 'middlename_percent': float, # r1_mname vs r2_mname 'lastname_percent': float # r1_lname vs r2_lname } """ # Check substring matches for each component f_match = check_substring_match(r1_fname, r2_fname) if r1_fname and r2_fname else False m_match = check_substring_match(r1_mname, r2_mname) if r1_mname and r2_mname else False l_match = check_substring_match(r1_lname, r2_lname) if r1_lname and r2_lname else False # Calculate full_name_percent: compare concatenated names full_name_percent = calculate_similarity_with_models(r1_concat, r2_concat) # Apply boosting logic based on substring matches # Rule 1: Only lastname matches (family match) if l_match and not f_match and not m_match: full_name_percent = max(full_name_percent, 85.0) # Ensure minimum 85% for family match # Rule 2: Lastname + (firstname or middle) matches (partial match) # Strong indicator of same person elif l_match and (f_match or m_match): full_name_percent = max(full_name_percent, 90.0) # Higher confidence when lastname + another field matches # Rule 3: No matches at all or only firstname/middlename matches # Use the calculated similarity as-is # Calculate individual component percentages # 2. Calculate firstname_percent: compare firstnames firstname_percent = calculate_similarity_with_models( r1_fname, r2_fname ) if r1_fname and r2_fname else 0.0 # 3. Calculate middlename_percent: compare middlenames middlename_percent = calculate_similarity_with_models( r1_mname, r2_mname ) if r1_mname and r2_mname else 0.0 # 4. Calculate lastname_percent: compare lastnames lastname_percent = calculate_similarity_with_models( r1_lname, r2_lname ) if r1_lname and r2_lname else 0.0 result= { 'full_name_percent': full_name_percent, 'firstname_percent': firstname_percent, 'middlename_percent': middlename_percent, 'lastname_percent': lastname_percent } return result def match_name(name: str, firstname: str, lastname: str, middlename: str) -> float: """ Match name with logic Returns similarity score as float or "missing value" """ name_processed = preprocess_for_matching(name) concat_name = concatenate_name_parts(firstname, middlename, lastname) # Case 1: NAME matches concatenated name if name_processed and concat_name and name_processed == concat_name: return 100 # Case 2: NAME is empty, use concatenated if not name_processed and concat_name: return 100 # Case 3: Concat is empty, use NAME if name_processed and not concat_name: return 100 # Case 4: Both exist but different - use model if name_processed and concat_name and name_processed != concat_name: # Pass both to model for fuzzy matching return match_entities(name_processed, concat_name) # Both empty return 0 def match_names_cross_records(r1_name: str, r1_firstname: str, r1_lastname: str, r1_middlename: str, r2_name: str, r2_firstname: str, r2_lastname: str, r2_middlename: str) -> float: """ Match names between two records with three cases Returns similarity score as float or "missing value" """ # Preprocess all inputs r1_name_proc = r1_name.upper().strip() if r1_name and r1_name not in ["-", " ", ""] else "" r2_name_proc = r2_name.upper().strip() if r2_name and r2_name not in ["-", " ", ""] else "" # Determine which case we're in r1_has_fullname = bool(r1_name_proc) r2_has_fullname = bool(r2_name_proc) # CASE 1: Both records have full names if r1_has_fullname and r2_has_fullname: return handle_case1(r1_name_proc, r2_name_proc,r1_firstname,r1_middlename, r1_lastname, r2_firstname, r2_middlename, r2_lastname) # Only process F/M/L fields if we're not in Case 1 r1_fname = r1_firstname.upper().strip() if r1_firstname and r1_firstname not in ["-", " ", ""] else "" r1_mname = r1_middlename.upper().strip() if r1_middlename and r1_middlename not in ["-", " ", ""] else "" r1_lname = r1_lastname.upper().strip() if r1_lastname and r1_lastname not in ["-", " ", ""] else "" r2_fname = r2_firstname.upper().strip() if r2_firstname and r2_firstname not in ["-", " ", ""] else "" r2_mname = r2_middlename.upper().strip() if r2_middlename and r2_middlename not in ["-", " ", ""] else "" r2_lname = r2_lastname.upper().strip() if r2_lastname and r2_lastname not in ["-", " ", ""] else "" r1_concat = concatenate_name_parts(r1_fname, r1_mname, r1_lname) r2_concat = concatenate_name_parts(r2_fname, r2_mname, r2_lname) # CASE 2: One has full name, other has F/M/L if r1_has_fullname and not r2_has_fullname and r2_concat: return handle_case2(r1_name_proc, r2_fname, r2_mname, r2_lname, r2_concat) elif r2_has_fullname and not r1_has_fullname and r1_concat: return handle_case2(r2_name_proc, r1_fname, r1_mname, r1_lname, r1_concat) # CASE 3: Both have F/M/L elif not r1_has_fullname and not r2_has_fullname and r1_concat and r2_concat: return handle_case3(r1_fname, r1_mname, r1_lname, r1_concat, r2_fname, r2_mname, r2_lname, r2_concat) # Missing data result = { 'full_name_percent': 0.0, 'firstname_percent': 0.0, 'middlename_percent': 0.0, 'lastname_percent': 0.0 } return result def match_addresses_1_to_n(addresses_r1: List[str], addresses_r2: List[str]) -> float: """ Match addresses 1:N - if any address in R1 matches any in R2 Returns similarity score as float or "missing value" """ valid_addr1 = [preprocess_for_matching(addr) for addr in addresses_r1 if addr and addr not in ["-", " ", ""]] valid_addr2 = [preprocess_for_matching(addr) for addr in addresses_r2 if addr and addr not in ["-", " ", ""]] print("address1 for matching:",valid_addr1) print("address2 for matching:",valid_addr2) if not valid_addr1 or not valid_addr2: return 0 best_score = 0 # Compare each address in R1 with each in R2 for addr1 in valid_addr1: for addr2 in valid_addr2: result = match_entities(addr1, addr2) # Convert to float to handle numpy types try: score = float(result) if score > best_score: best_score = score except (TypeError, ValueError): # If conversion fails, skip this result continue return best_score def match_single_field(value1: str, value2: str) -> float: """ Match single fields like SPOUSENAME, MOTHERNAME, etc. Returns similarity score as float or "missing value" """ return match_entities(value1, value2)