import streamlit as st import json import re import difflib from PIL import Image import base64 import os # ========================================================= # PAGE CONFIG # ========================================================= st.set_page_config( page_title="GEN AI Record Level Matching", page_icon="🔍", layout="wide", initial_sidebar_state="collapsed" ) # ========================================================= # CUSTOM CSS # ========================================================= st.markdown(""" """, unsafe_allow_html=True) # ========================================================= # SESSION STATE # ========================================================= MAX_FIELDS = 20 defaults = { 'address_ids_r1': [0], 'address_ids_r2': [0], 'phone_ids_r1': [0], 'phone_ids_r2': [0], 'email_ids_r1': [0], 'email_ids_r2': [0], 'custom_fields_r1': [], 'custom_fields_r2': [], } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v # ========================================================= # PURE PYTHON MATCHING LOGIC (no external ML libs) # ========================================================= def normalize_text(text): if not text: return "" return re.sub(r"\s+", " ", str(text).strip().lower()) def fuzzy_ratio(a, b): """Simple fuzzy ratio using difflib (0-100)""" if not a or not b: return 0 return int(difflib.SequenceMatcher(None, a, b).ratio() * 100) def token_sort_ratio(a, b): """Token sort ratio - sort words before comparing""" if not a or not b: return 0 a_sorted = " ".join(sorted(a.split())) b_sorted = " ".join(sorted(b.split())) return fuzzy_ratio(a_sorted, b_sorted) def name_similarity(a, b): """Compare two name strings""" if not a and not b: return -1 # both missing if not a or not b: return 0 a, b = normalize_text(a), normalize_text(b) r1 = fuzzy_ratio(a, b) r2 = token_sort_ratio(a, b) return max(r1, r2) def match_names(name1, fn1, ln1, mn1, name2, fn2, ln2, mn2): """Match full name records, returns dict with percent scores""" def build_full(name, fn, mn, ln): parts = [p for p in [fn, mn, ln] if p and p.strip()] if parts: return " ".join(parts) return name or "" full1 = normalize_text(build_full(name1, fn1, mn1, ln1) or name1 or "") full2 = normalize_text(build_full(name2, fn2, mn2, ln2) or name2 or "") full_score = name_similarity(full1, full2) if (full1 or full2) else -1 fn_score = name_similarity(normalize_text(fn1), normalize_text(fn2)) if (fn1 or fn2) else -1 mn_score = name_similarity(normalize_text(mn1), normalize_text(mn2)) if (mn1 or mn2) else -1 ln_score = name_similarity(normalize_text(ln1), normalize_text(ln2)) if (ln1 or ln2) else -1 return { "full_name_percent": full_score, "firstname_percent": fn_score, "middlename_percent": mn_score, "lastname_percent": ln_score, } def match_single(a, b): """Generic single field name/text matching""" if not a and not b: return -1 return name_similarity(normalize_text(a), normalize_text(b)) def match_addresses(addrs1, addrs2): """Match lists of addresses, return best score""" valid1 = [normalize_text(a) for a in addrs1 if a and a.strip()] valid2 = [normalize_text(a) for a in addrs2 if a and a.strip()] if not valid1 and not valid2: return -1 if not valid1 or not valid2: return 0 best = 0 for a1 in valid1: for a2 in valid2: s = max(fuzzy_ratio(a1, a2), token_sort_ratio(a1, a2)) if s > best: best = s return best def normalize_phone(p): if not p: return "" return re.sub(r"[^\d]", "", str(p)) def compare_phones(phones1, phones2): v1 = [normalize_phone(p) for p in phones1 if p and normalize_phone(p)] v2 = [normalize_phone(p) for p in phones2 if p and normalize_phone(p)] if not v1 and not v2: return -1 if not v1 or not v2: return 0 for p1 in v1: for p2 in v2: if p1 == p2 or p1[-10:] == p2[-10:]: return 100 return 0 def compare_emails(emails1, emails2): v1 = [e.strip().lower() for e in emails1 if e and e.strip()] v2 = [e.strip().lower() for e in emails2 if e and e.strip()] if not v1 and not v2: return -1 if not v1 or not v2: return 0 for e1 in v1: for e2 in v2: if e1 == e2: return 100 return 0 def compare_exact(a, b): if not a and not b: return -1 if not a or not b: return 0 return 100 if normalize_text(a) == normalize_text(b) else 0 def standardize_city(city): if not city: return "" return re.sub(r"\s+", " ", str(city).strip().upper()) def standardize_state(state): if not state: return "" return re.sub(r"\s+", " ", str(state).strip().upper()) def standardize_dob(dob): if not dob: return "" dob = dob.strip() # Try to normalize to YYYY-MM-DD for fmt in [r"(\d{4})[/-](\d{2})[/-](\d{2})", r"(\d{2})[/-](\d{2})[/-](\d{4})"]: m = re.match(fmt, dob) if m: g = m.groups() if len(g[0]) == 4: return f"{g[0]}-{g[1]}-{g[2]}" else: return f"{g[2]}-{g[1]}-{g[0]}" return dob def normalize_gender(val): if not val: return None s = str(val).strip().lower() if s in ['m', 'male', 'men', 'man']: return 'MALE' if s in ['f', 'female', 'women', 'woman']: return 'FEMALE' return s.upper() def score_to_label(score, field): """Convert numeric score to display value""" if score == -1: return "missing value" return round(float(score), 2) def get_dynamic_fields(record, prefix): fields = [] i = 0 while True: key = f"{prefix}{i}" if key in record: fields.append(record.get(key)) i += 1 else: break return fields def is_valid(val): return val and str(val).strip() not in ["", "-", " ", "NA", "N/A", "NULL"] def evaluate_rules(scores): """Simple rule-based overall decision""" numeric_scores = {k: v for k, v in scores.items() if isinstance(v, (int, float)) and v != -1} missing = {k: v for k, v in scores.items() if v == "missing value" or v == -1} if not numeric_scores: return "UNABLE TO DETERMINE", "Insufficient data to make a determination." # Strong identifiers strong_ids = ["AADHAR", "PAN", "PASSPORTID", "LICENSEID", "VOTERID"] for sid in strong_ids: if scores.get(sid) == 100: return "MATCH", f"Strong identifier match on {sid}." # Name + DOB + phone name_score = scores.get("NAME", scores.get("FIRSTNAME", 0)) if isinstance(name_score, str): name_score = 0 high_matches = sum(1 for k, v in numeric_scores.items() if isinstance(v, (int, float)) and v >= 80) total_evaluated = len(numeric_scores) if total_evaluated == 0: return "UNABLE TO DETERMINE", "No fields to compare." match_ratio = high_matches / total_evaluated if match_ratio >= 0.7: return "MATCH", f"{high_matches}/{total_evaluated} fields matched at ≥80%." elif match_ratio >= 0.4: return "POSSIBLE MATCH", f"{high_matches}/{total_evaluated} fields matched at ≥80%." else: return "NO MATCH", f"Only {high_matches}/{total_evaluated} fields matched at ≥80%." def match_records(r1, r2): """Full matching pipeline""" # Name matching name_result = match_names( r1.get("name"), r1.get("firstname"), r1.get("lastname"), r1.get("middlename"), r2.get("name"), r2.get("firstname"), r2.get("lastname"), r2.get("middlename") ) # Address matching r1_addrs = get_dynamic_fields(r1, "addressline_") r2_addrs = get_dynamic_fields(r2, "addressline_") address_score = match_addresses(r1_addrs, r2_addrs) # Phone r1_phones = get_dynamic_fields(r1, "phone_") r2_phones = get_dynamic_fields(r2, "phone_") phone_score = compare_phones(r1_phones, r2_phones) # Email r1_emails = get_dynamic_fields(r1, "email_") r2_emails = get_dynamic_fields(r2, "email_") email_score = compare_emails(r1_emails, r2_emails) # City / State / Zipcode r1_cities = [standardize_city(c) for c in get_dynamic_fields(r1, "city_") if is_valid(c)] r2_cities = [standardize_city(c) for c in get_dynamic_fields(r2, "city_") if is_valid(c)] r1_states = [standardize_state(s) for s in get_dynamic_fields(r1, "state_") if is_valid(s)] r2_states = [standardize_state(s) for s in get_dynamic_fields(r2, "state_") if is_valid(s)] r1_zips = get_dynamic_fields(r1, "zipcode_") r2_zips = get_dynamic_fields(r2, "zipcode_") city_score = -1 if r1_cities or r2_cities: city_score = 100 if any(c1 == c2 for c1 in r1_cities for c2 in r2_cities) else 0 state_score = -1 if r1_states or r2_states: state_score = 100 if any(s1 == s2 for s1 in r1_states for s2 in r2_states) else 0 zipcode_score = compare_exact( next((z for z in r1_zips if is_valid(z)), None), next((z for z in r2_zips if is_valid(z)), None) ) if (r1_zips or r2_zips) else -1 # Exact fields def safe_exact(k1, k2=None): k2 = k2 or k1 return compare_exact(r1.get(k1), r2.get(k2)) g1 = normalize_gender(r1.get("gender")) g2 = normalize_gender(r2.get("gender")) if not g1 and not g2: gender_score = -1 elif g1 and g2: gender_score = 100 if g1 == g2 else 0 else: gender_score = 0 results = { "GENDER": gender_score, "NAME": name_result["full_name_percent"], "FIRSTNAME": name_result["firstname_percent"], "MIDDLENAME": name_result["middlename_percent"], "LASTNAME": name_result["lastname_percent"], "SPOUSENAME": match_single(r1.get("spousename"), r2.get("spousename")), "MOTHERNAME": match_single(r1.get("mothername"), r2.get("mothername")), "FATHERNAME": match_single(r1.get("fathername"), r2.get("fathername")), "COMPANYNAME": match_single(r1.get("companyname"), r2.get("companyname")), "PARENTCOMPANYNAME": match_single(r1.get("parentcompanyname"), r2.get("parentcompanyname")), "AADHAR": safe_exact("AADHAR"), "PAN": safe_exact("pan"), "LICENSEID": safe_exact("licenseid"), "PASSPORTID": safe_exact("passportid"), "VOTERID": safe_exact("voterid"), "BIRTHDATE": compare_exact(r1.get("dob"), r2.get("dob")), "PHONE": phone_score, "EMAIL": email_score, "ADDRESSLINE": address_score, "CITY": city_score, "STATE": state_score, "ZIPCODE": zipcode_score, } # Custom fields known = {"name","firstname","middlename","lastname","spousename","mothername", "fathername","dob","gender","AADHAR","pan","licenseid","passportid", "voterid","companyname","parentcompanyname"} dyn_prefixes = ("zipcode_","city_","state_","phone_","email_","addressline_") all_keys = set(r1.keys()) | set(r2.keys()) for key in all_keys: ks = str(key) if ks in known: continue if any(ks.startswith(p) for p in dyn_prefixes): continue v1, v2 = r1.get(key), r2.get(key) if v1 or v2: results[ks.upper()] = compare_exact(v1, v2) return results # ========================================================= # UI HELPERS # ========================================================= def preprocess_text(text): if not text: return "" return re.sub(r"\s+", " ", text.strip()) def create_section_card(title, icon_svg, content_func, *args, **kwargs): st.markdown(f'''
{icon_svg} {title}
''', unsafe_allow_html=True) result = content_func(*args, **kwargs) st.markdown('
', unsafe_allow_html=True) return result ICONS = { "user": '', "id": '', "map": '', "phone": '', "briefcase": '' } # ========================================================= # SECTION CONTENT FUNCTIONS # ========================================================= def name_fields_content(record_num, prefix=""): col1, col2 = st.columns(2) with col1: full_name = st.text_input("Full Name", key=f"{prefix}name_{record_num}", placeholder="Enter full name") with col2: first_name = st.text_input("First Name", key=f"{prefix}firstname_{record_num}", placeholder="Enter first name") col1, col2 = st.columns(2) with col1: middle_name = st.text_input("Middle Name", key=f"{prefix}middlename_{record_num}", placeholder="Enter middle name") with col2: last_name = st.text_input("Last Name", key=f"{prefix}lastname_{record_num}", placeholder="Enter last name") col1, col2 = st.columns(2) with col1: mother_name = st.text_input("Mother's Name", key=f"{prefix}mothername_{record_num}", placeholder="Enter mother's name") with col2: father_name = st.text_input("Father's Name", key=f"{prefix}fathername_{record_num}", placeholder="Enter father's name") col1, col2 = st.columns(2) with col1: spouse_name = st.text_input("Spouse's Name", key=f"{prefix}spousename_{record_num}", placeholder="Enter spouse's name") with col2: other_name = st.text_input("Other Name", key=f"{prefix}othername_{record_num}", placeholder="Enter other name") col1, col2 = st.columns(2) with col1: dob = st.text_input("Date of Birth", key=f"{prefix}dob_{record_num}", placeholder="YYYY-MM-DD") with col2: gender = st.text_input("Gender", key=f"{prefix}gender_{record_num}", placeholder="Male/Female/Other") return { "name": full_name, "firstname": first_name, "middlename": middle_name, "lastname": last_name, "mothername": mother_name, "fathername": father_name, "spousename": spouse_name, "othername": other_name, "gender": gender, "dob": dob } def identifier_fields_content(record_num, prefix=""): col1, col2 = st.columns(2) with col1: aadhar = st.text_input("Aadhar Number", key=f"{prefix}taxid_{record_num}", placeholder="Enter Aadhar number") with col2: pan = st.text_input("PAN Number", key=f"{prefix}pan_{record_num}", placeholder="Enter PAN number") col1, col2 = st.columns(2) with col1: license_id = st.text_input("License Number", key=f"{prefix}licenseid_{record_num}", placeholder="Enter license number") with col2: passport = st.text_input("Passport Number", key=f"{prefix}passportid_{record_num}", placeholder="Enter passport number") col1, _ = st.columns(2) with col1: voter_id = st.text_input("Voter ID", key=f"{prefix}voterid_{record_num}", placeholder="Enter voter ID") st.markdown('
Custom Fields
', unsafe_allow_html=True) custom_fields = st.session_state[f"custom_fields_{prefix.strip('_')}"] custom_data = {} for idx, field in enumerate(custom_fields): col_c1, col_c2, col_rem = st.columns([5, 5, 1]) with col_c1: field_name = st.text_input(f"Field Name {idx+1}", value=field.get('name', ''), key=f"{prefix}custom_name_{idx}_{record_num}", placeholder="Field Name") custom_fields[idx]['name'] = field_name with col_c2: field_val = st.text_input(f"Field Value {idx+1}", value=field.get('value', ''), key=f"{prefix}custom_val_{idx}_{record_num}", placeholder="Value") custom_fields[idx]['value'] = field_val if field_name: custom_data[field_name] = field_val with col_rem: st.write("") st.write("") if st.button("−", key=f"{prefix}remove_custom_{idx}_{record_num}"): custom_fields.pop(idx) st.rerun() if st.button("+ ADD FIELD", key=f"{prefix}add_custom_{record_num}", type="secondary"): custom_fields.append({'name': '', 'value': ''}) st.rerun() result = {"AADHAR": aadhar, "pan": pan, "licenseid": license_id, "passportid": passport, "voterid": voter_id} result.update(custom_data) return result def address_item_content(record_num, addr_id, prefix=""): address_line = st.text_input("Street Address", key=f"{prefix}addressline_{addr_id}_{record_num}", placeholder="Street, Building, Area") city = st.text_input("City", key=f"{prefix}city_{addr_id}_{record_num}", placeholder="Enter city") state = st.text_input("State", key=f"{prefix}state_{addr_id}_{record_num}", placeholder="Enter state") pincode = st.text_input("Pincode", key=f"{prefix}zipcode_{addr_id}_{record_num}", placeholder="6-digit postal code") return { f"addressline_{addr_id}": address_line, f"city_{addr_id}": city, f"state_{addr_id}": state, f"zipcode_{addr_id}": pincode, } def addresses_section_content(record_num, prefix=""): ids_key = f"address_ids_{prefix.strip('_')}" ids = st.session_state[ids_key] addresses = {} col_title, col_add = st.columns([6, 1]) with col_title: st.markdown('
Manage Addresses
', unsafe_allow_html=True) with col_add: if len(ids) < MAX_FIELDS: if st.button("+", key=f"{prefix}add_address_{record_num}"): ids.append(max(ids) + 1 if ids else 0) st.rerun() for idx, addr_id in enumerate(ids): header_cols = st.columns([8, 1]) with header_cols[0]: header_text = f"Address {addr_id + 1}" if addr_id > 0 else "Primary Address" st.markdown(f"
{header_text}
", unsafe_allow_html=True) with header_cols[1]: if len(ids) > 1: if st.button("−", key=f"{prefix}remove_address_{addr_id}_{record_num}"): ids.remove(addr_id) st.rerun() addr_data = address_item_content(record_num, addr_id, prefix) addresses.update(addr_data) if idx < len(ids) - 1: st.markdown("
", unsafe_allow_html=True) return addresses def contact_section_content(record_num, prefix=""): contacts = {} r = prefix.strip("_") phone_ids = st.session_state[f"phone_ids_{r}"] email_ids = st.session_state[f"email_ids_{r}"] st.markdown('
📞 Phone Numbers
', unsafe_allow_html=True) for i, phone_id in enumerate(phone_ids): cols = st.columns([8, 1, 1]) with cols[0]: phone_val = st.text_input(f"Phone {phone_id+1}", key=f"{prefix}phone_{phone_id}_{record_num}", placeholder="Enter phone number", label_visibility="collapsed") contacts[f"phone_{phone_id}"] = phone_val with cols[1]: if len(phone_ids) < MAX_FIELDS: if st.button("+", key=f"{prefix}add_phone_{phone_id}_{record_num}"): st.session_state[f"phone_ids_{r}"].append(max(phone_ids) + 1 if phone_ids else 0) st.rerun() with cols[2]: if len(phone_ids) > 1: if st.button("−", key=f"{prefix}remove_phone_{phone_id}_{record_num}"): st.session_state[f"phone_ids_{r}"].remove(phone_id) st.rerun() st.markdown('
', unsafe_allow_html=True) st.markdown('
✉️ Email Addresses
', unsafe_allow_html=True) for i, email_id in enumerate(email_ids): cols = st.columns([8, 1, 1]) with cols[0]: email_val = st.text_input(f"Email {email_id+1}", key=f"{prefix}email_{email_id}_{record_num}", placeholder="Enter email address", label_visibility="collapsed") contacts[f"email_{email_id}"] = email_val with cols[1]: if len(email_ids) < MAX_FIELDS: if st.button("+", key=f"{prefix}add_email_{email_id}_{record_num}"): st.session_state[f"email_ids_{r}"].append(max(email_ids) + 1 if email_ids else 0) st.rerun() with cols[2]: if len(email_ids) > 1: if st.button("−", key=f"{prefix}remove_email_{email_id}_{record_num}"): st.session_state[f"email_ids_{r}"].remove(email_id) st.rerun() return contacts def other_details_content(record_num, prefix=""): col1, col2 = st.columns(2) with col1: company = st.text_input("Company Name", key=f"{prefix}companyname_{record_num}", placeholder="Enter company name") with col2: parent_company = st.text_input("Parent Company Name", key=f"{prefix}parentcompanyname_{record_num}", placeholder="Enter parent company name") return {"companyname": company, "parentcompanyname": parent_company} # ========================================================= # MAIN # ========================================================= def main(): st.markdown('''
🔍
Record Level Matching Using Transformer based Models
''', unsafe_allow_html=True) st.markdown('
Enter details for two records below and click "Run Record Match" to see the matching result
', unsafe_allow_html=True) # Mode selector (UI only — Embedding is the only functional mode here) mode_col1, _ = st.columns([4, 6]) with mode_col1: matching_mode = st.radio( "Matching Mode", ["Embedding Mode", "LLM Mode"], key="matching_mode", horizontal=True, help="Embedding: Fuzzy/Token-based matching | LLM Mode: Requires external LLM server (unavailable in standalone)" ) if matching_mode == "LLM Mode": st.warning("⚠️ LLM Mode requires an external vLLM server. Falling back to Embedding (fuzzy) matching for standalone use.") col1, col2 = st.columns(2) with col1: st.markdown('
Record 1
', unsafe_allow_html=True) r1_names = create_section_card("Personal Details", ICONS["user"], name_fields_content, 1, "r1_") r1_identifiers = create_section_card("Equalities", ICONS["id"], identifier_fields_content, 1, "r1_") r1_addresses = create_section_card("Address Details", ICONS["map"], addresses_section_content, 1, "r1_") r1_contacts = create_section_card("Contact Information", ICONS["phone"], contact_section_content, 1, "r1_") r1_other = create_section_card("Employment Details", ICONS["briefcase"], other_details_content, 1, "r1_") with col2: st.markdown('
Record 2
', unsafe_allow_html=True) r2_names = create_section_card("Personal Details", ICONS["user"], name_fields_content, 2, "r2_") r2_identifiers = create_section_card("Equalities", ICONS["id"], identifier_fields_content, 2, "r2_") r2_addresses = create_section_card("Address Details", ICONS["map"], addresses_section_content, 2, "r2_") r2_contacts = create_section_card("Contact Information", ICONS["phone"], contact_section_content, 2, "r2_") r2_other = create_section_card("Employment Details", ICONS["briefcase"], other_details_content, 2, "r2_") if st.button("🚀 Run Record Match", type="primary", use_container_width=True): r1 = {**r1_names, **r1_identifiers, **r1_addresses, **r1_contacts, **r1_other} r2 = {**r2_names, **r2_identifiers, **r2_addresses, **r2_contacts, **r2_other} # Pre-process def process(r): out = {} for k, v in r.items(): k_str = str(k) if k_str == "dob": out[k_str] = standardize_dob(v or "") elif k_str.startswith("city_"): out[k_str] = standardize_city(v) if v else None elif k_str.startswith("state_"): out[k_str] = standardize_state(v) if v else None else: out[k_str] = preprocess_text(v) if isinstance(v, str) else v return out r1p = process(r1) r2p = process(r2) with st.spinner("Matching records..."): raw_scores = match_records(r1p, r2p) def fmt(v): if v == -1: return "missing value" return round(float(v), 2) field_scores = {k: fmt(v) for k, v in raw_scores.items()} overall_decision, reason = evaluate_rules(raw_scores) result = { "overall_decision": overall_decision, "reason": reason, "field_scores": field_scores, } st.markdown('''
📊 Matching Result (JSON)
''', unsafe_allow_html=True) st.json(result, expanded=True) if __name__ == "__main__": main()