pujithapsx's picture
without pincode integration
2f8960f
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, Optional, Tuple
from rapidfuzz import fuzz
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import re
import itertools
import unicodedata
from rules import standardize_address
# ---------- Model Store ----------
MODEL_STORE = {
"model1": SentenceTransformer("sentence-transformers/all-mpnet-base-v2"),
"model2": SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2"),
}
class NameAddressPreprocessor:
"""
Preprocessor for name and address matching in HDFC project.
Cleans text while preserving important characters like -, /, and ,
Handles repeated characters intelligently based on whether they're required.
"""
def __init__(self):
# Characters to preserve in addresses
self.preserve_chars = {'-', '/', ',', '.','&', '#', ':',';','_',"'"}
# Characters that are allowed to have repetitions (required chars)
self.required_repeated_chars = {'-', '/', ',', '#'}
def normalize_unicode(self, text):
"""Normalize unicode characters to their closest ASCII equivalent"""
if not text:
return ""
# Normalize to NFKD form and encode to ASCII, ignoring errors
text = unicodedata.normalize('NFKD', text)
text = text.encode('ASCII', 'ignore').decode('ASCII')
return text
def remove_extra_whitespace(self, text):
"""Remove extra whitespace while preserving single spaces"""
if not text:
return ""
# Replace multiple spaces/tabs with single space
text = re.sub(r'[ \t]+', ' ', text)
# Strip leading/trailing whitespace
return text.strip()
def handle_repeated_characters(self, text):
"""
Handle repeated SPECIAL characters intelligently:
- First, handle patterns like '.,.,.' or '.,.,' (alternating or mixed special chars)
- Then handle consecutive repeated characters like '---' or '....'
- If special character is required (in required_repeated_chars), replace repeated chars with single char
- If special character is NOT required, replace ALL repeated occurrences with single space
- IGNORES repeated letters and numbers (they are preserved as-is)
Examples:
- '---' (required) -> '-'
- '!!!!' (not required) -> ' '
- '....' (not required) -> ' '
- '////' (required) -> '/'
- 'AAAA' (letters) -> 'AAAA' (unchanged)
- '1111' (numbers) -> '1111' (unchanged)
- '.,.,.' -> ' '
- ',.,.' -> ' '
"""
if not text:
return ""
# Step 1: Handle patterns like '.,.,.' or mixed special characters
# This pattern matches special characters separated by other special chars (like comma-period patterns)
# Match 2+ special chars with optional special chars between them
text = re.sub(r'([^a-zA-Z0-9\s])([^a-zA-Z0-9\s])+', lambda m:
m.group(0)[0] if m.group(0)[0] in self.required_repeated_chars else ' ', text)
# Step 2: Handle consecutive repeated special characters
def replace_repeated(match):
char = match.group(0)[0] # Get the character being repeated
repeated_count = len(match.group(0))
# Only process if it's actually repeated (2 or more times)
if repeated_count < 2:
return match.group(0)
# If character is in required_repeated_chars, keep single instance
if char in self.required_repeated_chars:
return char
# For any other special character (whether in preserve_chars or not), replace with space
else:
return ' '
# Match only NON-ALPHANUMERIC characters that are repeated 2 or more times
text = re.sub(r'([^a-zA-Z0-9\s])\1+', replace_repeated, text)
return text
def remove_repeated_punctuation(self, text):
"""
Remove repeated punctuation marks (legacy method - kept for compatibility)
Now delegates to handle_repeated_characters for better logic
"""
if not text:
return ""
return self.handle_repeated_characters(text)
def remove_special_chars(self, text, preserve_chars=None):
"""
Remove special characters except those specified to preserve
Default preserves: - / , . # & : ; _ '
"""
if not text:
return ""
if preserve_chars is None:
preserve_chars = self.preserve_chars
# Create pattern for allowed characters
# Keep alphanumeric, spaces, and preserved special chars
preserve_pattern = ''.join(re.escape(c) for c in preserve_chars)
pattern = f'[^a-zA-Z0-9\s{preserve_pattern}]'
# Remove unwanted special characters
text = re.sub(pattern, '', text)
return text
def standardize_case(self, text, mode='upper'):
"""
Standardize text case for comparison
mode: 'upper', 'lower', or 'title'
"""
if not text:
return ""
if mode == 'upper':
return text.upper()
elif mode == 'lower':
return text.lower()
elif mode == 'title':
return text.title()
return text
def remove_control_characters(self, text):
"""Remove control characters (non-printable) and unwanted whitespace characters"""
if not text:
return ""
# Remove control characters and replace tabs/newlines with space
text = ''.join(char if unicodedata.category(char)[0] != 'C' else ' '
if char in '\n\t' else char
for char in text
if unicodedata.category(char)[0] != 'C' or char in '\n\t')
return text
def remove_digits(self, text):
"""Remove all digits from text"""
if not text:
return ""
return re.sub(r'\d', '', text)
def preprocess_name(self, name):
"""
Preprocess name for matching
Steps: Remove control chars -> Normalize unicode -> Handle repeated chars ->
Remove special chars -> Remove extra whitespace -> Standardize case
"""
if not name:
return ""
print("org name",name)
# Remove control characters and convert tabs/newlines to spaces
name = self.remove_control_characters(name)
# Normalize unicode
name = self.normalize_unicode(name)
# Handle repeated SPECIAL characters (not letters/numbers)
name = self.handle_repeated_characters(name)
# For names, preserve fewer special chars (only hyphen, apostrophe, and period)
name_preserve = {'-', "'", '.'}
name = self.remove_special_chars(name, preserve_chars=name_preserve)
# Remove extra whitespace
name = self.remove_extra_whitespace(name)
# Standardize to uppercase for comparison
name = self.standardize_case(name, mode='upper')
### Remove digits
name = self.remove_digits(name)
print("cleaned name",name)
return name
def preprocess_address(self, address):
"""
Preprocess address for matching
Steps: Remove control chars -> Normalize unicode -> Handle repeated chars ->
Remove special chars -> Remove extra whitespace -> Standardize case
"""
if not address:
return ""
print("org address",address)
# Remove control characters and convert tabs/newlines to spaces
address = self.remove_control_characters(address)
# Normalize unicode
address = self.normalize_unicode(address)
# Handle repeated SPECIAL characters intelligently (not letters/numbers)
address = self.handle_repeated_characters(address)
# Remove special chars while preserving important ones
address = self.remove_special_chars(address)
# Remove extra whitespace (do this again after other cleaning)
address = self.remove_extra_whitespace(address)
# Standardize to uppercase for comparison
address = self.standardize_case(address, mode='upper')
address= standardize_address(address)
print("cleaned address",address)
return address
# ---------- Text Preprocessing ----------
def preprocess_for_matching(text: str) -> str:
"""Standardize text for matching"""
if not text or text in ["-", " ", ""]:
return ""
return text.upper().strip()
# ---------- Core Matching Functions ----------
def calculate_fuzzy_scores(input1: str, input2: str) -> Dict[str, float]:
"""Calculate fuzzy matching scores using RapidFuzz"""
return {
"simple_ratio": fuzz.ratio(input1, input2),
"token_set_ratio": fuzz.token_set_ratio(input1, input2),
"w_ratio": fuzz.WRatio(input1, input2),
"partial_ratio": fuzz.partial_ratio(input1, input2),
}
def calculate_semantic_similarity(model_name: str, input1: str, input2: str) -> float:
"""Calculate semantic similarity using sentence transformers"""
model = MODEL_STORE[model_name]
print("input1 to the embedding model:",input1)
print("input2 to the embedding model:",input2)
embedding1 = model.encode([input1])
embedding2 = model.encode([input2])
return cosine_similarity(embedding1, embedding2)[0][0]
def calculate_final_score(fuzzy_scores: Dict[str, float], semantic_score: float) -> float:
"""Calculate weighted final score"""
weights = {
"simple_ratio": 0.15,
"token_set_ratio": 0.40,
"partial_ratio": 0.20,
"w_ratio": 0.05,
"semantic_score": 0.20,
}
normalized_scores = {
"simple_ratio": fuzzy_scores.get("simple_ratio", 0),
"token_set_ratio": fuzzy_scores.get("token_set_ratio", 0),
"partial_ratio": fuzzy_scores.get("partial_ratio", 0),
"w_ratio": fuzzy_scores.get("w_ratio", 0),
"semantic_score": semantic_score * 100,
}
weighted_sum = sum(normalized_scores[key] * weight for key, weight in weights.items())
return max(0, min(100, weighted_sum))
def calculate_overall_similarity(score1: float, score2: float) -> float:
"""Calculate overall similarity from two model scores"""
return score1 * 0.6 + score2 * 0.4
def check_substring_match(str1: str, str2: str) -> bool:
"""Check if one string is a substring of another"""
if not str1 or not str2:
return False
return str1 in str2 or str2 in str1
def check_individual_name_matches(name_full: str, fname: str, mname: str, lname: str) -> Tuple[bool, bool, bool]:
"""
Check if full name contains first, middle, or last name as substring
Returns: (first_match, middle_match, last_match)
"""
f_match = check_substring_match(name_full, fname) if fname else False
m_match = check_substring_match(name_full, mname) if mname else False
l_match = check_substring_match(name_full, lname) if lname else False
return f_match, m_match, l_match
def concatenate_name_parts(firstname: str, middlename: str, lastname: str) -> str:
"""Concatenate name parts"""
parts = []
if firstname and firstname not in ["-", " ", ""]:
parts.append(firstname.upper().strip())
if middlename and middlename not in ["-", " ", ""]:
parts.append(middlename.upper().strip())
if lastname and lastname not in ["-", " ", ""]:
parts.append(lastname.upper().strip())
if not parts:
return ""
parts.sort()
return " ".join(parts)
# ---------- helpers used only inside the new logic ----------
def _normalize_and_sort(name: str) -> str:
"""
1. Split on any non-alphanumeric character (space, underscore, comma, etc.)
2. Remove empty tokens
3. Upper-case
4. Sort alphabetically
5. Re-join with single space
"""
tokens = re.split(r'[^A-Za-z0-9]+', name.strip())
tokens = [t.upper() for t in tokens if t]
return ' '.join(sorted(tokens))
def _all_name_combinations(fname: str, mname: str, lname: str) -> list[str]:
"""
Return every possible ordering of the supplied parts,
dropping any empty/blank components.
"""
parts = []
for p in (fname, mname, lname):
if p and p.strip() not in ('-', '', ' '):
parts.append(p.strip().upper())
if not parts:
return []
# itertools.permutations gives every ordering
return [' '.join(order) for order in itertools.permutations(parts)]
def match_entities(value1: str, value2: str) -> float:
"""
Match two entities using fuzzy + semantic similarity
Returns: similarity score as float (0-100)
"""
standardized_input1 = preprocess_for_matching(value1)
standardized_input2 = preprocess_for_matching(value2)
if not standardized_input1 or not standardized_input2:
return 0
# Calculate fuzzy scores
fuzzy_match_scores = calculate_fuzzy_scores(standardized_input1, standardized_input2)
print("standardized input1",standardized_input1)
print("standardized input2",standardized_input2)
# Calculate semantic similarity using both models in parallel
with ThreadPoolExecutor() as executor:
f1 = executor.submit(calculate_semantic_similarity, "model1", standardized_input1, standardized_input2)
f2 = executor.submit(calculate_semantic_similarity, "model2", standardized_input1, standardized_input2)
cosine1 = f1.result()
cosine2 = f2.result()
ff1 = executor.submit(calculate_final_score, fuzzy_match_scores, cosine1)
ff2 = executor.submit(calculate_final_score, fuzzy_match_scores, cosine2)
final1 = ff1.result()
final2 = ff2.result()
overall_similarity = calculate_overall_similarity(final1, final2)
return round(overall_similarity, 2)
def calculate_similarity_with_models(text1: str, text2: str) -> float:
"""
Calculate similarity using fuzzy scores and embedding models
Returns similarity percentage as float
"""
if not text1 or not text2:
return 0
# Calculate fuzzy scores
fuzzy_scores = {
"simple_ratio": fuzz.ratio(text1, text2),
"token_set_ratio": fuzz.token_set_ratio(text1, text2),
"w_ratio": fuzz.WRatio(text1, text2),
"partial_ratio": fuzz.partial_ratio(text1, text2),
}
# Calculate semantic similarity using both models
with ThreadPoolExecutor() as executor:
model1 = MODEL_STORE["model1"]
model2 = MODEL_STORE["model2"]
f1 = executor.submit(lambda: cosine_similarity(
model1.encode([text1]), model1.encode([text2]))[0][0])
f2 = executor.submit(lambda: cosine_similarity(
model2.encode([text1]), model2.encode([text2]))[0][0])
cosine1 = f1.result()
cosine2 = f2.result()
# Calculate final scores
weights = {
"simple_ratio": 0.15,
"token_set_ratio": 0.40,
"partial_ratio": 0.20,
"w_ratio": 0.05,
"semantic_score": 0.20,
}
def calc_final(fuzzy, semantic):
normalized = {
"simple_ratio": fuzzy["simple_ratio"],
"token_set_ratio": fuzzy["token_set_ratio"],
"partial_ratio": fuzzy["partial_ratio"],
"w_ratio": fuzzy["w_ratio"],
"semantic_score": semantic * 100,
}
return sum(normalized[k] * weights[k] for k in weights.keys())
final1 = calc_final(fuzzy_scores, cosine1)
final2 = calc_final(fuzzy_scores, cosine2)
overall_similarity = final1 * 0.6 + final2 * 0.4
return round(overall_similarity, 2)
def handle_case1(full_name1: str, full_name2: str,
r1_fname: str, r1_mname: str, r1_lname: str,
r2_fname: str, r2_mname: str, r2_lname: str) -> dict:
"""
Case-1 (both records supply a full name)
Returns a dictionary with separate similarity scores for each component
Returns:
dict: {
'full_name_percent': float, # full_name1 vs full_name2
'firstname_percent': float, # r1_fname vs r2_fname
'middlename_percent': float, # r1_mname vs r2_mname
'lastname_percent': float # r1_lname vs r2_lname
}
"""
# Handle empty full names
if not full_name1 or not full_name2:
result={
'full_name_percent': 0.0,
'firstname_percent': 0.0,
'middlename_percent': 0.0,
'lastname_percent': 0.0
}
return result
# 1. Normalize + alphabetically sort each full name and calculate similarity
sorted1 = _normalize_and_sort(full_name1)
sorted2 = _normalize_and_sort(full_name2)
full_name_percent = calculate_similarity_with_models(sorted1, sorted2)
# 2. Calculate firstname_percent: compare firstnames
firstname_percent = calculate_similarity_with_models(
r1_fname,
r2_fname
) if r1_fname and r2_fname else 0.0
# 3. Calculate middlename_percent: compare middlenames
middlename_percent = calculate_similarity_with_models(
r1_mname,
r2_mname
) if r1_mname and r2_mname else 0.0
# 4. Calculate lastname_percent: compare lastnames
lastname_percent = calculate_similarity_with_models(
r1_lname,
r2_lname
) if r1_lname and r2_lname else 0.0
result={
'full_name_percent': full_name_percent,
'firstname_percent': firstname_percent,
'middlename_percent': middlename_percent,
'lastname_percent': lastname_percent
}
return result
def handle_case2(full_name: str,
fname: str, mname: str, lname: str,
concat_name: str) -> dict:
"""
Case-2 (one side has full name, the other has F/M/L)
Returns a dictionary with separate similarity scores for each component
Returns:
dict: {
'full_name_percent': float, # full_name vs concat_name
'firstname_percent': float, # full_name vs fname
'middlename_percent': float, # full_name vs mname
'lastname_percent': float # full_name vs lname
}
"""
# 0. Try every permutation of F/M/L
for permuted in _all_name_combinations(fname, mname, lname):
if permuted == full_name.upper().strip():
# Perfect match - all components get 100%
result= {
'full_name_percent': 100.0,
'firstname_percent': 100.0,
'middlename_percent': 100.0,
'lastname_percent': 100.0
}
return result
# 1. Calculate full_name_percent: compare full_name with concatenated name
full_name_percent = calculate_similarity_with_models(
full_name,
concat_name
)
# 2. Calculate firstname_percent: compare full_name with firstname only
firstname_percent = calculate_similarity_with_models(
full_name,
fname if fname else ""
) if fname else 0.0
# 3. Calculate middlename_percent: compare full_name with middlename only
middlename_percent = calculate_similarity_with_models(
full_name,
mname if mname else ""
) if mname else 0.0
# 4. Calculate lastname_percent: compare full_name with lastname only
lastname_percent = calculate_similarity_with_models(
full_name,
lname if lname else ""
) if lname else 0.0
result={
'full_name_percent': full_name_percent,
'firstname_percent': firstname_percent,
'middlename_percent': middlename_percent,
'lastname_percent': lastname_percent
}
return result
def handle_case3(r1_fname: str, r1_mname: str, r1_lname: str, r1_concat: str,
r2_fname: str, r2_mname: str, r2_lname: str, r2_concat: str) -> dict:
"""
Handle Case 3: Both records have F/M/L
Returns a dictionary with separate similarity scores for each component
Returns:
dict: {
'full_name_percent': float, # r1_concat vs r2_concat
'firstname_percent': float, # r1_fname vs r2_fname
'middlename_percent': float, # r1_mname vs r2_mname
'lastname_percent': float # r1_lname vs r2_lname
}
"""
# Check substring matches for each component
f_match = check_substring_match(r1_fname, r2_fname) if r1_fname and r2_fname else False
m_match = check_substring_match(r1_mname, r2_mname) if r1_mname and r2_mname else False
l_match = check_substring_match(r1_lname, r2_lname) if r1_lname and r2_lname else False
# Calculate full_name_percent: compare concatenated names
full_name_percent = calculate_similarity_with_models(r1_concat, r2_concat)
# Apply boosting logic based on substring matches
# Rule 1: Only lastname matches (family match)
if l_match and not f_match and not m_match:
full_name_percent = max(full_name_percent, 85.0) # Ensure minimum 85% for family match
# Rule 2: Lastname + (firstname or middle) matches (partial match)
# Strong indicator of same person
elif l_match and (f_match or m_match):
full_name_percent = max(full_name_percent, 90.0) # Higher confidence when lastname + another field matches
# Rule 3: No matches at all or only firstname/middlename matches
# Use the calculated similarity as-is
# Calculate individual component percentages
# 2. Calculate firstname_percent: compare firstnames
firstname_percent = calculate_similarity_with_models(
r1_fname,
r2_fname
) if r1_fname and r2_fname else 0.0
# 3. Calculate middlename_percent: compare middlenames
middlename_percent = calculate_similarity_with_models(
r1_mname,
r2_mname
) if r1_mname and r2_mname else 0.0
# 4. Calculate lastname_percent: compare lastnames
lastname_percent = calculate_similarity_with_models(
r1_lname,
r2_lname
) if r1_lname and r2_lname else 0.0
result= {
'full_name_percent': full_name_percent,
'firstname_percent': firstname_percent,
'middlename_percent': middlename_percent,
'lastname_percent': lastname_percent
}
return result
def match_name(name: str, firstname: str, lastname: str, middlename: str) -> float:
"""
Match name with logic
Returns similarity score as float or "missing value"
"""
name_processed = preprocess_for_matching(name)
concat_name = concatenate_name_parts(firstname, middlename, lastname)
# Case 1: NAME matches concatenated name
if name_processed and concat_name and name_processed == concat_name:
return 100
# Case 2: NAME is empty, use concatenated
if not name_processed and concat_name:
return 100
# Case 3: Concat is empty, use NAME
if name_processed and not concat_name:
return 100
# Case 4: Both exist but different - use model
if name_processed and concat_name and name_processed != concat_name:
# Pass both to model for fuzzy matching
return match_entities(name_processed, concat_name)
# Both empty
return 0
def match_names_cross_records(r1_name: str, r1_firstname: str, r1_lastname: str, r1_middlename: str,
r2_name: str, r2_firstname: str, r2_lastname: str, r2_middlename: str) -> float:
"""
Match names between two records with three cases
Returns similarity score as float or "missing value"
"""
# Preprocess all inputs
r1_name_proc = r1_name.upper().strip() if r1_name and r1_name not in ["-", " ", ""] else ""
r2_name_proc = r2_name.upper().strip() if r2_name and r2_name not in ["-", " ", ""] else ""
# Determine which case we're in
r1_has_fullname = bool(r1_name_proc)
r2_has_fullname = bool(r2_name_proc)
# CASE 1: Both records have full names
if r1_has_fullname and r2_has_fullname:
return handle_case1(r1_name_proc, r2_name_proc,r1_firstname,r1_middlename, r1_lastname, r2_firstname, r2_middlename, r2_lastname)
# Only process F/M/L fields if we're not in Case 1
r1_fname = r1_firstname.upper().strip() if r1_firstname and r1_firstname not in ["-", " ", ""] else ""
r1_mname = r1_middlename.upper().strip() if r1_middlename and r1_middlename not in ["-", " ", ""] else ""
r1_lname = r1_lastname.upper().strip() if r1_lastname and r1_lastname not in ["-", " ", ""] else ""
r2_fname = r2_firstname.upper().strip() if r2_firstname and r2_firstname not in ["-", " ", ""] else ""
r2_mname = r2_middlename.upper().strip() if r2_middlename and r2_middlename not in ["-", " ", ""] else ""
r2_lname = r2_lastname.upper().strip() if r2_lastname and r2_lastname not in ["-", " ", ""] else ""
r1_concat = concatenate_name_parts(r1_fname, r1_mname, r1_lname)
r2_concat = concatenate_name_parts(r2_fname, r2_mname, r2_lname)
# CASE 2: One has full name, other has F/M/L
if r1_has_fullname and not r2_has_fullname and r2_concat:
return handle_case2(r1_name_proc, r2_fname, r2_mname, r2_lname, r2_concat)
elif r2_has_fullname and not r1_has_fullname and r1_concat:
return handle_case2(r2_name_proc, r1_fname, r1_mname, r1_lname, r1_concat)
# CASE 3: Both have F/M/L
elif not r1_has_fullname and not r2_has_fullname and r1_concat and r2_concat:
return handle_case3(r1_fname, r1_mname, r1_lname, r1_concat,
r2_fname, r2_mname, r2_lname, r2_concat)
# Missing data
result = {
'full_name_percent': 0.0,
'firstname_percent': 0.0,
'middlename_percent': 0.0,
'lastname_percent': 0.0
}
return result
def match_addresses_1_to_n(addresses_r1: List[str], addresses_r2: List[str]) -> float:
"""
Match addresses 1:N - if any address in R1 matches any in R2
Returns similarity score as float or "missing value"
"""
valid_addr1 = [preprocess_for_matching(addr) for addr in addresses_r1 if addr and addr not in ["-", " ", ""]]
valid_addr2 = [preprocess_for_matching(addr) for addr in addresses_r2 if addr and addr not in ["-", " ", ""]]
print("address1 for matching:",valid_addr1)
print("address2 for matching:",valid_addr2)
if not valid_addr1 or not valid_addr2:
return 0
best_score = 0
# Compare each address in R1 with each in R2
for addr1 in valid_addr1:
for addr2 in valid_addr2:
result = match_entities(addr1, addr2)
# Convert to float to handle numpy types
try:
score = float(result)
if score > best_score:
best_score = score
except (TypeError, ValueError):
# If conversion fails, skip this result
continue
return best_score
def match_single_field(value1: str, value2: str) -> float:
"""
Match single fields like SPOUSENAME, MOTHERNAME, etc.
Returns similarity score as float or "missing value"
"""
return match_entities(value1, value2)