| from concurrent.futures import ThreadPoolExecutor |
| from typing import Dict, List, Optional, Tuple |
| from rapidfuzz import fuzz |
| from sklearn.metrics.pairwise import cosine_similarity |
| from sentence_transformers import SentenceTransformer |
| import re |
| import itertools |
| import unicodedata |
|
|
| from rules import standardize_address |
|
|
| |
| MODEL_STORE = { |
| "model1": SentenceTransformer("sentence-transformers/all-mpnet-base-v2"), |
| "model2": SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2"), |
| } |
|
|
|
|
| class NameAddressPreprocessor: |
| """ |
| Preprocessor for name and address matching in HDFC project. |
| Cleans text while preserving important characters like -, /, and , |
| Handles repeated characters intelligently based on whether they're required. |
| """ |
| |
| def __init__(self): |
| |
| self.preserve_chars = {'-', '/', ',', '.','&', '#', ':',';','_',"'"} |
| |
| self.required_repeated_chars = {'-', '/', ',', '#'} |
|
|
| def normalize_unicode(self, text): |
| """Normalize unicode characters to their closest ASCII equivalent""" |
| if not text: |
| return "" |
| |
| text = unicodedata.normalize('NFKD', text) |
| text = text.encode('ASCII', 'ignore').decode('ASCII') |
| return text |
| |
| def remove_extra_whitespace(self, text): |
| """Remove extra whitespace while preserving single spaces""" |
| if not text: |
| return "" |
| |
| text = re.sub(r'[ \t]+', ' ', text) |
| |
| return text.strip() |
| |
| def handle_repeated_characters(self, text): |
| """ |
| Handle repeated SPECIAL characters intelligently: |
| - First, handle patterns like '.,.,.' or '.,.,' (alternating or mixed special chars) |
| - Then handle consecutive repeated characters like '---' or '....' |
| - If special character is required (in required_repeated_chars), replace repeated chars with single char |
| - If special character is NOT required, replace ALL repeated occurrences with single space |
| - IGNORES repeated letters and numbers (they are preserved as-is) |
| |
| Examples: |
| - '---' (required) -> '-' |
| - '!!!!' (not required) -> ' ' |
| - '....' (not required) -> ' ' |
| - '////' (required) -> '/' |
| - 'AAAA' (letters) -> 'AAAA' (unchanged) |
| - '1111' (numbers) -> '1111' (unchanged) |
| - '.,.,.' -> ' ' |
| - ',.,.' -> ' ' |
| """ |
| if not text: |
| return "" |
| |
| |
| |
| |
| text = re.sub(r'([^a-zA-Z0-9\s])([^a-zA-Z0-9\s])+', lambda m: |
| m.group(0)[0] if m.group(0)[0] in self.required_repeated_chars else ' ', text) |
| |
| |
| def replace_repeated(match): |
| char = match.group(0)[0] |
| repeated_count = len(match.group(0)) |
| |
| |
| if repeated_count < 2: |
| return match.group(0) |
| |
| |
| if char in self.required_repeated_chars: |
| return char |
| |
| else: |
| return ' ' |
| |
| |
| text = re.sub(r'([^a-zA-Z0-9\s])\1+', replace_repeated, text) |
| |
| return text |
| |
| def remove_repeated_punctuation(self, text): |
| """ |
| Remove repeated punctuation marks (legacy method - kept for compatibility) |
| Now delegates to handle_repeated_characters for better logic |
| """ |
| if not text: |
| return "" |
| |
| return self.handle_repeated_characters(text) |
| |
| def remove_special_chars(self, text, preserve_chars=None): |
| """ |
| Remove special characters except those specified to preserve |
| Default preserves: - / , . # & : ; _ ' |
| """ |
| if not text: |
| return "" |
| |
| if preserve_chars is None: |
| preserve_chars = self.preserve_chars |
| |
| |
| |
| preserve_pattern = ''.join(re.escape(c) for c in preserve_chars) |
| pattern = f'[^a-zA-Z0-9\s{preserve_pattern}]' |
| |
| |
| text = re.sub(pattern, '', text) |
| return text |
| |
| def standardize_case(self, text, mode='upper'): |
| """ |
| Standardize text case for comparison |
| mode: 'upper', 'lower', or 'title' |
| """ |
| if not text: |
| return "" |
| |
| if mode == 'upper': |
| return text.upper() |
| elif mode == 'lower': |
| return text.lower() |
| elif mode == 'title': |
| return text.title() |
| return text |
| |
| def remove_control_characters(self, text): |
| """Remove control characters (non-printable) and unwanted whitespace characters""" |
| if not text: |
| return "" |
| |
| text = ''.join(char if unicodedata.category(char)[0] != 'C' else ' ' |
| if char in '\n\t' else char |
| for char in text |
| if unicodedata.category(char)[0] != 'C' or char in '\n\t') |
| return text |
| |
| def remove_digits(self, text): |
| """Remove all digits from text""" |
| if not text: |
| return "" |
| return re.sub(r'\d', '', text) |
|
|
| def preprocess_name(self, name): |
| """ |
| Preprocess name for matching |
| Steps: Remove control chars -> Normalize unicode -> Handle repeated chars -> |
| Remove special chars -> Remove extra whitespace -> Standardize case |
| """ |
| if not name: |
| return "" |
| print("org name",name) |
| |
| name = self.remove_control_characters(name) |
| |
| |
| name = self.normalize_unicode(name) |
| |
| |
| name = self.handle_repeated_characters(name) |
| |
| |
| name_preserve = {'-', "'", '.'} |
| name = self.remove_special_chars(name, preserve_chars=name_preserve) |
| |
| |
| name = self.remove_extra_whitespace(name) |
| |
| |
| name = self.standardize_case(name, mode='upper') |
|
|
| |
| name = self.remove_digits(name) |
|
|
| print("cleaned name",name) |
| return name |
| |
| def preprocess_address(self, address): |
| """ |
| Preprocess address for matching |
| Steps: Remove control chars -> Normalize unicode -> Handle repeated chars -> |
| Remove special chars -> Remove extra whitespace -> Standardize case |
| """ |
| if not address: |
| return "" |
| print("org address",address) |
| |
| address = self.remove_control_characters(address) |
| |
| |
| address = self.normalize_unicode(address) |
| |
| |
| address = self.handle_repeated_characters(address) |
| |
| |
| address = self.remove_special_chars(address) |
| |
| |
| address = self.remove_extra_whitespace(address) |
| |
| |
| address = self.standardize_case(address, mode='upper') |
| address= standardize_address(address) |
| print("cleaned address",address) |
| return address |
|
|
|
|
| |
| def preprocess_for_matching(text: str) -> str: |
| """Standardize text for matching""" |
| if not text or text in ["-", " ", ""]: |
| return "" |
| return text.upper().strip() |
|
|
| |
| def calculate_fuzzy_scores(input1: str, input2: str) -> Dict[str, float]: |
| """Calculate fuzzy matching scores using RapidFuzz""" |
| return { |
| "simple_ratio": fuzz.ratio(input1, input2), |
| "token_set_ratio": fuzz.token_set_ratio(input1, input2), |
| "w_ratio": fuzz.WRatio(input1, input2), |
| "partial_ratio": fuzz.partial_ratio(input1, input2), |
| } |
|
|
| def calculate_semantic_similarity(model_name: str, input1: str, input2: str) -> float: |
| """Calculate semantic similarity using sentence transformers""" |
| model = MODEL_STORE[model_name] |
| print("input1 to the embedding model:",input1) |
| print("input2 to the embedding model:",input2) |
| embedding1 = model.encode([input1]) |
| embedding2 = model.encode([input2]) |
| return cosine_similarity(embedding1, embedding2)[0][0] |
|
|
| def calculate_final_score(fuzzy_scores: Dict[str, float], semantic_score: float) -> float: |
| """Calculate weighted final score""" |
| weights = { |
| "simple_ratio": 0.15, |
| "token_set_ratio": 0.40, |
| "partial_ratio": 0.20, |
| "w_ratio": 0.05, |
| "semantic_score": 0.20, |
| } |
| normalized_scores = { |
| "simple_ratio": fuzzy_scores.get("simple_ratio", 0), |
| "token_set_ratio": fuzzy_scores.get("token_set_ratio", 0), |
| "partial_ratio": fuzzy_scores.get("partial_ratio", 0), |
| "w_ratio": fuzzy_scores.get("w_ratio", 0), |
| "semantic_score": semantic_score * 100, |
| } |
| weighted_sum = sum(normalized_scores[key] * weight for key, weight in weights.items()) |
| return max(0, min(100, weighted_sum)) |
|
|
| def calculate_overall_similarity(score1: float, score2: float) -> float: |
| """Calculate overall similarity from two model scores""" |
| return score1 * 0.6 + score2 * 0.4 |
|
|
| def check_substring_match(str1: str, str2: str) -> bool: |
| """Check if one string is a substring of another""" |
| if not str1 or not str2: |
| return False |
| return str1 in str2 or str2 in str1 |
|
|
| def check_individual_name_matches(name_full: str, fname: str, mname: str, lname: str) -> Tuple[bool, bool, bool]: |
| """ |
| Check if full name contains first, middle, or last name as substring |
| Returns: (first_match, middle_match, last_match) |
| """ |
| f_match = check_substring_match(name_full, fname) if fname else False |
| m_match = check_substring_match(name_full, mname) if mname else False |
| l_match = check_substring_match(name_full, lname) if lname else False |
| return f_match, m_match, l_match |
|
|
|
|
| def concatenate_name_parts(firstname: str, middlename: str, lastname: str) -> str: |
| """Concatenate name parts""" |
| parts = [] |
| if firstname and firstname not in ["-", " ", ""]: |
| parts.append(firstname.upper().strip()) |
| if middlename and middlename not in ["-", " ", ""]: |
| parts.append(middlename.upper().strip()) |
| if lastname and lastname not in ["-", " ", ""]: |
| parts.append(lastname.upper().strip()) |
| |
| if not parts: |
| return "" |
| |
| parts.sort() |
| return " ".join(parts) |
|
|
| |
| def _normalize_and_sort(name: str) -> str: |
| """ |
| 1. Split on any non-alphanumeric character (space, underscore, comma, etc.) |
| 2. Remove empty tokens |
| 3. Upper-case |
| 4. Sort alphabetically |
| 5. Re-join with single space |
| """ |
| tokens = re.split(r'[^A-Za-z0-9]+', name.strip()) |
| tokens = [t.upper() for t in tokens if t] |
| return ' '.join(sorted(tokens)) |
|
|
| def _all_name_combinations(fname: str, mname: str, lname: str) -> list[str]: |
| """ |
| Return every possible ordering of the supplied parts, |
| dropping any empty/blank components. |
| """ |
| parts = [] |
| for p in (fname, mname, lname): |
| if p and p.strip() not in ('-', '', ' '): |
| parts.append(p.strip().upper()) |
| if not parts: |
| return [] |
| |
| return [' '.join(order) for order in itertools.permutations(parts)] |
|
|
|
|
| def match_entities(value1: str, value2: str) -> float: |
| """ |
| Match two entities using fuzzy + semantic similarity |
| Returns: similarity score as float (0-100) |
| """ |
| standardized_input1 = preprocess_for_matching(value1) |
| standardized_input2 = preprocess_for_matching(value2) |
|
|
| if not standardized_input1 or not standardized_input2: |
| return 0 |
| |
| fuzzy_match_scores = calculate_fuzzy_scores(standardized_input1, standardized_input2) |
| print("standardized input1",standardized_input1) |
| print("standardized input2",standardized_input2) |
| |
| with ThreadPoolExecutor() as executor: |
| f1 = executor.submit(calculate_semantic_similarity, "model1", standardized_input1, standardized_input2) |
| f2 = executor.submit(calculate_semantic_similarity, "model2", standardized_input1, standardized_input2) |
|
|
| cosine1 = f1.result() |
| cosine2 = f2.result() |
|
|
| ff1 = executor.submit(calculate_final_score, fuzzy_match_scores, cosine1) |
| ff2 = executor.submit(calculate_final_score, fuzzy_match_scores, cosine2) |
|
|
| final1 = ff1.result() |
| final2 = ff2.result() |
|
|
| overall_similarity = calculate_overall_similarity(final1, final2) |
|
|
| return round(overall_similarity, 2) |
|
|
| def calculate_similarity_with_models(text1: str, text2: str) -> float: |
| """ |
| Calculate similarity using fuzzy scores and embedding models |
| Returns similarity percentage as float |
| """ |
| if not text1 or not text2: |
| return 0 |
| |
| |
| fuzzy_scores = { |
| "simple_ratio": fuzz.ratio(text1, text2), |
| "token_set_ratio": fuzz.token_set_ratio(text1, text2), |
| "w_ratio": fuzz.WRatio(text1, text2), |
| "partial_ratio": fuzz.partial_ratio(text1, text2), |
| } |
| |
| |
| with ThreadPoolExecutor() as executor: |
| model1 = MODEL_STORE["model1"] |
| model2 = MODEL_STORE["model2"] |
| |
| f1 = executor.submit(lambda: cosine_similarity( |
| model1.encode([text1]), model1.encode([text2]))[0][0]) |
| f2 = executor.submit(lambda: cosine_similarity( |
| model2.encode([text1]), model2.encode([text2]))[0][0]) |
| |
| cosine1 = f1.result() |
| cosine2 = f2.result() |
| |
| |
| weights = { |
| "simple_ratio": 0.15, |
| "token_set_ratio": 0.40, |
| "partial_ratio": 0.20, |
| "w_ratio": 0.05, |
| "semantic_score": 0.20, |
| } |
| |
| def calc_final(fuzzy, semantic): |
| normalized = { |
| "simple_ratio": fuzzy["simple_ratio"], |
| "token_set_ratio": fuzzy["token_set_ratio"], |
| "partial_ratio": fuzzy["partial_ratio"], |
| "w_ratio": fuzzy["w_ratio"], |
| "semantic_score": semantic * 100, |
| } |
| return sum(normalized[k] * weights[k] for k in weights.keys()) |
| |
| final1 = calc_final(fuzzy_scores, cosine1) |
| final2 = calc_final(fuzzy_scores, cosine2) |
| overall_similarity = final1 * 0.6 + final2 * 0.4 |
| |
| return round(overall_similarity, 2) |
|
|
| def handle_case1(full_name1: str, full_name2: str, |
| r1_fname: str, r1_mname: str, r1_lname: str, |
| r2_fname: str, r2_mname: str, r2_lname: str) -> dict: |
| """ |
| Case-1 (both records supply a full name) |
| Returns a dictionary with separate similarity scores for each component |
| |
| Returns: |
| dict: { |
| 'full_name_percent': float, # full_name1 vs full_name2 |
| 'firstname_percent': float, # r1_fname vs r2_fname |
| 'middlename_percent': float, # r1_mname vs r2_mname |
| 'lastname_percent': float # r1_lname vs r2_lname |
| } |
| """ |
| |
| if not full_name1 or not full_name2: |
| result={ |
| 'full_name_percent': 0.0, |
| 'firstname_percent': 0.0, |
| 'middlename_percent': 0.0, |
| 'lastname_percent': 0.0 |
| } |
| return result |
|
|
| |
| sorted1 = _normalize_and_sort(full_name1) |
| sorted2 = _normalize_and_sort(full_name2) |
| |
| full_name_percent = calculate_similarity_with_models(sorted1, sorted2) |
| |
| |
| firstname_percent = calculate_similarity_with_models( |
| r1_fname, |
| r2_fname |
| ) if r1_fname and r2_fname else 0.0 |
|
|
| |
| middlename_percent = calculate_similarity_with_models( |
| r1_mname, |
| r2_mname |
| ) if r1_mname and r2_mname else 0.0 |
|
|
| |
| lastname_percent = calculate_similarity_with_models( |
| r1_lname, |
| r2_lname |
| ) if r1_lname and r2_lname else 0.0 |
|
|
| result={ |
| 'full_name_percent': full_name_percent, |
| 'firstname_percent': firstname_percent, |
| 'middlename_percent': middlename_percent, |
| 'lastname_percent': lastname_percent |
| } |
| return result |
|
|
| def handle_case2(full_name: str, |
| fname: str, mname: str, lname: str, |
| concat_name: str) -> dict: |
| """ |
| Case-2 (one side has full name, the other has F/M/L) |
| Returns a dictionary with separate similarity scores for each component |
| |
| Returns: |
| dict: { |
| 'full_name_percent': float, # full_name vs concat_name |
| 'firstname_percent': float, # full_name vs fname |
| 'middlename_percent': float, # full_name vs mname |
| 'lastname_percent': float # full_name vs lname |
| } |
| """ |
| |
| for permuted in _all_name_combinations(fname, mname, lname): |
| if permuted == full_name.upper().strip(): |
| |
| result= { |
| 'full_name_percent': 100.0, |
| 'firstname_percent': 100.0, |
| 'middlename_percent': 100.0, |
| 'lastname_percent': 100.0 |
| } |
| return result |
|
|
| |
| full_name_percent = calculate_similarity_with_models( |
| full_name, |
| concat_name |
| ) |
|
|
| |
| firstname_percent = calculate_similarity_with_models( |
| full_name, |
| fname if fname else "" |
| ) if fname else 0.0 |
|
|
| |
| middlename_percent = calculate_similarity_with_models( |
| full_name, |
| mname if mname else "" |
| ) if mname else 0.0 |
|
|
| |
| lastname_percent = calculate_similarity_with_models( |
| full_name, |
| lname if lname else "" |
| ) if lname else 0.0 |
|
|
| result={ |
| 'full_name_percent': full_name_percent, |
| 'firstname_percent': firstname_percent, |
| 'middlename_percent': middlename_percent, |
| 'lastname_percent': lastname_percent |
| } |
| return result |
|
|
|
|
| def handle_case3(r1_fname: str, r1_mname: str, r1_lname: str, r1_concat: str, |
| r2_fname: str, r2_mname: str, r2_lname: str, r2_concat: str) -> dict: |
| """ |
| Handle Case 3: Both records have F/M/L |
| Returns a dictionary with separate similarity scores for each component |
| |
| Returns: |
| dict: { |
| 'full_name_percent': float, # r1_concat vs r2_concat |
| 'firstname_percent': float, # r1_fname vs r2_fname |
| 'middlename_percent': float, # r1_mname vs r2_mname |
| 'lastname_percent': float # r1_lname vs r2_lname |
| } |
| """ |
| |
| f_match = check_substring_match(r1_fname, r2_fname) if r1_fname and r2_fname else False |
| m_match = check_substring_match(r1_mname, r2_mname) if r1_mname and r2_mname else False |
| l_match = check_substring_match(r1_lname, r2_lname) if r1_lname and r2_lname else False |
| |
| |
| full_name_percent = calculate_similarity_with_models(r1_concat, r2_concat) |
| |
| |
| |
| if l_match and not f_match and not m_match: |
| full_name_percent = max(full_name_percent, 85.0) |
| |
| |
| |
| elif l_match and (f_match or m_match): |
| full_name_percent = max(full_name_percent, 90.0) |
| |
| |
| |
| |
| |
| |
| firstname_percent = calculate_similarity_with_models( |
| r1_fname, |
| r2_fname |
| ) if r1_fname and r2_fname else 0.0 |
|
|
| |
| middlename_percent = calculate_similarity_with_models( |
| r1_mname, |
| r2_mname |
| ) if r1_mname and r2_mname else 0.0 |
|
|
| |
| lastname_percent = calculate_similarity_with_models( |
| r1_lname, |
| r2_lname |
| ) if r1_lname and r2_lname else 0.0 |
|
|
| result= { |
| 'full_name_percent': full_name_percent, |
| 'firstname_percent': firstname_percent, |
| 'middlename_percent': middlename_percent, |
| 'lastname_percent': lastname_percent |
| } |
| return result |
|
|
| def match_name(name: str, firstname: str, lastname: str, middlename: str) -> float: |
| """ |
| Match name with logic |
| Returns similarity score as float or "missing value" |
| """ |
| name_processed = preprocess_for_matching(name) |
| concat_name = concatenate_name_parts(firstname, middlename, lastname) |
| |
| |
| if name_processed and concat_name and name_processed == concat_name: |
| return 100 |
| |
| |
| if not name_processed and concat_name: |
| return 100 |
| |
| |
| if name_processed and not concat_name: |
| return 100 |
| |
| |
| if name_processed and concat_name and name_processed != concat_name: |
| |
| return match_entities(name_processed, concat_name) |
| |
| |
| return 0 |
|
|
| def match_names_cross_records(r1_name: str, r1_firstname: str, r1_lastname: str, r1_middlename: str, |
| r2_name: str, r2_firstname: str, r2_lastname: str, r2_middlename: str) -> float: |
| """ |
| Match names between two records with three cases |
| Returns similarity score as float or "missing value" |
| """ |
| |
| r1_name_proc = r1_name.upper().strip() if r1_name and r1_name not in ["-", " ", ""] else "" |
| r2_name_proc = r2_name.upper().strip() if r2_name and r2_name not in ["-", " ", ""] else "" |
| |
| |
| r1_has_fullname = bool(r1_name_proc) |
| r2_has_fullname = bool(r2_name_proc) |
| |
| |
| if r1_has_fullname and r2_has_fullname: |
| return handle_case1(r1_name_proc, r2_name_proc,r1_firstname,r1_middlename, r1_lastname, r2_firstname, r2_middlename, r2_lastname) |
| |
| |
| r1_fname = r1_firstname.upper().strip() if r1_firstname and r1_firstname not in ["-", " ", ""] else "" |
| r1_mname = r1_middlename.upper().strip() if r1_middlename and r1_middlename not in ["-", " ", ""] else "" |
| r1_lname = r1_lastname.upper().strip() if r1_lastname and r1_lastname not in ["-", " ", ""] else "" |
| |
| r2_fname = r2_firstname.upper().strip() if r2_firstname and r2_firstname not in ["-", " ", ""] else "" |
| r2_mname = r2_middlename.upper().strip() if r2_middlename and r2_middlename not in ["-", " ", ""] else "" |
| r2_lname = r2_lastname.upper().strip() if r2_lastname and r2_lastname not in ["-", " ", ""] else "" |
| |
| r1_concat = concatenate_name_parts(r1_fname, r1_mname, r1_lname) |
| r2_concat = concatenate_name_parts(r2_fname, r2_mname, r2_lname) |
| |
| |
| if r1_has_fullname and not r2_has_fullname and r2_concat: |
| return handle_case2(r1_name_proc, r2_fname, r2_mname, r2_lname, r2_concat) |
| |
| elif r2_has_fullname and not r1_has_fullname and r1_concat: |
| return handle_case2(r2_name_proc, r1_fname, r1_mname, r1_lname, r1_concat) |
| |
| |
| elif not r1_has_fullname and not r2_has_fullname and r1_concat and r2_concat: |
| return handle_case3(r1_fname, r1_mname, r1_lname, r1_concat, |
| r2_fname, r2_mname, r2_lname, r2_concat) |
| |
| |
| result = { |
| 'full_name_percent': 0.0, |
| 'firstname_percent': 0.0, |
| 'middlename_percent': 0.0, |
| 'lastname_percent': 0.0 |
| } |
| return result |
|
|
| def match_addresses_1_to_n(addresses_r1: List[str], addresses_r2: List[str]) -> float: |
| """ |
| Match addresses 1:N - if any address in R1 matches any in R2 |
| Returns similarity score as float or "missing value" |
| """ |
| valid_addr1 = [preprocess_for_matching(addr) for addr in addresses_r1 if addr and addr not in ["-", " ", ""]] |
| valid_addr2 = [preprocess_for_matching(addr) for addr in addresses_r2 if addr and addr not in ["-", " ", ""]] |
| print("address1 for matching:",valid_addr1) |
| print("address2 for matching:",valid_addr2) |
| if not valid_addr1 or not valid_addr2: |
| return 0 |
| |
| best_score = 0 |
| |
| |
| for addr1 in valid_addr1: |
| for addr2 in valid_addr2: |
| result = match_entities(addr1, addr2) |
| |
| try: |
| score = float(result) |
| if score > best_score: |
| best_score = score |
| except (TypeError, ValueError): |
| |
| continue |
| |
| return best_score |
|
|
| def match_single_field(value1: str, value2: str) -> float: |
| """ |
| Match single fields like SPOUSENAME, MOTHERNAME, etc. |
| Returns similarity score as float or "missing value" |
| """ |
| return match_entities(value1, value2) |