pujithapsx's picture
with pincode proper logic added
ba3c1b6
import re
from datetime import datetime
from typing import List, Dict
import pgeocode
import math
APARTMENT_IDENTIFIER="APARTMENT NO"
FLAT_NUMBER_IDENTIFIER="FLAT NO"
HOUSE_NUMBER_IDENTIFIER= "HOUSE_NO"
STREET_KEYWORD="STREET"
# =========================================================
# PINCODE SIMILARITY FUNCTION
# =========================================================
def pincode_similarity_india(pin1, pin2):
"""
Calculate similarity between two Indian pincodes based on geographic distance
and metro/non-metro classification.
Args:
pin1: First pincode (string or int)
pin2: Second pincode (string or int)
Returns:
dict: Contains match status, similarity score, distance, and classification details,
plus geocoding details (county_name, state_name for both pins)
"""
INVALID_VALUES = {None, "", "-", "NA", "N/A", "NULL"}
def is_missing(pin):
return pin is None or str(pin).strip().upper() in INVALID_VALUES
if is_missing(pin1) or is_missing(pin2):
return {
"match": False,
"similarity_score": None,
"distance_km": None,
"area_type": "Missing pincode",
"reason": "One or both pincodes are null / empty / placeholder",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# ========== INPUT VALIDATION & NORMALIZATION ==========
try:
pin1 = str(pin1).strip().zfill(6)
pin2 = str(pin2).strip().zfill(6)
# ========== HARD SHORT-CIRCUIT: EXACT SAME PIN ==========
if pin1 == pin2:
# Still need to get geocoding data for city/state extraction
try:
nomi = pgeocode.Nominatim("IN")
p1 = nomi.query_postal_code(pin1)
# Extract city and state
county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None
state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None
return {
"match": True,
"similarity_score": 100,
"distance_km": 0.0,
"area_type": "Exact same pincode",
"is_metro_logic": None,
"is_extended_metro": None,
"metro_cluster": None,
"pin1_prefix": pin1[:3],
"pin2_prefix": pin2[:3],
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": county_name,
"pin2_county_name": county_name,
"pin1_state_name": state_name,
"pin2_state_name": state_name,
"pin1_location": None,
"pin2_location": None,
}
except Exception as e:
return {
"match": True,
"similarity_score": 100,
"distance_km": 0.0,
"area_type": "Exact same pincode",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
except (ValueError, AttributeError):
return {
"match": False,
"similarity_score": 0,
"reason": "Invalid pincode format - cannot convert to string",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# Validate format
if len(pin1) != 6 or len(pin2) != 6:
return {
"match": False,
"similarity_score": 0,
"reason": f"Invalid pincode length (pin1: {len(pin1)}, pin2: {len(pin2)})",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
if not pin1.isdigit() or not pin2.isdigit():
return {
"match": False,
"similarity_score": 0,
"reason": "Pincode must contain only digits",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# Check for invalid ranges (Indian pincodes: 110001-855117)
pin1_num = int(pin1)
pin2_num = int(pin2)
if pin1_num < 110001 or pin1_num > 855117 or pin2_num < 110001 or pin2_num > 855117:
return {
"match": False,
"similarity_score": 0,
"reason": "Pincode outside valid Indian range (110001-855117)",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# ========== CONFIGURATION ==========
# Major metro city prefixes (3-digit)
METRO_PIN_PREFIXES = {
"110", # Delhi NCR
"400", # Mumbai
"560", # Bengaluru
"600", # Chennai
"500", # Hyderabad
"700", # Kolkata
"411", # Pune
"380", # Ahmedabad
}
# Extended metro regions (satellite cities, suburbs)
EXTENDED_METROS = [
{"110", "201", "122", "121", "124"}, # Delhi—Noida—Gurgaon—Faridabad—Ghaziabad
{"400", "421", "410"}, # Mumbai—Thane—Navi Mumbai
{"500", "501"}, # Hyderabad—Secunderabad
{"560", "562"}, # Bengaluru—Whitefield—Electronic City
{"600", "601", "603"}, # Chennai—Kanchipuram—Chengalpattu
{"700", "711", "712"}, # Kolkata—Howrah—Hooghly
]
# Distance thresholds for metro areas (km)
METRO_THRESHOLDS = {
"same_locality": 8, # Very close neighborhoods
"nearby": 15, # Adjacent areas/suburbs
"same_metro": 35, # Within metro limits
"extended_metro": 60, # Extended metro region
}
# Distance thresholds for non-metro areas (km)
NON_METRO_THRESHOLDS = {
"same_locality": 5, # Same town/village cluster
"nearby": 12, # Adjacent towns
"same_district": 40, # Within district (approximate)
}
# ========== UTILITY FUNCTIONS ==========
def haversine(lat1, lon1, lat2, lon2):
"""Calculate distance between two lat/lon points using Haversine formula"""
R = 6371 # Earth's radius in kilometers
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (
math.sin(dlat / 2) ** 2 +
math.cos(math.radians(lat1)) *
math.cos(math.radians(lat2)) *
math.sin(dlon / 2) ** 2
)
c = 2 * math.asin(math.sqrt(a))
return R * c
# ========== GEOCODING LOOKUP ==========
try:
nomi = pgeocode.Nominatim("IN")
p1 = nomi.query_postal_code(pin1)
p2 = nomi.query_postal_code(pin2)
print("extracted pincode1 details", p1)
print("extracted pincode2 details", p2)
except Exception as e:
return {
"match": False,
"similarity_score": 0,
"reason": f"Geocoding service error: {str(e)}",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# Check if geocoding was successful
if p1 is None or p2 is None:
return {
"match": False,
"similarity_score": 0,
"reason": "Geocoding returned None",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
if (p1.latitude is None or p1.longitude is None or
p2.latitude is None or p2.longitude is None or
math.isnan(p1.latitude) or math.isnan(p2.latitude)):
return {
"match": False,
"similarity_score": 0,
"reason": "Pincode not found in geocoding database",
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": None,
"pin2_county_name": None,
"pin1_state_name": None,
"pin2_state_name": None
}
# ========== EXTRACT CITY AND STATE FROM GEOCODING ==========
# Extract county_name (city) and state_name for both pincodes
pin1_county_name = p1.county_name if hasattr(p1, 'county_name') and not (p1.county_name is None or (isinstance(p1.county_name, float) and math.isnan(p1.county_name))) else None
pin2_county_name = p2.county_name if hasattr(p2, 'county_name') and not (p2.county_name is None or (isinstance(p2.county_name, float) and math.isnan(p2.county_name))) else None
pin1_state_name = p1.state_name if hasattr(p1, 'state_name') and not (p1.state_name is None or (isinstance(p1.state_name, float) and math.isnan(p1.state_name))) else None
pin2_state_name = p2.state_name if hasattr(p2, 'state_name') and not (p2.state_name is None or (isinstance(p2.state_name, float) and math.isnan(p2.state_name))) else None
# ========== DISTANCE CALCULATION ==========
distance = haversine(
p1.latitude, p1.longitude,
p2.latitude, p2.longitude
)
# ========== PREFIX EXTRACTION ==========
prefix1 = pin1[:3]
prefix2 = pin2[:3]
# ========== METRO CLASSIFICATION ==========
is_metro = False
is_extended_metro = False
metro_cluster_name = None
# Check if both pincodes belong to same extended metro cluster
for cluster in EXTENDED_METROS:
if prefix1 in cluster and prefix2 in cluster:
is_extended_metro = True
is_metro = True # Extended metros use metro logic
# Identify cluster for labeling
if "110" in cluster:
metro_cluster_name = "Delhi NCR"
elif "400" in cluster:
metro_cluster_name = "Mumbai Metropolitan Region"
elif "500" in cluster:
metro_cluster_name = "Hyderabad Metro"
elif "560" in cluster:
metro_cluster_name = "Bengaluru Metro"
elif "600" in cluster:
metro_cluster_name = "Chennai Metro"
elif "700" in cluster:
metro_cluster_name = "Kolkata Metro"
break
# Check if same metro prefix (both in same city)
if not is_metro and prefix1 == prefix2 and prefix1 in METRO_PIN_PREFIXES:
is_metro = True
# Get city name
metro_map = {
"110": "Delhi", "400": "Mumbai", "560": "Bengaluru",
"600": "Chennai", "500": "Hyderabad", "700": "Kolkata",
"411": "Pune", "380": "Ahmedabad"
}
metro_cluster_name = metro_map.get(prefix1, "Metro City")
# Check if at least one is a metro (for cross-metro cases)
one_is_metro = prefix1 in METRO_PIN_PREFIXES or prefix2 in METRO_PIN_PREFIXES
# ========== SIMILARITY SCORING LOGIC ==========
score = 0
if is_metro:
# Metro area logic
if distance <= METRO_THRESHOLDS["same_locality"]:
score = 95
elif distance <= METRO_THRESHOLDS["nearby"]:
score = 85
elif distance <= METRO_THRESHOLDS["same_metro"]:
score = 70
elif is_extended_metro and distance <= METRO_THRESHOLDS["extended_metro"]:
score = 60
else:
score = 35
elif one_is_metro and not is_metro:
# Cross-metro or metro-to-non-metro
if distance <= 20:
score = 50
else:
score = 25
else:
# Non-metro logic
same_state = False
if hasattr(p1, 'state_name') and hasattr(p2, 'state_name'):
same_state = p1.state_name == p2.state_name
if distance <= NON_METRO_THRESHOLDS["same_locality"]:
score = 92
elif distance <= NON_METRO_THRESHOLDS["nearby"]:
score = 75
elif distance <= NON_METRO_THRESHOLDS["same_district"]:
score = 55
elif same_state and distance <= 100:
score = 40
else:
score = 20
# ========== RETURN RESULT ==========
return {
"match": score >= 60,
"similarity_score": score,
"distance_km": distance,
"pin1": pin1,
"pin2": pin2,
"pin1_county_name": pin1_county_name,
"pin2_county_name": pin2_county_name,
"pin1_state_name": pin1_state_name,
"pin2_state_name": pin2_state_name,
"area_type": metro_cluster_name if is_metro else "Non-metro",
"is_metro_logic": is_metro,
"is_extended_metro": is_extended_metro
}
# =========================================================
# NORMALIZATION & PREPROCESSING
# =========================================================
def preprocess_text(text):
"""Remove extra trailing/leading spaces and normalize whitespace"""
if not text:
return ""
text = re.sub(r"\s+", " ", text.strip())
return text
def normalize_text(text):
"""Normalize text to uppercase and remove extra spaces"""
return re.sub(r"\s+", " ", text.upper().strip()) if text else ""
# =========================================================
# VALIDATION FUNCTIONS
# =========================================================
def validate_and_normalize_pincode(pincode):
"""
Validate and normalize pincode to exactly 6 digits
Returns normalized pincode or None if invalid
"""
if not pincode:
return None
digits = re.sub(r'\D', '', str(pincode).strip())
if len(digits) == 6:
return digits
return None
def validate_and_normalize_phone(phone):
"""
Validate and normalize phone to exactly 10 digits
Handles formats: +91, 91-, 91, or plain 10 digits
Returns normalized 10-digit phone or None if invalid
"""
if not phone:
return None
phone_str = str(phone).strip()
# Remove common prefixes and separators
phone_str = re.sub(r'^\+91[-\s]?', '', phone_str)
phone_str = re.sub(r'^91[-\s]?', '', phone_str)
phone_str = re.sub(r'^0[-\s]?', '', phone_str)
digits = re.sub(r'\D', '', phone_str)
if len(digits) == 10:
return digits
return None
def validate_and_normalize_email(email):
"""
Validate and normalize email using regex
Returns normalized email or None if invalid
"""
if not email:
return None
email_str = str(email).strip().lower()
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if re.match(email_pattern, email_str):
return email_str
return None
def normalize_dob(dob_str):
"""Normalize DOB to YYYY-MM-DD format"""
if not dob_str:
return None
formats = [
"%Y-%m-%d", "%Y/%m/%d",
"%d-%m-%Y", "%d/%m/%Y",
"%m-%d-%Y", "%m/%d/%Y",
"%Y-%d-%m", "%Y/%d/%m"
]
for fmt in formats:
try:
dt = datetime.strptime(dob_str, fmt)
dt=dt.strftime("%d-%m-%Y")
print("date",dt)
return dt
except ValueError:
continue
return None
STATE_MAPPING = {
"andhra pradesh": [
"andhra pradesh", "andhrapradesh", "andhra",
"ap", "a.p", "a.p.", "ap state","in-ap"
],
"arunachal pradesh": [
"arunachal pradesh", "arunachal",
"ar", "a.r", "arunachal pradesh state","in-ar"
],
"assam": [
"assam", "as", "a.s", "assam state", "in-as"
],
"bihar": [
"bihar", "br", "b.r", "bihar state", "in-br"
],
"chhattisgarh": [
"chhattisgarh", "chattisgarh", "chhatisgarh",
"cg", "c.g", "ct", "chattisgarh state","in-cg"
],
"goa": [
"goa", "ga", "g.a","in-ga"
],
"gujarat": [
"gujarat", "gujrat", "gujarath",
"gj", "g.j", "in-gj"
],
"haryana": [
"haryana", "hariyana",
"hr", "h.r","in-hr"
],
"himachal pradesh": [
"himachal pradesh", "himachal",
"hp", "h.p", "h.p.","in-hp"
],
"jharkhand": [
"jharkhand", "jh", "j.h", "in-jh"
],
"karnataka": [
"karnataka", "karnatak", "karn",
"ka", "k.a", "mysore state","in-ka"
],
"kerala": [
"kerala", "keralam","kl", "k.l", "in-kl"
],
"madhya pradesh": [
"madhya pradesh", "madhyapradesh", "madhya",
"mp", "m.p", "m.p.", "mp state","in-mp"
],
"maharashtra": [
"maharashtra", "maharastra", "maha",
"mh", "m.h", "maharashtra state","in-mh"
],
"manipur": [
"manipur", "mn", "m.n","in-mn"
],
"meghalaya": [
"meghalaya", "ml", "m.l","in-ml"
],
"mizoram": [
"mizoram", "mz", "m.z","in-mz"
],
"nagaland": [
"nagaland", "nl", "n.l","in-nl"
],
"odisha": [
"odisha", "orissa","od", "o.d",
"or", "o.r", "odisha state", "in-od"
],
"punjab": [
"punjab", "panjab",
"pb", "p.b","in-pb"
],
"rajasthan": [
"rajasthan", "raj",
"rj", "r.j", "rajasthan state","in-rj"
],
"sikkim": [
"sikkim", "sk", "s.k", "in-sk"
],
"tamil nadu": [
"tamil nadu", "tamilnadu", "tamil",
"tn", "t.n", "t.n.", "tamilnadu state","in-tn"
],
"telangana": [
"telangana", "telengana","in-ts",
"tg", "t.g", "ts", "t.s", "telangana state",
],
"tripura": [
"tripura", "tr", "t.r","in-tr"
],
"uttar pradesh": [
"uttar pradesh", "uttarpradesh", "uttar",
"up", "u.p", "u.p.", "up state","in-up"
],
"uttarakhand": [
"uttarakhand", "uttaranchal",
"uk", "u.k", "ua", "uttarakhand state","in-uk"
],
"west bengal": [
"west bengal", "westbengal", "in-wb",
"wb", "w.b", "w.b.", "west bengal state",
],
# -------------------- UNION TERRITORIES --------------------
"andaman and nicobar islands": [
"andaman and nicobar islands", "andaman nicobar",
"andaman", "nicobar", "an", "a.n", "a & n islands","in-an"
],
"chandigarh": [
"chandigarh", "ch", "c.h",
"in-ch", "mohali", "sas nagar","kharar",
"panchkula", "zirakpur"
],
"dadra and nagar haveli and daman and diu": [
"dadra and nagar haveli and daman and diu",
"dadra nagar haveli", "daman diu",
"dn", "d.n", "dnh", "dd","in-dh"
],
"delhi": [
"delhi", "new delhi","dl", "d.l",
"nct of delhi", "national capital territory of delhi",
"in-dl","delhi", "new delhi","north east delhi", "north west delhi",
"south east delhi", "south west delhi","seelampur", "shahdara",
"dwarka", "rohini", "pitampura", "karol bagh",
"lajpat nagar", "saket", "janakpuri",
"mayur vihar", "vasant kunj", "okhla",
"noida", "greater noida", "faridabad", "ghz",
"ghaziabad", "indirapuram","gurugram", "gurgaon",
],
"jammu and kashmir": [
"jammu and kashmir",
"jammu", "kashmir","in-jk",
"jk", "j.k", "j&k", "jammu & kashmir"
],
"ladakh": [
"ladakh", "la", "l.a","in-la"
],
"lakshadweep": [
"lakshadweep", "lakshadweep islands",
"ld", "l.d","in-ld"
],
"puducherry": [
"puducherry", "pondicherry",
"py", "p.y","in-py"
],
}
CITY_MAPPING = {
"mumbai": [
"mumbai", "bombay", "mumbai suburban"
],
"delhi": [
"delhi", "new delhi", "delhi ncr", "nct of delhi",
"seelampur", "shahdara", "dwarka", "rohini",
"pitampura", "karol bagh", "lajpat nagar",
"saket", "janakpuri", "mayur vihar",
"vasant kunj", "okhla"
],
"bengaluru": [
"bengaluru", "bangalore", "bengaluru urban"
],
"hyderabad": [
"hyderabad", "secunderabad", "hyderabad city"
],
"chennai": [
"chennai", "madras", "chennai city"
],
"kolkata": [
"kolkata", "calcutta", "kolkata city"
],
"pune": [
"pune", "poona"
],
"ahmedabad": [
"ahmedabad", "amdavad"
],
"jaipur": [
"jaipur", "pink city"
],
"lucknow": [
"lucknow", "lakhnau"
],
"kanpur": [
"kanpur", "cawnpore"
],
"nagpur": [
"nagpur"
],
"indore": [
"indore"
],
"thane": [
"thane", "thana"
],
"bhopal": [
"bhopal"
],
"visakhapatnam": [
"visakhapatnam", "vizag", "vishakhapatnam"
],
"pimpri-chinchwad": [
"pimpri-chinchwad", "pimpri chinchwad", "pcmc"
],
"patna": [
"patna", "pataliputra"
],
"vadodara": [
"vadodara", "baroda"
],
"ghaziabad": [
"ghaziabad", "ghz"
],
"ludhiana": [
"ludhiana"
],
"agra": [
"agra"
],
"nashik": [
"nashik", "nasik"
],
"faridabad": [
"faridabad"
],
"meerut": [
"meerut"
],
"rajkot": [
"rajkot"
],
"kalyan-dombivli": [
"kalyan-dombivli", "kalyan", "dombivli"
],
"vasai-virar": [
"vasai-virar", "vasai", "virar"
],
"varanasi": [
"varanasi", "banaras", "benares", "kashi"
],
"srinagar": [
"srinagar"
],
"aurangabad": [
"aurangabad"
],
"dhanbad": [
"dhanbad"
],
"amritsar": [
"amritsar"
],
"navi mumbai": [
"navi mumbai", "new bombay"
],
"allahabad": [
"allahabad", "prayagraj", "ilahabad"
],
"ranchi": [
"ranchi"
],
"howrah": [
"howrah", "haora"
],
"coimbatore": [
"coimbatore"
],
"jabalpur": [
"jabalpur", "jubbulpore"
],
"gwalior": [
"gwalior"
],
"vijayawada": [
"vijayawada"
],
"jodhpur": [
"jodhpur"
],
"madurai": [
"madurai"
],
"raipur": [
"raipur"
],
"kota": [
"kota"
],
"guwahati": [
"guwahati", "gauhati"
],
"chandigarh": [
"chandigarh", "mohali", "sas nagar", "kharar",
"panchkula", "zirakpur"
],
"solapur": [
"solapur", "sholapur"
],
"hubli-dharwad": [
"hubli-dharwad", "hubli", "dharwad"
],
"bareilly": [
"bareilly"
],
"moradabad": [
"moradabad"
],
"mysore": [
"mysore", "mysuru"
],
"gurgaon": [
"gurgaon", "gurugram"
],
"aligarh": [
"aligarh"
],
"jalandhar": [
"jalandhar"
],
"tiruchirappalli": [
"tiruchirappalli", "trichy", "trichinopoly"
],
"bhubaneswar": [
"bhubaneswar", "bhubaneshwar"
],
"salem": [
"salem"
],
"warangal": [
"warangal"
],
"thiruvananthapuram": [
"thiruvananthapuram", "trivandrum"
],
"guntur": [
"guntur"
],
"bhiwandi": [
"bhiwandi"
],
"saharanpur": [
"saharanpur"
],
"gorakhpur": [
"gorakhpur"
],
"bikaner": [
"bikaner"
],
"amravati": [
"amravati"
],
"noida": [
"noida"
],
"jamshedpur": [
"jamshedpur", "tatanagar"
],
"bhilai": [
"bhilai", "bhilai nagar"
],
"cuttack": [
"cuttack"
],
"firozabad": [
"firozabad"
],
"kochi": [
"kochi", "cochin"
],
"bhavnagar": [
"bhavnagar"
],
"dehradun": [
"dehradun", "dehra dun"
],
"durgapur": [
"durgapur"
],
"asansol": [
"asansol"
],
"nanded": [
"nanded"
],
"kolhapur": [
"kolhapur"
],
"ajmer": [
"ajmer"
],
"gulbarga": [
"gulbarga", "kalaburagi"
],
"jamnagar": [
"jamnagar"
],
"ujjain": [
"ujjain"
],
"loni": [
"loni"
],
"siliguri": [
"siliguri"
],
"jhansi": [
"jhansi"
],
"ulhasnagar": [
"ulhasnagar"
],
"nellore": [
"nellore"
],
"jammu": [
"jammu"
],
"sangli-miraj-kupwad": [
"sangli-miraj-kupwad", "sangli", "miraj", "kupwad"
],
"belgaum": [
"belgaum", "belagavi"
],
"mangalore": [
"mangalore", "mangaluru"
],
"ambattur": [
"ambattur"
],
"tirunelveli": [
"tirunelveli"
],
"malegaon": [
"malegaon"
],
"greater noida": [
"greater noida"
]
}
def standardize_state(state_str):
"""
Standardize state names to canonical form
Returns standard name or original if not found
"""
if not state_str:
return None
# Normalize: strip, lowercase, remove extra spaces
normalized = state_str.strip().lower()
normalized = re.sub(r'\s+', ' ', normalized)
# Check if already standard
if normalized in STATE_MAPPING:
return normalized
# Find matching state
for standard_name, variants in STATE_MAPPING.items():
if normalized in variants:
return standard_name
# Not found
return state_str.strip().lower()
def standardize_city(city_str):
"""
Standardize city names to canonical form
Returns standard name or original if not found
"""
if not city_str:
return None
# Normalize: strip, lowercase, remove extra spaces
normalized = city_str.strip().lower()
normalized = re.sub(r'\s+', ' ', normalized)
# Check if already standard
if normalized in CITY_MAPPING:
return normalized
# Find matching city
for standard_name, variants in CITY_MAPPING.items():
if normalized in variants:
return standard_name
# Not found - return normalized version
return normalized
def standardize_address(address_str):
"""
Standardize address components
"""
if not address_str:
return None
address = address_str.upper().strip()
# Standardize common abbreviations
replacements = {
r'\bSTR\.?\b': 'STREET',
r'\bRD\.?\b': 'ROAD',
r'\bAVE\.?\b': 'AVENUE',
r'\bBLVD\.?\b': 'BOULEVARD',
r'\bAPT\.?\b': 'APARTMENT',
r'\bFL\.?\b': 'FLOOR',
r'\bSTE\.?\b': 'SUITE',
}
for pattern, replacement in replacements.items():
address = re.sub(pattern, replacement, address)
# Remove extra spaces
address = re.sub(r'\s+', ' ', address)
return address.strip()
# =========================================================
# FIELD COMPARISON FUNCTIONS
# =========================================================
def compare_exact(val1, val2):
"""Exact match (case-insensitive)"""
if not val1 or not val2:
return 0
# Normalize
v1 = str(val1).strip().upper()
v2 = str(val2).strip().upper()
# Exact match
return 100 if v1 == v2 else 0
def compare_any_match(list1, list2, field_type="pincode"):
"""
1:N matching for lists of values (pincodes, states, cities)
Returns 100 if any value in list1 matches any value in list2
"""
# Filter out None and empty values
valid_list1 = [v for v in list1 if v and str(v).strip() not in ["", "-", " "]]
valid_list2 = [v for v in list2 if v and str(v).strip() not in ["", "-", " "]]
if not valid_list1 or not valid_list2:
return 0
# Normalize based on field type
if field_type == "pincode":
normalized_list1 = [validate_and_normalize_pincode(v) for v in valid_list1]
normalized_list2 = [validate_and_normalize_pincode(v) for v in valid_list2]
elif field_type == "state":
normalized_list1 = [standardize_state(v) for v in valid_list1]
normalized_list2 = [standardize_state(v) for v in valid_list2]
elif field_type == "city":
normalized_list1 = [standardize_city(v) for v in valid_list1]
normalized_list2 = [standardize_city(v) for v in valid_list2]
else:
normalized_list1 = [str(v).strip().upper() for v in valid_list1]
normalized_list2 = [str(v).strip().upper() for v in valid_list2]
# Remove None values after normalization
normalized_list1 = [v for v in normalized_list1 if v]
normalized_list2 = [v for v in normalized_list2 if v]
if not normalized_list1 or not normalized_list2:
return 0
# Check for any match
for v1 in normalized_list1:
if v1 in normalized_list2:
return 100
return 0
def compare_phone_any_match(phones1, phones2):
"""1:N matching for phone numbers"""
# Filter and normalize
valid_phones1 = [validate_and_normalize_phone(p) for p in phones1 if p]
valid_phones2 = [validate_and_normalize_phone(p) for p in phones2 if p]
# Remove None values
valid_phones1 = [p for p in valid_phones1 if p]
valid_phones2 = [p for p in valid_phones2 if p]
if not valid_phones1 or not valid_phones2:
return 0
# Check for exact match
for p1 in valid_phones1:
if p1 in valid_phones2:
return 100
return 0
def compare_email_any_match(emails1, emails2):
"""1:N matching for email addresses"""
# Filter and normalize
valid_emails1 = [validate_and_normalize_email(e) for e in emails1 if e]
valid_emails2 = [validate_and_normalize_email(e) for e in emails2 if e]
# Remove None values
valid_emails1 = [e for e in valid_emails1 if e]
valid_emails2 = [e for e in valid_emails2 if e]
if not valid_emails1 or not valid_emails2:
return 0
# Check for exact match
for e1 in valid_emails1:
if e1 in valid_emails2:
return 100
return 0
# =========================================================
# MATCHING RULES
# =========================================================
def evaluate_matching_rules(field_scores: Dict[str, float]) -> tuple:
"""
Evaluate matching rules and return overall decision
Returns: (decision, reason)
"""
def get_score(field_name):
return field_scores.get(field_name, 0)
def rule_satisfied(conditions):
for field, threshold in conditions:
if get_score(field) < threshold:
return False
return True
# Matching rules in priority order
RULES = [
([("NAME", 100), ("BIRTHDATE", 100), ("PHONE", 100)],
"NAME >= 100 AND DOB >= 100 AND PHONE >= 100"),
([("NAME", 100), ("BIRTHDATE", 100), ("EMAIL", 100)],
"NAME >= 100 AND DOB >= 100 AND EMAIL >= 100"),
([("NAME", 100), ("BIRTHDATE", 100), ("ADDRESSLINE", 70)],
"NAME >= 100 AND DOB >= 100 AND ADDRESS >= 70"),
([("NAME", 100), ("ZIPCODE", 100), ("ADDRESSLINE", 65)],
"NAME >= 100 AND ZIPCODE >= 100 AND ADDRESS >= 65"),
([("NAME", 100), ("CITY", 100), ("ADDRESSLINE", 65)],
"NAME >= 100 AND CITY >= 100 AND ADDRESS >= 65"),
([("NAME", 85), ("LASTNAME", 85), ("BIRTHDATE", 100), ("ADDRESSLINE", 60)],
"NAME >= 85 AND LASTNAME >= 85 AND DOB >= 100 AND ADDRESS >= 60"),
([("NAME", 85), ("BIRTHDATE", 100), ("ZIPCODE", 100)],
"NAME >= 85 AND DOB >= 100 AND ZIPCODE >= 100"),
([("NAME", 85), ("BIRTHDATE", 100), ("CITY", 100)],
"NAME >= 85 AND DOB >= 100 AND CITY >= 100"),
([("NAME", 85), ("ZIPCODE", 100), ("ADDRESSLINE", 60)],
"NAME >= 85 AND ZIPCODE >= 100 AND ADDRESS >= 60"),
([("NAME", 85), ("CITY", 100), ("ADDRESSLINE", 60)],
"NAME >= 85 AND CITY >= 100 AND ADDRESS >= 60"),
([("BIRTHDATE", 100), ("ZIPCODE", 100), ("ADDRESSLINE", 65)],
"BIRTHDATE >= 100 AND ZIPCODE >= 100 AND ADDRESS >= 65"),
([("BIRTHDATE", 100), ("CITY", 100), ("ADDRESSLINE", 65)],
"BIRTHDATE >= 100 AND CITY >= 100 AND ADDRESS >= 65"),
([("LASTNAME", 85), ("ZIPCODE", 100), ("ADDRESSLINE", 60)],
"LASTNAME >= 85 AND ZIPCODE >= 100 AND ADDRESS >= 60"),
([("NAME", 85), ("PHONE", 100)],
"NAME >= 85 AND PHONE >= 100"),
([("BIRTHDATE", 100), ("PHONE", 100)],
"BIRTHDATE >= 100 AND PHONE >= 100"),
([("BIRTHDATE", 100), ("NAME", 85)],
"BIRTHDATE >=100 AND NAME>=85"),
([("ADDRESSLINE", 60), ("TAXID", 100)],
"ADDRESS >= 60 and PAN >= 100"),
([("ADDRESSLINE", 60), ("LICENSEID", 100)],
"ADDRESS >= 60 and DRIVING_LICN_NO >= 100"),
([("BIRTHDATE", 75), ("PHONE", 100)],
"BIRTHDATE >= 75 and PHONE >= 100"),
([("BIRTHDATE", 75), ("TAXID", 100)],
"BIRTHDATE >= 75 and PAN >= 100"),
([("BIRTHDATE", 75), ("LICENSEID", 100)],
"BIRTHDATE >= 75 and DRIVING_LICN_NO >= 100"),
([("BIRTHDATE", 75), ("PASSPORTID", 100)],
"BIRTHDATE >= 75 and PASSPORT_NO >= 100"),
([("NAME", 60), ("PASSPORTID", 100)],
"NAME >= 60 and PASSPORT_NO >= 100"),
([("NAME", 60), ("LICENSEID", 100)],
"NAME >= 60 and DRIVING_LICN_NO >= 100"),
([("NAME", 60), ("TAXID", 100)],
"NAME >= 60 and PAN >= 100"),
([("PHONE", 100)], "PHONE >= 100"),
([("LICENSEID", 100)], "DRIVING_LICN_NO >= 100"),
([("PASSPORTID", 100)], "PASSPORT_NO >= 100"),
([("TAXID", 100)], "PAN >= 100"),
([("EMAIL", 100)], "EMAIL >= 100"),
]
# Check each rule in order
for conditions, reason in RULES:
if rule_satisfied(conditions):
return "Match", reason
return "No Match", "None of the defined matching rules were satisfied"
# =========================================================
# PATTERN-BASED FIELD MATCHING (0 or 100 logic)
# =========================================================
def apply_pattern_matching_logic(field_name: str, score) -> float:
"""
Apply 0 or 100 logic for pattern-based fields
For DOB, PHONE, EMAIL, ZIPCODE, etc.: if match -> 100, else -> 0
For other fields: return the actual similarity score
"""
# Pattern fields that should be 0 or 100
PATTERN_FIELDS = {
"BIRTHDATE", "PHONE", "EMAIL", "ZIPCODE",
"TAXID", "LICENSEID", "PASSPORTID", "GENDER"
}
# If it's a missing value, keep it as is
if score == "missing value":
return 0
# If it's a pattern field, apply 0 or 100 logic
if field_name in PATTERN_FIELDS:
return 100 if score >= 100 else 0
# For non-pattern fields, return the actual score
return score
# -----------------------------
# GIVEN ORDERED HOUSE PATTERNS
# -----------------------------
HOUSE_NUMBER_PATTERNS_ORDERED = [
r"\b(MIG|HIG|LIG)-\d+[A-Z]?\b",
r"\b\d+(?:-\d+){2,}[A-Z]?\b",
r"\b\d+-\d+/\d+[A-Z]?\b",
r"\b\d+-\d+/[A-Z]\b",
r"\b\d+-\d+/\d+\b",
r"\b\d+/\d+(?:/\d+)?\s?[A-Z]?\b",
r"\b\d+-\d+[A-Z]\b",
r"\b\d+-\d+\b",
r"\b[A-Z]{1,2}-?\d+[A-Z]?\b",
r"\b\d+[A-Z]\b",
r"\b\d{1,4}\b",
]
# -----------------------------
# NORMALIZATION
# -----------------------------
def normalize(text: str) -> str:
text = text.upper()
text = re.sub(r"(?<=\d)(?=[a-zA-Z])", " ", text)
text = re.sub(r"[,:]", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def is_street_context(text, match_start):
window = text[max(0, match_start - 20):match_start]
if re.search(rf"\b{STREET_KEYWORD}\b", window):
return True
return False
# -----------------------------
# CONTEXT-AWARE EXTRACTION
# -----------------------------
def extract_by_identifiers(text, identifier, patterns):
"""
Returns tuple: (extracted_value, match_object with full pattern including identifier)
"""
# capture text AFTER identifier
pattern = rf"{re.escape(identifier)}\s*([a-z0-9/\- ]{{1,15}})"
match = re.search(pattern, text)
if match:
candidate = match.group(1).strip().upper()
for p in patterns:
m = re.search(p, candidate)
if m:
return m.group(), match # Return both value and full match
return None, None
# -----------------------------
# HOUSE NUMBER EXTRACTION
# -----------------------------
def extract_leading_house_number(text):
tokens = text.strip().split()
if len(tokens) < 2:
return None, None
first = tokens[0].upper()
second = tokens[1].upper()
# First token must look like a house number
if not re.fullmatch(r"[A-Z]?\d+[A-Z]?", first):
return None, None
# Second token must NOT be a street keyword
if second == STREET_KEYWORD:
return None, None
# Create a match object for the first token
match = re.search(rf"\b{re.escape(first)}\b", text)
return first, match
def extract_house_number(text):
"""
Returns tuple: (house_number, match_object)
"""
# Identifier-based (highest confidence)
result, match = extract_by_identifiers(
text,
HOUSE_NUMBER_IDENTIFIER,
HOUSE_NUMBER_PATTERNS_ORDERED
)
if result:
return result, match
# Leading-token heuristic (NEW)
leading, match = extract_leading_house_number(text)
if leading:
return leading, match
# Regex fallback (street-blocked)
for pattern in HOUSE_NUMBER_PATTERNS_ORDERED:
for match in re.finditer(pattern, text.upper()):
if is_street_context(text, match.start()):
continue
return match.group(), match
return None, None
# -----------------------------
# FLAT NUMBER EXTRACTION
# -----------------------------
def extract_flat_number(text):
"""
Returns tuple: (flat_number, match_object)
"""
# Flat numbers are usually SHORT
FLAT_PATTERNS = [
r"\b\d+[A-Z]?\b",
r"\b[A-Z]-?\d+\b",
]
return extract_by_identifiers(
text,
FLAT_NUMBER_IDENTIFIER,
FLAT_PATTERNS
)
# -----------------------------
# APARTMENT / BUILDING EXTRACTION
# -----------------------------
def extract_apartment(text):
"""
Returns tuple: (apartment_name, match_object)
"""
for ident in APARTMENT_IDENTIFIER:
pattern = rf"{re.escape(ident)}\s+([a-z0-9\- ]{{2,40}})"
match = re.search(pattern, text)
if match:
return match.group(1).strip().title(), match
return None, None
# -----------------------------
# HELPER FUNCTION TO REMOVE PATTERN
# -----------------------------
def remove_pattern_from_text(text, match_obj):
"""
Removes the matched pattern from text and cleans up extra spaces
"""
if match_obj is None:
return text
# Get the matched string
matched_str = match_obj.group()
# Remove the matched string from text
cleaned = text[:match_obj.start()] + text[match_obj.end():]
# Clean up multiple spaces
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
# -----------------------------
# MASTER FUNCTION
# -----------------------------
def extract_address_components(address_line: str) -> dict:
normalized = normalize(address_line)
remaining_address = normalized
# Extract house number
house_no, house_match = extract_house_number(remaining_address)
if house_match:
remaining_address = remove_pattern_from_text(remaining_address, house_match)
# Extract flat number
flat_no, flat_match = extract_flat_number(remaining_address)
if flat_match:
remaining_address = remove_pattern_from_text(remaining_address, flat_match)
# Extract apartment
apartment, apt_match = extract_apartment(remaining_address)
if apt_match:
remaining_address = remove_pattern_from_text(remaining_address, apt_match)
# Final cleanup of remaining address
remaining_address = re.sub(r"\s+", " ", remaining_address).strip()
remaining_address = re.sub(r"^[,\s]+|[,\s]+$", "", remaining_address) # Remove leading/trailing commas
return {
"house_number": house_no,
"flat_number": flat_no,
"apartment": apartment,
"remaining_address": remaining_address # The cleaned address without extracted components
}